Coverage for liitos/tools.py: 88.89%

246 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-28 20:14:46 +00:00

1import datetime as dti 

2import difflib 

3import hashlib 

4import json 

5import pathlib 

6import platform 

7import re 

8import subprocess # nosec B404 

9import uuid 

10from typing import Any, Callable, Generator, Union, no_type_check 

11 

12import yaml 

13 

14import foran.foran as api # type: ignore 

15from foran.report import generate_report # type: ignore 

16from taksonomia.taksonomia import Taxonomy # type: ignore 

17 

18from liitos import ( 

19 CONTEXT, 

20 ENCODING, 

21 KEYS_REQUIRED, 

22 LATEX_PAYLOAD_NAME, 

23 TOOL_VERSION_COMMAND_MAP, 

24 ToolKey, 

25 log, 

26) 

27 

28PathLike = Union[str, pathlib.Path] 

29 

30DOC_BASE = pathlib.Path('..', '..') 

31STRUCTURE_PATH = DOC_BASE / 'structure.yml' 

32IMAGES_FOLDER = 'images/' 

33DIAGRAMS_FOLDER = 'diagrams/' 

34PATCH_SPEC_NAME = 'patch.yml' 

35CHUNK_SIZE = 2 << 15 

36TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00' 

37LOG_SEPARATOR = '- ' * 80 

38INTER_PROCESS_SYNC_SECS = 0.1 

39INTER_PROCESS_SYNC_ATTEMPTS = 10 

40 

41IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.') 

42HAS_WARNING = re.compile(r'[Ww]arning') 

43HAS_ERROR = re.compile(r'[Ee]rror') 

44 

45 

46def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str: 

47 """Return the SHA512 hex digest of the data from file. 

48 

49 Examples: 

50 

51 >>> import pathlib, tempfile 

52 >>> empty_sha512 = ( 

53 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce' 

54 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e' 

55 ... ) 

56 >>> with tempfile.NamedTemporaryFile() as handle: 

57 ... empty_hash = hash_file(handle.name) 

58 >>> assert empty_hash == empty_sha512 

59 """ 

60 if hasher is None: 

61 hasher = hashlib.sha512 

62 the_hash = hasher() 

63 with open(path, 'rb') as handle: 

64 while chunk := handle.read(CHUNK_SIZE): 

65 the_hash.update(chunk) 

66 return the_hash.hexdigest() 

67 

68 

69@no_type_check 

70def log_subprocess_output(pipe, prefix: str): 

71 for line in iter(pipe.readline, b''): # b'\n'-separated lines 

72 cand = line.decode(encoding=ENCODING).rstrip() 

73 if HAS_ERROR.search(cand): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 log.error(prefix + ': ' + cand) 

75 continue 

76 if HAS_WARNING.search(cand) and not any( 

77 ( 

78 '"calc" is loaded -- this is not' in cand, 

79 'Package microtype Warning: Unable to apply patch' in cand, 

80 'Unknown document division name (startatroot)' in cand, 

81 'Unknown slot number of character' in cand, 

82 ) 

83 ): 

84 log.warning(prefix + ': ' + cand) 

85 continue 

86 if IS_BORING.search(cand): 

87 log.debug(prefix + ': ' + cand) 

88 continue 

89 log.info(prefix + ': ' + cand) 

90 

91 

92@no_type_check 

93def vcs_probe(): 

94 """Are we in front, on par, or behind with the upstream?""" 

95 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control' 

96 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control' 

97 try: 

98 repo = api.Repo('.', search_parent_directories=True) 

99 status = api.Status(repo) 

100 api.local_commits(repo, status) 

101 api.local_staged(repo, status) 

102 api.local_files(repo, status) 

103 CONTEXT['source_hash'] = f'sha1:{status.commit}' 

104 try: 

105 repo_root_folder = repo.git.rev_parse(show_toplevel=True) 

106 path = pathlib.Path(repo_root_folder) 

107 anchor = path.parent.name 

108 here = path.name 

109 CONTEXT['source_hint'] = f'{anchor}/{here}' 

110 yield f'Root ({repo_root_folder})' 

111 except Exception: # noqa 

112 yield 'WARNING - ignored exception when assessing repo root folder location' 

113 for line in generate_report(status): 

114 yield line.rstrip() 

115 except Exception: # noqa 

116 yield 'WARNING - we seem to not be within a git repository clone' 

117 

118 

119def node_id() -> str: 

120 """Generate the build node identifier. 

121 

122 Examples: 

123 

124 >>> nid = node_id() 

125 >>> assert len(nid) == 36 

126 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23])) 

127 """ 

128 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node())) 

129 

130 

131def report_taxonomy(target_path: pathlib.Path) -> None: 

132 """Convenience function to report date, size, and checksums of the deliverable.""" 

133 taxonomy = Taxonomy(target_path, excludes='', key_function='md5') 

134 for path in sorted(target_path.parent.rglob('*')): 

135 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path) 

136 log.info('- Writing render/pdf folder taxonomy to inventory.json ...') 

137 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False) 

138 

139 stat = target_path.stat() 

140 size_bytes = stat.st_size 

141 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT) 

142 sha612_hash = hash_file(target_path, hashlib.sha512) 

143 sha256_hash = hash_file(target_path, hashlib.sha256) 

144 sha1_hash = hash_file(target_path, hashlib.sha1) 

145 md5_hash = hash_file(target_path, hashlib.md5) 

146 log.info('- Ephemeral:') 

147 log.info(f' + name: {target_path.name}') 

148 log.info(f' + size: {size_bytes} bytes') 

149 log.info(f' + date: {mod_time}') 

150 log.info('- Characteristic:') 

151 log.info(' + Checksums:') 

152 log.info(f' sha512:{sha612_hash}') 

153 log.info(f' sha256:{sha256_hash}') 

154 log.info(f' sha1:{sha1_hash}') 

155 log.info(f' md5:{md5_hash}') 

156 log.info(' + Fonts:') 

157 

158 

159@no_type_check 

160def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

161 """Derive the unified diff between left and right lists of strings as generator of strings. 

162 

163 Examples: 

164 

165 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+')) 

166 >>> lines 

167 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d'] 

168 """ 

169 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label): 

170 yield line.rstrip() 

171 

172 

173@no_type_check 

174def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

175 """Do the log bridging of the diff.""" 

176 log.info(LOG_SEPARATOR) 

177 for line in unified_diff(left, right, left_label, right_label): 

178 for fine in line.split('\n'): 

179 log.info(fine) 

180 log.info(LOG_SEPARATOR) 

181 

182 

183@no_type_check 

184def ensure_separate_log_lines(sourcer: Callable, *args: Union[list[object], None]): 

185 """Wrapping idiom breaking up any strings containing newlines.""" 

186 log.info(LOG_SEPARATOR) 

187 for line in sourcer(*args) if args else sourcer(): 

188 for fine in line.split('\n'): 

189 log.info(fine) 

190 log.info(LOG_SEPARATOR) 

191 

192 

193@no_type_check 

194def delegate(command: list[str], marker: str, do_shell: bool = False) -> int: 

195 """Execute command in subprocess and follow requests.""" 

196 try: 

197 process = subprocess.Popen( 

198 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602 

199 ) 

200 with process.stdout: 

201 log_subprocess_output(process.stdout, marker) 

202 code = process.wait() 

203 if code < 0: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 log.error(f'{marker} process ({command}) was terminated by signal {-code}') 

205 elif code > 0: 

206 log.error(f'{marker} process ({command}) returned {code}') 

207 else: 

208 log.info(f'{marker} process succeeded') 

209 except Exception as err: 

210 log.error(f'failed executing tool with error: {err}') 

211 code = 42 

212 

213 return code 

214 

215 

216@no_type_check 

217def report(on: ToolKey) -> int: 

218 """Execute the tool specific version command.""" 

219 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {}) 

220 tool_version_call_text = str(tool_context.get('command', '')).strip() 

221 tool_version_call = tool_version_call_text.split() 

222 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip() 

223 if not tool_version_call: 

224 log.warning(f'cowardly avoiding undefined call for tool key ({on})') 

225 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})') 

226 return 42 

227 

228 log.info(LOG_SEPARATOR) 

229 log.info(f'requesting tool version information from environment per ({tool_version_call})') 

230 log.info(f'- {tool_reason_banner}') 

231 code = delegate(tool_version_call, f'tool-version-of-{on}') 

232 log.info(LOG_SEPARATOR) 

233 

234 return code 

235 

236 

237@no_type_check 

238def execute_filter( 

239 the_filter: Callable, 

240 head: str, 

241 backup: str, 

242 label: str, 

243 text_lines: list[str], 

244 lookup: Union[dict[str, str], None] = None, 

245) -> list[str]: 

246 """Chain filter calls by storing in and out lies in files and return the resulting lines.""" 

247 log.info(LOG_SEPARATOR) 

248 log.info(head) 

249 doc_before_caps_patch = backup 

250 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle: 

251 handle.write('\n'.join(text_lines)) 

252 patched_lines = the_filter(text_lines, lookup=lookup) 

253 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle: 

254 handle.write('\n'.join(patched_lines)) 

255 log.info(f'diff of the ({label}) filter result:') 

256 log_unified_diff(text_lines, patched_lines) 

257 

258 return patched_lines 

259 

260 

261@no_type_check 

262def load_target( 

263 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH 

264) -> tuple[bool, dict[str, str]]: 

265 """DRY.""" 

266 if not structure_path.is_file() or not structure_path.stat().st_size: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 log.error(f'render failed to find non-empty structure file at {structure_path}') 

268 return False, {} 

269 

270 with open(structure_path, 'rt', encoding=ENCODING) as handle: 

271 structure = yaml.safe_load(handle) 

272 

273 targets = sorted(structure.keys()) 

274 

275 if not targets: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 log.error(f'structure at ({structure_path}) does not provide any targets') 

277 return False, {} 

278 

279 if target_code not in targets: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 log.error(f'structure does not provide ({target_code})') 

281 return False, {} 

282 

283 if len(targets) != 1: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})') 

285 return True, {} 

286 

287 target = targets[0] 

288 facets = sorted(list(facet.keys())[0] for facet in structure[target]) 

289 log.info(f'found single target ({target}) with facets ({facets})') 

290 

291 if facet_code not in facets: 

292 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})') 

293 return False, {} 

294 

295 aspect_map = {} 

296 for data in structure[target]: 296 ↛ 300line 296 didn't jump to line 300 because the loop on line 296 didn't complete

297 if facet_code in data: 

298 aspect_map = data[facet_code] 

299 break 

300 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map] 

301 if missing_keys: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 log.error( 

303 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}' 

304 f' for target ({target_code}) and facet ({facet_code})' 

305 ) 

306 log.error(f'- the found aspects: {sorted(aspect_map.keys())}') 

307 log.error(f'- missing aspects: {sorted(missing_keys)}') 

308 return False, {} 

309 

310 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED): 

311 log.debug( 

312 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}' 

313 f' for target ({target_code}) and facet ({facet_code})' 

314 ) 

315 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead') 

316 

317 return True, aspect_map 

318 

319 

320@no_type_check 

321def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]: 

322 """Separation of concerns.""" 

323 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING)) 

324 blocks = doc['blocks'] 

325 mermaid_caption_map = {} 

326 for b in blocks: 

327 if b['t'] == 'CodeBlock' and b['c'][0]: 

328 try: 

329 is_mermaid = b['c'][0][1][0] == 'mermaid' 

330 atts = b['c'][0][2] 

331 except IndexError: 

332 continue 

333 

334 if not is_mermaid: 

335 continue 

336 m_caption, m_filename, m_format, m_loc = '', '', '', '' 

337 for k, v in atts: 

338 if k == 'caption': 

339 m_caption = v 

340 elif k == 'filename': 

341 m_filename = v 

342 elif k == 'format': 

343 m_format = v 

344 elif k == 'loc': 

345 m_loc = v 

346 else: 

347 pass 

348 token = f'{m_loc}/{m_filename}.{m_format}' # noqa 

349 if token in mermaid_caption_map: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 log.warning('Duplicate token, same caption?') 

351 log.warning(f'- prior: {token} -> {m_caption}') 

352 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}') 

353 mermaid_caption_map[token] = m_caption 

354 return mermaid_caption_map 

355 

356 

357def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]: 

358 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru]. 

359 

360 Examples: 

361 

362 >>> lines = ['a', 'b', 'c', 'd'] 

363 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c')) 

364 >>> filtered 

365 ['a', 'd'] 

366 """ 

367 in_section = False 

368 for line in text_lines: 

369 if not in_section: 

370 if from_cut in line: 

371 in_section = True 

372 continue 

373 if in_section: 

374 if thru_cut in line: 

375 in_section = False 

376 continue 

377 yield line