17.98%

194 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-25 15:36:16 +00:00

1"""Gather the structure and discover the content.""" 

2 

3import json 

4import os 

5import pathlib 

6from typing import Dict, List, Set, Tuple, Union 

7 

8import yaml 

9 

10from liitos import ( 

11 DEFAULT_STRUCTURE_NAME, 

12 KEY_APPROVALS, 

13 KEY_BIND, 

14 KEY_CHANGES, 

15 KEY_LAYOUT, 

16 KEY_META, 

17 KEYS_REQUIRED, 

18 ENCODING, 

19 log, 

20) 

21 

22PathLike = Union[str, pathlib.Path] 

23 

24Approvals = Dict[str, Union[List[str], List[List[str]]]] 

25Assets = Dict[str, Dict[str, Dict[str, str]]] 

26Binder = List[str] 

27Changes = Dict[str, Union[List[str], List[List[str]]]] 

28Layout = Dict[str, str] 

29Meta = Dict[str, str] 

30Structure = Dict[str, List[Dict[str, str]]] 

31Targets = Set[str] 

32Facets = Dict[str, Targets] 

33Payload = Union[Approvals, Binder, Changes, Meta] 

34Verification = Tuple[bool, str] 

35 

36 

37def load_structure(path: PathLike = DEFAULT_STRUCTURE_NAME) -> Structure: 

38 """Load the structure information and content links from the YAML file per convention.""" 

39 with open(path, 'rt', encoding=ENCODING) as handle: 

40 return yaml.safe_load(handle) # type: ignore 

41 

42 

43def targets(structure: Structure) -> Targets: 

44 """Extract the targets from the given structure information item.""" 

45 return set(target for target in structure) 

46 

47 

48def facets(structure: Structure) -> Facets: 

49 """Extract the facets per target from the given structure information item.""" 

50 return {target: set(facet for facet_data in cnt for facet in facet_data) for target, cnt in structure.items()} 

51 

52 

53def assets(structure: Structure) -> Assets: 

54 """Map the assets to facets of targets.""" 

55 return {t: {f: asset for fd in cnt for f, asset in fd.items()} for t, cnt in structure.items()} # type: ignore 

56 

57 

58def verify_target(name: str, target_set: Targets) -> Verification: 

59 """Verify presence of target yielding predicate and message (in case of failure).""" 

60 return (True, '') if name in target_set else (False, f'target ({name}) not in {sorted(target_set)}') 

61 

62 

63def verify_facet(name: str, target: str, facet_map: Facets) -> Verification: 

64 """Verify presence of facet for target yielding predicate and message (in case of failure).""" 

65 if name in facet_map[target]: 

66 return True, '' 

67 return False, f'facet ({name}) of target ({target}) not in {sorted(facet_map[target])}' 

68 

69 

70def error_context( 

71 payload: Payload, 

72 label: str, 

73 facet: str, 

74 target: str, 

75 path: PathLike, 

76 err: Union[FileNotFoundError, KeyError, ValueError], 

77) -> Tuple[Payload, str]: 

78 """Provide harmonized context for the error situation as per parameters.""" 

79 if isinstance(err, FileNotFoundError): 

80 return payload, f'{label} link not found at ({path}) or invalid for facet ({facet}) of target ({target})' 

81 if isinstance(err, KeyError): 

82 return [], f'{label} not found in assets for facet ({facet}) of target ({target})' 

83 if isinstance(err, ValueError): 

84 return [], f'{label} requires json or yaml format in assets for facet ({facet}) of target ({target})' 

85 raise NotImplementedError(f'error context not implemented for error ({err})') 

86 

87 

88def load_binder(facet: str, target: str, path: PathLike) -> Tuple[Binder, str]: 

89 """Yield the binder for facet of target from path and message (in case of failure).""" 

90 try: 

91 with open(path, 'rt', encoding=ENCODING) as handle: 

92 return [line.strip() for line in handle.readlines() if line.strip()], '' 

93 except FileNotFoundError as err: 

94 return error_context([], 'Binder', facet, target, path, err) # type: ignore 

95 

96 

97def binder(facet: str, target: str, asset_struct: Assets) -> Tuple[Binder, str]: 

98 """Yield the binder for facet of target from link in assets and message (in case of failure).""" 

99 try: 

100 path = pathlib.Path(asset_struct[target][facet][KEY_BIND]) 

101 except KeyError as err: 

102 return error_context([], 'Binder', facet, target, '', err) # type: ignore 

103 return load_binder(facet, target, path) 

104 

105 

106def load_layout(facet: str, target: str, path: PathLike) -> Tuple[Meta, str]: 

107 """Yield the layout for facet of target from path and message (in case of failure).""" 

108 try: 

109 with open(path, 'rt', encoding=ENCODING) as handle: 

110 return yaml.safe_load(handle), '' 

111 except FileNotFoundError as err: 

112 return error_context({}, 'Metadata', facet, target, path, err) # type: ignore 

113 

114 

115def layout(facet: str, target: str, asset_struct: Assets) -> Tuple[Meta, str]: 

116 """Yield the layout for facet of target from link in assets and message (in case of failure).""" 

117 try: 

118 path = pathlib.Path(asset_struct[target][facet][KEY_LAYOUT]) 

119 except KeyError as err: 

120 return error_context({}, 'Layout', facet, target, '', err) # type: ignore 

121 return load_layout(facet, target, path) 

122 

123 

124def load_meta(facet: str, target: str, path: PathLike) -> Tuple[Meta, str]: 

125 """Yield the metadata for facet of target from path and message (in case of failure).""" 

126 try: 

127 with open(path, 'rt', encoding=ENCODING) as handle: 

128 return yaml.safe_load(handle), '' 

129 except FileNotFoundError as err: 

130 return error_context({}, 'Metadata', facet, target, path, err) # type: ignore 

131 

132 

133def meta(facet: str, target: str, asset_struct: Assets) -> Tuple[Meta, str]: 

134 """Yield the metadata for facet of target from link in assets and message (in case of failure).""" 

135 try: 

136 path = pathlib.Path(asset_struct[target][facet][KEY_META]) 

137 except KeyError as err: 

138 return error_context({}, 'Metadata', facet, target, '', err) # type: ignore 

139 return load_meta(facet, target, path) 

140 

141 

142def load_approvals(facet: str, target: str, path: PathLike) -> Tuple[Approvals, str]: 

143 """Yield the approvals for facet of target from path and message (in case of failure).""" 

144 if str(path).lower().endswith('json'): 

145 try: 

146 with open(path, 'rt', encoding=ENCODING) as handle: 

147 return json.load(handle), '' 

148 except FileNotFoundError as err: 

149 return error_context({}, 'Approvals', facet, target, path, err) # type: ignore 

150 elif str(path).lower().endswith(('yaml', 'yml')): 

151 try: 

152 with open(path, 'rt', encoding=ENCODING) as handle: 

153 return yaml.safe_load(handle), '' 

154 except FileNotFoundError as err: 

155 return error_context({}, 'Approvals', facet, target, path, err) # type: ignore 

156 

157 return error_context({}, 'Approvals', facet, target, path, ValueError('json or yaml required')) # type: ignore 

158 

159 

160def approvals(facet: str, target: str, asset_struct: Assets) -> Tuple[Approvals, str]: 

161 """Yield the approvals for facet of target from link in assets and message (in case of failure).""" 

162 try: 

163 path = pathlib.Path(asset_struct[target][facet][KEY_APPROVALS]) 

164 except KeyError as err: 

165 return error_context({}, 'Approvals', facet, target, '', err) # type: ignore 

166 return load_approvals(facet, target, path) 

167 

168 

169def load_changes(facet: str, target: str, path: PathLike) -> Tuple[Approvals, str]: 

170 """Yield the changes for facet of target from path and message (in case of failure).""" 

171 if str(path).lower().endswith('json'): 

172 try: 

173 with open(path, 'rt', encoding=ENCODING) as handle: 

174 return json.load(handle), '' 

175 except FileNotFoundError as err: 

176 return error_context({}, 'Changes', facet, target, path, err) # type: ignore 

177 elif str(path).lower().endswith(('yaml', 'yml')): 

178 try: 

179 with open(path, 'rt', encoding=ENCODING) as handle: 

180 return yaml.safe_load(handle), '' 

181 except FileNotFoundError as err: 

182 return error_context({}, 'Changes', facet, target, path, err) # type: ignore 

183 

184 return error_context({}, 'Changes', facet, target, path, ValueError('json or yaml required')) # type: ignore 

185 

186 

187def changes(facet: str, target: str, asset_struct: Assets) -> Tuple[Changes, str]: 

188 """Yield the changes for facet of target from link in assets and message (in case of failure).""" 

189 try: 

190 path = pathlib.Path(asset_struct[target][facet][KEY_CHANGES]) 

191 except KeyError as err: 

192 return error_context({}, 'Changes', facet, target, '', err) # type: ignore 

193 return load_changes(facet, target, path) 

194 

195 

196def verify_asset_keys(facet: str, target: str, asset_struct: Assets) -> Verification: 

197 """Verify presence of required keys for facet of target yielding predicate and message (in case of failure).""" 

198 if all(key in asset_struct[target][facet] for key in KEYS_REQUIRED): 

199 return True, '' 

200 return False, f'keys in {sorted(KEYS_REQUIRED)} for facet ({facet}) of target ({target}) are missing' 

201 

202 

203def verify_asset_links(facet: str, target: str, asset_struct: Assets) -> Verification: 

204 """Verify presence of asset links for facet of target yielding predicate and message (in case of failure).""" 

205 predicate, message = verify_asset_keys(facet, target, asset_struct) 

206 if not predicate: 

207 return predicate, message 

208 for key in KEYS_REQUIRED: 

209 link = pathlib.Path(asset_struct[target][facet][key]) 

210 log.debug(f' + verifying: {pathlib.Path.cwd() / link}') 

211 if not link.is_file() or not link.stat().st_size: 

212 return False, f'{key} asset link ({link}) for facet ({facet}) of target ({target}) is invalid' 

213 return True, '' 

214 

215 

216ASSET_KEY_ACTION = { 

217 KEY_APPROVALS: approvals, 

218 KEY_BIND: binder, 

219 KEY_CHANGES: changes, 

220 KEY_LAYOUT: layout, 

221 KEY_META: meta, 

222} 

223 

224 

225def verify_assets(facet: str, target: str, asset_struct: Assets) -> Verification: 

226 """Verify assets for facet of target yielding predicate and message (in case of failure).""" 

227 predicate, message = verify_asset_links(facet, target, asset_struct) 

228 if not predicate: 

229 return predicate, message 

230 for key, action in ASSET_KEY_ACTION.items(): 

231 asset, message = action(facet, target, asset_struct) 

232 if not asset: 

233 return False, f'{key} asset for facet ({facet}) of target ({target}) is invalid' 

234 return True, '' 

235 

236 

237def prelude( 

238 doc_root: Union[str, pathlib.Path], structure_name: str, target_key: str, facet_key: str, command: str 

239) -> tuple[Structure, Assets]: 

240 """DRY.""" 

241 doc_root = pathlib.Path(doc_root) 

242 idem = os.getcwd() 

243 os.chdir(doc_root) 

244 job_description = ( 

245 f'facet ({facet_key}) of target ({target_key}) with structure map ({structure_name})' 

246 f' in document root ({doc_root}) coming from ({idem})' 

247 ) 

248 log.info(f'executing prelude of command ({command}) for {job_description}') 

249 structure = load_structure(structure_name) 

250 asset_map = assets(structure) 

251 return structure, asset_map 

252 

253 

254def verify( 

255 doc_root: Union[str, pathlib.Path], 

256 structure_name: str, 

257 target_key: str, 

258 facet_key: str, 

259 options: dict[str, Union[bool, str]], 

260) -> int: 

261 """Drive the verification.""" 

262 doc_root = pathlib.Path(doc_root) 

263 os.chdir(doc_root) 

264 facet = facet_key 

265 target = target_key 

266 structure_name = structure_name 

267 job_description = ( 

268 f'facet ({facet}) for target ({target}) with structure map ({structure_name})' f' in document root ({doc_root})' 

269 ) 

270 log.info(f'starting verification of {job_description}') 

271 structure = load_structure(structure_name) 

272 target_set = targets(structure) 

273 facet_map = facets(structure) 

274 asset_map = assets(structure) 

275 

276 predicate, message = verify_target(target, target_set) 

277 if not predicate: 

278 log.error(f'failed verification with: {message}') 

279 return 1 

280 log.info(f'- target ({target}) OK') 

281 

282 predicate, message = verify_facet(facet, target, facet_map) 

283 if not predicate: 

284 log.error(f'failed verification with: {message}') 

285 return 1 

286 log.info(f'- facet ({facet}) of target ({target}) OK') 

287 

288 predicate, message = verify_assets(facet, target, asset_map) 

289 if not predicate: 

290 log.error(f'failed verification with: {message}') 

291 return 1 

292 log.info(f'- assets ({", ".join(sorted(KEYS_REQUIRED))}) for facet ({facet}) of target ({target}) OK') 

293 

294 signatures_path = asset_map[target][facet][KEY_APPROVALS] 

295 log.info(f'loading signatures from {signatures_path=}') 

296 signatures = load_approvals(facet, target, signatures_path) 

297 log.info(f'{signatures=}') 

298 history_path = asset_map[target][facet][KEY_CHANGES] 

299 log.info(f'loading history from {history_path=}') 

300 history = load_changes(facet, target, history_path) 

301 log.info(f'{history=}') 

302 

303 layout_path = asset_map[target][facet][KEY_LAYOUT] 

304 log.info(f'loading layout from {layout_path=}') 

305 design = load_layout(facet, target, layout_path) 

306 log.info(f'{design=}') 

307 

308 metadata_path = asset_map[target][facet][KEY_META] 

309 log.info(f'loading metadata from {metadata_path=}') 

310 info = load_meta(facet, target, metadata_path) 

311 

312 log.info(f'{info=}') 

313 log.info('successful verification') 

314 return 0