Coverage for liitos/tools.py: 80.58%
277 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-08-31 13:07:35 +00:00
« prev ^ index » next coverage.py v7.10.6, created at 2025-08-31 13:07:35 +00:00
1import datetime as dti
2import difflib
3import hashlib
4import json
5import os
6import pathlib
7import platform
8import re
9import subprocess # nosec B404
10import uuid
11from typing import Any, Callable, Generator, Union, no_type_check
13import yaml
15import foran.foran as api # type: ignore
16from foran.report import generate_report # type: ignore
17from taksonomia.taksonomia import Taxonomy # type: ignore
19from liitos import (
20 CONTEXT,
21 ENCODING,
22 KEYS_REQUIRED,
23 LATEX_PAYLOAD_NAME,
24 TOOL_VERSION_COMMAND_MAP,
25 ToolKey,
26 log,
27)
29PathLike = Union[str, pathlib.Path]
31DOC_BASE = pathlib.Path('..', '..')
32STRUCTURE_PATH = DOC_BASE / 'structure.yml'
33IMAGES_FOLDER = 'images/'
34DIAGRAMS_FOLDER = 'diagrams/'
35PATCH_SPEC_NAME = 'patch.yml'
36CHUNK_SIZE = 2 << 15
37TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00'
38LOG_SEPARATOR = '- ' * 80
39INTER_PROCESS_SYNC_SECS = 0.1
40INTER_PROCESS_SYNC_ATTEMPTS = 10
42IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.')
43HAS_WARNING = re.compile(r'[Ww]arning')
44HAS_ERROR = re.compile(r'[Ee]rror')
47def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str:
48 """Return the SHA512 hex digest of the data from file.
50 Examples:
52 >>> import pathlib, tempfile
53 >>> empty_sha512 = (
54 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce'
55 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e'
56 ... )
57 >>> with tempfile.NamedTemporaryFile() as handle:
58 ... empty_hash = hash_file(handle.name)
59 >>> assert empty_hash == empty_sha512
60 """
61 if hasher is None:
62 hasher = hashlib.sha512
63 the_hash = hasher()
64 with open(path, 'rb') as handle:
65 while chunk := handle.read(CHUNK_SIZE):
66 the_hash.update(chunk)
67 return the_hash.hexdigest()
70@no_type_check
71def vcs_probe():
72 """Are we in front, on par, or behind with the upstream?"""
73 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control'
74 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control'
75 try:
76 repo = api.Repo('.', search_parent_directories=True)
77 status = api.Status(repo)
78 CONTEXT['source_hash'] = f'sha1:{status.commit}'
80 try:
81 repo_root_folder = repo.git.rev_parse(show_toplevel=True)
82 path = pathlib.Path(repo_root_folder)
83 anchor = path.parent.name
84 here = path.name
85 CONTEXT['source_hint'] = f'{anchor}/{here}'
86 yield f'Root ({repo_root_folder})'
87 except Exception: # noqa
88 yield 'WARNING - ignored exception when assessing repo root folder location'
89 for line in generate_report(status):
90 yield line.rstrip()
92 except Exception as err: # noqa
93 yield f'WARNING - we seem to not be within a git repository clone ({err})'
96def node_id() -> str:
97 """Generate the build node identifier.
99 Examples:
101 >>> nid = node_id()
102 >>> assert len(nid) == 36
103 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23]))
104 """
105 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node()))
108def report_taxonomy(target_path: pathlib.Path) -> None:
109 """Convenience function to report date, size, and checksums of the deliverable."""
110 taxonomy = Taxonomy(target_path, excludes='', key_function='md5')
111 for path in sorted(target_path.parent.rglob('*')):
112 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path)
113 log.warning('- Writing render/pdf folder taxonomy to inventory.json ...')
114 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False)
116 stat = target_path.stat()
117 size_bytes = stat.st_size
118 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT)
119 sha612_hash = hash_file(target_path, hashlib.sha512)
120 sha256_hash = hash_file(target_path, hashlib.sha256)
121 sha1_hash = hash_file(target_path, hashlib.sha1)
122 md5_hash = hash_file(target_path, hashlib.md5)
123 log.warning('- Ephemeral:')
124 log.warning(f' + name: {target_path.name}')
125 log.warning(f' + size: {size_bytes} bytes')
126 log.warning(f' + date: {mod_time}')
127 log.warning('- Characteristic:')
128 log.warning(' + Checksums:')
129 log.warning(f' sha512:{sha612_hash}')
130 log.warning(f' sha256:{sha256_hash}')
131 log.warning(f' sha1:{sha1_hash}')
132 log.warning(f' md5:{md5_hash}')
133 log.warning(' + Fonts:')
136@no_type_check
137def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
138 """Derive the unified diff between left and right lists of strings as generator of strings.
140 Examples:
142 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+'))
143 >>> lines
144 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d']
145 """
146 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label):
147 yield line.rstrip()
150@no_type_check
151def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
152 """Do the log bridging of the diff."""
153 log.info(LOG_SEPARATOR)
154 for line in unified_diff(left, right, left_label, right_label):
155 for fine in line.split('\n'):
156 log.info(fine)
157 log.info(LOG_SEPARATOR)
160@no_type_check
161def ensure_separate_log_lines(sourcer: Callable, trampoline: Callable = log.info, *args: Union[list[object], None]):
162 """Wrapping idiom breaking up any strings containing newlines."""
163 trampoline(LOG_SEPARATOR)
164 for line in sourcer(*args) if args else sourcer():
165 for fine in line.split('\n'):
166 trampoline(fine)
167 trampoline(LOG_SEPARATOR)
170@no_type_check
171def log_subprocess_output(pipe, prefix: str) -> list[str]:
172 log_buffer = []
173 for line in iter(pipe.readline, b''): # b'\n'-separated lines
174 cand = line.decode(encoding=ENCODING).rstrip()
175 msg = prefix + ': ' + cand
176 log_buffer.append(msg)
177 if HAS_ERROR.search(cand): 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 log.error(msg)
179 continue
180 if HAS_WARNING.search(cand) and not (
181 'latex' in prefix
182 and any(
183 (
184 '"calc" is loaded -- this is not' in cand,
185 'Package microtype Warning: Unable to apply patch' in cand,
186 'Unknown document division name (startatroot)' in cand,
187 'Unknown slot number of character' in cand,
188 )
189 )
190 ):
191 log.warning(msg)
192 continue
193 if IS_BORING.search(cand):
194 log.debug(msg)
195 continue
196 log.info(msg)
198 return log_buffer
201@no_type_check
202def delegate(command: list[str], marker: str, do_shell: bool = False, is_quiet: bool = False) -> int:
203 """Execute command in subprocess and follow requests.
205 Hints on LaTeX noise reduction per special variables:
207 - max_print_line=1000
208 - error_line=254
209 - half_error_line=238
211 So, in texmf.copf or in shell process, these reduce the amount of lines ...
213 max_print_line=1000 error_line=254 half_error_line=238
214 """
215 try:
216 if 'latex' in marker:
217 env = dict(os.environ)
218 env['max_print_line'] = '1000'
219 env['error_line'] = '254'
220 env['half_error_line'] = '238'
221 process = subprocess.Popen(
222 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell, env=env # nosec B602
223 )
224 else:
225 process = subprocess.Popen(
226 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602
227 )
228 with process.stdout:
229 log_buffer = log_subprocess_output(process.stdout, marker)
230 code = process.wait()
231 if code < 0: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 log.error(f'{marker} process ({command}) was terminated by signal {-code}; (cf. below for hints)')
233 elif code > 0:
234 log.error(f'{marker} process ({command}) returned {code}; (cf. below for hints)')
235 else:
236 log.info(f'{marker} process succeeded')
237 except Exception as err:
238 log.error(f'failed executing tool with error: {err}; (cf. below for hints)')
239 code = 42
241 if code != 0 and is_quiet: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 for msg in log_buffer:
243 if 'latex' in marker:
244 payload = msg.replace(f'{marker}:', '').strip()
245 if not payload:
246 continue
247 if '(microtype)' in payload:
248 continue
249 if 'Package microtype Warning: Unknown slot number of character' in payload:
250 continue
251 if IS_BORING.search(payload):
252 continue
253 if any(
254 (
255 '"calc" is loaded -- this is not' in payload,
256 'Package microtype Warning: Unable to apply patch' in payload,
257 'Unknown document division name (startatroot)' in payload,
258 'Unknown slot number of character' in payload,
259 )
260 ):
261 continue
262 cleansed = payload.replace('[', '').replace(']', '').replace('|', '')
263 if not cleansed.strip():
264 continue
265 if not payload.replace(')', ''):
266 continue
267 log.error(msg)
269 if code == 0 and is_quiet and marker in ('label-pdf', '', 'assess-pdf-fonts'):
270 for msg in log_buffer:
271 log.warning(msg)
273 return code
276@no_type_check
277def report(on: ToolKey) -> int:
278 """Execute the tool specific version command."""
279 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {})
280 tool_version_call_text = str(tool_context.get('command', '')).strip()
281 tool_version_call = tool_version_call_text.split()
282 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip()
283 if not tool_version_call:
284 log.warning(f'cowardly avoiding undefined call for tool key ({on})')
285 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})')
286 return 42
288 log.info(LOG_SEPARATOR)
289 log.info(f'requesting tool version information from environment per ({tool_version_call})')
290 log.info(f'- {tool_reason_banner}')
291 code = delegate(tool_version_call, f'tool-version-of-{on}')
292 log.info(LOG_SEPARATOR)
294 return code
297@no_type_check
298def execute_filter(
299 the_filter: Callable,
300 head: str,
301 backup: str,
302 label: str,
303 text_lines: list[str],
304 lookup: Union[dict[str, str], None] = None,
305) -> list[str]:
306 """Chain filter calls by storing in and out lies in files and return the resulting lines."""
307 log.info(LOG_SEPARATOR)
308 log.info(head)
309 doc_before_caps_patch = backup
310 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle:
311 handle.write('\n'.join(text_lines))
312 patched_lines = the_filter(text_lines, lookup=lookup)
313 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle:
314 handle.write('\n'.join(patched_lines))
315 log.info(f'diff of the ({label}) filter result:')
316 log_unified_diff(text_lines, patched_lines)
318 return patched_lines
321@no_type_check
322def load_target(
323 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH
324) -> tuple[bool, dict[str, str]]:
325 """DRY."""
326 if not structure_path.is_file() or not structure_path.stat().st_size: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 log.error(f'render failed to find non-empty structure file at {structure_path}')
328 return False, {}
330 with open(structure_path, 'rt', encoding=ENCODING) as handle:
331 structure = yaml.safe_load(handle)
333 targets = sorted(structure.keys())
335 if not targets: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 log.error(f'structure at ({structure_path}) does not provide any targets')
337 return False, {}
339 if target_code not in targets: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 log.error(f'structure does not provide ({target_code})')
341 return False, {}
343 if len(targets) != 1: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true
344 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})')
345 return True, {}
347 target = targets[0]
348 facets = sorted(list(facet.keys())[0] for facet in structure[target])
349 log.info(f'found single target ({target}) with facets ({facets})')
351 if facet_code not in facets:
352 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})')
353 return False, {}
355 aspect_map = {}
356 for data in structure[target]: 356 ↛ 360line 356 didn't jump to line 360 because the loop on line 356 didn't complete
357 if facet_code in data:
358 aspect_map = data[facet_code]
359 break
360 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map]
361 if missing_keys: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 log.error(
363 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}'
364 f' for target ({target_code}) and facet ({facet_code})'
365 )
366 log.error(f'- the found aspects: {sorted(aspect_map.keys())}')
367 log.error(f'- missing aspects: {sorted(missing_keys)}')
368 return False, {}
370 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED):
371 log.debug(
372 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}'
373 f' for target ({target_code}) and facet ({facet_code})'
374 )
375 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead')
377 return True, aspect_map
380@no_type_check
381def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]:
382 """Separation of concerns."""
383 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING))
384 blocks = doc['blocks']
385 mermaid_caption_map = {}
386 for b in blocks:
387 if b['t'] == 'CodeBlock' and b['c'][0]:
388 try:
389 is_mermaid = b['c'][0][1][0] == 'mermaid'
390 atts = b['c'][0][2]
391 except IndexError:
392 continue
394 if not is_mermaid:
395 continue
396 m_caption, m_filename, m_format, m_loc = '', '', '', ''
397 for k, v in atts:
398 if k == 'caption':
399 m_caption = v
400 elif k == 'filename':
401 m_filename = v
402 elif k == 'format':
403 m_format = v
404 elif k == 'loc':
405 m_loc = v
406 else:
407 pass
408 token = f'{m_loc}/{m_filename}.{m_format}' # noqa
409 if token in mermaid_caption_map: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 log.warning('Duplicate token, same caption?')
411 log.warning(f'- prior: {token} -> {m_caption}')
412 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}')
413 mermaid_caption_map[token] = m_caption
414 return mermaid_caption_map
417def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]:
418 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru].
420 Examples:
422 >>> lines = ['a', 'b', 'c', 'd']
423 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c'))
424 >>> filtered
425 ['a', 'd']
426 """
427 in_section = False
428 for line in text_lines:
429 if not in_section:
430 if from_cut in line:
431 in_section = True
432 continue
433 if in_section:
434 if thru_cut in line:
435 in_section = False
436 continue
437 yield line