Coverage for taksonomia/taksonomia.py: 87.54%

213 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 23:02:14 +00:00

1"""Taxonomy (Finnish: taksonomia) of a folder tree, guided by conventions. (implementation).""" 

2 

3import argparse 

4import base64 

5import datetime as dti 

6import hashlib 

7import lzma 

8import os 

9import pathlib 

10import sys 

11from typing import no_type_check 

12 

13import msgspec 

14import yaml 

15 

16import taksonomia.anglify as anglify 

17from taksonomia import ( 

18 APP_ALIAS, 

19 COMMA, 

20 ENCODING, 

21 KNOWN_FORMATS, 

22 KNOWN_KEY_FUNCTIONS, 

23 TS_FORMAT, 

24 VERSION_INFO, 

25 log, 

26 parse_csl, 

27) 

28from taksonomia.machine import Machine 

29 

30CHUNK_SIZE = 2 << 15 

31DOCTYPE = '<?xml version="1.0" encoding="UTF-8"?>' 

32EMPTY_SHA256 = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' 

33EMPTY_SHA512 = ( 

34 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce' 

35 '47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e' 

36) 

37EMPTY = { 

38 'sha512': EMPTY_SHA512, 

39 'sha256': EMPTY_SHA256, 

40} 

41ENCODING_ERRORS_POLICY = 'ignore' 

42HASH_ALGO_PREFS = tuple(EMPTY) 

43TAX = 'taxonomy' 

44XMLNS = 'https://pypi.org/project/taksonomia/api/v1' 

45XZ_EXT = '.xz' 

46XZ_FILTERS = [{'id': lzma.FILTER_LZMA2, 'preset': 7 | lzma.PRESET_EXTREME}] 

47LZMA_KWARGS = {'check': lzma.CHECK_SHA256, 'filters': XZ_FILTERS} 

48 

49 

50def elf_hash(some_bytes: bytes) -> int: 

51 """The ELF hash (Extremely Lossy Function - also used in ELF format). 

52 unsigned long ElfHash(const unsigned char *s) { 

53 unsigned long h = 0, high; 

54 while (*s) { 

55 h = (h << 4) + *s++; 

56 if (high = h & 0xF0000000) 

57 h ^= high >> 24; 

58 h &= ~high; 

59 } 

60 return h; 

61 } 

62 """ 

63 h = 0 

64 for s in some_bytes: 

65 h = (h << 4) + s 

66 high = h & 0xF0000000 

67 if high: 

68 h ^= high >> 24 

69 h &= ~high 

70 return h 

71 

72 

73@no_type_check 

74class Taxonomy: 

75 """Collector of topological and size information on files in a tree.""" 

76 

77 def __init__(self, root: pathlib.Path, excludes: str, key_function: str = 'elf') -> None: 

78 """Construct a collector instance for root.""" 

79 self.root = root 

80 self.excludes = sorted(part.strip() for part in excludes.split(COMMA) if part.strip()) 

81 self.key_function = key_function.lower() 

82 if self.key_function not in KNOWN_KEY_FUNCTIONS: 

83 raise ValueError(f'key function {key_function} not in {KNOWN_KEY_FUNCTIONS}') 

84 

85 self.perspective = str(pathlib.Path.cwd()) 

86 self.closed = False 

87 self.hasher = { 

88 'sha512': hashlib.sha512, 

89 'sha256': hashlib.sha256, 

90 } 

91 self.pid = os.getpid() 

92 self.machine = Machine(str(self.root), self.pid) 

93 self.start_time = dti.datetime.now(tz=dti.timezone.utc) 

94 

95 self.tree = { 

96 TAX: { 

97 'hash_algo_prefs': list(HASH_ALGO_PREFS), 

98 'key_function': self.key_function, 

99 'generator': { 

100 'name': APP_ALIAS, 

101 'version_info': list(VERSION_INFO), 

102 'source': f'https://pypi.org/project/taksonomia/{".".join(VERSION_INFO[:3])}/', 

103 'sbom': 'https://codes.dilettant.life/docs/taksonomia/third-party/', 

104 }, 

105 'context': { 

106 'start_ts': self.start_time.strftime(TS_FORMAT), 

107 'end_ts': None, 

108 'duration_secs': 0, 

109 **self.machine.context(), 

110 'pwd': self.perspective, 

111 'tree_root': str(self.root), 

112 'excludes': self.excludes, 

113 'machine_perf': { 

114 'pre': self.machine.perf(), 

115 'post': None, 

116 }, 

117 }, 

118 'summary': { 

119 'hash_hexdigest': {**{algo: EMPTY[algo] for algo in HASH_ALGO_PREFS}}, 

120 'count_branches': 0, 

121 'count_leaves': 0, 

122 'size_bytes': 0, 

123 }, 

124 'branches': {}, 

125 'leaves': {}, 

126 } 

127 } 

128 self.shadow = {**{algo: self.hasher[algo]() for algo in HASH_ALGO_PREFS}, 'branches': {}} 

129 

130 def ignore(self, path: pathlib.Path) -> bool: 

131 """Dry place for the filter hook (excludes).""" 

132 text = str(path) 

133 return bool(self.excludes) and any(exclude in text for exclude in self.excludes) 133 ↛ exitline 133 didn't finish the generator expression on line 133

134 

135 def key(self, path_str: str) -> str: 

136 """Hashing function for the path keys.""" 

137 if self.key_function == 'elf': 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true

138 return str(elf_hash(path_str.encode(ENCODING))) 

139 elif self.key_function == 'md5': 

140 return hashlib.md5(path_str.encode(ENCODING)).hexdigest() # nosec B324 

141 return hashlib.blake2b(path_str.encode(ENCODING)).hexdigest() 

142 

143 def add_branch(self, path: pathlib.Path) -> None: 

144 """Add a folder (sub tree) entry.""" 

145 if self.ignore(path): 

146 return 

147 

148 st = path.stat() 

149 branch = str(path) 

150 self.tree[TAX]['branches'][self.key(branch)] = { # type: ignore 

151 'path': branch, 

152 'hash_hexdigest': {**{algo: EMPTY[algo] for algo in HASH_ALGO_PREFS}}, 

153 'summary': { 

154 'count_branches': 1, 

155 'count_leaves': 0, 

156 'size_bytes': 0, 

157 }, 

158 'mod_time': dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT), 

159 } 

160 self.shadow['branches'][self.key(branch)] = { # type: ignore 

161 **{algo: self.hasher[algo]() for algo in HASH_ALGO_PREFS} 

162 } 

163 self.tree[TAX]['summary']['count_branches'] += 1 # type: ignore 

164 for parent in path.parents: 

165 branch_key = self.key(str(parent)) 

166 if branch_key in self.tree[TAX]['branches']: 

167 self.tree[TAX]['branches'][branch_key]['summary']['count_branches'] += 1 # type: ignore 

168 

169 def hash_file(self, path: pathlib.Path, algo: str = 'sha512') -> str: 

170 """Return the SHA512 hex digest of the data from file.""" 

171 if algo not in self.hasher: 

172 raise KeyError(f'Unsupported hash algorithm requested - {algo} is not in {HASH_ALGO_PREFS}') 

173 

174 hash = self.hasher[algo]() 

175 with open(path, 'rb') as handle: 

176 while chunk := handle.read(CHUNK_SIZE): 

177 hash.update(chunk) 

178 return hash.hexdigest() 

179 

180 def add_leaf(self, path: pathlib.Path) -> None: 

181 """Add a folder (sub tree) entry.""" 

182 if self.ignore(path): 

183 return 

184 

185 st = path.stat() 

186 size_bytes = st.st_size 

187 mod_time = dti.datetime.fromtimestamp(st.st_mtime, tz=dti.timezone.utc).strftime(TS_FORMAT) 

188 leaf = str(path) 

189 self.tree[TAX]['leaves'][self.key(leaf)] = { # type: ignore 

190 'path': leaf, 

191 'branch': self.key(str(path.parent)), 

192 'hash_hexdigest': {algo: self.hash_file(path, algo) for algo in HASH_ALGO_PREFS}, 

193 'size_bytes': size_bytes, 

194 'mod_time': mod_time, 

195 } 

196 

197 hexdig = 'hash_hexdigest' 

198 for algo in HASH_ALGO_PREFS: 

199 self.shadow[algo].update( # type: ignore 

200 self.tree[TAX]['leaves'][self.key(leaf)][hexdig][algo].encode(ENCODING) # type: ignore 

201 ) 

202 self.tree[TAX]['summary'][hexdig][algo] = self.shadow[algo].hexdigest() # type: ignore 

203 self.tree[TAX]['summary']['size_bytes'] += size_bytes # type: ignore 

204 self.tree[TAX]['summary']['count_leaves'] += 1 # type: ignore 

205 for parent in path.parents: 

206 bk = self.key(str(parent)) 

207 if bk in self.tree[TAX]['branches']: 

208 self.tree[TAX]['branches'][bk]['summary']['count_leaves'] += 1 # type: ignore 

209 self.tree[TAX]['branches'][bk]['summary']['size_bytes'] += size_bytes # type: ignore 

210 shadow_sum = self.shadow['branches'][bk] # type: ignore 

211 for algo in HASH_ALGO_PREFS: 

212 shadow_sum[algo].update( 

213 self.tree[TAX]['leaves'][self.key(leaf)][hexdig][algo].encode(ENCODING) # type: ignore 

214 ) 

215 self.tree[TAX]['branches'][bk][hexdig][algo] = shadow_sum[algo].hexdigest() # type: ignore 

216 

217 def close(self) -> None: 

218 """Create the post visitation machine context perf entry (if needed)).""" 

219 if not self.closed: 

220 self.tree[TAX]['context']['machine_perf']['post'] = self.machine.perf() # type: ignore 

221 end_time = dti.datetime.now(tz=dti.timezone.utc) 

222 self.tree[TAX]['context']['end_ts'] = end_time.strftime(TS_FORMAT) # type: ignore 

223 self.tree[TAX]['context']['duration_secs'] = (end_time - self.start_time).total_seconds() # type: ignore 

224 self.closed = True 

225 

226 @no_type_check 

227 def report(self): 

228 """Create the post visitation machine context perf entry (if needed) and report the taxonomy.""" 

229 self.close() 

230 return self.tree 

231 

232 def __repr__(self) -> str: 

233 """Express yourself.""" 

234 return msgspec.json.format(msgspec.json.encode(self.tree)).decode() 

235 

236 @no_type_check 

237 def json_to(self, sink: object, base64_encode: bool = False, xz_compress: bool = False) -> None: 

238 """Close the taxonomy collection and write tree in json format to sink.""" 

239 self.close() 

240 if sink is sys.stdout: 

241 if xz_compress: 

242 log.warning('ignoring --xz-compress for now as json output goes to std out') 

243 if base64_encode: 

244 print(msgspec.json.encode(self.tree)) 

245 return 

246 print(self.__repr__()) 

247 return 

248 

249 if xz_compress: 249 ↛ 254line 249 didn't jump to line 254, because the condition on line 249 was never false

250 with lzma.open(pathlib.Path(f'{sink}.json.xz'), 'wb', **LZMA_KWARGS) as handle: 

251 handle.write(msgspec.json.encode(self.tree)) 

252 return 

253 

254 if base64_encode: 

255 with open(pathlib.Path(f'{sink}.json.b64'), 'wb') as handle: 

256 handle.write(base64.b64encode(msgspec.json.encode(self.tree))) 

257 else: 

258 with open(pathlib.Path(f'{sink}.json'), 'wb') as handle: 

259 handle.write(msgspec.json.encode(self.tree)) 

260 

261 @no_type_check 

262 def xml_to(self, sink: object, base64_encode: bool = False, xz_compress: bool = False) -> None: 

263 """Close the taxonomy collection and write tree in xml format to sink.""" 

264 self.close() 

265 xml_str = anglify.as_xml(self.tree) 

266 if sink is sys.stdout: 

267 if xz_compress: 267 ↛ 268line 267 didn't jump to line 268, because the condition on line 267 was never true

268 log.warning('ignoring --xz-compress for now as xml output goes to std out') 

269 if base64_encode: 

270 print(str(base64.b64encode(xml_str.encode(encoding=ENCODING)).decode(encoding=ENCODING))) 

271 return 

272 print(xml_str) 

273 return 

274 

275 if xz_compress: 

276 with lzma.open(pathlib.Path(f'{sink}.xml.xz'), 'w', **LZMA_KWARGS) as handle: 

277 handle.write(xml_str.encode(encoding=ENCODING, errors=ENCODING_ERRORS_POLICY)) 

278 return 

279 

280 if base64_encode: 

281 with open(pathlib.Path(f'{sink}.xml.b64'), 'wt', encoding=ENCODING) as handle: 

282 handle.write(base64.b64encode(xml_str.encode(encoding=ENCODING)).decode(encoding=ENCODING)) 

283 else: 

284 with open(pathlib.Path(f'{sink}.xml'), 'wt', encoding=ENCODING) as handle: 

285 handle.write(xml_str) 

286 

287 @no_type_check 

288 def yaml_to(self, sink: object, base64_encode: bool = False, xz_compress: bool = False) -> None: 

289 """Close the taxonomy collection and write tree in yaml format to sink.""" 

290 self.close() 

291 if sink is sys.stdout: 

292 if xz_compress: 

293 log.warning('ignoring --xz-compress for now as yaml output goes to std out') 

294 if base64_encode: 

295 print(str(base64.b64encode(yaml.dump(self.tree).encode(encoding=ENCODING)).decode(encoding=ENCODING))) 

296 return 

297 print(yaml.dump(self.tree)) 

298 return 

299 

300 if xz_compress: 300 ↛ 305line 300 didn't jump to line 305, because the condition on line 300 was never false

301 with lzma.open(pathlib.Path(f'{sink}.yml.xz'), 'w', **LZMA_KWARGS) as handle: 

302 handle.write(yaml.dump(self.tree).encode(encoding=ENCODING, errors=ENCODING_ERRORS_POLICY)) 

303 return 

304 

305 if base64_encode: 

306 with open(pathlib.Path(f'{sink}.yml.b64'), 'wt', encoding=ENCODING) as handle: 

307 handle.write(base64.b64encode(yaml.dump(self.tree).encode(encoding=ENCODING)).decode(encoding=ENCODING)) 

308 else: 

309 with open(pathlib.Path(f'{sink}.yml'), 'wt', encoding=ENCODING) as handle: 

310 yaml.dump(self.tree, handle) 

311 

312 @no_type_check 

313 def dump(self, sink: object, format_type: str, base64_encode: bool = False, xz_compress: bool = False) -> None: 

314 """Dump the assumed to be final taxonomy (tree) in json or yaml format.""" 

315 if format_type.lower() not in KNOWN_FORMATS: 

316 raise ValueError(f'requested format {format_type} for taxonomy dump not in {KNOWN_FORMATS}') 

317 

318 if format_type.lower() == 'json': 

319 return self.json_to(sink, base64_encode, xz_compress) 

320 if format_type.lower() == 'xml': 

321 return self.xml_to(sink, base64_encode, xz_compress) 

322 return self.yaml_to(sink, base64_encode, xz_compress) 

323 

324 

325def parse(): # type: ignore 

326 return NotImplemented 

327 

328 

329def main(options: argparse.Namespace) -> int: 

330 """Visit the folder tree below root and yield the taxonomy.""" 

331 tree_root = pathlib.Path(options.tree_root) 

332 log.info(f'Assessing taxonomy of folder {tree_root}') 

333 log.info(f'Output channel is {"STDOUT" if options.out_path is sys.stdout else options.out_path}') 

334 if options.excludes.strip(): 

335 exploded = tuple(options.excludes.strip().split(COMMA)) 

336 log.info(f'Requested exclusion of ({", ".join(exploded)}) partial{"" if len(exploded) == 1 else "s"}') 

337 if options.xz_compress: 

338 log.info('Requested xz compression (LZMA)') 

339 if options.base64_encode: 

340 log.info('Requested encoding (BASE64)') 

341 taxonomy = Taxonomy(tree_root, options.excludes, options.key_function) 

342 for path in sorted(tree_root.rglob('*')): 

343 if path.is_dir(): 

344 log.info(f'Detected branch {path}') 

345 taxonomy.add_branch(path) 

346 continue 

347 taxonomy.add_leaf(path) 

348 log.info(f'Detected leaf {path}') 

349 

350 for fmt in sorted(parse_csl(options.format_type_csl)): 

351 log.info(f'- Dumping taxonomy as {fmt} format') 

352 taxonomy.dump( 

353 sink=options.out_path, 

354 format_type=fmt, 

355 base64_encode=options.base64_encode, 

356 xz_compress=options.xz_compress, 

357 ) 

358 duration_secs = taxonomy.tree['taxonomy']['context']['duration_secs'] # type: ignore 

359 log.info(f'Assessed taxonomy of folder {tree_root} in {duration_secs} secs') 

360 

361 return 0