Coverage for liitos/tools.py: 81.05%
289 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 19:29:53 +00:00
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 19:29:53 +00:00
1import datetime as dti
2import difflib
3import hashlib
4import json
5import os
6import pathlib
7import platform
8import re
9import subprocess # nosec B404
10import uuid
11from typing import Any, Callable, Generator, Union, no_type_check
13import yaml
15import foran.foran as api # type: ignore
16from foran.report import generate_report # type: ignore
17from taksonomia.taksonomia import Taxonomy # type: ignore
19from liitos import (
20 CONTEXT,
21 ENCODING,
22 KEYS_REQUIRED,
23 LATEX_PAYLOAD_NAME,
24 TOOL_VERSION_COMMAND_MAP,
25 ToolKey,
26 log,
27)
29PathLike = Union[str, pathlib.Path]
31SPACE = ' '
33DOC_BASE = pathlib.Path('..', '..')
34STRUCTURE_PATH = DOC_BASE / 'structure.yml'
35IMAGES_FOLDER = 'images/'
36DIAGRAMS_FOLDER = 'diagrams/'
37PATCH_SPEC_NAME = 'patch.yml'
38CHUNK_SIZE = 2 << 15
39TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00'
40LOG_SEPARATOR = '- ' * 80
41INTER_PROCESS_SYNC_SECS = 0.1
42INTER_PROCESS_SYNC_ATTEMPTS = 10
44IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.')
45HAS_WARNING = re.compile(r'[Ww]arning')
46HAS_ERROR = re.compile(r'[Ee]rror')
49def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str:
50 """Return the SHA512 hex digest of the data from file.
52 Examples:
54 >>> import pathlib, tempfile
55 >>> empty_sha512 = (
56 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce'
57 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e'
58 ... )
59 >>> with tempfile.NamedTemporaryFile() as handle:
60 ... empty_hash = hash_file(handle.name)
61 >>> assert empty_hash == empty_sha512
62 """
63 if hasher is None:
64 hasher = hashlib.sha512
65 the_hash = hasher()
66 with open(path, 'rb') as handle:
67 while chunk := handle.read(CHUNK_SIZE):
68 the_hash.update(chunk)
69 return the_hash.hexdigest()
72@no_type_check
73def vcs_probe():
74 """Are we in front, on par, or behind with the upstream?"""
75 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control'
76 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control'
77 try:
78 repo = api.Repo('.', search_parent_directories=True)
79 status = api.Status(repo)
80 CONTEXT['source_hash'] = f'sha1:{status.commit}'
82 try:
83 repo_root_folder = repo.git.rev_parse(show_toplevel=True)
84 path = pathlib.Path(repo_root_folder)
85 anchor = path.parent.name
86 here = path.name
87 CONTEXT['source_hint'] = f'{anchor}/{here}'
88 yield f'Root ({repo_root_folder})'
89 except Exception: # noqa
90 yield 'WARNING - ignored exception when assessing repo root folder location'
91 for line in generate_report(status):
92 yield line.rstrip()
94 except Exception as err: # noqa
95 yield f'WARNING - we seem to not be within a git repository clone ({err})'
98def node_id() -> str:
99 """Generate the build node identifier.
101 Examples:
103 >>> nid = node_id()
104 >>> assert len(nid) == 36
105 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23]))
106 """
107 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node()))
110def report_taxonomy(target_path: pathlib.Path) -> None:
111 """Convenience function to report date, size, and checksums of the deliverable."""
112 taxonomy = Taxonomy(target_path, excludes='', key_function='md5')
113 for path in sorted(target_path.parent.rglob('*')):
114 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path)
115 log.warning('- Writing render/pdf folder taxonomy to inventory.json ...')
116 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False)
118 stat = target_path.stat()
119 size_bytes = stat.st_size
120 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT)
121 sha612_hash = hash_file(target_path, hashlib.sha512)
122 sha256_hash = hash_file(target_path, hashlib.sha256)
123 sha1_hash = hash_file(target_path, hashlib.sha1)
124 md5_hash = hash_file(target_path, hashlib.md5)
125 log.warning('- Ephemeral:')
126 log.warning(f' + name: {target_path.name}')
127 log.warning(f' + size: {size_bytes} bytes')
128 log.warning(f' + date: {mod_time}')
129 log.warning('- Characteristic:')
130 log.warning(' + Checksums:')
131 log.warning(f' sha512:{sha612_hash}')
132 log.warning(f' sha256:{sha256_hash}')
133 log.warning(f' sha1:{sha1_hash}')
134 log.warning(f' md5:{md5_hash}')
135 log.warning(' + Fonts:')
138@no_type_check
139def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
140 """Derive the unified diff between left and right lists of strings as generator of strings.
142 Examples:
144 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+'))
145 >>> lines
146 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d']
147 """
148 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label):
149 yield line.rstrip()
152@no_type_check
153def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
154 """Do the log bridging of the diff."""
155 log.info(LOG_SEPARATOR)
156 for line in unified_diff(left, right, left_label, right_label):
157 for fine in line.split('\n'):
158 log.info(fine)
159 log.info(LOG_SEPARATOR)
162@no_type_check
163def ensure_separate_log_lines(sourcer: Callable, trampoline: Callable = log.info, *args: Union[list[object], None]):
164 """Wrapping idiom breaking up any strings containing newlines."""
165 trampoline(LOG_SEPARATOR)
166 for line in sourcer(*args) if args else sourcer():
167 for fine in line.split('\n'):
168 trampoline(fine)
169 trampoline(LOG_SEPARATOR)
172@no_type_check
173def log_subprocess_output(pipe, prefix: str) -> list[str]:
174 log_buffer = []
175 for line in iter(pipe.readline, b''): # b'\n'-separated lines
176 cand = line.decode(encoding=ENCODING).rstrip()
177 msg = prefix + ': ' + cand
178 log_buffer.append(msg)
179 if HAS_ERROR.search(cand): 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 log.error(msg)
181 continue
182 if HAS_WARNING.search(cand) and not (
183 'latex' in prefix
184 and any(
185 (
186 '"calc" is loaded -- this is not' in cand,
187 'Package microtype Warning: Unable to apply patch' in cand,
188 'Unknown document division name (startatroot)' in cand,
189 'Unknown slot number of character' in cand,
190 )
191 )
192 ):
193 log.warning(msg)
194 continue
195 if IS_BORING.search(cand):
196 log.debug(msg)
197 continue
198 log.info(msg)
200 return log_buffer
203@no_type_check
204def delegate(command: list[str], marker: str, do_shell: bool = False, is_quiet: bool = False) -> int:
205 """Execute command in subprocess and follow requests.
207 Hints on LaTeX noise reduction per special variables:
209 - max_print_line=1000
210 - error_line=254
211 - half_error_line=238
213 So, in texmf.copf or in shell process, these reduce the amount of lines ...
215 max_print_line=1000 error_line=254 half_error_line=238
216 """
217 try:
218 if 'latex' in marker:
219 env = dict(os.environ)
220 env['max_print_line'] = '1000'
221 env['error_line'] = '254'
222 env['half_error_line'] = '238'
223 process = subprocess.Popen(
224 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell, env=env # nosec B602
225 )
226 else:
227 process = subprocess.Popen(
228 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602
229 )
230 with process.stdout:
231 log_buffer = log_subprocess_output(process.stdout, marker)
232 code = process.wait()
233 if code < 0: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 log.error(f'{marker} process ({command}) was terminated by signal {-code}; (cf. below for hints)')
235 elif code > 0:
236 log.error(f'{marker} process ({command}) returned {code}; (cf. below for hints)')
237 else:
238 log.info(f'{marker} process succeeded')
239 except Exception as err:
240 log.error(f'failed executing tool with error: {err}; (cf. below for hints)')
241 code = 42
243 if code != 0 and is_quiet: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 for msg in log_buffer:
245 if 'latex' in marker:
246 payload = msg.replace(f'{marker}:', '').strip()
247 if not payload:
248 continue
249 if '(microtype)' in payload:
250 continue
251 if 'Package microtype Warning: Unknown slot number of character' in payload:
252 continue
253 if IS_BORING.search(payload):
254 continue
255 if any(
256 (
257 '"calc" is loaded -- this is not' in payload,
258 'Package microtype Warning: Unable to apply patch' in payload,
259 'Unknown document division name (startatroot)' in payload,
260 'Unknown slot number of character' in payload,
261 )
262 ):
263 continue
264 cleansed = payload.replace('[', '').replace(']', '').replace('|', '')
265 if not cleansed.strip():
266 continue
267 if not payload.replace(')', ''):
268 continue
269 log.error(msg)
271 if code == 0 and is_quiet and marker in ('label-pdf', '', 'assess-pdf-fonts'):
272 for msg in log_buffer:
273 log.warning(msg)
275 return code
278@no_type_check
279def report(on: ToolKey) -> int:
280 """Execute the tool specific version command."""
281 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {})
282 tool_version_call_text = str(tool_context.get('command', '')).strip()
283 tool_version_call = tool_version_call_text.split()
284 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip()
285 if not tool_version_call:
286 log.warning(f'cowardly avoiding undefined call for tool key ({on})')
287 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})')
288 return 42
290 log.info(LOG_SEPARATOR)
291 log.info(f'requesting tool version information from environment per ({tool_version_call})')
292 log.info(f'- {tool_reason_banner}')
293 code = delegate(tool_version_call, f'tool-version-of-{on}')
294 log.info(LOG_SEPARATOR)
296 return code
299@no_type_check
300def execute_filter(
301 the_filter: Callable,
302 head: str,
303 backup: str,
304 label: str,
305 text_lines: list[str],
306 lookup: Union[dict[str, str], None] = None,
307) -> list[str]:
308 """Chain filter calls by storing in and out lies in files and return the resulting lines."""
309 log.info(LOG_SEPARATOR)
310 log.info(head)
311 doc_before_caps_patch = backup
312 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle:
313 handle.write('\n'.join(text_lines))
314 patched_lines = the_filter(text_lines, lookup=lookup)
315 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle:
316 handle.write('\n'.join(patched_lines))
317 log.info(f'diff of the ({label}) filter result:')
318 log_unified_diff(text_lines, patched_lines)
320 return patched_lines
323@no_type_check
324def load_target(
325 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH
326) -> tuple[bool, dict[str, str]]:
327 """DRY."""
328 if not structure_path.is_file() or not structure_path.stat().st_size: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true
329 log.error(f'render failed to find non-empty structure file at {structure_path}')
330 return False, {}
332 with open(structure_path, 'rt', encoding=ENCODING) as handle:
333 structure = yaml.safe_load(handle)
335 targets = sorted(structure.keys())
337 if not targets: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 log.error(f'structure at ({structure_path}) does not provide any targets')
339 return False, {}
341 if target_code not in targets: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true
342 log.error(f'structure does not provide ({target_code})')
343 return False, {}
345 if len(targets) != 1: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})')
347 return True, {}
349 target = targets[0]
350 facets = sorted(list(facet.keys())[0] for facet in structure[target])
351 log.info(f'found single target ({target}) with facets ({facets})')
353 if facet_code not in facets:
354 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})')
355 return False, {}
357 aspect_map = {}
358 for data in structure[target]: 358 ↛ 362line 358 didn't jump to line 362 because the loop on line 358 didn't complete
359 if facet_code in data:
360 aspect_map = data[facet_code]
361 break
362 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map]
363 if missing_keys: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true
364 log.error(
365 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}'
366 f' for target ({target_code}) and facet ({facet_code})'
367 )
368 log.error(f'- the found aspects: {sorted(aspect_map.keys())}')
369 log.error(f'- missing aspects: {sorted(missing_keys)}')
370 return False, {}
372 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED):
373 log.debug(
374 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}'
375 f' for target ({target_code}) and facet ({facet_code})'
376 )
377 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead')
379 return True, aspect_map
382def incoherent_math_mode_in_caption(caption: str, phase_info: str = '') -> list[str]:
383 """Heuristics to warn on underscores and carets oustide of math mode in captions."""
384 findings: list[str] = []
385 if phase_info and not phase_info[0] == SPACE:
386 phase_info = SPACE + phase_info
387 if caption and '_' in caption and not ('$' in caption and not caption.count('$') % 2):
388 findings.append(f'Underscore (_) and no LaTeX math mode tokens in caption ({caption}){phase_info}')
389 if caption and '^' in caption and not ('$' in caption and not caption.count('$') % 2):
390 findings.append(f'Caret (^) and no LaTeX math mode tokens in caption ({caption}){phase_info}')
391 return findings
394@no_type_check
395def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]:
396 """Separation of concerns."""
397 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING))
398 blocks = doc['blocks']
399 mermaid_caption_map = {}
400 for b in blocks:
401 if b['t'] == 'CodeBlock' and b['c'][0]:
402 try:
403 is_mermaid = b['c'][0][1][0] == 'mermaid'
404 atts = b['c'][0][2]
405 except IndexError:
406 continue
408 if not is_mermaid:
409 continue
410 m_caption, m_filename, m_format, m_loc = '', '', '', ''
411 for k, v in atts:
412 if k == 'caption':
413 m_caption = v
414 elif k == 'filename':
415 m_filename = v
416 elif k == 'format':
417 m_format = v
418 elif k == 'loc':
419 m_loc = v
420 else:
421 pass
422 token = f'{m_loc}/{m_filename}.{m_format}' # noqa
423 if token in mermaid_caption_map: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true
424 log.warning('Duplicate token, same caption?')
425 log.warning(f'- prior: {token} -> {m_caption}')
426 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}')
427 for msg in incoherent_math_mode_in_caption(m_caption, phase_info=f'for mermaid image ({token})'): 427 ↛ 428line 427 didn't jump to line 428 because the loop on line 427 never started
428 log.warning(msg)
429 mermaid_caption_map[token] = m_caption
430 return mermaid_caption_map
433def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]:
434 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru].
436 Examples:
438 >>> lines = ['a', 'b', 'c', 'd']
439 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c'))
440 >>> filtered
441 ['a', 'd']
442 """
443 in_section = False
444 for line in text_lines:
445 if not in_section:
446 if from_cut in line:
447 in_section = True
448 continue
449 if in_section:
450 if thru_cut in line:
451 in_section = False
452 continue
453 yield line