Coverage for kiirastuli/kiirastuli.py: 30.28%

76 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2023-01-02 20:15 +0100

1"""Purge monotonically named files in folders keeping range endpoints. 

2 

3Implementation uses sha256 hashes for identity and assumes that 

4the natural order relates to the notion of fresher or better. 

5""" 

6import argparse 

7import datetime as dti 

8import logging 

9import os 

10import typing 

11 

12from puhdistusalue.puhdistusalue import read_folder, triage_hashes # type: ignore 

13from puristaa.puristaa import prefix_compression # type: ignore 

14 

15from kiirastuli import log 

16 

17BUFFER_BYTES = 2 << 15 

18 

19 

20@typing.no_type_check 

21def humanize_mass(total_less_bytes: int): 

22 """DRY""" 

23 if total_less_bytes >= 1e9: 23 ↛ 24line 23 didn't jump to line 24, because the condition on line 23 was never true

24 return f'{round(total_less_bytes / 1024 / 1024 / 1024, 3) :.3f}', 'total gigabytes' 

25 if total_less_bytes >= 1e6: 25 ↛ 26line 25 didn't jump to line 26, because the condition on line 25 was never true

26 return f'{round(total_less_bytes / 1024 / 1024, 3) :.3f}', 'total megabytes' 

27 if total_less_bytes >= 1e3: 27 ↛ 28line 27 didn't jump to line 28, because the condition on line 27 was never true

28 return f'{round(total_less_bytes / 1024, 3) :.3f}', 'total kilobytes' 

29 

30 return f'{total_less_bytes :d}', 'total bytes' 

31 

32 

33@typing.no_type_check 

34def humanize_duration(duration_seconds: float): 

35 """DRY""" 

36 if duration_seconds >= 3600: 36 ↛ 37line 36 didn't jump to line 37, because the condition on line 36 was never true

37 return f'{round(duration_seconds / 60 / 60, 3) :.3f}', 'hours' 

38 if duration_seconds >= 60: 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true

39 return f'{round(duration_seconds / 60, 3) :.3f}', 'minutes' 

40 if duration_seconds >= 1: 40 ↛ 41line 40 didn't jump to line 41, because the condition on line 40 was never true

41 return f'{round(duration_seconds, 3) :.3f}', 'seconds' 

42 

43 return f'{round(duration_seconds * 1e3, 3) :.3f}', 'millis' 

44 

45 

46@typing.no_type_check 

47def list_dir(folder_path): 

48 """Access the dir and yield the local names inside.""" 

49 return os.listdir(folder_path) 

50 

51 

52@typing.no_type_check 

53def elements_of_gen(folder_path): 

54 """Prefix names in folder path and yield sorted pairs of names and file paths.""" 

55 for name in sorted(name for name in list_dir(folder_path)): 

56 yield name, os.path.join(folder_path, name) 

57 

58 

59@typing.no_type_check 

60def main(options: argparse.Namespace) -> int: 

61 """Process the files separately per folder.""" 

62 start_time = dti.datetime.utcnow() 

63 verbose = options.verbose 

64 if verbose: 

65 logging.getLogger().setLevel(logging.DEBUG) 

66 human = options.human 

67 folders = options.folders 

68 total_removed, total_less_bytes = 0, 0 

69 for a_path in folders: 

70 hash_map = {} 

71 try: 

72 hash_map = read_folder(a_path) 

73 except FileNotFoundError as err: 

74 log.warning(f'WARNING: Skipping non-existing path ({a_path}) -> "{err}"') 

75 if hash_map: 

76 keep_these, remove_those = triage_hashes(hash_map) 

77 for this in keep_these: 

78 log.debug(f'KEEP file {this}') 

79 folder_removed, folder_less_bytes = 0, 0 

80 for that in remove_those: 

81 log.debug(f'REMOVE file {that}') 

82 target = os.path.join(a_path, that) 

83 folder_less_bytes += os.path.getsize(target) 

84 os.remove(target) 

85 folder_removed += 1 

86 

87 if verbose: 

88 log.info( 

89 f'removed {folder_removed} redundant objects or {folder_less_bytes}' 

90 f' combined bytes from folder at {a_path}' 

91 ) 

92 total_less_bytes += folder_less_bytes 

93 total_removed += folder_removed 

94 

95 prefix, rel_paths = prefix_compression(folders, policy=lambda x: x == '/') 

96 if len(rel_paths) > 5: 

97 folders_disp = f"{prefix}[{', '.join(rel_paths[:3])}, ... {rel_paths[-1]}]" 

98 else: 

99 folders_disp = f'{folders}' if folders else '[<EMPTY>]' 

100 

101 duration_seconds = (dti.datetime.utcnow() - start_time).total_seconds() 

102 if human: 

103 m_quantity, m_unit = humanize_mass(total_less_bytes) 

104 d_quantity, d_unit = humanize_duration(duration_seconds) 

105 else: 

106 m_quantity, m_unit = f'{total_less_bytes :d}', 'total bytes' 

107 d_quantity, d_unit = f'{round(duration_seconds, 3) :.3f}', 'seconds' 

108 

109 log.info( 

110 f'removed {total_removed} total redundant objects or' 

111 f' {m_quantity} {m_unit} from folders at {folders_disp}' 

112 f' in {d_quantity} {d_unit}' 

113 ) 

114 return 0