Coverage for subtractor/subtractor.py: 92.91%
188 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 22:33:07 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 22:33:07 +00:00
1# -*- coding: utf-8 -*-
2# pylint: disable=c-extension-no-member,expression-not-assigned,invalid-name,line-too-long,logging-fstring-interpolation
3"""Do the diff."""
4import logging
5import pathlib
6import subprocess # nosec B404
7import sys
8import typing
9from typing import Tuple
11import subtractor.pixel as pixel # To access pixel.OPTIONS['threshold']
12from subtractor.pixel import diff_img, shape_of_png
13from subtractor.stream import final_suffix_in, visit
15ENCODING = 'utf-8'
17APP = 'subtractor'
19LOG = logging.getLogger() # Temporary refactoring: module level logger
20LOG_FOLDER = pathlib.Path('logs')
21LOG_FILE = f'{APP}.log'
22LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE)
23LOG_LEVEL = logging.INFO
25FAILURE_PATH_REASON = 'Failed action for path %s with error: %s'
27VISIT_OPTIONS = {
28 'pre_filter': sorted,
29 'pre_filter_options': {'reverse': True},
30 'post_filter': final_suffix_in,
31 'post_filter_options': {'suffixes': ('.png',)},
32}
34SLUG_CAP = 164
35SLUG_ETC = ' ...'
37DSL_MAIN_SPLIT = ':$file:'
38DSL_SUB_SPLIT = ':$name:'
41@typing.no_type_check
42def init_logger(name=None, level=None):
43 """Initialize module level logger"""
44 global LOG # pylint: disable=global-statement
46 log_format = {
47 'format': '%(asctime)s.%(msecs)03d %(levelname)s [%(name)s]: %(message)s',
48 'datefmt': '%Y-%m-%dT%H:%M:%S',
49 # 'filename': LOG_PATH,
50 'level': LOG_LEVEL if level is None else level,
51 }
52 logging.basicConfig(**log_format)
53 LOG = logging.getLogger(APP if name is None else name)
54 LOG.propagate = True
57@typing.no_type_check
58def slugify(thing, these=('\n',), those=(' ',)) -> str:
59 """Replace these (default: new lines) by those (default: space) and return string of thing."""
60 if not these or not those:
61 return str(thing)
62 if len(these) < len(those):
63 raise ValueError('slugify called with more replacement targets than sources')
64 if len(those) == 1:
65 that = those[0] # HACK A DID ACK
66 if len(these) == 1:
67 these = these[0]
68 return str(thing).replace(these, that)
69 hook = str(thing)
70 for this in these:
71 hook = hook.replace(this, that)
72 return hook
74 hook = str(thing)
75 for this, that in zip(these, those):
76 hook = hook.replace(this, that)
77 return hook
80def file_has_content(path: pathlib.Path) -> Tuple[bool, str]:
81 """Simplistic handler to develop generic processing function."""
82 if not path.is_file():
83 return False, f'{path} is no file'
84 byte_size = path.stat().st_size
85 return byte_size > 0, str(byte_size)
88@typing.no_type_check
89def process(path, handler, success, failure):
90 """Generic processing of path yields a,ended COHDA protocol."""
91 valid, message = handler(path)
92 if valid:
93 return True, message, success + 1, failure
95 return False, message, success, failure + 1
98@typing.no_type_check
99def process_pair(invoke, good, bad, obs_path, present, ref_path):
100 """The main per pair processing code."""
101 LOG.info('Pair ref=%s, obs=%s', ref_path, obs_path)
102 if ref_path and obs_path: 102 ↛ 158line 102 didn't jump to line 158, because the condition on line 102 was never false
103 ok_ref, size, _, _ = process(ref_path, file_has_content, good, bad)
104 LOG.info(' Found ref=%s to be %s with size %s bytes', ref_path, 'OK' if ok_ref else 'NOK', size)
105 ok_ref, width, height, info = shape_of_png(ref_path)
106 LOG.info(
107 ' Analyzed ref=%s as PNG to be %s with %s',
108 ref_path,
109 'OK' if ok_ref else 'NOK',
110 f'shape {width}x{height}' if ok_ref else info['error'],
111 )
112 ok_obs, size, _, _ = process(obs_path, file_has_content, good, bad)
113 LOG.info(' Found obs=%s to be %s with size %s bytes', obs_path, 'OK' if ok_obs else 'NOK', size)
114 ok_obs, width, height, info = shape_of_png(obs_path)
115 LOG.info(
116 ' Analyzed obs=%s as PNG to be %s with %s',
117 obs_path,
118 'OK' if ok_obs else 'NOK',
119 f'shape {width}x{height}' if ok_obs else info['error'],
120 )
121 present_path = pathlib.Path(present, f'diff-of-{obs_path.parts[-1]}') if present.is_dir() else present
122 if not all([ok_ref, ok_obs]): 122 ↛ 123line 122 didn't jump to line 123, because the condition on line 122 was never true
123 bad += 1
124 else:
125 if not invoke:
126 pixel_count = width * height
127 mismatch, _, _ = diff_img(ref_path, obs_path, present_path)
128 if mismatch: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true
129 LOG.info(
130 ' Mismatch of obs=%s is %d of %d pixels or %0.1f %%',
131 obs_path,
132 mismatch,
133 pixel_count,
134 round(100 * mismatch / pixel_count, 1),
135 )
136 bad += 1
137 else:
138 LOG.info(' Match of obs=%s', obs_path)
139 good += 1
140 else:
141 args = invoke['executor'].replace('$ref', str(ref_path)).replace('$obs', str(obs_path)).split()
142 if invoke['param_file_name']:
143 param_file_content = (
144 invoke['param_file_content'].replace('$ref', str(ref_path)).replace('$obs', str(obs_path))
145 )
146 with open(invoke['param_file_name'], 'wt', encoding=ENCODING) as handle:
147 handle.write(param_file_content)
148 completed = subprocess.run(args, capture_output=True, check=True) # nosec B603
149 if not completed.returncode: 149 ↛ 152line 149 didn't jump to line 152, because the condition on line 149 was never false
150 good += 1
151 else:
152 bad += 1
153 slug = slugify(completed.stdout)
154 if len(slug) > SLUG_CAP: 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true
155 slug = slug[:SLUG_CAP] + SLUG_ETC
156 LOG.info(slug)
157 else:
158 bad += 1
160 return good, bad
163def present_from(ref: pathlib.Path, obs: pathlib.Path) -> pathlib.Path:
164 """Build a somehow least surprising difference folder from ref and obs."""
165 ref_code = ref.parts[-1]
166 if obs.is_file():
167 return pathlib.Path(*obs.parts[:-1], f'diff-of-{obs.parts[-1]}')
169 present = pathlib.Path(*obs.parts[:-1], f'diff-of-{ref_code}_{obs.parts[-1]}')
170 present.mkdir(parents=True, exist_ok=True)
171 return present
174@typing.no_type_check
175def causal_triplet(trunks) -> tuple:
176 """Generate past, present, and future from trunks or include a present of None."""
177 past, future = tuple(pathlib.Path(entry) for entry in trunks[:2])
179 if any([past.is_dir(), future.is_dir()]):
180 consistent_args = past.is_dir() and future.is_dir()
181 elif any([past.is_file(), future.is_file()]): 181 ↛ 184line 181 didn't jump to line 184, because the condition on line 181 was never false
182 consistent_args = past.is_file() and future.is_file()
183 else:
184 consistent_args = False
185 if not consistent_args:
186 return past, None, future
188 present = pathlib.Path(trunks[-1]) if len(trunks) == 3 else present_from(past, future)
189 return past, present, future
192class Splicer:
193 """Hollow splicer - split and merge."""
195 @typing.no_type_check
196 @staticmethod
197 def split(thing):
198 """Split thing into things."""
199 return thing.parts[:-1], thing.parts[-1]
201 @typing.no_type_check
202 @staticmethod
203 def merge(left, right):
204 """Merge left and right back into thing."""
205 return pathlib.Path(left, right)
208@typing.no_type_check
209def names_of(root, splicer: Splicer, options):
210 """Yield file names of root."""
211 for path in visit(root, **options):
212 yield splicer.split(path)
215@typing.no_type_check
216def matching_zipper(ref, obs, splicer: Splicer, gen, gen_options: dict):
217 """Generate a complete matching zipper for the longest matching sequence."""
218 x_p = {name: (name, None) for _, name in gen(ref, splicer, gen_options)}
219 for _, name in gen(obs, splicer, gen_options):
220 x_p[name] = (name, name) if name in x_p else (None, name)
221 for key in sorted(x_p):
222 r, o = x_p[key]
223 yield (r if r is None else splicer.merge(ref, r), o if o is None else splicer.merge(obs, o))
226@typing.no_type_check
227def parse_template(text):
228 """Hack to return exec and param file template until config language is clear."""
229 if DSL_MAIN_SPLIT in text:
230 LOG.info('Detected magic (%s) in template', DSL_MAIN_SPLIT)
231 executor, rest = text.split(DSL_MAIN_SPLIT)
232 try:
233 content, name = rest.split(DSL_SUB_SPLIT)
234 except ValueError:
235 LOG.critical(
236 'Template with (%s) lacks (%s) to link executor part (%s) with param file content via name.',
237 DSL_MAIN_SPLIT,
238 DSL_SUB_SPLIT,
239 executor,
240 )
241 raise
242 executor = {'executor': executor, 'param_file_content': content, 'param_file_name': name}
243 else:
244 executor = {'executor': text, 'param_file_content': None, 'param_file_name': None}
245 LOG.info('Parsed diff template (%s) into executor ...', text)
246 LOG.info(' ... into executor (%s)', str(executor))
247 return executor
250@typing.no_type_check
251def main(argv=None, abort=False, debug=None, threshold=None, diff_template=''):
252 """Drive the subtractor.
253 This function acts as the command line interface backend.
254 There is some duplication to support testability.
255 """
256 init_logger(level=logging.DEBUG if debug else None)
257 forest = argv if argv else sys.argv[1:]
258 if not forest or len(forest) < 2 or len(forest) > 3:
259 print('Usage: subtractor past future [present]')
260 return 0, 'USAGE'
262 executor = {}
263 if diff_template:
264 LOG.info('Requested external diff tool per template(%s)', diff_template)
265 executor = parse_template(diff_template)
267 LOG.debug('Guarded dispatch forest=%s', forest)
268 past, present, future = causal_triplet(forest)
270 if not present:
271 print('ERROR: Either all args are dirs or files, but no mix')
272 return 2, 'USAGE'
274 present_is_dir = present.is_dir()
275 LOG.debug('Timeline past=%s, present=%s, and future=%s', past, present, future)
277 mode_display = 'folder' if present_is_dir else 'file'
279 LOG.info('Starting comparisons visiting past=%s and future=%s in %s mode', past, future, mode_display)
280 threshold_fraction = 0.00
281 if threshold: 281 ↛ 283line 281 didn't jump to line 283, because the condition on line 281 was never false
282 threshold_fraction = threshold
283 pixel.OPTIONS['threshold'] = threshold_fraction
284 LOG.info(
285 ' Threshold for pixel mismatch is %d%s', int(100 * threshold_fraction), ' %' if threshold_fraction > 0 else ''
286 )
287 good, bad = 0, 0
289 if not present_is_dir:
290 good, bad = process_pair(executor, good, bad, future, present, past)
291 else:
292 for ref_path, obs_path in matching_zipper(past, future, Splicer(), names_of, VISIT_OPTIONS):
293 good, bad = process_pair(executor, good, bad, obs_path, present, ref_path)
294 if abort and bad: 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true
295 LOG.error('Requested abort and encountered a bad pair')
296 break
298 LOG.info('Finished comparisons finding good=%d and bad=%d in %s mode', good, bad, mode_display)
300 print(f"{'OK' if not bad else 'FAIL'}")
302 return 0, ''