Coverage for navigaattori/structures.py: 85.02%

264 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 20:46:10 +00:00

1import copy 

2import logging 

3import pathlib 

4from typing import Union, no_type_check 

5 

6import yaml 

7 

8from navigaattori import ( 

9 DEFAULT_STRUCTURE_NAME, 

10 ENCODING, 

11 GUESSED_STRUCTURES_FOLDER_NAME, 

12 HUB_NAME, 

13 STRUCTURES_KEY, 

14 log, 

15 parse_csl, 

16) 

17from navigaattori.approvals import Approvals 

18from navigaattori.bind import Binder 

19from navigaattori.changes import Changes 

20from navigaattori.layout import Layout 

21from navigaattori.meta import Meta 

22 

23 

24@no_type_check 

25class Structures: 

26 """Model for structures as top level information to navigate all target types.""" 

27 

28 def fs_root_is_dir(self) -> None: 

29 """Ensure we received a folder to bootstrap.""" 

30 if not self.fs_root.is_dir(): 

31 self.state_code = 1 

32 self.state_message = f'root ({self.fs_root}) is no directory' 

33 log.error(self.state_message) 

34 

35 def explore_structure_path(self) -> None: 

36 """Try to read from the structures filesystem path.""" 

37 if not self.structures_path.is_file() or not self.structures_path.stat().st_size: 

38 self.state_message = f'structures file ({self.structures_path}) does not exist or is empty' 

39 if not self.guess: 

40 log.debug(self.state_message) 

41 log.info( 

42 '... you may want to try the --guess option to the explore command to bootstrap a structures file' 

43 ) 

44 self.state_code = 1 

45 return 

46 self.has_structures_path = False 

47 log.warning(self.state_message) 

48 

49 def bootstrap_target_types(self) -> None: 

50 """Fill in structures and target_types data as per mode (guess or not).""" 

51 structures = {} 

52 if self.has_structures_path: 

53 with open(self.structures_path, 'rt', encoding=ENCODING) as handle: 

54 structures = yaml.safe_load(handle) 

55 

56 if not structures and not self.guess: 

57 self.state_message = f'structures information read from file ({self.structures_path}) is empty' 

58 log.error(self.state_message) 

59 self.state_code = 1 

60 return 

61 

62 if structures and not self.guess and STRUCTURES_KEY not in structures: 

63 self.state_message = f'structures information is missing the ({STRUCTURES_KEY}) key' 

64 log.error(self.state_message) 

65 self.state_code = 1 

66 return 

67 

68 if not structures and self.guess: 

69 log.info(f'guessing target types from recursive search for ({DEFAULT_STRUCTURE_NAME}) files ...') 

70 self.target_types = {} 

71 for path in self.fs_root.rglob('*'): 

72 if str(path).endswith(DEFAULT_STRUCTURE_NAME): 

73 if all((partial not in str(path) for partial in self.excludes)): 73 ↛ 71line 73 didn't jump to line 71, because the condition on line 73 was never false

74 t_type = path.parent.name 

75 t_rel_dir = str(path.parent).split(f'{self.fs_root}', 1)[1].lstrip('/') 

76 t_file = path.name 

77 self.target_types[t_type] = { 

78 'dir': t_rel_dir, 

79 'file': t_file, 

80 'structure': {}, 

81 'valid': True, 

82 } 

83 log.info(f'- guessed target type ({t_type}) from path ({path})') 

84 else: 

85 log.info(f'not guessing but reading target types from ({self.structures_path}) data instead ...') 

86 spanning_map = structures.get(STRUCTURES_KEY, {}) 

87 if not isinstance(spanning_map, dict) or not spanning_map: 

88 self.state_message = ( 

89 f'the ({STRUCTURES_KEY}) key does not provide a map of target types to structure paths' 

90 ) 

91 log.error(self.state_message) 

92 self.state_code = 1 

93 return 

94 

95 self.target_types = { 

96 t: { 

97 'dir': str(pathlib.Path(sp).parent), 

98 'file': pathlib.Path(sp).name, 

99 'structure': {}, 

100 'valid': True, 

101 } 

102 for t, sp in spanning_map.items() 

103 } 

104 

105 @no_type_check 

106 def screen_target_types(self) -> None: 

107 """Ensure we have backing in the file system for all target types.""" 

108 for target_type, spec in self.target_types.items(): 

109 log.info(f'screening target type ({target_type}) ...') 

110 spec_path = self.fs_root / spec['dir'] / spec['file'] 

111 if not spec_path.is_file() or not spec_path.stat().st_size: 

112 log.error(f'spec_path file ({spec_path}) for target type ({target_type}) does not exist or is empty') 

113 self.target_types[target_type]['valid'] = False 

114 

115 @no_type_check 

116 def assess_target_types(self) -> None: 

117 """Assess and eventually fill in information from target types.""" 

118 for target_type, spec in self.target_types.items(): 

119 if not spec['valid']: 

120 log.info(f'skipping invalid target ({target_type})') 

121 continue 

122 log.info(f'assessing target type ({target_type}) ...') 

123 structure = {} 

124 spec_path = self.fs_root / spec['dir'] / spec['file'] 

125 try: 

126 with open(spec_path, 'rt', encoding=ENCODING) as handle: 

127 structure = yaml.safe_load(handle) 

128 if not structure: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true

129 log.error( 

130 f'structure information for target type ({target_type}) read from file ({spec_path}) is empty' 

131 ) 

132 self.target_types[target_type]['valid'] = False 

133 except Exception as ex: 

134 log.error(f'reading spec_path file ({spec_path}) for target type ({target_type}) errs with ({ex})') 

135 self.target_types[target_type]['valid'] = False 

136 continue 

137 

138 if not structure: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true

139 self.target_types[target_type]['valid'] = False 

140 continue 

141 

142 for target, facet_container in structure.items(): 

143 log.info(f'- assessing target ({target}) with target type ({target_type}) ...') 

144 self.target_types[target_type]['structure'][target] = {} 

145 for facet in facet_container: 

146 for fk, fd in facet.items(): 

147 self.target_types[target_type]['structure'][target][fk] = copy.deepcopy(self.facet_block) 

148 for efk in self.expected_facet_keys: 

149 self.target_types[target_type]['structure'][target][fk][efk] = fd.get(efk) 

150 for erfk in self.resource_keys: 

151 erv = self.target_types[target_type]['structure'][target][fk][erfk] 

152 if not erv or not (self.fs_root / spec['dir'] / erv).is_file(): 

153 if erfk == 'layout': 

154 log.info( 

155 f' + optional ({erfk}) resource is ({erv}) for facet ({fk}) of target ({target})' # noqa 

156 f' with target type ({target_type}) - resource does not exist or is no file' 

157 ) 

158 else: 

159 log.error( 

160 f' + invalid ({erfk}) resource ({erv}) for facet ({fk}) of target ({target})' 

161 f' with target type ({target_type}) - resource does not exist or is no file' 

162 ) 

163 self.target_types[target_type]['valid'] = False 

164 else: 

165 if erfk == 'approvals': 

166 approvals_path = self.fs_root / self.target_types[target_type]['dir'] / erv 

167 log.info(f'assessing approvals ({approvals_path}) yielding:') 

168 code, details = self.assess_approvals(approvals_path) 

169 if code: 169 ↛ 170line 169 didn't jump to line 170, because the condition on line 169 was never true

170 self.target_types[target_type]['valid'] = False 

171 elif erfk == 'bind': 

172 binder_path = self.fs_root / self.target_types[target_type]['dir'] / erv 

173 log.info(f'assessing binder ({binder_path}) yielding:') 

174 code, details = self.assess_binder(binder_path) 

175 if code: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true

176 self.target_types[target_type]['valid'] = False 

177 elif erfk == 'changes': 177 ↛ 183line 177 didn't jump to line 183, because the condition on line 177 was never false

178 changes_path = self.fs_root / self.target_types[target_type]['dir'] / erv 

179 log.info(f'assessing changes ({changes_path}) yielding:') 

180 code, details = self.assess_changes(changes_path) 

181 if code: 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true

182 self.target_types[target_type]['valid'] = False 

183 elif erfk == 'layout': 

184 layout_path = self.fs_root / self.target_types[target_type]['dir'] / erv 

185 log.info(f'assessing layout ({layout_path}) yielding:') 

186 code, details = self.assess_layout(layout_path) 

187 if code: 

188 self.target_types[target_type]['valid'] = False 

189 elif erfk == 'meta': 

190 meta_top_path = self.fs_root / self.target_types[target_type]['dir'] / erv 

191 log.info(f'assessing changes ({meta_top_path}) yielding:') 

192 code, details = self.assess_meta(meta_top_path) 

193 if code: 

194 self.target_types[target_type]['valid'] = False 

195 

196 @no_type_check 

197 def assess_approvals(self, approvals_path: Union[str, pathlib.Path]): 

198 """Delegate the verification to an instance of the Approvals class.""" 

199 approvals = Approvals(approvals_path, options=self._options) 

200 if approvals.is_valid(): 200 ↛ 203line 200 didn't jump to line 203, because the condition on line 200 was never false

201 return 0, approvals.container() 

202 

203 return approvals.code_details() 

204 

205 @no_type_check 

206 def assess_binder(self, binder_path: Union[str, pathlib.Path]): 

207 """Delegate the verification to an instance of the Binder class.""" 

208 binder = Binder(binder_path, options=self._options) 

209 if binder.is_valid(): 209 ↛ 212line 209 didn't jump to line 212, because the condition on line 209 was never false

210 return 0, binder.container() 

211 

212 return binder.code_details() 

213 

214 @no_type_check 

215 def assess_changes(self, changes_path: Union[str, pathlib.Path]): 

216 """Delegate the verification to an instance of the Changes class.""" 

217 changes = Changes(changes_path, options=self._options) 

218 if changes.is_valid(): 218 ↛ 221line 218 didn't jump to line 221, because the condition on line 218 was never false

219 return 0, changes.container() 

220 

221 return changes.code_details() 

222 

223 @no_type_check 

224 def assess_layout(self, layout_path: Union[str, pathlib.Path]): 

225 """Delegate the verification to an instance of the Layout class.""" 

226 layout = Layout(layout_path, options=self._options) 

227 if layout.is_valid(): 

228 return 0, layout.container() 

229 

230 return layout.code_details() 

231 

232 @no_type_check 

233 def assess_meta(self, meta_top_path: Union[str, pathlib.Path]): 

234 """Delegate the verification to an instance of the Meta class.""" 

235 meta = Meta(meta_top_path, options=self._options) 

236 if meta.is_valid(): 

237 return 0, meta.container() 

238 

239 return meta.code_details() 

240 

241 def log_assessed_tree(self) -> None: 

242 """Log out the tree we found.""" 

243 for target_type, spec in self.target_types.items(): 

244 log.info(f'reporting target type ({target_type}) ...') 

245 log.info(f'- {target_type=}:') 

246 for key, aspect in spec.items(): 

247 if not isinstance(aspect, dict) or not aspect: 

248 log.info(f' + {key} -> {aspect}') 

249 else: 

250 log.info(f' + {key} =>') 

251 for this, that in aspect.items(): 

252 if not isinstance(that, dict) or not that: 252 ↛ 253line 252 didn't jump to line 253, because the condition on line 252 was never true

253 log.info(f' * {this} -> {that}') 

254 else: 

255 log.info(f' * {this} =>') 

256 for k, v in that.items(): 

257 if not isinstance(v, dict) or not v: 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true

258 log.info(f' - {k} -> {v}') 

259 else: 

260 log.info(f' - {k} =>') 

261 for kf, vf in v.items(): 

262 log.info(f' + {kf} -> {vf}') 

263 

264 def validate_on_screening_level(self) -> None: 

265 """Let's wrap this up if any invalid target type is present.""" 

266 if not self.target_types: 

267 self.state_message = 'target types are not present - invalid structures file?' 

268 log.error(self.state_message) 

269 self.state_code = 1 

270 return 

271 

272 if any(not spec['valid'] for spec in self.target_types.values()): 

273 invalid = sorted(target_type for target_type, spec in self.target_types.items() if not spec['valid']) 

274 target_sin_plu = 'target type' if len(invalid) == 1 else 'target types' 

275 be_sin_plu = 'is' if len(invalid) == 1 else 'are' 

276 self.state_message = f'specifications for {target_sin_plu} ({", ".join(invalid)}) {be_sin_plu} invalid' 

277 log.error(self.state_message) 

278 self.state_code = 1 

279 return 

280 self.state_message = 'structures appear to be valid (on file system screening level)' 

281 log.info(self.state_message) 

282 

283 @no_type_check 

284 def dump_guesses(self) -> None: 

285 """In case we are in guess mode and there is no existing structures file - dump what we suggest.""" 

286 if self.guess and not self.has_structures_path: 

287 guessing_path = pathlib.Path(GUESSED_STRUCTURES_FOLDER_NAME) 

288 guessing_path.mkdir(parents=True, exist_ok=True) 

289 

290 log.info(f'dumping proposed global expanded file from guessing to ({guessing_path / "tree.yml"}) ...') 

291 with open(guessing_path / 'tree.yml', 'wt', encoding=ENCODING) as handle: 

292 yaml.dump(self.container(complete=True), handle) 

293 

294 log.info(f'dumping proposed structures file from guessing to ({guessing_path / HUB_NAME}) ...') 

295 with open(guessing_path / HUB_NAME, 'wt', encoding=ENCODING) as handle: 

296 yaml.dump(self.structures_map(), handle) 

297 

298 @no_type_check 

299 def __init__(self, doc_root: Union[str, pathlib.Path], options: dict[str, bool]): 

300 self._options = options 

301 self.quiet: bool = self._options.get('quiet', False) 

302 self.strict: bool = self._options.get('strict', False) 

303 self.verbose: bool = self._options.get('verbose', False) 

304 self.guess: bool = self._options.get('guess', False) 

305 

306 if self.quiet: 306 ↛ 307line 306 didn't jump to line 307, because the condition on line 306 was never true

307 logging.getLogger().setLevel(logging.ERROR) 

308 elif self.strict: 

309 logging.getLogger().setLevel(logging.DEBUG) 

310 log.debug('- set logging level to debug (strict mode)') 

311 elif self.verbose: 

312 logging.getLogger().setLevel(logging.INFO) 

313 log.debug('- set logging level to info (verbose mode - the default)') 

314 

315 self.excludes_csl: str = self._options.get('excludes', '.git/,render/pdf/') 

316 self.excludes: tuple[str, ...] = tuple() 

317 if self.excludes_csl.strip(): 

318 self.excludes = parse_csl(self.excludes_csl) 

319 excl_sin_plu = f'partial{"" if len(self.excludes) == 1 else "s"}' 

320 log.info(f'- will exclude ({", ".join(self.excludes)}) path {excl_sin_plu}') 

321 

322 self.fs_root: pathlib.Path = pathlib.Path(doc_root) 

323 self.structures_path: pathlib.Path = self.fs_root / HUB_NAME 

324 self.has_structures_path = True 

325 self.structures = {} 

326 self.target_types = {} 

327 

328 self.facet_block = { 

329 'approvals': '', # resource pointer to fs 

330 'bind': '', # resource pointer to fs 

331 'changes': '', # resource pointer to fs 

332 'layout': '', # resource pointer to fs 

333 'meta': '', # resource pointer to fs 

334 'render': True, 

335 'formats': [], 

336 'options': {}, 

337 } 

338 self.expected_facet_keys = list(self.facet_block) 

339 self.resource_keys = self.expected_facet_keys[:4] 

340 

341 self.state_code = 0 

342 self.state_message = '' 

343 

344 self.fs_root_is_dir() 

345 

346 if not self.state_code: 

347 self.explore_structure_path() 

348 

349 if not self.state_code: 

350 self.bootstrap_target_types() 

351 

352 if not self.state_code: 

353 self.screen_target_types() 

354 

355 if not self.state_code: 

356 self.assess_target_types() 

357 

358 self.log_assessed_tree() 

359 self.validate_on_screening_level() 

360 

361 self.dump_guesses() 

362 

363 def is_valid(self) -> bool: 

364 """Is the model valid?""" 

365 return not self.state_code 

366 

367 def code_details(self) -> tuple[int, str]: 

368 """Return an ordered pair of state code and message""" 

369 return self.state_code, self.state_message 

370 

371 @no_type_check 

372 def container(self, complete=False): 

373 """Return either the complete assessed tree or only the target types map.""" 

374 return {STRUCTURES_KEY: copy.deepcopy(self.target_types)} if complete else copy.deepcopy(self.target_types) 

375 

376 def structures_map(self) -> dict[str, dict[str, str]]: 

377 """Return the assessed content in the shape of a structures file.""" 

378 return {STRUCTURES_KEY: {k: f'{v["dir"]}/{v["file"]}' for k, v in self.target_types.items()}}