Coverage for liitos/tools.py: 88.89%
246 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 20:14:46 +00:00
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-28 20:14:46 +00:00
1import datetime as dti
2import difflib
3import hashlib
4import json
5import pathlib
6import platform
7import re
8import subprocess # nosec B404
9import uuid
10from typing import Any, Callable, Generator, Union, no_type_check
12import yaml
14import foran.foran as api # type: ignore
15from foran.report import generate_report # type: ignore
16from taksonomia.taksonomia import Taxonomy # type: ignore
18from liitos import (
19 CONTEXT,
20 ENCODING,
21 KEYS_REQUIRED,
22 LATEX_PAYLOAD_NAME,
23 TOOL_VERSION_COMMAND_MAP,
24 ToolKey,
25 log,
26)
28PathLike = Union[str, pathlib.Path]
30DOC_BASE = pathlib.Path('..', '..')
31STRUCTURE_PATH = DOC_BASE / 'structure.yml'
32IMAGES_FOLDER = 'images/'
33DIAGRAMS_FOLDER = 'diagrams/'
34PATCH_SPEC_NAME = 'patch.yml'
35CHUNK_SIZE = 2 << 15
36TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00'
37LOG_SEPARATOR = '- ' * 80
38INTER_PROCESS_SYNC_SECS = 0.1
39INTER_PROCESS_SYNC_ATTEMPTS = 10
41IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.')
42HAS_WARNING = re.compile(r'[Ww]arning')
43HAS_ERROR = re.compile(r'[Ee]rror')
46def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str:
47 """Return the SHA512 hex digest of the data from file.
49 Examples:
51 >>> import pathlib, tempfile
52 >>> empty_sha512 = (
53 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce'
54 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e'
55 ... )
56 >>> with tempfile.NamedTemporaryFile() as handle:
57 ... empty_hash = hash_file(handle.name)
58 >>> assert empty_hash == empty_sha512
59 """
60 if hasher is None:
61 hasher = hashlib.sha512
62 the_hash = hasher()
63 with open(path, 'rb') as handle:
64 while chunk := handle.read(CHUNK_SIZE):
65 the_hash.update(chunk)
66 return the_hash.hexdigest()
69@no_type_check
70def log_subprocess_output(pipe, prefix: str):
71 for line in iter(pipe.readline, b''): # b'\n'-separated lines
72 cand = line.decode(encoding=ENCODING).rstrip()
73 if HAS_ERROR.search(cand): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 log.error(prefix + ': ' + cand)
75 continue
76 if HAS_WARNING.search(cand) and not any(
77 (
78 '"calc" is loaded -- this is not' in cand,
79 'Package microtype Warning: Unable to apply patch' in cand,
80 'Unknown document division name (startatroot)' in cand,
81 'Unknown slot number of character' in cand,
82 )
83 ):
84 log.warning(prefix + ': ' + cand)
85 continue
86 if IS_BORING.search(cand):
87 log.debug(prefix + ': ' + cand)
88 continue
89 log.info(prefix + ': ' + cand)
92@no_type_check
93def vcs_probe():
94 """Are we in front, on par, or behind with the upstream?"""
95 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control'
96 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control'
97 try:
98 repo = api.Repo('.', search_parent_directories=True)
99 status = api.Status(repo)
100 api.local_commits(repo, status)
101 api.local_staged(repo, status)
102 api.local_files(repo, status)
103 CONTEXT['source_hash'] = f'sha1:{status.commit}'
104 try:
105 repo_root_folder = repo.git.rev_parse(show_toplevel=True)
106 path = pathlib.Path(repo_root_folder)
107 anchor = path.parent.name
108 here = path.name
109 CONTEXT['source_hint'] = f'{anchor}/{here}'
110 yield f'Root ({repo_root_folder})'
111 except Exception: # noqa
112 yield 'WARNING - ignored exception when assessing repo root folder location'
113 for line in generate_report(status):
114 yield line.rstrip()
115 except Exception: # noqa
116 yield 'WARNING - we seem to not be within a git repository clone'
119def node_id() -> str:
120 """Generate the build node identifier.
122 Examples:
124 >>> nid = node_id()
125 >>> assert len(nid) == 36
126 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23]))
127 """
128 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node()))
131def report_taxonomy(target_path: pathlib.Path) -> None:
132 """Convenience function to report date, size, and checksums of the deliverable."""
133 taxonomy = Taxonomy(target_path, excludes='', key_function='md5')
134 for path in sorted(target_path.parent.rglob('*')):
135 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path)
136 log.info('- Writing render/pdf folder taxonomy to inventory.json ...')
137 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False)
139 stat = target_path.stat()
140 size_bytes = stat.st_size
141 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT)
142 sha612_hash = hash_file(target_path, hashlib.sha512)
143 sha256_hash = hash_file(target_path, hashlib.sha256)
144 sha1_hash = hash_file(target_path, hashlib.sha1)
145 md5_hash = hash_file(target_path, hashlib.md5)
146 log.info('- Ephemeral:')
147 log.info(f' + name: {target_path.name}')
148 log.info(f' + size: {size_bytes} bytes')
149 log.info(f' + date: {mod_time}')
150 log.info('- Characteristic:')
151 log.info(' + Checksums:')
152 log.info(f' sha512:{sha612_hash}')
153 log.info(f' sha256:{sha256_hash}')
154 log.info(f' sha1:{sha1_hash}')
155 log.info(f' md5:{md5_hash}')
156 log.info(' + Fonts:')
159@no_type_check
160def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
161 """Derive the unified diff between left and right lists of strings as generator of strings.
163 Examples:
165 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+'))
166 >>> lines
167 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d']
168 """
169 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label):
170 yield line.rstrip()
173@no_type_check
174def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
175 """Do the log bridging of the diff."""
176 log.info(LOG_SEPARATOR)
177 for line in unified_diff(left, right, left_label, right_label):
178 for fine in line.split('\n'):
179 log.info(fine)
180 log.info(LOG_SEPARATOR)
183@no_type_check
184def ensure_separate_log_lines(sourcer: Callable, *args: Union[list[object], None]):
185 """Wrapping idiom breaking up any strings containing newlines."""
186 log.info(LOG_SEPARATOR)
187 for line in sourcer(*args) if args else sourcer():
188 for fine in line.split('\n'):
189 log.info(fine)
190 log.info(LOG_SEPARATOR)
193@no_type_check
194def delegate(command: list[str], marker: str, do_shell: bool = False) -> int:
195 """Execute command in subprocess and follow requests."""
196 try:
197 process = subprocess.Popen(
198 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602
199 )
200 with process.stdout:
201 log_subprocess_output(process.stdout, marker)
202 code = process.wait()
203 if code < 0: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 log.error(f'{marker} process ({command}) was terminated by signal {-code}')
205 elif code > 0:
206 log.error(f'{marker} process ({command}) returned {code}')
207 else:
208 log.info(f'{marker} process succeeded')
209 except Exception as err:
210 log.error(f'failed executing tool with error: {err}')
211 code = 42
213 return code
216@no_type_check
217def report(on: ToolKey) -> int:
218 """Execute the tool specific version command."""
219 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {})
220 tool_version_call_text = str(tool_context.get('command', '')).strip()
221 tool_version_call = tool_version_call_text.split()
222 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip()
223 if not tool_version_call:
224 log.warning(f'cowardly avoiding undefined call for tool key ({on})')
225 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})')
226 return 42
228 log.info(LOG_SEPARATOR)
229 log.info(f'requesting tool version information from environment per ({tool_version_call})')
230 log.info(f'- {tool_reason_banner}')
231 code = delegate(tool_version_call, f'tool-version-of-{on}')
232 log.info(LOG_SEPARATOR)
234 return code
237@no_type_check
238def execute_filter(
239 the_filter: Callable,
240 head: str,
241 backup: str,
242 label: str,
243 text_lines: list[str],
244 lookup: Union[dict[str, str], None] = None,
245) -> list[str]:
246 """Chain filter calls by storing in and out lies in files and return the resulting lines."""
247 log.info(LOG_SEPARATOR)
248 log.info(head)
249 doc_before_caps_patch = backup
250 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle:
251 handle.write('\n'.join(text_lines))
252 patched_lines = the_filter(text_lines, lookup=lookup)
253 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle:
254 handle.write('\n'.join(patched_lines))
255 log.info(f'diff of the ({label}) filter result:')
256 log_unified_diff(text_lines, patched_lines)
258 return patched_lines
261@no_type_check
262def load_target(
263 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH
264) -> tuple[bool, dict[str, str]]:
265 """DRY."""
266 if not structure_path.is_file() or not structure_path.stat().st_size: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 log.error(f'render failed to find non-empty structure file at {structure_path}')
268 return False, {}
270 with open(structure_path, 'rt', encoding=ENCODING) as handle:
271 structure = yaml.safe_load(handle)
273 targets = sorted(structure.keys())
275 if not targets: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 log.error(f'structure at ({structure_path}) does not provide any targets')
277 return False, {}
279 if target_code not in targets: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 log.error(f'structure does not provide ({target_code})')
281 return False, {}
283 if len(targets) != 1: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})')
285 return True, {}
287 target = targets[0]
288 facets = sorted(list(facet.keys())[0] for facet in structure[target])
289 log.info(f'found single target ({target}) with facets ({facets})')
291 if facet_code not in facets:
292 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})')
293 return False, {}
295 aspect_map = {}
296 for data in structure[target]: 296 ↛ 300line 296 didn't jump to line 300 because the loop on line 296 didn't complete
297 if facet_code in data:
298 aspect_map = data[facet_code]
299 break
300 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map]
301 if missing_keys: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 log.error(
303 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}'
304 f' for target ({target_code}) and facet ({facet_code})'
305 )
306 log.error(f'- the found aspects: {sorted(aspect_map.keys())}')
307 log.error(f'- missing aspects: {sorted(missing_keys)}')
308 return False, {}
310 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED):
311 log.debug(
312 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}'
313 f' for target ({target_code}) and facet ({facet_code})'
314 )
315 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead')
317 return True, aspect_map
320@no_type_check
321def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]:
322 """Separation of concerns."""
323 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING))
324 blocks = doc['blocks']
325 mermaid_caption_map = {}
326 for b in blocks:
327 if b['t'] == 'CodeBlock' and b['c'][0]:
328 try:
329 is_mermaid = b['c'][0][1][0] == 'mermaid'
330 atts = b['c'][0][2]
331 except IndexError:
332 continue
334 if not is_mermaid:
335 continue
336 m_caption, m_filename, m_format, m_loc = '', '', '', ''
337 for k, v in atts:
338 if k == 'caption':
339 m_caption = v
340 elif k == 'filename':
341 m_filename = v
342 elif k == 'format':
343 m_format = v
344 elif k == 'loc':
345 m_loc = v
346 else:
347 pass
348 token = f'{m_loc}/{m_filename}.{m_format}' # noqa
349 if token in mermaid_caption_map: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 log.warning('Duplicate token, same caption?')
351 log.warning(f'- prior: {token} -> {m_caption}')
352 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}')
353 mermaid_caption_map[token] = m_caption
354 return mermaid_caption_map
357def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]:
358 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru].
360 Examples:
362 >>> lines = ['a', 'b', 'c', 'd']
363 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c'))
364 >>> filtered
365 ['a', 'd']
366 """
367 in_section = False
368 for line in text_lines:
369 if not in_section:
370 if from_cut in line:
371 in_section = True
372 continue
373 if in_section:
374 if thru_cut in line:
375 in_section = False
376 continue
377 yield line