Coverage for liitos/tools.py: 89.66%

241 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-05 17:22:35 +00:00

1import datetime as dti 

2import difflib 

3import hashlib 

4import json 

5import pathlib 

6import platform 

7import re 

8import subprocess # nosec B404 

9import uuid 

10from typing import Any, Callable, Generator, Union, no_type_check 

11 

12import yaml 

13 

14import foran.foran as api # type: ignore 

15from foran.report import generate_report # type: ignore 

16from taksonomia.taksonomia import Taxonomy # type: ignore 

17 

18from liitos import ( 

19 CONTEXT, 

20 ENCODING, 

21 KEYS_REQUIRED, 

22 LATEX_PAYLOAD_NAME, 

23 TOOL_VERSION_COMMAND_MAP, 

24 ToolKey, 

25 log, 

26) 

27 

28PathLike = Union[str, pathlib.Path] 

29 

30DOC_BASE = pathlib.Path('..', '..') 

31STRUCTURE_PATH = DOC_BASE / 'structure.yml' 

32IMAGES_FOLDER = 'images/' 

33DIAGRAMS_FOLDER = 'diagrams/' 

34PATCH_SPEC_NAME = 'patch.yml' 

35CHUNK_SIZE = 2 << 15 

36TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00' 

37LOG_SEPARATOR = '- ' * 80 

38INTER_PROCESS_SYNC_SECS = 0.1 

39INTER_PROCESS_SYNC_ATTEMPTS = 10 

40 

41IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.') 

42 

43 

44def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str: 

45 """Return the SHA512 hex digest of the data from file. 

46 

47 Examples: 

48 

49 >>> import pathlib, tempfile 

50 >>> empty_sha512 = ( 

51 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce' 

52 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e' 

53 ... ) 

54 >>> with tempfile.NamedTemporaryFile() as handle: 

55 ... empty_hash = hash_file(handle.name) 

56 >>> assert empty_hash == empty_sha512 

57 """ 

58 if hasher is None: 

59 hasher = hashlib.sha512 

60 the_hash = hasher() 

61 with open(path, 'rb') as handle: 

62 while chunk := handle.read(CHUNK_SIZE): 

63 the_hash.update(chunk) 

64 return the_hash.hexdigest() 

65 

66 

67@no_type_check 

68def log_subprocess_output(pipe, prefix: str): 

69 for line in iter(pipe.readline, b''): # b'\n'-separated lines 

70 cand = line.decode(encoding=ENCODING).rstrip() 

71 if IS_BORING.search(cand): 

72 log.debug(cand) 

73 continue 

74 if cand.strip().strip('[])yex'): 

75 if any( 

76 [ 

77 'microtype' in cand, 

78 'xassoccnt' in cand, 

79 'texlive/2022/texmf-dist/tex/' in cand, 

80 cand == 'erns.sty)', 

81 cand == '(see the transcript file for additional information)', 

82 cand.startswith(r'Overfull \hbox ') 

83 and cand.endswith(r'pt too wide) has occurred while \output is active'), 

84 ] 

85 ): 

86 log.debug(f'{prefix}: %s', cand) 

87 else: 

88 log.info(f'{prefix}: %s', cand) 

89 

90 

91@no_type_check 

92def vcs_probe(): 

93 """Are we in front, on par, or behind with the upstream?""" 

94 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control' 

95 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control' 

96 try: 

97 repo = api.Repo('.', search_parent_directories=True) 

98 status = api.Status(repo) 

99 api.local_commits(repo, status) 

100 api.local_staged(repo, status) 

101 api.local_files(repo, status) 

102 CONTEXT['source_hash'] = f'sha1:{status.commit}' 

103 try: 

104 repo_root_folder = repo.git.rev_parse(show_toplevel=True) 

105 path = pathlib.Path(repo_root_folder) 

106 anchor = path.parent.name 

107 here = path.name 

108 CONTEXT['source_hint'] = f'{anchor}/{here}' 

109 yield f'Root ({repo_root_folder})' 

110 except Exception: # noqa 

111 yield 'WARNING - ignored exception when assessing repo root folder location' 

112 for line in generate_report(status): 

113 yield line.rstrip() 

114 except Exception: # noqa 

115 yield 'WARNING - we seem to not be within a git repository clone' 

116 

117 

118def node_id() -> str: 

119 """Generate the build node identifier. 

120 

121 Examples: 

122 

123 >>> nid = node_id() 

124 >>> assert len(nid) == 36 

125 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23])) 

126 """ 

127 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node())) 

128 

129 

130def report_taxonomy(target_path: pathlib.Path) -> None: 

131 """Convenience function to report date, size, and checksums of the deliverable.""" 

132 taxonomy = Taxonomy(target_path, excludes='', key_function='md5') 

133 for path in sorted(target_path.parent.rglob('*')): 

134 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path) 

135 log.info('- Writing render/pdf folder taxonomy to inventory.json ...') 

136 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False) 

137 

138 stat = target_path.stat() 

139 size_bytes = stat.st_size 

140 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT) 

141 sha612_hash = hash_file(target_path, hashlib.sha512) 

142 sha256_hash = hash_file(target_path, hashlib.sha256) 

143 sha1_hash = hash_file(target_path, hashlib.sha1) 

144 md5_hash = hash_file(target_path, hashlib.md5) 

145 log.info('- Ephemeral:') 

146 log.info(f' + name: {target_path.name}') 

147 log.info(f' + size: {size_bytes} bytes') 

148 log.info(f' + date: {mod_time}') 

149 log.info('- Characteristic:') 

150 log.info(' + Checksums:') 

151 log.info(f' sha512:{sha612_hash}') 

152 log.info(f' sha256:{sha256_hash}') 

153 log.info(f' sha1:{sha1_hash}') 

154 log.info(f' md5:{md5_hash}') 

155 log.info(' + Fonts:') 

156 

157 

158@no_type_check 

159def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

160 """Derive the unified diff between left and right lists of strings as generator of strings. 

161 

162 Examples: 

163 

164 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+')) 

165 >>> lines 

166 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d'] 

167 """ 

168 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label): 

169 yield line.rstrip() 

170 

171 

172@no_type_check 

173def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

174 """Do the log bridging of the diff.""" 

175 log.info(LOG_SEPARATOR) 

176 for line in unified_diff(left, right, left_label, right_label): 

177 for fine in line.split('\n'): 

178 log.info(fine) 

179 log.info(LOG_SEPARATOR) 

180 

181 

182@no_type_check 

183def ensure_separate_log_lines(sourcer: Callable, *args: Union[list[object], None]): 

184 """Wrapping idiom breaking up any strings containing newlines.""" 

185 log.info(LOG_SEPARATOR) 

186 for line in sourcer(*args) if args else sourcer(): 

187 for fine in line.split('\n'): 

188 log.info(fine) 

189 log.info(LOG_SEPARATOR) 

190 

191 

192@no_type_check 

193def delegate(command: list[str], marker: str, do_shell: bool = False) -> int: 

194 """Execute command in subprocess and follow requests.""" 

195 try: 

196 process = subprocess.Popen( 

197 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602 

198 ) 

199 with process.stdout: 

200 log_subprocess_output(process.stdout, marker) 

201 code = process.wait() 

202 if code < 0: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 log.error(f'{marker} process ({command}) was terminated by signal {-code}') 

204 elif code > 0: 

205 log.error(f'{marker} process ({command}) returned {code}') 

206 else: 

207 log.info(f'{marker} process succeeded') 

208 except Exception as err: 

209 log.error(f'failed executing tool with error: {err}') 

210 code = 42 

211 

212 return code 

213 

214 

215@no_type_check 

216def report(on: ToolKey) -> int: 

217 """Execute the tool specific version command.""" 

218 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {}) 

219 tool_version_call_text = str(tool_context.get('command', '')).strip() 

220 tool_version_call = tool_version_call_text.split() 

221 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip() 

222 if not tool_version_call: 

223 log.warning(f'cowardly avoiding undefined call for tool key ({on})') 

224 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})') 

225 return 42 

226 

227 log.info(LOG_SEPARATOR) 

228 log.info(f'requesting tool version information from environment per ({tool_version_call})') 

229 log.info(f'- {tool_reason_banner}') 

230 code = delegate(tool_version_call, f'tool-version-of-{on}') 

231 log.info(LOG_SEPARATOR) 

232 

233 return code 

234 

235 

236@no_type_check 

237def execute_filter( 

238 the_filter: Callable, 

239 head: str, 

240 backup: str, 

241 label: str, 

242 text_lines: list[str], 

243 lookup: Union[dict[str, str], None] = None, 

244) -> list[str]: 

245 """Chain filter calls by storing in and out lies in files and return the resulting lines.""" 

246 log.info(LOG_SEPARATOR) 

247 log.info(head) 

248 doc_before_caps_patch = backup 

249 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle: 

250 handle.write('\n'.join(text_lines)) 

251 patched_lines = the_filter(text_lines, lookup=lookup) 

252 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle: 

253 handle.write('\n'.join(patched_lines)) 

254 log.info(f'diff of the ({label}) filter result:') 

255 log_unified_diff(text_lines, patched_lines) 

256 

257 return patched_lines 

258 

259 

260@no_type_check 

261def load_target( 

262 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH 

263) -> tuple[bool, dict[str, str]]: 

264 """DRY.""" 

265 if not structure_path.is_file() or not structure_path.stat().st_size: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 log.error(f'render failed to find non-empty structure file at {structure_path}') 

267 return False, {} 

268 

269 with open(structure_path, 'rt', encoding=ENCODING) as handle: 

270 structure = yaml.safe_load(handle) 

271 

272 targets = sorted(structure.keys()) 

273 

274 if not targets: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 log.error(f'structure at ({structure_path}) does not provide any targets') 

276 return False, {} 

277 

278 if target_code not in targets: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 log.error(f'structure does not provide ({target_code})') 

280 return False, {} 

281 

282 if len(targets) != 1: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})') 

284 return True, {} 

285 

286 target = targets[0] 

287 facets = sorted(list(facet.keys())[0] for facet in structure[target]) 

288 log.info(f'found single target ({target}) with facets ({facets})') 

289 

290 if facet_code not in facets: 

291 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})') 

292 return False, {} 

293 

294 aspect_map = {} 

295 for data in structure[target]: 295 ↛ 299line 295 didn't jump to line 299 because the loop on line 295 didn't complete

296 if facet_code in data: 

297 aspect_map = data[facet_code] 

298 break 

299 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map] 

300 if missing_keys: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 log.error( 

302 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}' 

303 f' for target ({target_code}) and facet ({facet_code})' 

304 ) 

305 log.error(f'- the found aspects: {sorted(aspect_map.keys())}') 

306 log.error(f'- missing aspects: {sorted(missing_keys)}') 

307 return False, {} 

308 

309 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED): 

310 log.debug( 

311 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}' 

312 f' for target ({target_code}) and facet ({facet_code})' 

313 ) 

314 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead') 

315 

316 return True, aspect_map 

317 

318 

319@no_type_check 

320def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]: 

321 """Separation of concerns.""" 

322 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING)) 

323 blocks = doc['blocks'] 

324 mermaid_caption_map = {} 

325 for b in blocks: 

326 if b['t'] == 'CodeBlock' and b['c'][0]: 

327 try: 

328 is_mermaid = b['c'][0][1][0] == 'mermaid' 

329 atts = b['c'][0][2] 

330 except IndexError: 

331 continue 

332 

333 if not is_mermaid: 

334 continue 

335 m_caption, m_filename, m_format, m_loc = '', '', '', '' 

336 for k, v in atts: 

337 if k == 'caption': 

338 m_caption = v 

339 elif k == 'filename': 

340 m_filename = v 

341 elif k == 'format': 

342 m_format = v 

343 elif k == 'loc': 

344 m_loc = v 

345 else: 

346 pass 

347 token = f'{m_loc}/{m_filename}.{m_format}' # noqa 

348 if token in mermaid_caption_map: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true

349 log.warning('Duplicate token, same caption?') 

350 log.warning(f'- prior: {token} -> {m_caption}') 

351 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}') 

352 mermaid_caption_map[token] = m_caption 

353 return mermaid_caption_map 

354 

355 

356def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]: 

357 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru]. 

358 

359 Examples: 

360 

361 >>> lines = ['a', 'b', 'c', 'd'] 

362 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c')) 

363 >>> filtered 

364 ['a', 'd'] 

365 """ 

366 in_section = False 

367 for line in text_lines: 

368 if not in_section: 

369 if from_cut in line: 

370 in_section = True 

371 continue 

372 if in_section: 

373 if thru_cut in line: 

374 in_section = False 

375 continue 

376 yield line