Coverage for taksonomia/taksonomia.py: 87.54%
213 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 23:02:14 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 23:02:14 +00:00
1"""Taxonomy (Finnish: taksonomia) of a folder tree, guided by conventions. (implementation)."""
3import argparse
4import base64
5import datetime as dti
6import hashlib
7import lzma
8import os
9import pathlib
10import sys
11from typing import no_type_check
13import msgspec
14import yaml
16import taksonomia.anglify as anglify
17from taksonomia import (
18 APP_ALIAS,
19 COMMA,
20 ENCODING,
21 KNOWN_FORMATS,
22 KNOWN_KEY_FUNCTIONS,
23 TS_FORMAT,
24 VERSION_INFO,
25 log,
26 parse_csl,
27)
28from taksonomia.machine import Machine
30CHUNK_SIZE = 2 << 15
31DOCTYPE = '<?xml version="1.0" encoding="UTF-8"?>'
32EMPTY_SHA256 = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
33EMPTY_SHA512 = (
34 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce'
35 '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e'
36)
37EMPTY = {
38 'sha512': EMPTY_SHA512,
39 'sha256': EMPTY_SHA256,
40}
41ENCODING_ERRORS_POLICY = 'ignore'
42HASH_ALGO_PREFS = tuple(EMPTY)
43TAX = 'taxonomy'
44XMLNS = 'https://pypi.org/project/taksonomia/api/v1'
45XZ_EXT = '.xz'
46XZ_FILTERS = [{'id': lzma.FILTER_LZMA2, 'preset': 7 | lzma.PRESET_EXTREME}]
47LZMA_KWARGS = {'check': lzma.CHECK_SHA256, 'filters': XZ_FILTERS}
50def elf_hash(some_bytes: bytes) -> int:
51 """The ELF hash (Extremely Lossy Function - also used in ELF format).
52 unsigned long ElfHash(const unsigned char *s) {
53 unsigned long h = 0, high;
54 while (*s) {
55 h = (h << 4) + *s++;
56 if (high = h & 0xF0000000)
57 h ^= high >> 24;
58 h &= ~high;
59 }
60 return h;
61 }
62 """
63 h = 0
64 for s in some_bytes:
65 h = (h << 4) + s
66 high = h & 0xF0000000
67 if high:
68 h ^= high >> 24
69 h &= ~high
70 return h
73@no_type_check
74class Taxonomy:
75 """Collector of topological and size information on files in a tree."""
77 def __init__(self, root: pathlib.Path, excludes: str, key_function: str = 'elf') -> None:
78 """Construct a collector instance for root."""
79 self.root = root
80 self.excludes = sorted(part.strip() for part in excludes.split(COMMA) if part.strip())
81 self.key_function = key_function.lower()
82 if self.key_function not in KNOWN_KEY_FUNCTIONS:
83 raise ValueError(f'key function {key_function} not in {KNOWN_KEY_FUNCTIONS}')
85 self.perspective = str(pathlib.Path.cwd())
86 self.closed = False
87 self.hasher = {
88 'sha512': hashlib.sha512,
89 'sha256': hashlib.sha256,
90 }
91 self.pid = os.getpid()
92 self.machine = Machine(str(self.root), self.pid)
93 self.start_time = dti.datetime.now(tz=dti.timezone.utc)
95 self.tree = {
96 TAX: {
97 'hash_algo_prefs': list(HASH_ALGO_PREFS),
98 'key_function': self.key_function,
99 'generator': {
100 'name': APP_ALIAS,
101 'version_info': list(VERSION_INFO),
102 'source': f'https://pypi.org/project/taksonomia/{".".join(VERSION_INFO[:3])}/',
103 'sbom': 'https://codes.dilettant.life/docs/taksonomia/third-party/',
104 },
105 'context': {
106 'start_ts': self.start_time.strftime(TS_FORMAT),
107 'end_ts': None,
108 'duration_secs': 0,
109 **self.machine.context(),
110 'pwd': self.perspective,
111 'tree_root': str(self.root),
112 'excludes': self.excludes,
113 'machine_perf': {
114 'pre': self.machine.perf(),
115 'post': None,
116 },
117 },
118 'summary': {
119 'hash_hexdigest': {**{algo: EMPTY[algo] for algo in HASH_ALGO_PREFS}},
120 'count_branches': 0,
121 'count_leaves': 0,
122 'size_bytes': 0,
123 },
124 'branches': {},
125 'leaves': {},
126 }
127 }
128 self.shadow = {**{algo: self.hasher[algo]() for algo in HASH_ALGO_PREFS}, 'branches': {}}
130 def ignore(self, path: pathlib.Path) -> bool:
131 """Dry place for the filter hook (excludes)."""
132 text = str(path)
133 return bool(self.excludes) and any(exclude in text for exclude in self.excludes) 133 ↛ exitline 133 didn't finish the generator expression on line 133
135 def key(self, path_str: str) -> str:
136 """Hashing function for the path keys."""
137 if self.key_function == 'elf': 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true
138 return str(elf_hash(path_str.encode(ENCODING)))
139 elif self.key_function == 'md5':
140 return hashlib.md5(path_str.encode(ENCODING)).hexdigest() # nosec B324
141 return hashlib.blake2b(path_str.encode(ENCODING)).hexdigest()
143 def add_branch(self, path: pathlib.Path) -> None:
144 """Add a folder (sub tree) entry."""
145 if self.ignore(path):
146 return
148 st = path.stat()
149 branch = str(path)
150 self.tree[TAX]['branches'][self.key(branch)] = { # type: ignore
151 'path': branch,
152 'hash_hexdigest': {**{algo: EMPTY[algo] for algo in HASH_ALGO_PREFS}},
153 'summary': {
154 'count_branches': 1,
155 'count_leaves': 0,
156 'size_bytes': 0,
157 },
158 'mod_time': dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT),
159 }
160 self.shadow['branches'][self.key(branch)] = { # type: ignore
161 **{algo: self.hasher[algo]() for algo in HASH_ALGO_PREFS}
162 }
163 self.tree[TAX]['summary']['count_branches'] += 1 # type: ignore
164 for parent in path.parents:
165 branch_key = self.key(str(parent))
166 if branch_key in self.tree[TAX]['branches']:
167 self.tree[TAX]['branches'][branch_key]['summary']['count_branches'] += 1 # type: ignore
169 def hash_file(self, path: pathlib.Path, algo: str = 'sha512') -> str:
170 """Return the SHA512 hex digest of the data from file."""
171 if algo not in self.hasher:
172 raise KeyError(f'Unsupported hash algorithm requested - {algo} is not in {HASH_ALGO_PREFS}')
174 hash = self.hasher[algo]()
175 with open(path, 'rb') as handle:
176 while chunk := handle.read(CHUNK_SIZE):
177 hash.update(chunk)
178 return hash.hexdigest()
180 def add_leaf(self, path: pathlib.Path) -> None:
181 """Add a folder (sub tree) entry."""
182 if self.ignore(path):
183 return
185 st = path.stat()
186 size_bytes = st.st_size
187 mod_time = dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT)
188 leaf = str(path)
189 self.tree[TAX]['leaves'][self.key(leaf)] = { # type: ignore
190 'path': leaf,
191 'branch': self.key(str(path.parent)),
192 'hash_hexdigest': {algo: self.hash_file(path, algo) for algo in HASH_ALGO_PREFS},
193 'size_bytes': size_bytes,
194 'mod_time': mod_time,
195 }
197 hexdig = 'hash_hexdigest'
198 for algo in HASH_ALGO_PREFS:
199 self.shadow[algo].update( # type: ignore
200 self.tree[TAX]['leaves'][self.key(leaf)][hexdig][algo].encode(ENCODING) # type: ignore
201 )
202 self.tree[TAX]['summary'][hexdig][algo] = self.shadow[algo].hexdigest() # type: ignore
203 self.tree[TAX]['summary']['size_bytes'] += size_bytes # type: ignore
204 self.tree[TAX]['summary']['count_leaves'] += 1 # type: ignore
205 for parent in path.parents:
206 bk = self.key(str(parent))
207 if bk in self.tree[TAX]['branches']:
208 self.tree[TAX]['branches'][bk]['summary']['count_leaves'] += 1 # type: ignore
209 self.tree[TAX]['branches'][bk]['summary']['size_bytes'] += size_bytes # type: ignore
210 shadow_sum = self.shadow['branches'][bk] # type: ignore
211 for algo in HASH_ALGO_PREFS:
212 shadow_sum[algo].update(
213 self.tree[TAX]['leaves'][self.key(leaf)][hexdig][algo].encode(ENCODING) # type: ignore
214 )
215 self.tree[TAX]['branches'][bk][hexdig][algo] = shadow_sum[algo].hexdigest() # type: ignore
217 def close(self) -> None:
218 """Create the post visitation machine context perf entry (if needed))."""
219 if not self.closed:
220 self.tree[TAX]['context']['machine_perf']['post'] = self.machine.perf() # type: ignore
221 end_time = dti.datetime.now(tz=dti.timezone.utc)
222 self.tree[TAX]['context']['end_ts'] = end_time.strftime(TS_FORMAT) # type: ignore
223 self.tree[TAX]['context']['duration_secs'] = (end_time - self.start_time).total_seconds() # type: ignore
224 self.closed = True
226 @no_type_check
227 def report(self):
228 """Create the post visitation machine context perf entry (if needed) and report the taxonomy."""
229 self.close()
230 return self.tree
232 def __repr__(self) -> str:
233 """Express yourself."""
234 return msgspec.json.format(msgspec.json.encode(self.tree)).decode()
236 @no_type_check
237 def json_to(self, sink: object, base64_encode: bool = False, xz_compress: bool = False) -> None:
238 """Close the taxonomy collection and write tree in json format to sink."""
239 self.close()
240 if sink is sys.stdout:
241 if xz_compress:
242 log.warning('ignoring --xz-compress for now as json output goes to std out')
243 if base64_encode:
244 print(msgspec.json.encode(self.tree))
245 return
246 print(self.__repr__())
247 return
249 if xz_compress: 249 ↛ 254line 249 didn't jump to line 254, because the condition on line 249 was never false
250 with lzma.open(pathlib.Path(f'{sink}.json.xz'), 'wb', **LZMA_KWARGS) as handle:
251 handle.write(msgspec.json.encode(self.tree))
252 return
254 if base64_encode:
255 with open(pathlib.Path(f'{sink}.json.b64'), 'wb') as handle:
256 handle.write(base64.b64encode(msgspec.json.encode(self.tree)))
257 else:
258 with open(pathlib.Path(f'{sink}.json'), 'wb') as handle:
259 handle.write(msgspec.json.encode(self.tree))
261 @no_type_check
262 def xml_to(self, sink: object, base64_encode: bool = False, xz_compress: bool = False) -> None:
263 """Close the taxonomy collection and write tree in xml format to sink."""
264 self.close()
265 xml_str = anglify.as_xml(self.tree)
266 if sink is sys.stdout:
267 if xz_compress: 267 ↛ 268line 267 didn't jump to line 268, because the condition on line 267 was never true
268 log.warning('ignoring --xz-compress for now as xml output goes to std out')
269 if base64_encode:
270 print(str(base64.b64encode(xml_str.encode(encoding=ENCODING)).decode(encoding=ENCODING)))
271 return
272 print(xml_str)
273 return
275 if xz_compress:
276 with lzma.open(pathlib.Path(f'{sink}.xml.xz'), 'w', **LZMA_KWARGS) as handle:
277 handle.write(xml_str.encode(encoding=ENCODING, errors=ENCODING_ERRORS_POLICY))
278 return
280 if base64_encode:
281 with open(pathlib.Path(f'{sink}.xml.b64'), 'wt', encoding=ENCODING) as handle:
282 handle.write(base64.b64encode(xml_str.encode(encoding=ENCODING)).decode(encoding=ENCODING))
283 else:
284 with open(pathlib.Path(f'{sink}.xml'), 'wt', encoding=ENCODING) as handle:
285 handle.write(xml_str)
287 @no_type_check
288 def yaml_to(self, sink: object, base64_encode: bool = False, xz_compress: bool = False) -> None:
289 """Close the taxonomy collection and write tree in yaml format to sink."""
290 self.close()
291 if sink is sys.stdout:
292 if xz_compress:
293 log.warning('ignoring --xz-compress for now as yaml output goes to std out')
294 if base64_encode:
295 print(str(base64.b64encode(yaml.dump(self.tree).encode(encoding=ENCODING)).decode(encoding=ENCODING)))
296 return
297 print(yaml.dump(self.tree))
298 return
300 if xz_compress: 300 ↛ 305line 300 didn't jump to line 305, because the condition on line 300 was never false
301 with lzma.open(pathlib.Path(f'{sink}.yml.xz'), 'w', **LZMA_KWARGS) as handle:
302 handle.write(yaml.dump(self.tree).encode(encoding=ENCODING, errors=ENCODING_ERRORS_POLICY))
303 return
305 if base64_encode:
306 with open(pathlib.Path(f'{sink}.yml.b64'), 'wt', encoding=ENCODING) as handle:
307 handle.write(base64.b64encode(yaml.dump(self.tree).encode(encoding=ENCODING)).decode(encoding=ENCODING))
308 else:
309 with open(pathlib.Path(f'{sink}.yml'), 'wt', encoding=ENCODING) as handle:
310 yaml.dump(self.tree, handle)
312 @no_type_check
313 def dump(self, sink: object, format_type: str, base64_encode: bool = False, xz_compress: bool = False) -> None:
314 """Dump the assumed to be final taxonomy (tree) in json or yaml format."""
315 if format_type.lower() not in KNOWN_FORMATS:
316 raise ValueError(f'requested format {format_type} for taxonomy dump not in {KNOWN_FORMATS}')
318 if format_type.lower() == 'json':
319 return self.json_to(sink, base64_encode, xz_compress)
320 if format_type.lower() == 'xml':
321 return self.xml_to(sink, base64_encode, xz_compress)
322 return self.yaml_to(sink, base64_encode, xz_compress)
325def parse(): # type: ignore
326 return NotImplemented
329def main(options: argparse.Namespace) -> int:
330 """Visit the folder tree below root and yield the taxonomy."""
331 tree_root = pathlib.Path(options.tree_root)
332 log.info(f'Assessing taxonomy of folder {tree_root}')
333 log.info(f'Output channel is {"STDOUT" if options.out_path is sys.stdout else options.out_path}')
334 if options.excludes.strip():
335 exploded = tuple(options.excludes.strip().split(COMMA))
336 log.info(f'Requested exclusion of ({", ".join(exploded)}) partial{"" if len(exploded) == 1 else "s"}')
337 if options.xz_compress:
338 log.info('Requested xz compression (LZMA)')
339 if options.base64_encode:
340 log.info('Requested encoding (BASE64)')
341 taxonomy = Taxonomy(tree_root, options.excludes, options.key_function)
342 for path in sorted(tree_root.rglob('*')):
343 if path.is_dir():
344 log.info(f'Detected branch {path}')
345 taxonomy.add_branch(path)
346 continue
347 taxonomy.add_leaf(path)
348 log.info(f'Detected leaf {path}')
350 for fmt in sorted(parse_csl(options.format_type_csl)):
351 log.info(f'- Dumping taxonomy as {fmt} format')
352 taxonomy.dump(
353 sink=options.out_path,
354 format_type=fmt,
355 base64_encode=options.base64_encode,
356 xz_compress=options.xz_compress,
357 )
358 duration_secs = taxonomy.tree['taxonomy']['context']['duration_secs'] # type: ignore
359 log.info(f'Assessed taxonomy of folder {tree_root} in {duration_secs} secs')
361 return 0