Coverage for liitos/tools.py: 80.58%

277 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-08-31 13:07:35 +00:00

1import datetime as dti 

2import difflib 

3import hashlib 

4import json 

5import os 

6import pathlib 

7import platform 

8import re 

9import subprocess # nosec B404 

10import uuid 

11from typing import Any, Callable, Generator, Union, no_type_check 

12 

13import yaml 

14 

15import foran.foran as api # type: ignore 

16from foran.report import generate_report # type: ignore 

17from taksonomia.taksonomia import Taxonomy # type: ignore 

18 

19from liitos import ( 

20 CONTEXT, 

21 ENCODING, 

22 KEYS_REQUIRED, 

23 LATEX_PAYLOAD_NAME, 

24 TOOL_VERSION_COMMAND_MAP, 

25 ToolKey, 

26 log, 

27) 

28 

29PathLike = Union[str, pathlib.Path] 

30 

31DOC_BASE = pathlib.Path('..', '..') 

32STRUCTURE_PATH = DOC_BASE / 'structure.yml' 

33IMAGES_FOLDER = 'images/' 

34DIAGRAMS_FOLDER = 'diagrams/' 

35PATCH_SPEC_NAME = 'patch.yml' 

36CHUNK_SIZE = 2 << 15 

37TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00' 

38LOG_SEPARATOR = '- ' * 80 

39INTER_PROCESS_SYNC_SECS = 0.1 

40INTER_PROCESS_SYNC_ATTEMPTS = 10 

41 

42IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.') 

43HAS_WARNING = re.compile(r'[Ww]arning') 

44HAS_ERROR = re.compile(r'[Ee]rror') 

45 

46 

47def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str: 

48 """Return the SHA512 hex digest of the data from file. 

49 

50 Examples: 

51 

52 >>> import pathlib, tempfile 

53 >>> empty_sha512 = ( 

54 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce' 

55 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e' 

56 ... ) 

57 >>> with tempfile.NamedTemporaryFile() as handle: 

58 ... empty_hash = hash_file(handle.name) 

59 >>> assert empty_hash == empty_sha512 

60 """ 

61 if hasher is None: 

62 hasher = hashlib.sha512 

63 the_hash = hasher() 

64 with open(path, 'rb') as handle: 

65 while chunk := handle.read(CHUNK_SIZE): 

66 the_hash.update(chunk) 

67 return the_hash.hexdigest() 

68 

69 

70@no_type_check 

71def vcs_probe(): 

72 """Are we in front, on par, or behind with the upstream?""" 

73 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control' 

74 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control' 

75 try: 

76 repo = api.Repo('.', search_parent_directories=True) 

77 status = api.Status(repo) 

78 CONTEXT['source_hash'] = f'sha1:{status.commit}' 

79 

80 try: 

81 repo_root_folder = repo.git.rev_parse(show_toplevel=True) 

82 path = pathlib.Path(repo_root_folder) 

83 anchor = path.parent.name 

84 here = path.name 

85 CONTEXT['source_hint'] = f'{anchor}/{here}' 

86 yield f'Root ({repo_root_folder})' 

87 except Exception: # noqa 

88 yield 'WARNING - ignored exception when assessing repo root folder location' 

89 for line in generate_report(status): 

90 yield line.rstrip() 

91 

92 except Exception as err: # noqa 

93 yield f'WARNING - we seem to not be within a git repository clone ({err})' 

94 

95 

96def node_id() -> str: 

97 """Generate the build node identifier. 

98 

99 Examples: 

100 

101 >>> nid = node_id() 

102 >>> assert len(nid) == 36 

103 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23])) 

104 """ 

105 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node())) 

106 

107 

108def report_taxonomy(target_path: pathlib.Path) -> None: 

109 """Convenience function to report date, size, and checksums of the deliverable.""" 

110 taxonomy = Taxonomy(target_path, excludes='', key_function='md5') 

111 for path in sorted(target_path.parent.rglob('*')): 

112 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path) 

113 log.warning('- Writing render/pdf folder taxonomy to inventory.json ...') 

114 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False) 

115 

116 stat = target_path.stat() 

117 size_bytes = stat.st_size 

118 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT) 

119 sha612_hash = hash_file(target_path, hashlib.sha512) 

120 sha256_hash = hash_file(target_path, hashlib.sha256) 

121 sha1_hash = hash_file(target_path, hashlib.sha1) 

122 md5_hash = hash_file(target_path, hashlib.md5) 

123 log.warning('- Ephemeral:') 

124 log.warning(f' + name: {target_path.name}') 

125 log.warning(f' + size: {size_bytes} bytes') 

126 log.warning(f' + date: {mod_time}') 

127 log.warning('- Characteristic:') 

128 log.warning(' + Checksums:') 

129 log.warning(f' sha512:{sha612_hash}') 

130 log.warning(f' sha256:{sha256_hash}') 

131 log.warning(f' sha1:{sha1_hash}') 

132 log.warning(f' md5:{md5_hash}') 

133 log.warning(' + Fonts:') 

134 

135 

136@no_type_check 

137def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

138 """Derive the unified diff between left and right lists of strings as generator of strings. 

139 

140 Examples: 

141 

142 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+')) 

143 >>> lines 

144 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d'] 

145 """ 

146 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label): 

147 yield line.rstrip() 

148 

149 

150@no_type_check 

151def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

152 """Do the log bridging of the diff.""" 

153 log.info(LOG_SEPARATOR) 

154 for line in unified_diff(left, right, left_label, right_label): 

155 for fine in line.split('\n'): 

156 log.info(fine) 

157 log.info(LOG_SEPARATOR) 

158 

159 

160@no_type_check 

161def ensure_separate_log_lines(sourcer: Callable, trampoline: Callable = log.info, *args: Union[list[object], None]): 

162 """Wrapping idiom breaking up any strings containing newlines.""" 

163 trampoline(LOG_SEPARATOR) 

164 for line in sourcer(*args) if args else sourcer(): 

165 for fine in line.split('\n'): 

166 trampoline(fine) 

167 trampoline(LOG_SEPARATOR) 

168 

169 

170@no_type_check 

171def log_subprocess_output(pipe, prefix: str) -> list[str]: 

172 log_buffer = [] 

173 for line in iter(pipe.readline, b''): # b'\n'-separated lines 

174 cand = line.decode(encoding=ENCODING).rstrip() 

175 msg = prefix + ': ' + cand 

176 log_buffer.append(msg) 

177 if HAS_ERROR.search(cand): 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 log.error(msg) 

179 continue 

180 if HAS_WARNING.search(cand) and not ( 

181 'latex' in prefix 

182 and any( 

183 ( 

184 '"calc" is loaded -- this is not' in cand, 

185 'Package microtype Warning: Unable to apply patch' in cand, 

186 'Unknown document division name (startatroot)' in cand, 

187 'Unknown slot number of character' in cand, 

188 ) 

189 ) 

190 ): 

191 log.warning(msg) 

192 continue 

193 if IS_BORING.search(cand): 

194 log.debug(msg) 

195 continue 

196 log.info(msg) 

197 

198 return log_buffer 

199 

200 

201@no_type_check 

202def delegate(command: list[str], marker: str, do_shell: bool = False, is_quiet: bool = False) -> int: 

203 """Execute command in subprocess and follow requests. 

204 

205 Hints on LaTeX noise reduction per special variables: 

206 

207 - max_print_line=1000 

208 - error_line=254 

209 - half_error_line=238 

210 

211 So, in texmf.copf or in shell process, these reduce the amount of lines ... 

212 

213 max_print_line=1000 error_line=254 half_error_line=238 

214 """ 

215 try: 

216 if 'latex' in marker: 

217 env = dict(os.environ) 

218 env['max_print_line'] = '1000' 

219 env['error_line'] = '254' 

220 env['half_error_line'] = '238' 

221 process = subprocess.Popen( 

222 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell, env=env # nosec B602 

223 ) 

224 else: 

225 process = subprocess.Popen( 

226 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602 

227 ) 

228 with process.stdout: 

229 log_buffer = log_subprocess_output(process.stdout, marker) 

230 code = process.wait() 

231 if code < 0: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 log.error(f'{marker} process ({command}) was terminated by signal {-code}; (cf. below for hints)') 

233 elif code > 0: 

234 log.error(f'{marker} process ({command}) returned {code}; (cf. below for hints)') 

235 else: 

236 log.info(f'{marker} process succeeded') 

237 except Exception as err: 

238 log.error(f'failed executing tool with error: {err}; (cf. below for hints)') 

239 code = 42 

240 

241 if code != 0 and is_quiet: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 for msg in log_buffer: 

243 if 'latex' in marker: 

244 payload = msg.replace(f'{marker}:', '').strip() 

245 if not payload: 

246 continue 

247 if '(microtype)' in payload: 

248 continue 

249 if 'Package microtype Warning: Unknown slot number of character' in payload: 

250 continue 

251 if IS_BORING.search(payload): 

252 continue 

253 if any( 

254 ( 

255 '"calc" is loaded -- this is not' in payload, 

256 'Package microtype Warning: Unable to apply patch' in payload, 

257 'Unknown document division name (startatroot)' in payload, 

258 'Unknown slot number of character' in payload, 

259 ) 

260 ): 

261 continue 

262 cleansed = payload.replace('[', '').replace(']', '').replace('|', '') 

263 if not cleansed.strip(): 

264 continue 

265 if not payload.replace(')', ''): 

266 continue 

267 log.error(msg) 

268 

269 if code == 0 and is_quiet and marker in ('label-pdf', '', 'assess-pdf-fonts'): 

270 for msg in log_buffer: 

271 log.warning(msg) 

272 

273 return code 

274 

275 

276@no_type_check 

277def report(on: ToolKey) -> int: 

278 """Execute the tool specific version command.""" 

279 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {}) 

280 tool_version_call_text = str(tool_context.get('command', '')).strip() 

281 tool_version_call = tool_version_call_text.split() 

282 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip() 

283 if not tool_version_call: 

284 log.warning(f'cowardly avoiding undefined call for tool key ({on})') 

285 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})') 

286 return 42 

287 

288 log.info(LOG_SEPARATOR) 

289 log.info(f'requesting tool version information from environment per ({tool_version_call})') 

290 log.info(f'- {tool_reason_banner}') 

291 code = delegate(tool_version_call, f'tool-version-of-{on}') 

292 log.info(LOG_SEPARATOR) 

293 

294 return code 

295 

296 

297@no_type_check 

298def execute_filter( 

299 the_filter: Callable, 

300 head: str, 

301 backup: str, 

302 label: str, 

303 text_lines: list[str], 

304 lookup: Union[dict[str, str], None] = None, 

305) -> list[str]: 

306 """Chain filter calls by storing in and out lies in files and return the resulting lines.""" 

307 log.info(LOG_SEPARATOR) 

308 log.info(head) 

309 doc_before_caps_patch = backup 

310 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle: 

311 handle.write('\n'.join(text_lines)) 

312 patched_lines = the_filter(text_lines, lookup=lookup) 

313 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle: 

314 handle.write('\n'.join(patched_lines)) 

315 log.info(f'diff of the ({label}) filter result:') 

316 log_unified_diff(text_lines, patched_lines) 

317 

318 return patched_lines 

319 

320 

321@no_type_check 

322def load_target( 

323 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH 

324) -> tuple[bool, dict[str, str]]: 

325 """DRY.""" 

326 if not structure_path.is_file() or not structure_path.stat().st_size: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 log.error(f'render failed to find non-empty structure file at {structure_path}') 

328 return False, {} 

329 

330 with open(structure_path, 'rt', encoding=ENCODING) as handle: 

331 structure = yaml.safe_load(handle) 

332 

333 targets = sorted(structure.keys()) 

334 

335 if not targets: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 log.error(f'structure at ({structure_path}) does not provide any targets') 

337 return False, {} 

338 

339 if target_code not in targets: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 log.error(f'structure does not provide ({target_code})') 

341 return False, {} 

342 

343 if len(targets) != 1: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})') 

345 return True, {} 

346 

347 target = targets[0] 

348 facets = sorted(list(facet.keys())[0] for facet in structure[target]) 

349 log.info(f'found single target ({target}) with facets ({facets})') 

350 

351 if facet_code not in facets: 

352 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})') 

353 return False, {} 

354 

355 aspect_map = {} 

356 for data in structure[target]: 356 ↛ 360line 356 didn't jump to line 360 because the loop on line 356 didn't complete

357 if facet_code in data: 

358 aspect_map = data[facet_code] 

359 break 

360 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map] 

361 if missing_keys: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 log.error( 

363 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}' 

364 f' for target ({target_code}) and facet ({facet_code})' 

365 ) 

366 log.error(f'- the found aspects: {sorted(aspect_map.keys())}') 

367 log.error(f'- missing aspects: {sorted(missing_keys)}') 

368 return False, {} 

369 

370 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED): 

371 log.debug( 

372 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}' 

373 f' for target ({target_code}) and facet ({facet_code})' 

374 ) 

375 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead') 

376 

377 return True, aspect_map 

378 

379 

380@no_type_check 

381def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]: 

382 """Separation of concerns.""" 

383 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING)) 

384 blocks = doc['blocks'] 

385 mermaid_caption_map = {} 

386 for b in blocks: 

387 if b['t'] == 'CodeBlock' and b['c'][0]: 

388 try: 

389 is_mermaid = b['c'][0][1][0] == 'mermaid' 

390 atts = b['c'][0][2] 

391 except IndexError: 

392 continue 

393 

394 if not is_mermaid: 

395 continue 

396 m_caption, m_filename, m_format, m_loc = '', '', '', '' 

397 for k, v in atts: 

398 if k == 'caption': 

399 m_caption = v 

400 elif k == 'filename': 

401 m_filename = v 

402 elif k == 'format': 

403 m_format = v 

404 elif k == 'loc': 

405 m_loc = v 

406 else: 

407 pass 

408 token = f'{m_loc}/{m_filename}.{m_format}' # noqa 

409 if token in mermaid_caption_map: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true

410 log.warning('Duplicate token, same caption?') 

411 log.warning(f'- prior: {token} -> {m_caption}') 

412 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}') 

413 mermaid_caption_map[token] = m_caption 

414 return mermaid_caption_map 

415 

416 

417def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]: 

418 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru]. 

419 

420 Examples: 

421 

422 >>> lines = ['a', 'b', 'c', 'd'] 

423 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c')) 

424 >>> filtered 

425 ['a', 'd'] 

426 """ 

427 in_section = False 

428 for line in text_lines: 

429 if not in_section: 

430 if from_cut in line: 

431 in_section = True 

432 continue 

433 if in_section: 

434 if thru_cut in line: 

435 in_section = False 

436 continue 

437 yield line