Coverage for liitos/tools.py: 89.66%
241 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-05 17:22:35 +00:00
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-05 17:22:35 +00:00
1import datetime as dti
2import difflib
3import hashlib
4import json
5import pathlib
6import platform
7import re
8import subprocess # nosec B404
9import uuid
10from typing import Any, Callable, Generator, Union, no_type_check
12import yaml
14import foran.foran as api # type: ignore
15from foran.report import generate_report # type: ignore
16from taksonomia.taksonomia import Taxonomy # type: ignore
18from liitos import (
19 CONTEXT,
20 ENCODING,
21 KEYS_REQUIRED,
22 LATEX_PAYLOAD_NAME,
23 TOOL_VERSION_COMMAND_MAP,
24 ToolKey,
25 log,
26)
28PathLike = Union[str, pathlib.Path]
30DOC_BASE = pathlib.Path('..', '..')
31STRUCTURE_PATH = DOC_BASE / 'structure.yml'
32IMAGES_FOLDER = 'images/'
33DIAGRAMS_FOLDER = 'diagrams/'
34PATCH_SPEC_NAME = 'patch.yml'
35CHUNK_SIZE = 2 << 15
36TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00'
37LOG_SEPARATOR = '- ' * 80
38INTER_PROCESS_SYNC_SECS = 0.1
39INTER_PROCESS_SYNC_ATTEMPTS = 10
41IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.')
44def hash_file(path: PathLike, hasher: Union[Callable[..., Any], None] = None) -> str:
45 """Return the SHA512 hex digest of the data from file.
47 Examples:
49 >>> import pathlib, tempfile
50 >>> empty_sha512 = (
51 ... 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce'
52 ... '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e'
53 ... )
54 >>> with tempfile.NamedTemporaryFile() as handle:
55 ... empty_hash = hash_file(handle.name)
56 >>> assert empty_hash == empty_sha512
57 """
58 if hasher is None:
59 hasher = hashlib.sha512
60 the_hash = hasher()
61 with open(path, 'rb') as handle:
62 while chunk := handle.read(CHUNK_SIZE):
63 the_hash.update(chunk)
64 return the_hash.hexdigest()
67@no_type_check
68def log_subprocess_output(pipe, prefix: str):
69 for line in iter(pipe.readline, b''): # b'\n'-separated lines
70 cand = line.decode(encoding=ENCODING).rstrip()
71 if IS_BORING.search(cand):
72 log.debug(cand)
73 continue
74 if cand.strip().strip('[])yex'):
75 if any(
76 [
77 'microtype' in cand,
78 'xassoccnt' in cand,
79 'texlive/2022/texmf-dist/tex/' in cand,
80 cand == 'erns.sty)',
81 cand == '(see the transcript file for additional information)',
82 cand.startswith(r'Overfull \hbox ')
83 and cand.endswith(r'pt too wide) has occurred while \output is active'),
84 ]
85 ):
86 log.debug(f'{prefix}: %s', cand)
87 else:
88 log.info(f'{prefix}: %s', cand)
91@no_type_check
92def vcs_probe():
93 """Are we in front, on par, or behind with the upstream?"""
94 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control'
95 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control'
96 try:
97 repo = api.Repo('.', search_parent_directories=True)
98 status = api.Status(repo)
99 api.local_commits(repo, status)
100 api.local_staged(repo, status)
101 api.local_files(repo, status)
102 CONTEXT['source_hash'] = f'sha1:{status.commit}'
103 try:
104 repo_root_folder = repo.git.rev_parse(show_toplevel=True)
105 path = pathlib.Path(repo_root_folder)
106 anchor = path.parent.name
107 here = path.name
108 CONTEXT['source_hint'] = f'{anchor}/{here}'
109 yield f'Root ({repo_root_folder})'
110 except Exception: # noqa
111 yield 'WARNING - ignored exception when assessing repo root folder location'
112 for line in generate_report(status):
113 yield line.rstrip()
114 except Exception: # noqa
115 yield 'WARNING - we seem to not be within a git repository clone'
118def node_id() -> str:
119 """Generate the build node identifier.
121 Examples:
123 >>> nid = node_id()
124 >>> assert len(nid) == 36
125 >>> assert all(c == '-' for c in (nid[8], nid[13], nid[18], nid[23]))
126 """
127 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node()))
130def report_taxonomy(target_path: pathlib.Path) -> None:
131 """Convenience function to report date, size, and checksums of the deliverable."""
132 taxonomy = Taxonomy(target_path, excludes='', key_function='md5')
133 for path in sorted(target_path.parent.rglob('*')):
134 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path)
135 log.info('- Writing render/pdf folder taxonomy to inventory.json ...')
136 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False)
138 stat = target_path.stat()
139 size_bytes = stat.st_size
140 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT)
141 sha612_hash = hash_file(target_path, hashlib.sha512)
142 sha256_hash = hash_file(target_path, hashlib.sha256)
143 sha1_hash = hash_file(target_path, hashlib.sha1)
144 md5_hash = hash_file(target_path, hashlib.md5)
145 log.info('- Ephemeral:')
146 log.info(f' + name: {target_path.name}')
147 log.info(f' + size: {size_bytes} bytes')
148 log.info(f' + date: {mod_time}')
149 log.info('- Characteristic:')
150 log.info(' + Checksums:')
151 log.info(f' sha512:{sha612_hash}')
152 log.info(f' sha256:{sha256_hash}')
153 log.info(f' sha1:{sha1_hash}')
154 log.info(f' md5:{md5_hash}')
155 log.info(' + Fonts:')
158@no_type_check
159def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
160 """Derive the unified diff between left and right lists of strings as generator of strings.
162 Examples:
164 >>> lines = list(unified_diff(['a', 'b'], ['aa', 'b', 'd'], '-', '+'))
165 >>> lines
166 ['--- -', '+++ +', '@@ -1,2 +1,3 @@', '-a', '+aa', ' b', '+d']
167 """
168 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label):
169 yield line.rstrip()
172@no_type_check
173def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
174 """Do the log bridging of the diff."""
175 log.info(LOG_SEPARATOR)
176 for line in unified_diff(left, right, left_label, right_label):
177 for fine in line.split('\n'):
178 log.info(fine)
179 log.info(LOG_SEPARATOR)
182@no_type_check
183def ensure_separate_log_lines(sourcer: Callable, *args: Union[list[object], None]):
184 """Wrapping idiom breaking up any strings containing newlines."""
185 log.info(LOG_SEPARATOR)
186 for line in sourcer(*args) if args else sourcer():
187 for fine in line.split('\n'):
188 log.info(fine)
189 log.info(LOG_SEPARATOR)
192@no_type_check
193def delegate(command: list[str], marker: str, do_shell: bool = False) -> int:
194 """Execute command in subprocess and follow requests."""
195 try:
196 process = subprocess.Popen(
197 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602
198 )
199 with process.stdout:
200 log_subprocess_output(process.stdout, marker)
201 code = process.wait()
202 if code < 0: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 log.error(f'{marker} process ({command}) was terminated by signal {-code}')
204 elif code > 0:
205 log.error(f'{marker} process ({command}) returned {code}')
206 else:
207 log.info(f'{marker} process succeeded')
208 except Exception as err:
209 log.error(f'failed executing tool with error: {err}')
210 code = 42
212 return code
215@no_type_check
216def report(on: ToolKey) -> int:
217 """Execute the tool specific version command."""
218 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {})
219 tool_version_call_text = str(tool_context.get('command', '')).strip()
220 tool_version_call = tool_version_call_text.split()
221 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip()
222 if not tool_version_call:
223 log.warning(f'cowardly avoiding undefined call for tool key ({on})')
224 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})')
225 return 42
227 log.info(LOG_SEPARATOR)
228 log.info(f'requesting tool version information from environment per ({tool_version_call})')
229 log.info(f'- {tool_reason_banner}')
230 code = delegate(tool_version_call, f'tool-version-of-{on}')
231 log.info(LOG_SEPARATOR)
233 return code
236@no_type_check
237def execute_filter(
238 the_filter: Callable,
239 head: str,
240 backup: str,
241 label: str,
242 text_lines: list[str],
243 lookup: Union[dict[str, str], None] = None,
244) -> list[str]:
245 """Chain filter calls by storing in and out lies in files and return the resulting lines."""
246 log.info(LOG_SEPARATOR)
247 log.info(head)
248 doc_before_caps_patch = backup
249 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle:
250 handle.write('\n'.join(text_lines))
251 patched_lines = the_filter(text_lines, lookup=lookup)
252 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle:
253 handle.write('\n'.join(patched_lines))
254 log.info(f'diff of the ({label}) filter result:')
255 log_unified_diff(text_lines, patched_lines)
257 return patched_lines
260@no_type_check
261def load_target(
262 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH
263) -> tuple[bool, dict[str, str]]:
264 """DRY."""
265 if not structure_path.is_file() or not structure_path.stat().st_size: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 log.error(f'render failed to find non-empty structure file at {structure_path}')
267 return False, {}
269 with open(structure_path, 'rt', encoding=ENCODING) as handle:
270 structure = yaml.safe_load(handle)
272 targets = sorted(structure.keys())
274 if not targets: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 log.error(f'structure at ({structure_path}) does not provide any targets')
276 return False, {}
278 if target_code not in targets: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 log.error(f'structure does not provide ({target_code})')
280 return False, {}
282 if len(targets) != 1: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})')
284 return True, {}
286 target = targets[0]
287 facets = sorted(list(facet.keys())[0] for facet in structure[target])
288 log.info(f'found single target ({target}) with facets ({facets})')
290 if facet_code not in facets:
291 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})')
292 return False, {}
294 aspect_map = {}
295 for data in structure[target]: 295 ↛ 299line 295 didn't jump to line 299 because the loop on line 295 didn't complete
296 if facet_code in data:
297 aspect_map = data[facet_code]
298 break
299 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map]
300 if missing_keys: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 log.error(
302 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}'
303 f' for target ({target_code}) and facet ({facet_code})'
304 )
305 log.error(f'- the found aspects: {sorted(aspect_map.keys())}')
306 log.error(f'- missing aspects: {sorted(missing_keys)}')
307 return False, {}
309 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED):
310 log.debug(
311 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}'
312 f' for target ({target_code}) and facet ({facet_code})'
313 )
314 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead')
316 return True, aspect_map
319@no_type_check
320def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]:
321 """Separation of concerns."""
322 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING))
323 blocks = doc['blocks']
324 mermaid_caption_map = {}
325 for b in blocks:
326 if b['t'] == 'CodeBlock' and b['c'][0]:
327 try:
328 is_mermaid = b['c'][0][1][0] == 'mermaid'
329 atts = b['c'][0][2]
330 except IndexError:
331 continue
333 if not is_mermaid:
334 continue
335 m_caption, m_filename, m_format, m_loc = '', '', '', ''
336 for k, v in atts:
337 if k == 'caption':
338 m_caption = v
339 elif k == 'filename':
340 m_filename = v
341 elif k == 'format':
342 m_format = v
343 elif k == 'loc':
344 m_loc = v
345 else:
346 pass
347 token = f'{m_loc}/{m_filename}.{m_format}' # noqa
348 if token in mermaid_caption_map: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true
349 log.warning('Duplicate token, same caption?')
350 log.warning(f'- prior: {token} -> {m_caption}')
351 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}')
352 mermaid_caption_map[token] = m_caption
353 return mermaid_caption_map
356def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]:
357 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru].
359 Examples:
361 >>> lines = ['a', 'b', 'c', 'd']
362 >>> filtered = list(remove_target_region_gen(lines, 'b', 'c'))
363 >>> filtered
364 ['a', 'd']
365 """
366 in_section = False
367 for line in text_lines:
368 if not in_section:
369 if from_cut in line:
370 in_section = True
371 continue
372 if in_section:
373 if thru_cut in line:
374 in_section = False
375 continue
376 yield line