Coverage for liitos/tools.py: 81.05%

289 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-07 19:29:53 +00:00

1import datetime as dti 

2import difflib 

3import hashlib 

4import json 

5import os 

6import pathlib 

7import platform 

8import re 

9import subprocess # nosec B404 

10import uuid 

11from typing import Any, Callable, Generator, Union, no_type_check 

12 

13import yaml 

14 

15import foran.foran as api # type: ignore 

16from foran.report import generate_report # type: ignore 

17from taksonomia.taksonomia import Taxonomy # type: ignore 

18 

19from liitos import ( 

20 CONTEXT, 

21 ENCODING, 

22 KEYS_REQUIRED, 

23 LATEX_PAYLOAD_NAME, 

24 TOOL_VERSION_COMMAND_MAP, 

25 ToolKey, 

26 log, 

27) 

28 

29PathLike = Union[str, pathlib.Path] 

30 

31SPACE = ' ' 

32 

33DOC_BASE = pathlib.Path('..', '..') 

34STRUCTURE_PATH = DOC_BASE / 'structure.yml' 

35IMAGES_FOLDER = 'images/' 

36DIAGRAMS_FOLDER = 'diagrams/' 

37PATCH_SPEC_NAME = 'patch.yml' 

38CHUNK_SIZE = 2 << 15 

39TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00' 

40LOG_SEPARATOR = '- ' * 80 

41INTER_PROCESS_SYNC_SECS = 0.1 

42INTER_PROCESS_SYNC_ATTEMPTS = 10 

43 

44IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.') 

45HAS_WARNING = re.compile(r'[Ww]arning') 

46HAS_ERROR = re.compile(r'[Ee]rror') 

47 

48 

49def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str: 

50 """Return the SHA512 hex digest of the data from file. 

51 

52 Examples: 

53 

54 >>> import pathlib, tempfile 

55 >>> empty_sha512 = ( 

56 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce' 

57 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e' 

58 ... ) 

59 >>> with tempfile.NamedTemporaryFile() as handle: 

60 ... empty_hash = hash_file(handle.name) 

61 >>> assert empty_hash == empty_sha512 

62 """ 

63 if hasher is None: 

64 hasher = hashlib.sha512 

65 the_hash = hasher() 

66 with open(path, 'rb') as handle: 

67 while chunk := handle.read(CHUNK_SIZE): 

68 the_hash.update(chunk) 

69 return the_hash.hexdigest() 

70 

71 

72@no_type_check 

73def vcs_probe(): 

74 """Are we in front, on par, or behind with the upstream?""" 

75 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control' 

76 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control' 

77 try: 

78 repo = api.Repo('.', search_parent_directories=True) 

79 status = api.Status(repo) 

80 CONTEXT['source_hash'] = f'sha1:{status.commit}' 

81 

82 try: 

83 repo_root_folder = repo.git.rev_parse(show_toplevel=True) 

84 path = pathlib.Path(repo_root_folder) 

85 anchor = path.parent.name 

86 here = path.name 

87 CONTEXT['source_hint'] = f'{anchor}/{here}' 

88 yield f'Root ({repo_root_folder})' 

89 except Exception: # noqa 

90 yield 'WARNING - ignored exception when assessing repo root folder location' 

91 for line in generate_report(status): 

92 yield line.rstrip() 

93 

94 except Exception as err: # noqa 

95 yield f'WARNING - we seem to not be within a git repository clone ({err})' 

96 

97 

98def node_id() -> str: 

99 """Generate the build node identifier. 

100 

101 Examples: 

102 

103 >>> nid = node_id() 

104 >>> assert len(nid) == 36 

105 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23])) 

106 """ 

107 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node())) 

108 

109 

110def report_taxonomy(target_path: pathlib.Path) -> None: 

111 """Convenience function to report date, size, and checksums of the deliverable.""" 

112 taxonomy = Taxonomy(target_path, excludes='', key_function='md5') 

113 for path in sorted(target_path.parent.rglob('*')): 

114 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path) 

115 log.warning('- Writing render/pdf folder taxonomy to inventory.json ...') 

116 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False) 

117 

118 stat = target_path.stat() 

119 size_bytes = stat.st_size 

120 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT) 

121 sha612_hash = hash_file(target_path, hashlib.sha512) 

122 sha256_hash = hash_file(target_path, hashlib.sha256) 

123 sha1_hash = hash_file(target_path, hashlib.sha1) 

124 md5_hash = hash_file(target_path, hashlib.md5) 

125 log.warning('- Ephemeral:') 

126 log.warning(f' + name: {target_path.name}') 

127 log.warning(f' + size: {size_bytes} bytes') 

128 log.warning(f' + date: {mod_time}') 

129 log.warning('- Characteristic:') 

130 log.warning(' + Checksums:') 

131 log.warning(f' sha512:{sha612_hash}') 

132 log.warning(f' sha256:{sha256_hash}') 

133 log.warning(f' sha1:{sha1_hash}') 

134 log.warning(f' md5:{md5_hash}') 

135 log.warning(' + Fonts:') 

136 

137 

138@no_type_check 

139def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

140 """Derive the unified diff between left and right lists of strings as generator of strings. 

141 

142 Examples: 

143 

144 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+')) 

145 >>> lines 

146 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d'] 

147 """ 

148 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label): 

149 yield line.rstrip() 

150 

151 

152@no_type_check 

153def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

154 """Do the log bridging of the diff.""" 

155 log.info(LOG_SEPARATOR) 

156 for line in unified_diff(left, right, left_label, right_label): 

157 for fine in line.split('\n'): 

158 log.info(fine) 

159 log.info(LOG_SEPARATOR) 

160 

161 

162@no_type_check 

163def ensure_separate_log_lines(sourcer: Callable, trampoline: Callable = log.info, *args: Union[list[object], None]): 

164 """Wrapping idiom breaking up any strings containing newlines.""" 

165 trampoline(LOG_SEPARATOR) 

166 for line in sourcer(*args) if args else sourcer(): 

167 for fine in line.split('\n'): 

168 trampoline(fine) 

169 trampoline(LOG_SEPARATOR) 

170 

171 

172@no_type_check 

173def log_subprocess_output(pipe, prefix: str) -> list[str]: 

174 log_buffer = [] 

175 for line in iter(pipe.readline, b''): # b'\n'-separated lines 

176 cand = line.decode(encoding=ENCODING).rstrip() 

177 msg = prefix + ': ' + cand 

178 log_buffer.append(msg) 

179 if HAS_ERROR.search(cand): 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 log.error(msg) 

181 continue 

182 if HAS_WARNING.search(cand) and not ( 

183 'latex' in prefix 

184 and any( 

185 ( 

186 '"calc" is loaded -- this is not' in cand, 

187 'Package microtype Warning: Unable to apply patch' in cand, 

188 'Unknown document division name (startatroot)' in cand, 

189 'Unknown slot number of character' in cand, 

190 ) 

191 ) 

192 ): 

193 log.warning(msg) 

194 continue 

195 if IS_BORING.search(cand): 

196 log.debug(msg) 

197 continue 

198 log.info(msg) 

199 

200 return log_buffer 

201 

202 

203@no_type_check 

204def delegate(command: list[str], marker: str, do_shell: bool = False, is_quiet: bool = False) -> int: 

205 """Execute command in subprocess and follow requests. 

206 

207 Hints on LaTeX noise reduction per special variables: 

208 

209 - max_print_line=1000 

210 - error_line=254 

211 - half_error_line=238 

212 

213 So, in texmf.copf or in shell process, these reduce the amount of lines ... 

214 

215 max_print_line=1000 error_line=254 half_error_line=238 

216 """ 

217 try: 

218 if 'latex' in marker: 

219 env = dict(os.environ) 

220 env['max_print_line'] = '1000' 

221 env['error_line'] = '254' 

222 env['half_error_line'] = '238' 

223 process = subprocess.Popen( 

224 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell, env=env # nosec B602 

225 ) 

226 else: 

227 process = subprocess.Popen( 

228 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602 

229 ) 

230 with process.stdout: 

231 log_buffer = log_subprocess_output(process.stdout, marker) 

232 code = process.wait() 

233 if code < 0: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 log.error(f'{marker} process ({command}) was terminated by signal {-code}; (cf. below for hints)') 

235 elif code > 0: 

236 log.error(f'{marker} process ({command}) returned {code}; (cf. below for hints)') 

237 else: 

238 log.info(f'{marker} process succeeded') 

239 except Exception as err: 

240 log.error(f'failed executing tool with error: {err}; (cf. below for hints)') 

241 code = 42 

242 

243 if code != 0 and is_quiet: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 for msg in log_buffer: 

245 if 'latex' in marker: 

246 payload = msg.replace(f'{marker}:', '').strip() 

247 if not payload: 

248 continue 

249 if '(microtype)' in payload: 

250 continue 

251 if 'Package microtype Warning: Unknown slot number of character' in payload: 

252 continue 

253 if IS_BORING.search(payload): 

254 continue 

255 if any( 

256 ( 

257 '"calc" is loaded -- this is not' in payload, 

258 'Package microtype Warning: Unable to apply patch' in payload, 

259 'Unknown document division name (startatroot)' in payload, 

260 'Unknown slot number of character' in payload, 

261 ) 

262 ): 

263 continue 

264 cleansed = payload.replace('[', '').replace(']', '').replace('|', '') 

265 if not cleansed.strip(): 

266 continue 

267 if not payload.replace(')', ''): 

268 continue 

269 log.error(msg) 

270 

271 if code == 0 and is_quiet and marker in ('label-pdf', '', 'assess-pdf-fonts'): 

272 for msg in log_buffer: 

273 log.warning(msg) 

274 

275 return code 

276 

277 

278@no_type_check 

279def report(on: ToolKey) -> int: 

280 """Execute the tool specific version command.""" 

281 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {}) 

282 tool_version_call_text = str(tool_context.get('command', '')).strip() 

283 tool_version_call = tool_version_call_text.split() 

284 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip() 

285 if not tool_version_call: 

286 log.warning(f'cowardly avoiding undefined call for tool key ({on})') 

287 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})') 

288 return 42 

289 

290 log.info(LOG_SEPARATOR) 

291 log.info(f'requesting tool version information from environment per ({tool_version_call})') 

292 log.info(f'- {tool_reason_banner}') 

293 code = delegate(tool_version_call, f'tool-version-of-{on}') 

294 log.info(LOG_SEPARATOR) 

295 

296 return code 

297 

298 

299@no_type_check 

300def execute_filter( 

301 the_filter: Callable, 

302 head: str, 

303 backup: str, 

304 label: str, 

305 text_lines: list[str], 

306 lookup: Union[dict[str, str], None] = None, 

307) -> list[str]: 

308 """Chain filter calls by storing in and out lies in files and return the resulting lines.""" 

309 log.info(LOG_SEPARATOR) 

310 log.info(head) 

311 doc_before_caps_patch = backup 

312 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle: 

313 handle.write('\n'.join(text_lines)) 

314 patched_lines = the_filter(text_lines, lookup=lookup) 

315 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle: 

316 handle.write('\n'.join(patched_lines)) 

317 log.info(f'diff of the ({label}) filter result:') 

318 log_unified_diff(text_lines, patched_lines) 

319 

320 return patched_lines 

321 

322 

323@no_type_check 

324def load_target( 

325 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH 

326) -> tuple[bool, dict[str, str]]: 

327 """DRY.""" 

328 if not structure_path.is_file() or not structure_path.stat().st_size: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true

329 log.error(f'render failed to find non-empty structure file at {structure_path}') 

330 return False, {} 

331 

332 with open(structure_path, 'rt', encoding=ENCODING) as handle: 

333 structure = yaml.safe_load(handle) 

334 

335 targets = sorted(structure.keys()) 

336 

337 if not targets: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 log.error(f'structure at ({structure_path}) does not provide any targets') 

339 return False, {} 

340 

341 if target_code not in targets: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true

342 log.error(f'structure does not provide ({target_code})') 

343 return False, {} 

344 

345 if len(targets) != 1: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true

346 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})') 

347 return True, {} 

348 

349 target = targets[0] 

350 facets = sorted(list(facet.keys())[0] for facet in structure[target]) 

351 log.info(f'found single target ({target}) with facets ({facets})') 

352 

353 if facet_code not in facets: 

354 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})') 

355 return False, {} 

356 

357 aspect_map = {} 

358 for data in structure[target]: 358 ↛ 362line 358 didn't jump to line 362 because the loop on line 358 didn't complete

359 if facet_code in data: 

360 aspect_map = data[facet_code] 

361 break 

362 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map] 

363 if missing_keys: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true

364 log.error( 

365 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}' 

366 f' for target ({target_code}) and facet ({facet_code})' 

367 ) 

368 log.error(f'- the found aspects: {sorted(aspect_map.keys())}') 

369 log.error(f'- missing aspects: {sorted(missing_keys)}') 

370 return False, {} 

371 

372 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED): 

373 log.debug( 

374 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}' 

375 f' for target ({target_code}) and facet ({facet_code})' 

376 ) 

377 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead') 

378 

379 return True, aspect_map 

380 

381 

382def incoherent_math_mode_in_caption(caption: str, phase_info: str = '') -> list[str]: 

383 """Heuristics to warn on underscores and carets oustide of math mode in captions.""" 

384 findings: list[str] = [] 

385 if phase_info and not phase_info[0] == SPACE: 

386 phase_info = SPACE + phase_info 

387 if caption and '_' in caption and not ('$' in caption and not caption.count('$') % 2): 

388 findings.append(f'Underscore (_) and no LaTeX math mode tokens in caption ({caption}){phase_info}') 

389 if caption and '^' in caption and not ('$' in caption and not caption.count('$') % 2): 

390 findings.append(f'Caret (^) and no LaTeX math mode tokens in caption ({caption}){phase_info}') 

391 return findings 

392 

393 

394@no_type_check 

395def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]: 

396 """Separation of concerns.""" 

397 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING)) 

398 blocks = doc['blocks'] 

399 mermaid_caption_map = {} 

400 for b in blocks: 

401 if b['t'] == 'CodeBlock' and b['c'][0]: 

402 try: 

403 is_mermaid = b['c'][0][1][0] == 'mermaid' 

404 atts = b['c'][0][2] 

405 except IndexError: 

406 continue 

407 

408 if not is_mermaid: 

409 continue 

410 m_caption, m_filename, m_format, m_loc = '', '', '', '' 

411 for k, v in atts: 

412 if k == 'caption': 

413 m_caption = v 

414 elif k == 'filename': 

415 m_filename = v 

416 elif k == 'format': 

417 m_format = v 

418 elif k == 'loc': 

419 m_loc = v 

420 else: 

421 pass 

422 token = f'{m_loc}/{m_filename}.{m_format}' # noqa 

423 if token in mermaid_caption_map: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true

424 log.warning('Duplicate token, same caption?') 

425 log.warning(f'- prior: {token} -> {m_caption}') 

426 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}') 

427 for msg in incoherent_math_mode_in_caption(m_caption, phase_info=f'for mermaid image ({token})'): 427 ↛ 428line 427 didn't jump to line 428 because the loop on line 427 never started

428 log.warning(msg) 

429 mermaid_caption_map[token] = m_caption 

430 return mermaid_caption_map 

431 

432 

433def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]: 

434 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru]. 

435 

436 Examples: 

437 

438 >>> lines = ['a', 'b', 'c', 'd'] 

439 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c')) 

440 >>> filtered 

441 ['a', 'd'] 

442 """ 

443 in_section = False 

444 for line in text_lines: 

445 if not in_section: 

446 if from_cut in line: 

447 in_section = True 

448 continue 

449 if in_section: 

450 if thru_cut in line: 

451 in_section = False 

452 continue 

453 yield line