Coverage for subtractor/subtractor.py: 92.91%

188 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 22:33:07 +00:00

1# -*- coding: utf-8 -*- 

2# pylint: disable=c-extension-no-member,expression-not-assigned,invalid-name,line-too-long,logging-fstring-interpolation 

3"""Do the diff.""" 

4import logging 

5import pathlib 

6import subprocess # nosec B404 

7import sys 

8import typing 

9from typing import Tuple 

10 

11import subtractor.pixel as pixel # To access pixel.OPTIONS['threshold'] 

12from subtractor.pixel import diff_img, shape_of_png 

13from subtractor.stream import final_suffix_in, visit 

14 

15ENCODING = 'utf-8' 

16 

17APP = 'subtractor' 

18 

19LOG = logging.getLogger() # Temporary refactoring: module level logger 

20LOG_FOLDER = pathlib.Path('logs') 

21LOG_FILE = f'{APP}.log' 

22LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE) 

23LOG_LEVEL = logging.INFO 

24 

25FAILURE_PATH_REASON = 'Failed action for path %s with error: %s' 

26 

27VISIT_OPTIONS = { 

28 'pre_filter': sorted, 

29 'pre_filter_options': {'reverse': True}, 

30 'post_filter': final_suffix_in, 

31 'post_filter_options': {'suffixes': ('.png',)}, 

32} 

33 

34SLUG_CAP = 164 

35SLUG_ETC = ' ...' 

36 

37DSL_MAIN_SPLIT = ':$file:' 

38DSL_SUB_SPLIT = ':$name:' 

39 

40 

41@typing.no_type_check 

42def init_logger(name=None, level=None): 

43 """Initialize module level logger""" 

44 global LOG # pylint: disable=global-statement 

45 

46 log_format = { 

47 'format': '%(asctime)s.%(msecs)03d %(levelname)s [%(name)s]: %(message)s', 

48 'datefmt': '%Y-%m-%dT%H:%M:%S', 

49 # 'filename': LOG_PATH, 

50 'level': LOG_LEVEL if level is None else level, 

51 } 

52 logging.basicConfig(**log_format) 

53 LOG = logging.getLogger(APP if name is None else name) 

54 LOG.propagate = True 

55 

56 

57@typing.no_type_check 

58def slugify(thing, these=('\n',), those=(' ',)) -> str: 

59 """Replace these (default: new lines) by those (default: space) and return string of thing.""" 

60 if not these or not those: 

61 return str(thing) 

62 if len(these) < len(those): 

63 raise ValueError('slugify called with more replacement targets than sources') 

64 if len(those) == 1: 

65 that = those[0] # HACK A DID ACK 

66 if len(these) == 1: 

67 these = these[0] 

68 return str(thing).replace(these, that) 

69 hook = str(thing) 

70 for this in these: 

71 hook = hook.replace(this, that) 

72 return hook 

73 

74 hook = str(thing) 

75 for this, that in zip(these, those): 

76 hook = hook.replace(this, that) 

77 return hook 

78 

79 

80def file_has_content(path: pathlib.Path) -> Tuple[bool, str]: 

81 """Simplistic handler to develop generic processing function.""" 

82 if not path.is_file(): 

83 return False, f'{path} is no file' 

84 byte_size = path.stat().st_size 

85 return byte_size > 0, str(byte_size) 

86 

87 

88@typing.no_type_check 

89def process(path, handler, success, failure): 

90 """Generic processing of path yields a,ended COHDA protocol.""" 

91 valid, message = handler(path) 

92 if valid: 

93 return True, message, success + 1, failure 

94 

95 return False, message, success, failure + 1 

96 

97 

98@typing.no_type_check 

99def process_pair(invoke, good, bad, obs_path, present, ref_path): 

100 """The main per pair processing code.""" 

101 LOG.info('Pair ref=%s, obs=%s', ref_path, obs_path) 

102 if ref_path and obs_path: 102 ↛ 158line 102 didn't jump to line 158, because the condition on line 102 was never false

103 ok_ref, size, _, _ = process(ref_path, file_has_content, good, bad) 

104 LOG.info(' Found ref=%s to be %s with size %s bytes', ref_path, 'OK' if ok_ref else 'NOK', size) 

105 ok_ref, width, height, info = shape_of_png(ref_path) 

106 LOG.info( 

107 ' Analyzed ref=%s as PNG to be %s with %s', 

108 ref_path, 

109 'OK' if ok_ref else 'NOK', 

110 f'shape {width}x{height}' if ok_ref else info['error'], 

111 ) 

112 ok_obs, size, _, _ = process(obs_path, file_has_content, good, bad) 

113 LOG.info(' Found obs=%s to be %s with size %s bytes', obs_path, 'OK' if ok_obs else 'NOK', size) 

114 ok_obs, width, height, info = shape_of_png(obs_path) 

115 LOG.info( 

116 ' Analyzed obs=%s as PNG to be %s with %s', 

117 obs_path, 

118 'OK' if ok_obs else 'NOK', 

119 f'shape {width}x{height}' if ok_obs else info['error'], 

120 ) 

121 present_path = pathlib.Path(present, f'diff-of-{obs_path.parts[-1]}') if present.is_dir() else present 

122 if not all([ok_ref, ok_obs]): 122 ↛ 123line 122 didn't jump to line 123, because the condition on line 122 was never true

123 bad += 1 

124 else: 

125 if not invoke: 

126 pixel_count = width * height 

127 mismatch, _, _ = diff_img(ref_path, obs_path, present_path) 

128 if mismatch: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true

129 LOG.info( 

130 ' Mismatch of obs=%s is %d of %d pixels or %0.1f %%', 

131 obs_path, 

132 mismatch, 

133 pixel_count, 

134 round(100 * mismatch / pixel_count, 1), 

135 ) 

136 bad += 1 

137 else: 

138 LOG.info(' Match of obs=%s', obs_path) 

139 good += 1 

140 else: 

141 args = invoke['executor'].replace('$ref', str(ref_path)).replace('$obs', str(obs_path)).split() 

142 if invoke['param_file_name']: 

143 param_file_content = ( 

144 invoke['param_file_content'].replace('$ref', str(ref_path)).replace('$obs', str(obs_path)) 

145 ) 

146 with open(invoke['param_file_name'], 'wt', encoding=ENCODING) as handle: 

147 handle.write(param_file_content) 

148 completed = subprocess.run(args, capture_output=True, check=True) # nosec B603 

149 if not completed.returncode: 149 ↛ 152line 149 didn't jump to line 152, because the condition on line 149 was never false

150 good += 1 

151 else: 

152 bad += 1 

153 slug = slugify(completed.stdout) 

154 if len(slug) > SLUG_CAP: 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true

155 slug = slug[:SLUG_CAP] + SLUG_ETC 

156 LOG.info(slug) 

157 else: 

158 bad += 1 

159 

160 return good, bad 

161 

162 

163def present_from(ref: pathlib.Path, obs: pathlib.Path) -> pathlib.Path: 

164 """Build a somehow least surprising difference folder from ref and obs.""" 

165 ref_code = ref.parts[-1] 

166 if obs.is_file(): 

167 return pathlib.Path(*obs.parts[:-1], f'diff-of-{obs.parts[-1]}') 

168 

169 present = pathlib.Path(*obs.parts[:-1], f'diff-of-{ref_code}_{obs.parts[-1]}') 

170 present.mkdir(parents=True, exist_ok=True) 

171 return present 

172 

173 

174@typing.no_type_check 

175def causal_triplet(trunks) -> tuple: 

176 """Generate past, present, and future from trunks or include a present of None.""" 

177 past, future = tuple(pathlib.Path(entry) for entry in trunks[:2]) 

178 

179 if any([past.is_dir(), future.is_dir()]): 

180 consistent_args = past.is_dir() and future.is_dir() 

181 elif any([past.is_file(), future.is_file()]): 181 ↛ 184line 181 didn't jump to line 184, because the condition on line 181 was never false

182 consistent_args = past.is_file() and future.is_file() 

183 else: 

184 consistent_args = False 

185 if not consistent_args: 

186 return past, None, future 

187 

188 present = pathlib.Path(trunks[-1]) if len(trunks) == 3 else present_from(past, future) 

189 return past, present, future 

190 

191 

192class Splicer: 

193 """Hollow splicer - split and merge.""" 

194 

195 @typing.no_type_check 

196 @staticmethod 

197 def split(thing): 

198 """Split thing into things.""" 

199 return thing.parts[:-1], thing.parts[-1] 

200 

201 @typing.no_type_check 

202 @staticmethod 

203 def merge(left, right): 

204 """Merge left and right back into thing.""" 

205 return pathlib.Path(left, right) 

206 

207 

208@typing.no_type_check 

209def names_of(root, splicer: Splicer, options): 

210 """Yield file names of root.""" 

211 for path in visit(root, **options): 

212 yield splicer.split(path) 

213 

214 

215@typing.no_type_check 

216def matching_zipper(ref, obs, splicer: Splicer, gen, gen_options: dict): 

217 """Generate a complete matching zipper for the longest matching sequence.""" 

218 x_p = {name: (name, None) for _, name in gen(ref, splicer, gen_options)} 

219 for _, name in gen(obs, splicer, gen_options): 

220 x_p[name] = (name, name) if name in x_p else (None, name) 

221 for key in sorted(x_p): 

222 r, o = x_p[key] 

223 yield (r if r is None else splicer.merge(ref, r), o if o is None else splicer.merge(obs, o)) 

224 

225 

226@typing.no_type_check 

227def parse_template(text): 

228 """Hack to return exec and param file template until config language is clear.""" 

229 if DSL_MAIN_SPLIT in text: 

230 LOG.info('Detected magic (%s) in template', DSL_MAIN_SPLIT) 

231 executor, rest = text.split(DSL_MAIN_SPLIT) 

232 try: 

233 content, name = rest.split(DSL_SUB_SPLIT) 

234 except ValueError: 

235 LOG.critical( 

236 'Template with (%s) lacks (%s) to link executor part (%s) with param file content via name.', 

237 DSL_MAIN_SPLIT, 

238 DSL_SUB_SPLIT, 

239 executor, 

240 ) 

241 raise 

242 executor = {'executor': executor, 'param_file_content': content, 'param_file_name': name} 

243 else: 

244 executor = {'executor': text, 'param_file_content': None, 'param_file_name': None} 

245 LOG.info('Parsed diff template (%s) into executor ...', text) 

246 LOG.info(' ... into executor (%s)', str(executor)) 

247 return executor 

248 

249 

250@typing.no_type_check 

251def main(argv=None, abort=False, debug=None, threshold=None, diff_template=''): 

252 """Drive the subtractor. 

253 This function acts as the command line interface backend. 

254 There is some duplication to support testability. 

255 """ 

256 init_logger(level=logging.DEBUG if debug else None) 

257 forest = argv if argv else sys.argv[1:] 

258 if not forest or len(forest) < 2 or len(forest) > 3: 

259 print('Usage: subtractor past future [present]') 

260 return 0, 'USAGE' 

261 

262 executor = {} 

263 if diff_template: 

264 LOG.info('Requested external diff tool per template(%s)', diff_template) 

265 executor = parse_template(diff_template) 

266 

267 LOG.debug('Guarded dispatch forest=%s', forest) 

268 past, present, future = causal_triplet(forest) 

269 

270 if not present: 

271 print('ERROR: Either all args are dirs or files, but no mix') 

272 return 2, 'USAGE' 

273 

274 present_is_dir = present.is_dir() 

275 LOG.debug('Timeline past=%s, present=%s, and future=%s', past, present, future) 

276 

277 mode_display = 'folder' if present_is_dir else 'file' 

278 

279 LOG.info('Starting comparisons visiting past=%s and future=%s in %s mode', past, future, mode_display) 

280 threshold_fraction = 0.00 

281 if threshold: 281 ↛ 283line 281 didn't jump to line 283, because the condition on line 281 was never false

282 threshold_fraction = threshold 

283 pixel.OPTIONS['threshold'] = threshold_fraction 

284 LOG.info( 

285 ' Threshold for pixel mismatch is %d%s', int(100 * threshold_fraction), ' %' if threshold_fraction > 0 else '' 

286 ) 

287 good, bad = 0, 0 

288 

289 if not present_is_dir: 

290 good, bad = process_pair(executor, good, bad, future, present, past) 

291 else: 

292 for ref_path, obs_path in matching_zipper(past, future, Splicer(), names_of, VISIT_OPTIONS): 

293 good, bad = process_pair(executor, good, bad, obs_path, present, ref_path) 

294 if abort and bad: 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true

295 LOG.error('Requested abort and encountered a bad pair') 

296 break 

297 

298 LOG.info('Finished comparisons finding good=%d and bad=%d in %s mode', good, bad, mode_display) 

299 

300 print(f"{'OK' if not bad else 'FAIL'}") 

301 

302 return 0, ''