Coverage for navigaattori/structures.py: 85.02%
264 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:46:10 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:46:10 +00:00
1import copy
2import logging
3import pathlib
4from typing import Union, no_type_check
6import yaml
8from navigaattori import (
9 DEFAULT_STRUCTURE_NAME,
10 ENCODING,
11 GUESSED_STRUCTURES_FOLDER_NAME,
12 HUB_NAME,
13 STRUCTURES_KEY,
14 log,
15 parse_csl,
16)
17from navigaattori.approvals import Approvals
18from navigaattori.bind import Binder
19from navigaattori.changes import Changes
20from navigaattori.layout import Layout
21from navigaattori.meta import Meta
24@no_type_check
25class Structures:
26 """Model for structures as top level information to navigate all target types."""
28 def fs_root_is_dir(self) -> None:
29 """Ensure we received a folder to bootstrap."""
30 if not self.fs_root.is_dir():
31 self.state_code = 1
32 self.state_message = f'root ({self.fs_root}) is no directory'
33 log.error(self.state_message)
35 def explore_structure_path(self) -> None:
36 """Try to read from the structures filesystem path."""
37 if not self.structures_path.is_file() or not self.structures_path.stat().st_size:
38 self.state_message = f'structures file ({self.structures_path}) does not exist or is empty'
39 if not self.guess:
40 log.debug(self.state_message)
41 log.info(
42 '... you may want to try the --guess option to the explore command to bootstrap a structures file'
43 )
44 self.state_code = 1
45 return
46 self.has_structures_path = False
47 log.warning(self.state_message)
49 def bootstrap_target_types(self) -> None:
50 """Fill in structures and target_types data as per mode (guess or not)."""
51 structures = {}
52 if self.has_structures_path:
53 with open(self.structures_path, 'rt', encoding=ENCODING) as handle:
54 structures = yaml.safe_load(handle)
56 if not structures and not self.guess:
57 self.state_message = f'structures information read from file ({self.structures_path}) is empty'
58 log.error(self.state_message)
59 self.state_code = 1
60 return
62 if structures and not self.guess and STRUCTURES_KEY not in structures:
63 self.state_message = f'structures information is missing the ({STRUCTURES_KEY}) key'
64 log.error(self.state_message)
65 self.state_code = 1
66 return
68 if not structures and self.guess:
69 log.info(f'guessing target types from recursive search for ({DEFAULT_STRUCTURE_NAME}) files ...')
70 self.target_types = {}
71 for path in self.fs_root.rglob('*'):
72 if str(path).endswith(DEFAULT_STRUCTURE_NAME):
73 if all((partial not in str(path) for partial in self.excludes)): 73 ↛ 71line 73 didn't jump to line 71, because the condition on line 73 was never false
74 t_type = path.parent.name
75 t_rel_dir = str(path.parent).split(f'{self.fs_root}', 1)[1].lstrip('/')
76 t_file = path.name
77 self.target_types[t_type] = {
78 'dir': t_rel_dir,
79 'file': t_file,
80 'structure': {},
81 'valid': True,
82 }
83 log.info(f'- guessed target type ({t_type}) from path ({path})')
84 else:
85 log.info(f'not guessing but reading target types from ({self.structures_path}) data instead ...')
86 spanning_map = structures.get(STRUCTURES_KEY, {})
87 if not isinstance(spanning_map, dict) or not spanning_map:
88 self.state_message = (
89 f'the ({STRUCTURES_KEY}) key does not provide a map of target types to structure paths'
90 )
91 log.error(self.state_message)
92 self.state_code = 1
93 return
95 self.target_types = {
96 t: {
97 'dir': str(pathlib.Path(sp).parent),
98 'file': pathlib.Path(sp).name,
99 'structure': {},
100 'valid': True,
101 }
102 for t, sp in spanning_map.items()
103 }
105 @no_type_check
106 def screen_target_types(self) -> None:
107 """Ensure we have backing in the file system for all target types."""
108 for target_type, spec in self.target_types.items():
109 log.info(f'screening target type ({target_type}) ...')
110 spec_path = self.fs_root / spec['dir'] / spec['file']
111 if not spec_path.is_file() or not spec_path.stat().st_size:
112 log.error(f'spec_path file ({spec_path}) for target type ({target_type}) does not exist or is empty')
113 self.target_types[target_type]['valid'] = False
115 @no_type_check
116 def assess_target_types(self) -> None:
117 """Assess and eventually fill in information from target types."""
118 for target_type, spec in self.target_types.items():
119 if not spec['valid']:
120 log.info(f'skipping invalid target ({target_type})')
121 continue
122 log.info(f'assessing target type ({target_type}) ...')
123 structure = {}
124 spec_path = self.fs_root / spec['dir'] / spec['file']
125 try:
126 with open(spec_path, 'rt', encoding=ENCODING) as handle:
127 structure = yaml.safe_load(handle)
128 if not structure: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true
129 log.error(
130 f'structure information for target type ({target_type}) read from file ({spec_path}) is empty'
131 )
132 self.target_types[target_type]['valid'] = False
133 except Exception as ex:
134 log.error(f'reading spec_path file ({spec_path}) for target type ({target_type}) errs with ({ex})')
135 self.target_types[target_type]['valid'] = False
136 continue
138 if not structure: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true
139 self.target_types[target_type]['valid'] = False
140 continue
142 for target, facet_container in structure.items():
143 log.info(f'- assessing target ({target}) with target type ({target_type}) ...')
144 self.target_types[target_type]['structure'][target] = {}
145 for facet in facet_container:
146 for fk, fd in facet.items():
147 self.target_types[target_type]['structure'][target][fk] = copy.deepcopy(self.facet_block)
148 for efk in self.expected_facet_keys:
149 self.target_types[target_type]['structure'][target][fk][efk] = fd.get(efk)
150 for erfk in self.resource_keys:
151 erv = self.target_types[target_type]['structure'][target][fk][erfk]
152 if not erv or not (self.fs_root / spec['dir'] / erv).is_file():
153 if erfk == 'layout':
154 log.info(
155 f' + optional ({erfk}) resource is ({erv}) for facet ({fk}) of target ({target})' # noqa
156 f' with target type ({target_type}) - resource does not exist or is no file'
157 )
158 else:
159 log.error(
160 f' + invalid ({erfk}) resource ({erv}) for facet ({fk}) of target ({target})'
161 f' with target type ({target_type}) - resource does not exist or is no file'
162 )
163 self.target_types[target_type]['valid'] = False
164 else:
165 if erfk == 'approvals':
166 approvals_path = self.fs_root / self.target_types[target_type]['dir'] / erv
167 log.info(f'assessing approvals ({approvals_path}) yielding:')
168 code, details = self.assess_approvals(approvals_path)
169 if code: 169 ↛ 170line 169 didn't jump to line 170, because the condition on line 169 was never true
170 self.target_types[target_type]['valid'] = False
171 elif erfk == 'bind':
172 binder_path = self.fs_root / self.target_types[target_type]['dir'] / erv
173 log.info(f'assessing binder ({binder_path}) yielding:')
174 code, details = self.assess_binder(binder_path)
175 if code: 175 ↛ 176line 175 didn't jump to line 176, because the condition on line 175 was never true
176 self.target_types[target_type]['valid'] = False
177 elif erfk == 'changes': 177 ↛ 183line 177 didn't jump to line 183, because the condition on line 177 was never false
178 changes_path = self.fs_root / self.target_types[target_type]['dir'] / erv
179 log.info(f'assessing changes ({changes_path}) yielding:')
180 code, details = self.assess_changes(changes_path)
181 if code: 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true
182 self.target_types[target_type]['valid'] = False
183 elif erfk == 'layout':
184 layout_path = self.fs_root / self.target_types[target_type]['dir'] / erv
185 log.info(f'assessing layout ({layout_path}) yielding:')
186 code, details = self.assess_layout(layout_path)
187 if code:
188 self.target_types[target_type]['valid'] = False
189 elif erfk == 'meta':
190 meta_top_path = self.fs_root / self.target_types[target_type]['dir'] / erv
191 log.info(f'assessing changes ({meta_top_path}) yielding:')
192 code, details = self.assess_meta(meta_top_path)
193 if code:
194 self.target_types[target_type]['valid'] = False
196 @no_type_check
197 def assess_approvals(self, approvals_path: Union[str, pathlib.Path]):
198 """Delegate the verification to an instance of the Approvals class."""
199 approvals = Approvals(approvals_path, options=self._options)
200 if approvals.is_valid(): 200 ↛ 203line 200 didn't jump to line 203, because the condition on line 200 was never false
201 return 0, approvals.container()
203 return approvals.code_details()
205 @no_type_check
206 def assess_binder(self, binder_path: Union[str, pathlib.Path]):
207 """Delegate the verification to an instance of the Binder class."""
208 binder = Binder(binder_path, options=self._options)
209 if binder.is_valid(): 209 ↛ 212line 209 didn't jump to line 212, because the condition on line 209 was never false
210 return 0, binder.container()
212 return binder.code_details()
214 @no_type_check
215 def assess_changes(self, changes_path: Union[str, pathlib.Path]):
216 """Delegate the verification to an instance of the Changes class."""
217 changes = Changes(changes_path, options=self._options)
218 if changes.is_valid(): 218 ↛ 221line 218 didn't jump to line 221, because the condition on line 218 was never false
219 return 0, changes.container()
221 return changes.code_details()
223 @no_type_check
224 def assess_layout(self, layout_path: Union[str, pathlib.Path]):
225 """Delegate the verification to an instance of the Layout class."""
226 layout = Layout(layout_path, options=self._options)
227 if layout.is_valid():
228 return 0, layout.container()
230 return layout.code_details()
232 @no_type_check
233 def assess_meta(self, meta_top_path: Union[str, pathlib.Path]):
234 """Delegate the verification to an instance of the Meta class."""
235 meta = Meta(meta_top_path, options=self._options)
236 if meta.is_valid():
237 return 0, meta.container()
239 return meta.code_details()
241 def log_assessed_tree(self) -> None:
242 """Log out the tree we found."""
243 for target_type, spec in self.target_types.items():
244 log.info(f'reporting target type ({target_type}) ...')
245 log.info(f'- {target_type=}:')
246 for key, aspect in spec.items():
247 if not isinstance(aspect, dict) or not aspect:
248 log.info(f' + {key} -> {aspect}')
249 else:
250 log.info(f' + {key} =>')
251 for this, that in aspect.items():
252 if not isinstance(that, dict) or not that: 252 ↛ 253line 252 didn't jump to line 253, because the condition on line 252 was never true
253 log.info(f' * {this} -> {that}')
254 else:
255 log.info(f' * {this} =>')
256 for k, v in that.items():
257 if not isinstance(v, dict) or not v: 257 ↛ 258line 257 didn't jump to line 258, because the condition on line 257 was never true
258 log.info(f' - {k} -> {v}')
259 else:
260 log.info(f' - {k} =>')
261 for kf, vf in v.items():
262 log.info(f' + {kf} -> {vf}')
264 def validate_on_screening_level(self) -> None:
265 """Let's wrap this up if any invalid target type is present."""
266 if not self.target_types:
267 self.state_message = 'target types are not present - invalid structures file?'
268 log.error(self.state_message)
269 self.state_code = 1
270 return
272 if any(not spec['valid'] for spec in self.target_types.values()):
273 invalid = sorted(target_type for target_type, spec in self.target_types.items() if not spec['valid'])
274 target_sin_plu = 'target type' if len(invalid) == 1 else 'target types'
275 be_sin_plu = 'is' if len(invalid) == 1 else 'are'
276 self.state_message = f'specifications for {target_sin_plu} ({", ".join(invalid)}) {be_sin_plu} invalid'
277 log.error(self.state_message)
278 self.state_code = 1
279 return
280 self.state_message = 'structures appear to be valid (on file system screening level)'
281 log.info(self.state_message)
283 @no_type_check
284 def dump_guesses(self) -> None:
285 """In case we are in guess mode and there is no existing structures file - dump what we suggest."""
286 if self.guess and not self.has_structures_path:
287 guessing_path = pathlib.Path(GUESSED_STRUCTURES_FOLDER_NAME)
288 guessing_path.mkdir(parents=True, exist_ok=True)
290 log.info(f'dumping proposed global expanded file from guessing to ({guessing_path / "tree.yml"}) ...')
291 with open(guessing_path / 'tree.yml', 'wt', encoding=ENCODING) as handle:
292 yaml.dump(self.container(complete=True), handle)
294 log.info(f'dumping proposed structures file from guessing to ({guessing_path / HUB_NAME}) ...')
295 with open(guessing_path / HUB_NAME, 'wt', encoding=ENCODING) as handle:
296 yaml.dump(self.structures_map(), handle)
298 @no_type_check
299 def __init__(self, doc_root: Union[str, pathlib.Path], options: dict[str, bool]):
300 self._options = options
301 self.quiet: bool = self._options.get('quiet', False)
302 self.strict: bool = self._options.get('strict', False)
303 self.verbose: bool = self._options.get('verbose', False)
304 self.guess: bool = self._options.get('guess', False)
306 if self.quiet: 306 ↛ 307line 306 didn't jump to line 307, because the condition on line 306 was never true
307 logging.getLogger().setLevel(logging.ERROR)
308 elif self.strict:
309 logging.getLogger().setLevel(logging.DEBUG)
310 log.debug('- set logging level to debug (strict mode)')
311 elif self.verbose:
312 logging.getLogger().setLevel(logging.INFO)
313 log.debug('- set logging level to info (verbose mode - the default)')
315 self.excludes_csl: str = self._options.get('excludes', '.git/,render/pdf/')
316 self.excludes: tuple[str, ...] = tuple()
317 if self.excludes_csl.strip():
318 self.excludes = parse_csl(self.excludes_csl)
319 excl_sin_plu = f'partial{"" if len(self.excludes) == 1 else "s"}'
320 log.info(f'- will exclude ({", ".join(self.excludes)}) path {excl_sin_plu}')
322 self.fs_root: pathlib.Path = pathlib.Path(doc_root)
323 self.structures_path: pathlib.Path = self.fs_root / HUB_NAME
324 self.has_structures_path = True
325 self.structures = {}
326 self.target_types = {}
328 self.facet_block = {
329 'approvals': '', # resource pointer to fs
330 'bind': '', # resource pointer to fs
331 'changes': '', # resource pointer to fs
332 'layout': '', # resource pointer to fs
333 'meta': '', # resource pointer to fs
334 'render': True,
335 'formats': [],
336 'options': {},
337 }
338 self.expected_facet_keys = list(self.facet_block)
339 self.resource_keys = self.expected_facet_keys[:4]
341 self.state_code = 0
342 self.state_message = ''
344 self.fs_root_is_dir()
346 if not self.state_code:
347 self.explore_structure_path()
349 if not self.state_code:
350 self.bootstrap_target_types()
352 if not self.state_code:
353 self.screen_target_types()
355 if not self.state_code:
356 self.assess_target_types()
358 self.log_assessed_tree()
359 self.validate_on_screening_level()
361 self.dump_guesses()
363 def is_valid(self) -> bool:
364 """Is the model valid?"""
365 return not self.state_code
367 def code_details(self) -> tuple[int, str]:
368 """Return an ordered pair of state code and message"""
369 return self.state_code, self.state_message
371 @no_type_check
372 def container(self, complete=False):
373 """Return either the complete assessed tree or only the target types map."""
374 return {STRUCTURES_KEY: copy.deepcopy(self.target_types)} if complete else copy.deepcopy(self.target_types)
376 def structures_map(self) -> dict[str, dict[str, str]]:
377 """Return the assessed content in the shape of a structures file."""
378 return {STRUCTURES_KEY: {k: f'{v["dir"]}/{v["file"]}' for k, v in self.target_types.items()}}