Coverage for sammen/sammen.py: 31.54%

86 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 22:12:00 +00:00

1"""Dansk for together - multiple watch to ensure special events come together or go away.""" 

2 

3import argparse 

4import datetime as dti 

5import logging 

6import os 

7import pathlib 

8from typing import no_type_check 

9 

10from watchfiles import Change, awatch 

11 

12from sammen import log 

13 

14COMMA = ',' 

15DOT = '.' 

16 

17 

18def paths_to_watch(families: dict[str, tuple[str, ...]]) -> tuple[str, ...]: 

19 """Extract the minimal spanning paths to watch.""" 

20 path_set = set() 

21 for primary, secondaries in families.items(): 

22 path_set.add(str(pathlib.Path(primary).parent)) 

23 for secondary in secondaries: 

24 path_set.add(str(pathlib.Path(secondary).parent)) 

25 

26 paths: set[str] = set() 

27 for path in sorted(path_set): 

28 if not paths or path not in paths and str(pathlib.Path(path).parent) not in paths: 28 ↛ 27line 28 didn't jump to line 27, because the condition on line 28 was never false

29 paths.add(path) 

30 

31 return tuple(sorted(paths)) 

32 

33 

34def paths_exist(paths: tuple[str, ...]) -> tuple[bool, str]: 

35 """First error or all good.""" 

36 for path in paths: 36 ↛ 40line 36 didn't jump to line 40, because the loop on line 36 didn't complete

37 folder = pathlib.Path(path) 

38 if not folder.is_dir(): 38 ↛ 36line 38 didn't jump to line 36, because the condition on line 38 was never false

39 return False, f'{folder} is no directory' 

40 return True, '' 

41 

42 

43def display_change(file_change: tuple[Change, str]) -> str: 

44 """Temporary debugging dryness helper.""" 

45 change, entry = file_change 

46 disp = 'ADD' if change == Change.added else ('MOD' if change == Change.modified else 'DEL') 

47 return f'Entry({entry}): {disp}' 

48 

49 

50@no_type_check 

51async def process(options, families, changes) -> None: 

52 """Correlate and eventually act.""" 

53 primary_present = {primary: pathlib.Path(primary).is_file() for primary in families} 

54 log.debug(primary_present) 

55 primary_abs_paths = {str(pathlib.Path(p).resolve()): p for p in primary_present} 

56 log.debug(primary_abs_paths) 

57 for file_change in changes: 

58 log.info(display_change(file_change)) 

59 change, entry = file_change 

60 if change == Change.deleted and entry in primary_abs_paths: 

61 for secondary in families[primary_abs_paths[entry]]: 

62 if pathlib.Path(secondary).is_file(): 

63 context = f'secondary {secondary} of gone primary {primary_abs_paths[entry]}' 

64 if options.dry_run: 

65 log.info(f'Would remove {context}') 

66 else: 

67 try: 

68 os.remove(secondary) 

69 log.info(f'Removed {context}') 

70 except Exception as err: 

71 log.info(f'Failed to remove {context} with error {err}') 

72 

73 

74async def main(options: argparse.Namespace) -> int: 

75 pos_args = options.families 

76 if options.quiet: 

77 logging.getLogger().setLevel(logging.ERROR) 

78 elif options.verbose: 

79 logging.getLogger().setLevel(logging.DEBUG) 

80 

81 start_time = dti.datetime.now(tz=dti.timezone.utc) 

82 families = {} 

83 for csl in pos_args: 

84 if COMMA not in csl: 

85 continue 

86 p, s = csl.split(COMMA, 1) 

87 xs = set(x.strip() for x in s.strip().split(COMMA)) 

88 families[p] = tuple(sorted(f'{p}{x}' if x.startswith(DOT) else x for x in xs)) 

89 if not families: 

90 log.error('families should be given as comma separated paths') 

91 return 2 

92 

93 mode = ' dry-run (no changes)' if options.dry_run else '' 

94 log.info(f'Watching for orphaned secondaries of {families}{mode}') 

95 paths = paths_to_watch(families) 

96 log.debug(f'- set up watchers along the paths {paths}') 

97 ok, diagnostic = paths_exist(paths) 

98 if not ok: 

99 log.error(diagnostic) 

100 return 1 

101 

102 try: 

103 async for changes in awatch(*paths): 

104 await process(options, families, changes) 

105 except RuntimeError: 

106 log.debug('Noisy request handling - preparing termination') 

107 except KeyboardInterrupt: 

108 log.debug('Received keyboard interrupt - preparing termination') 

109 

110 end_time = dti.datetime.now(tz=dti.timezone.utc) 

111 log.info(f'Orphanage watch complete after {(end_time - start_time).total_seconds()} seconds') 

112 return 0