Coverage for sammen/sammen.py: 31.54%
86 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 22:12:00 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 22:12:00 +00:00
1"""Dansk for together - multiple watch to ensure special events come together or go away."""
3import argparse
4import datetime as dti
5import logging
6import os
7import pathlib
8from typing import no_type_check
10from watchfiles import Change, awatch
12from sammen import log
14COMMA = ','
15DOT = '.'
18def paths_to_watch(families: dict[str, tuple[str, ...]]) -> tuple[str, ...]:
19 """Extract the minimal spanning paths to watch."""
20 path_set = set()
21 for primary, secondaries in families.items():
22 path_set.add(str(pathlib.Path(primary).parent))
23 for secondary in secondaries:
24 path_set.add(str(pathlib.Path(secondary).parent))
26 paths: set[str] = set()
27 for path in sorted(path_set):
28 if not paths or path not in paths and str(pathlib.Path(path).parent) not in paths: 28 ↛ 27line 28 didn't jump to line 27, because the condition on line 28 was never false
29 paths.add(path)
31 return tuple(sorted(paths))
34def paths_exist(paths: tuple[str, ...]) -> tuple[bool, str]:
35 """First error or all good."""
36 for path in paths: 36 ↛ 40line 36 didn't jump to line 40, because the loop on line 36 didn't complete
37 folder = pathlib.Path(path)
38 if not folder.is_dir(): 38 ↛ 36line 38 didn't jump to line 36, because the condition on line 38 was never false
39 return False, f'{folder} is no directory'
40 return True, ''
43def display_change(file_change: tuple[Change, str]) -> str:
44 """Temporary debugging dryness helper."""
45 change, entry = file_change
46 disp = 'ADD' if change == Change.added else ('MOD' if change == Change.modified else 'DEL')
47 return f'Entry({entry}): {disp}'
50@no_type_check
51async def process(options, families, changes) -> None:
52 """Correlate and eventually act."""
53 primary_present = {primary: pathlib.Path(primary).is_file() for primary in families}
54 log.debug(primary_present)
55 primary_abs_paths = {str(pathlib.Path(p).resolve()): p for p in primary_present}
56 log.debug(primary_abs_paths)
57 for file_change in changes:
58 log.info(display_change(file_change))
59 change, entry = file_change
60 if change == Change.deleted and entry in primary_abs_paths:
61 for secondary in families[primary_abs_paths[entry]]:
62 if pathlib.Path(secondary).is_file():
63 context = f'secondary {secondary} of gone primary {primary_abs_paths[entry]}'
64 if options.dry_run:
65 log.info(f'Would remove {context}')
66 else:
67 try:
68 os.remove(secondary)
69 log.info(f'Removed {context}')
70 except Exception as err:
71 log.info(f'Failed to remove {context} with error {err}')
74async def main(options: argparse.Namespace) -> int:
75 pos_args = options.families
76 if options.quiet:
77 logging.getLogger().setLevel(logging.ERROR)
78 elif options.verbose:
79 logging.getLogger().setLevel(logging.DEBUG)
81 start_time = dti.datetime.now(tz=dti.timezone.utc)
82 families = {}
83 for csl in pos_args:
84 if COMMA not in csl:
85 continue
86 p, s = csl.split(COMMA, 1)
87 xs = set(x.strip() for x in s.strip().split(COMMA))
88 families[p] = tuple(sorted(f'{p}{x}' if x.startswith(DOT) else x for x in xs))
89 if not families:
90 log.error('families should be given as comma separated paths')
91 return 2
93 mode = ' dry-run (no changes)' if options.dry_run else ''
94 log.info(f'Watching for orphaned secondaries of {families}{mode}')
95 paths = paths_to_watch(families)
96 log.debug(f'- set up watchers along the paths {paths}')
97 ok, diagnostic = paths_exist(paths)
98 if not ok:
99 log.error(diagnostic)
100 return 1
102 try:
103 async for changes in awatch(*paths):
104 await process(options, families, changes)
105 except RuntimeError:
106 log.debug('Noisy request handling - preparing termination')
107 except KeyboardInterrupt:
108 log.debug('Received keyboard interrupt - preparing termination')
110 end_time = dti.datetime.now(tz=dti.timezone.utc)
111 log.info(f'Orphanage watch complete after {(end_time - start_time).total_seconds()} seconds')
112 return 0