Coverage for kiirastuli/kiirastuli.py: 36.13%

76 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 18:51:34 +00:00

1"""Purge monotonically named files in folders keeping range endpoints. 

2 

3Implementation uses sha256 hashes for identity and assumes that 

4the natural order relates to the notion of fresher or better. 

5""" 

6 

7import argparse 

8import datetime as dti 

9import logging 

10import os 

11import typing 

12 

13from puhdistusalue.puhdistusalue import read_folder, triage_hashes # type: ignore 

14from puristaa.puristaa import prefix_compression # type: ignore 

15 

16from kiirastuli import log 

17 

18BUFFER_BYTES = 2 << 15 

19 

20 

21@typing.no_type_check 

22def humanize_mass(total_less_bytes: int): 

23 """DRY""" 

24 if total_less_bytes >= 1e9: 24 ↛ 25line 24 didn't jump to line 25, because the condition on line 24 was never true

25 return f'{round(total_less_bytes / 1024 / 1024 / 1024, 3) :.3f}', 'total gigabytes' 

26 if total_less_bytes >= 1e6: 26 ↛ 27line 26 didn't jump to line 27, because the condition on line 26 was never true

27 return f'{round(total_less_bytes / 1024 / 1024, 3) :.3f}', 'total megabytes' 

28 if total_less_bytes >= 1e3: 28 ↛ 29line 28 didn't jump to line 29, because the condition on line 28 was never true

29 return f'{round(total_less_bytes / 1024, 3) :.3f}', 'total kilobytes' 

30 

31 return f'{total_less_bytes :d}', 'total bytes' 

32 

33 

34@typing.no_type_check 

35def humanize_duration(duration_seconds: float): 

36 """DRY""" 

37 if duration_seconds >= 3600: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 return f'{round(duration_seconds / 60 / 60, 3) :.3f}', 'hours' 

39 if duration_seconds >= 60: 39 ↛ 40line 39 didn't jump to line 40, because the condition on line 39 was never true

40 return f'{round(duration_seconds / 60, 3) :.3f}', 'minutes' 

41 if duration_seconds >= 1: 41 ↛ 42line 41 didn't jump to line 42, because the condition on line 41 was never true

42 return f'{round(duration_seconds, 3) :.3f}', 'seconds' 

43 

44 return f'{round(duration_seconds * 1e3, 3) :.3f}', 'millis' 

45 

46 

47@typing.no_type_check 

48def list_dir(folder_path): 

49 """Access the dir and yield the local names inside.""" 

50 return os.listdir(folder_path) 

51 

52 

53@typing.no_type_check 

54def elements_of_gen(folder_path): 

55 """Prefix names in folder path and yield sorted pairs of names and file paths.""" 

56 for name in sorted(name for name in list_dir(folder_path)): 

57 yield name, os.path.join(folder_path, name) 

58 

59 

60@typing.no_type_check 

61def main(options: argparse.Namespace) -> int: 

62 """Process the files separately per folder.""" 

63 start_time = dti.datetime.utcnow() 

64 verbose = options.verbose 

65 if verbose: 

66 logging.getLogger().setLevel(logging.DEBUG) 

67 human = options.human 

68 folders = options.folders 

69 total_removed, total_less_bytes = 0, 0 

70 for a_path in folders: 

71 hash_map = {} 

72 try: 

73 hash_map = read_folder(a_path) 

74 except FileNotFoundError as err: 

75 log.warning(f'WARNING: Skipping non-existing path ({a_path}) -> "{err}"') 

76 if hash_map: 

77 keep_these, remove_those = triage_hashes(hash_map) 

78 for this in keep_these: 

79 log.debug(f'KEEP file {this}') 

80 folder_removed, folder_less_bytes = 0, 0 

81 for that in remove_those: 

82 log.debug(f'REMOVE file {that}') 

83 target = os.path.join(a_path, that) 

84 folder_less_bytes += os.path.getsize(target) 

85 os.remove(target) 

86 folder_removed += 1 

87 

88 if verbose: 

89 log.info( 

90 f'removed {folder_removed} redundant objects or {folder_less_bytes}' 

91 f' combined bytes from folder at {a_path}' 

92 ) 

93 total_less_bytes += folder_less_bytes 

94 total_removed += folder_removed 

95 

96 prefix, rel_paths = prefix_compression(folders, policy=lambda x: x == '/') 

97 if len(rel_paths) > 5: 

98 folders_disp = f"{prefix}[{', '.join(rel_paths[:3])}, ... {rel_paths[-1]}]" 

99 else: 

100 folders_disp = f'{folders}' if folders else '[<EMPTY>]' 

101 

102 duration_seconds = (dti.datetime.utcnow() - start_time).total_seconds() 

103 if human: 

104 m_quantity, m_unit = humanize_mass(total_less_bytes) 

105 d_quantity, d_unit = humanize_duration(duration_seconds) 

106 else: 

107 m_quantity, m_unit = f'{total_less_bytes :d}', 'total bytes' 

108 d_quantity, d_unit = f'{round(duration_seconds, 3) :.3f}', 'seconds' 

109 

110 log.info( 

111 f'removed {total_removed} total redundant objects or' 

112 f' {m_quantity} {m_unit} from folders at {folders_disp}' 

113 f' in {d_quantity} {d_unit}' 

114 ) 

115 return 0