Coverage for liitos/tools.py: 87.77%

241 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-25 15:36:16 +00:00

1import datetime as dti 

2import difflib 

3import hashlib 

4import json 

5import pathlib 

6import platform 

7import re 

8import subprocess # nosec B404 

9import uuid 

10from typing import Any, Callable, Generator, Union, no_type_check 

11 

12import yaml 

13 

14import foran.foran as api # type: ignore 

15from foran.report import generate_report # type: ignore 

16from taksonomia.taksonomia import Taxonomy # type: ignore 

17 

18from liitos import ( 

19 CONTEXT, 

20 ENCODING, 

21 KEYS_REQUIRED, 

22 LATEX_PAYLOAD_NAME, 

23 TOOL_VERSION_COMMAND_MAP, 

24 ToolKey, 

25 log, 

26) 

27 

28PathLike = Union[str, pathlib.Path] 

29 

30DOC_BASE = pathlib.Path('..', '..') 

31STRUCTURE_PATH = DOC_BASE / 'structure.yml' 

32IMAGES_FOLDER = 'images/' 

33DIAGRAMS_FOLDER = 'diagrams/' 

34PATCH_SPEC_NAME = 'patch.yml' 

35CHUNK_SIZE = 2 << 15 

36TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00' 

37LOG_SEPARATOR = '- ' * 80 

38INTER_PROCESS_SYNC_SECS = 0.1 

39INTER_PROCESS_SYNC_ATTEMPTS = 10 

40 

41IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.') 

42 

43 

44def hash_file(path: pathlib.Path, hasher: Union[Callable[..., Any], None] = None) -> str: 

45 """Return the SHA512 hex digest of the data from file.""" 

46 if hasher is None: 

47 hasher = hashlib.sha512 

48 the_hash = hasher() 

49 with open(path, 'rb') as handle: 

50 while chunk := handle.read(CHUNK_SIZE): 

51 the_hash.update(chunk) 

52 return the_hash.hexdigest() 

53 

54 

55@no_type_check 

56def log_subprocess_output(pipe, prefix: str): 

57 for line in iter(pipe.readline, b''): # b'\n'-separated lines 

58 cand = line.decode(encoding=ENCODING).rstrip() 

59 if IS_BORING.search(cand): 

60 log.debug(cand) 

61 continue 

62 if cand.strip().strip('[])yex'): 

63 if any( 

64 [ 

65 'microtype' in cand, 

66 'xassoccnt' in cand, 

67 'texlive/2022/texmf-dist/tex/' in cand, 

68 cand == 'erns.sty)', 

69 cand == '(see the transcript file for additional information)', 

70 cand.startswith(r'Overfull \hbox ') 

71 and cand.endswith(r'pt too wide) has occurred while \output is active'), 

72 ] 

73 ): 

74 log.debug(f'{prefix}: %s', cand) 

75 else: 

76 log.info(f'{prefix}: %s', cand) 

77 

78 

79@no_type_check 

80def vcs_probe(): 

81 """Are we in front, on par, or behind with the upstream?""" 

82 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control' 

83 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control' 

84 try: 

85 repo = api.Repo('.', search_parent_directories=True) 

86 status = api.Status(repo) 

87 api.local_commits(repo, status) 

88 api.local_staged(repo, status) 

89 api.local_files(repo, status) 

90 CONTEXT['source_hash'] = f'sha1:{status.commit}' 

91 try: 

92 repo_root_folder = repo.git.rev_parse(show_toplevel=True) 

93 path = pathlib.Path(repo_root_folder) 

94 anchor = path.parent.name 

95 here = path.name 

96 CONTEXT['source_hint'] = f'{anchor}/{here}' 

97 yield f'Root ({repo_root_folder})' 

98 except Exception: # noqa 

99 yield 'WARNING - ignored exception when assessing repo root folder location' 

100 for line in generate_report(status): 

101 yield line.rstrip() 

102 except Exception: # noqa 

103 yield 'WARNING - we seem to not be within a git repository clone' 

104 

105 

106def node_id() -> str: 

107 """Generate the build node identifier.""" 

108 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node())) 

109 

110 

111def report_taxonomy(target_path: pathlib.Path) -> None: 

112 """Convenience function to report date, size, and checksums of the deliverable.""" 

113 taxonomy = Taxonomy(target_path, excludes='', key_function='md5') 

114 for path in sorted(target_path.parent.rglob('*')): 

115 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path) 

116 log.info('- Writing render/pdf folder taxonomy to inventory.json ...') 

117 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False) 

118 

119 stat = target_path.stat() 

120 size_bytes = stat.st_size 

121 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT) 

122 sha612_hash = hash_file(target_path, hashlib.sha512) 

123 sha256_hash = hash_file(target_path, hashlib.sha256) 

124 sha1_hash = hash_file(target_path, hashlib.sha1) 

125 md5_hash = hash_file(target_path, hashlib.md5) 

126 log.info('- Ephemeral:') 

127 log.info(f' + name: {target_path.name}') 

128 log.info(f' + size: {size_bytes} bytes') 

129 log.info(f' + date: {mod_time}') 

130 log.info('- Characteristic:') 

131 log.info(' + Checksums:') 

132 log.info(f' sha512:{sha612_hash}') 

133 log.info(f' sha256:{sha256_hash}') 

134 log.info(f' sha1:{sha1_hash}') 

135 log.info(f' md5:{md5_hash}') 

136 log.info(' + Fonts:') 

137 

138 

139@no_type_check 

140def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

141 """Derive the unified diff between left and right lists of strings as generator of strings.""" 

142 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label): 

143 yield line.rstrip() 

144 

145 

146@no_type_check 

147def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'): 

148 """Do the log bridging of the diff.""" 

149 log.info(LOG_SEPARATOR) 

150 for line in unified_diff(left, right, left_label, right_label): 

151 for fine in line.split('\n'): 

152 log.info(fine) 

153 log.info(LOG_SEPARATOR) 

154 

155 

156@no_type_check 

157def ensure_separate_log_lines(sourcer: Callable, *args: Union[list[object], None]): 

158 """Wrapping idiom breaking up any strings containing newlines.""" 

159 log.info(LOG_SEPARATOR) 

160 for line in sourcer(*args) if args else sourcer(): 

161 for fine in line.split('\n'): 

162 log.info(fine) 

163 log.info(LOG_SEPARATOR) 

164 

165 

166@no_type_check 

167def delegate(command: list[str], marker: str, do_shell: bool = False) -> int: 

168 """Execute command in subprocess and follow requests.""" 

169 try: 

170 process = subprocess.Popen( 

171 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602 

172 ) 

173 with process.stdout: 

174 log_subprocess_output(process.stdout, marker) 

175 code = process.wait() 

176 if code < 0: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 log.error(f'{marker} process ({command}) was terminated by signal {-code}') 

178 elif code > 0: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 log.error(f'{marker} process ({command}) returned {code}') 

180 else: 

181 log.info(f'{marker} process succeeded') 

182 except Exception as err: 

183 log.error(f'failed executing tool with error: {err}') 

184 code = 42 

185 

186 return code 

187 

188 

189@no_type_check 

190def report(on: ToolKey) -> int: 

191 """Execute the tool specific version command.""" 

192 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {}) 

193 tool_version_call_text = str(tool_context.get('command', '')).strip() 

194 tool_version_call = tool_version_call_text.split() 

195 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip() 

196 if not tool_version_call: 

197 log.warning(f'cowardly avoiding undefined call for tool key ({on})') 

198 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})') 

199 return 42 

200 

201 log.info(LOG_SEPARATOR) 

202 log.info(f'requesting tool version information from environment per ({tool_version_call})') 

203 log.info(f'- {tool_reason_banner}') 

204 code = delegate(tool_version_call, f'tool-version-of-{on}') 

205 log.info(LOG_SEPARATOR) 

206 

207 return code 

208 

209 

210@no_type_check 

211def execute_filter( 

212 the_filter: Callable, 

213 head: str, 

214 backup: str, 

215 label: str, 

216 text_lines: list[str], 

217 lookup: Union[dict[str, str], None] = None, 

218) -> list[str]: 

219 """Chain filter calls by storing in and out lies in files and return the resulting lines.""" 

220 log.info(LOG_SEPARATOR) 

221 log.info(head) 

222 doc_before_caps_patch = backup 

223 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle: 

224 handle.write('\n'.join(text_lines)) 

225 patched_lines = the_filter(text_lines, lookup=lookup) 

226 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle: 

227 handle.write('\n'.join(patched_lines)) 

228 log.info(f'diff of the ({label}) filter result:') 

229 log_unified_diff(text_lines, patched_lines) 

230 

231 return patched_lines 

232 

233 

234@no_type_check 

235def load_target( 

236 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH 

237) -> tuple[bool, dict[str, str]]: 

238 """DRY.""" 

239 if not structure_path.is_file() or not structure_path.stat().st_size: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 log.error(f'render failed to find non-empty structure file at {structure_path}') 

241 return False, {} 

242 

243 with open(structure_path, 'rt', encoding=ENCODING) as handle: 

244 structure = yaml.safe_load(handle) 

245 

246 targets = sorted(structure.keys()) 

247 

248 if not targets: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 log.error(f'structure at ({structure_path}) does not provide any targets') 

250 return False, {} 

251 

252 if target_code not in targets: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 log.error(f'structure does not provide ({target_code})') 

254 return False, {} 

255 

256 if len(targets) != 1: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})') 

258 return True, {} 

259 

260 target = targets[0] 

261 facets = sorted(list(facet.keys())[0] for facet in structure[target]) 

262 log.info(f'found single target ({target}) with facets ({facets})') 

263 

264 if facet_code not in facets: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})') 

266 return False, {} 

267 

268 aspect_map = {} 

269 for data in structure[target]: 269 ↛ 273line 269 didn't jump to line 273 because the loop on line 269 didn't complete

270 if facet_code in data: 270 ↛ 269line 270 didn't jump to line 269 because the condition on line 270 was always true

271 aspect_map = data[facet_code] 

272 break 

273 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map] 

274 if missing_keys: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 log.error( 

276 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}' 

277 f' for target ({target_code}) and facet ({facet_code})' 

278 ) 

279 log.error(f'- the found aspects: {sorted(aspect_map.keys())}') 

280 log.error(f'- missing aspects: {sorted(missing_keys)}') 

281 return False, {} 

282 

283 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED): 

284 log.debug( 

285 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}' 

286 f' for target ({target_code}) and facet ({facet_code})' 

287 ) 

288 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead') 

289 

290 return True, aspect_map 

291 

292 

293@no_type_check 

294def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]: 

295 """Separation of concerns.""" 

296 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING)) 

297 blocks = doc['blocks'] 

298 mermaid_caption_map = {} 

299 for b in blocks: 

300 if b['t'] == 'CodeBlock' and b['c'][0]: 

301 try: 

302 is_mermaid = b['c'][0][1][0] == 'mermaid' 

303 atts = b['c'][0][2] 

304 except IndexError: 

305 continue 

306 

307 if not is_mermaid: 

308 continue 

309 m_caption, m_filename, m_format, m_loc = '', '', '', '' 

310 for k, v in atts: 

311 if k == 'caption': 

312 m_caption = v 

313 elif k == 'filename': 

314 m_filename = v 

315 elif k == 'format': 

316 m_format = v 

317 elif k == 'loc': 

318 m_loc = v 

319 else: 

320 pass 

321 token = f'{m_loc}/{m_filename}.{m_format}' # noqa 

322 if token in mermaid_caption_map: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 log.warning('Duplicate token, same caption?') 

324 log.warning(f'- prior: {token} -> {m_caption}') 

325 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}') 

326 mermaid_caption_map[token] = m_caption 

327 return mermaid_caption_map 

328 

329 

330def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]: 

331 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru].""" 

332 in_section = False 

333 for line in text_lines: 

334 if not in_section: 

335 if from_cut in line: 

336 in_section = True 

337 continue 

338 if in_section: 

339 if thru_cut in line: 

340 in_section = False 

341 continue 

342 yield line