Coverage for liitos/tools.py: 87.77%
241 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-25 15:36:16 +00:00
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-25 15:36:16 +00:00
1import datetime as dti
2import difflib
3import hashlib
4import json
5import pathlib
6import platform
7import re
8import subprocess # nosec B404
9import uuid
10from typing import Any, Callable, Generator, Union, no_type_check
12import yaml
14import foran.foran as api # type: ignore
15from foran.report import generate_report # type: ignore
16from taksonomia.taksonomia import Taxonomy # type: ignore
18from liitos import (
19 CONTEXT,
20 ENCODING,
21 KEYS_REQUIRED,
22 LATEX_PAYLOAD_NAME,
23 TOOL_VERSION_COMMAND_MAP,
24 ToolKey,
25 log,
26)
28PathLike = Union[str, pathlib.Path]
30DOC_BASE = pathlib.Path('..', '..')
31STRUCTURE_PATH = DOC_BASE / 'structure.yml'
32IMAGES_FOLDER = 'images/'
33DIAGRAMS_FOLDER = 'diagrams/'
34PATCH_SPEC_NAME = 'patch.yml'
35CHUNK_SIZE = 2 << 15
36TS_FORMAT = '%Y-%m-%d %H:%M:%S.%f +00:00'
37LOG_SEPARATOR = '- ' * 80
38INTER_PROCESS_SYNC_SECS = 0.1
39INTER_PROCESS_SYNC_ATTEMPTS = 10
41IS_BORING = re.compile(r'\(.*texmf-dist/tex.*\.')
44def hash_file(path: pathlib.Path, hasher: Union[Callable[..., Any], None] = None) -> str:
45 """Return the SHA512 hex digest of the data from file."""
46 if hasher is None:
47 hasher = hashlib.sha512
48 the_hash = hasher()
49 with open(path, 'rb') as handle:
50 while chunk := handle.read(CHUNK_SIZE):
51 the_hash.update(chunk)
52 return the_hash.hexdigest()
55@no_type_check
56def log_subprocess_output(pipe, prefix: str):
57 for line in iter(pipe.readline, b''): # b'\n'-separated lines
58 cand = line.decode(encoding=ENCODING).rstrip()
59 if IS_BORING.search(cand):
60 log.debug(cand)
61 continue
62 if cand.strip().strip('[])yex'):
63 if any(
64 [
65 'microtype' in cand,
66 'xassoccnt' in cand,
67 'texlive/2022/texmf-dist/tex/' in cand,
68 cand == 'erns.sty)',
69 cand == '(see the transcript file for additional information)',
70 cand.startswith(r'Overfull \hbox ')
71 and cand.endswith(r'pt too wide) has occurred while \output is active'),
72 ]
73 ):
74 log.debug(f'{prefix}: %s', cand)
75 else:
76 log.info(f'{prefix}: %s', cand)
79@no_type_check
80def vcs_probe():
81 """Are we in front, on par, or behind with the upstream?"""
82 CONTEXT['source_hash'] = 'info:plain:built-outside-of-version-control'
83 CONTEXT['source_hint'] = 'info:plain:built-outside-of-version-control'
84 try:
85 repo = api.Repo('.', search_parent_directories=True)
86 status = api.Status(repo)
87 api.local_commits(repo, status)
88 api.local_staged(repo, status)
89 api.local_files(repo, status)
90 CONTEXT['source_hash'] = f'sha1:{status.commit}'
91 try:
92 repo_root_folder = repo.git.rev_parse(show_toplevel=True)
93 path = pathlib.Path(repo_root_folder)
94 anchor = path.parent.name
95 here = path.name
96 CONTEXT['source_hint'] = f'{anchor}/{here}'
97 yield f'Root ({repo_root_folder})'
98 except Exception: # noqa
99 yield 'WARNING - ignored exception when assessing repo root folder location'
100 for line in generate_report(status):
101 yield line.rstrip()
102 except Exception: # noqa
103 yield 'WARNING - we seem to not be within a git repository clone'
106def node_id() -> str:
107 """Generate the build node identifier."""
108 return str(uuid.uuid3(uuid.NAMESPACE_DNS, platform.node()))
111def report_taxonomy(target_path: pathlib.Path) -> None:
112 """Convenience function to report date, size, and checksums of the deliverable."""
113 taxonomy = Taxonomy(target_path, excludes='', key_function='md5')
114 for path in sorted(target_path.parent.rglob('*')):
115 taxonomy.add_branch(path) if path.is_dir() else taxonomy.add_leaf(path)
116 log.info('- Writing render/pdf folder taxonomy to inventory.json ...')
117 taxonomy.dump(sink='inventory', format_type='json', base64_encode=False)
119 stat = target_path.stat()
120 size_bytes = stat.st_size
121 mod_time = dti.datetime.fromtimestamp(stat.st_ctime, tz=dti.timezone.utc).strftime(TS_FORMAT)
122 sha612_hash = hash_file(target_path, hashlib.sha512)
123 sha256_hash = hash_file(target_path, hashlib.sha256)
124 sha1_hash = hash_file(target_path, hashlib.sha1)
125 md5_hash = hash_file(target_path, hashlib.md5)
126 log.info('- Ephemeral:')
127 log.info(f' + name: {target_path.name}')
128 log.info(f' + size: {size_bytes} bytes')
129 log.info(f' + date: {mod_time}')
130 log.info('- Characteristic:')
131 log.info(' + Checksums:')
132 log.info(f' sha512:{sha612_hash}')
133 log.info(f' sha256:{sha256_hash}')
134 log.info(f' sha1:{sha1_hash}')
135 log.info(f' md5:{md5_hash}')
136 log.info(' + Fonts:')
139@no_type_check
140def unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
141 """Derive the unified diff between left and right lists of strings as generator of strings."""
142 for line in difflib.unified_diff(left, right, fromfile=left_label, tofile=right_label):
143 yield line.rstrip()
146@no_type_check
147def log_unified_diff(left: list[str], right: list[str], left_label: str = 'before', right_label: str = 'after'):
148 """Do the log bridging of the diff."""
149 log.info(LOG_SEPARATOR)
150 for line in unified_diff(left, right, left_label, right_label):
151 for fine in line.split('\n'):
152 log.info(fine)
153 log.info(LOG_SEPARATOR)
156@no_type_check
157def ensure_separate_log_lines(sourcer: Callable, *args: Union[list[object], None]):
158 """Wrapping idiom breaking up any strings containing newlines."""
159 log.info(LOG_SEPARATOR)
160 for line in sourcer(*args) if args else sourcer():
161 for fine in line.split('\n'):
162 log.info(fine)
163 log.info(LOG_SEPARATOR)
166@no_type_check
167def delegate(command: list[str], marker: str, do_shell: bool = False) -> int:
168 """Execute command in subprocess and follow requests."""
169 try:
170 process = subprocess.Popen(
171 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=do_shell # nosec B602
172 )
173 with process.stdout:
174 log_subprocess_output(process.stdout, marker)
175 code = process.wait()
176 if code < 0: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 log.error(f'{marker} process ({command}) was terminated by signal {-code}')
178 elif code > 0: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 log.error(f'{marker} process ({command}) returned {code}')
180 else:
181 log.info(f'{marker} process succeeded')
182 except Exception as err:
183 log.error(f'failed executing tool with error: {err}')
184 code = 42
186 return code
189@no_type_check
190def report(on: ToolKey) -> int:
191 """Execute the tool specific version command."""
192 tool_context = TOOL_VERSION_COMMAND_MAP.get(on, {})
193 tool_version_call_text = str(tool_context.get('command', '')).strip()
194 tool_version_call = tool_version_call_text.split()
195 tool_reason_banner = str(tool_context.get('banner', 'No reason for the tool known')).strip()
196 if not tool_version_call:
197 log.warning(f'cowardly avoiding undefined call for tool key ({on})')
198 log.info(f'- known tool keys are: ({", ".join(sorted(TOOL_VERSION_COMMAND_MAP))})')
199 return 42
201 log.info(LOG_SEPARATOR)
202 log.info(f'requesting tool version information from environment per ({tool_version_call})')
203 log.info(f'- {tool_reason_banner}')
204 code = delegate(tool_version_call, f'tool-version-of-{on}')
205 log.info(LOG_SEPARATOR)
207 return code
210@no_type_check
211def execute_filter(
212 the_filter: Callable,
213 head: str,
214 backup: str,
215 label: str,
216 text_lines: list[str],
217 lookup: Union[dict[str, str], None] = None,
218) -> list[str]:
219 """Chain filter calls by storing in and out lies in files and return the resulting lines."""
220 log.info(LOG_SEPARATOR)
221 log.info(head)
222 doc_before_caps_patch = backup
223 with open(doc_before_caps_patch, 'wt', encoding=ENCODING) as handle:
224 handle.write('\n'.join(text_lines))
225 patched_lines = the_filter(text_lines, lookup=lookup)
226 with open(LATEX_PAYLOAD_NAME, 'wt', encoding=ENCODING) as handle:
227 handle.write('\n'.join(patched_lines))
228 log.info(f'diff of the ({label}) filter result:')
229 log_unified_diff(text_lines, patched_lines)
231 return patched_lines
234@no_type_check
235def load_target(
236 target_code: str, facet_code: str, structure_path: PathLike = STRUCTURE_PATH
237) -> tuple[bool, dict[str, str]]:
238 """DRY."""
239 if not structure_path.is_file() or not structure_path.stat().st_size: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 log.error(f'render failed to find non-empty structure file at {structure_path}')
241 return False, {}
243 with open(structure_path, 'rt', encoding=ENCODING) as handle:
244 structure = yaml.safe_load(handle)
246 targets = sorted(structure.keys())
248 if not targets: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 log.error(f'structure at ({structure_path}) does not provide any targets')
250 return False, {}
252 if target_code not in targets: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 log.error(f'structure does not provide ({target_code})')
254 return False, {}
256 if len(targets) != 1: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 log.warning(f'unexpected count of targets ({len(targets)}) from ({targets})')
258 return True, {}
260 target = targets[0]
261 facets = sorted(list(facet.keys())[0] for facet in structure[target])
262 log.info(f'found single target ({target}) with facets ({facets})')
264 if facet_code not in facets: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 log.error(f'structure does not provide facet ({facet_code}) for target ({target_code})')
266 return False, {}
268 aspect_map = {}
269 for data in structure[target]: 269 ↛ 273line 269 didn't jump to line 273 because the loop on line 269 didn't complete
270 if facet_code in data: 270 ↛ 269line 270 didn't jump to line 269 because the condition on line 270 was always true
271 aspect_map = data[facet_code]
272 break
273 missing_keys = [key for key in KEYS_REQUIRED if key not in aspect_map]
274 if missing_keys: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 log.error(
276 f'structure does not provide all expected aspects {sorted(KEYS_REQUIRED)}'
277 f' for target ({target_code}) and facet ({facet_code})'
278 )
279 log.error(f'- the found aspects: {sorted(aspect_map.keys())}')
280 log.error(f'- missing aspects: {sorted(missing_keys)}')
281 return False, {}
283 if sorted(aspect_map.keys()) != sorted(KEYS_REQUIRED):
284 log.debug(
285 f'structure does not strictly provide the expected aspects {sorted(KEYS_REQUIRED)}'
286 f' for target ({target_code}) and facet ({facet_code})'
287 )
288 log.debug(f'- found the following aspects instead: {sorted(aspect_map.keys())} instead')
290 return True, aspect_map
293@no_type_check
294def mermaid_captions_from_json_ast(json_ast_path: Union[str, pathlib.Path]) -> dict[str, str]:
295 """Separation of concerns."""
296 doc = json.load(open(json_ast_path, 'rt', encoding=ENCODING))
297 blocks = doc['blocks']
298 mermaid_caption_map = {}
299 for b in blocks:
300 if b['t'] == 'CodeBlock' and b['c'][0]:
301 try:
302 is_mermaid = b['c'][0][1][0] == 'mermaid'
303 atts = b['c'][0][2]
304 except IndexError:
305 continue
307 if not is_mermaid:
308 continue
309 m_caption, m_filename, m_format, m_loc = '', '', '', ''
310 for k, v in atts:
311 if k == 'caption':
312 m_caption = v
313 elif k == 'filename':
314 m_filename = v
315 elif k == 'format':
316 m_format = v
317 elif k == 'loc':
318 m_loc = v
319 else:
320 pass
321 token = f'{m_loc}/{m_filename}.{m_format}' # noqa
322 if token in mermaid_caption_map: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 log.warning('Duplicate token, same caption?')
324 log.warning(f'- prior: {token} -> {m_caption}')
325 log.warning(f'- current: {token} -> {mermaid_caption_map[token]}')
326 mermaid_caption_map[token] = m_caption
327 return mermaid_caption_map
330def remove_target_region_gen(text_lines: list[str], from_cut: str, thru_cut: str) -> Generator[str, None, None]:
331 """Return generator that yields only the lines beyond the cut mark region skipping lines in [from, thru]."""
332 in_section = False
333 for line in text_lines:
334 if not in_section:
335 if from_cut in line:
336 in_section = True
337 continue
338 if in_section:
339 if thru_cut in line:
340 in_section = False
341 continue
342 yield line