Coverage for csaf/csaf.py: 13.42%

337 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-18 20:12:48 +00:00

1"""CSAF Document model. 

2 

3Minimal length of CSAF (spam) JSON is 116 bytes: 

40 1 2 3 4 5 6 7 8 9 

512345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012 

6{"document":{"category":" ","csaf_version":"2.0","publisher":{},"title":" ","tracking":{}}}} 

7""" 

8 

9from __future__ import annotations 

10 

11import pathlib 

12from itertools import chain 

13from typing import Annotated, Dict, Iterator, List, Mapping, Optional, Tuple, no_type_check 

14 

15import jmespath 

16import msgspec 

17from langcodes import tag_is_valid 

18from lazr.uri import URI, InvalidURIError # type: ignore 

19from pydantic import BaseModel, Field, field_validator 

20 

21import csaf 

22from csaf import log 

23from csaf.document import Document 

24from csaf.mandatory.rules import ( 

25 is_valid, 

26 is_valid_category, 

27 is_valid_defined_group_ids, 

28 is_valid_defined_product_ids, 

29 is_valid_translator, 

30 is_valid_unique_group_ids, 

31 is_valid_unique_product_ids, 

32) 

33from csaf.product import ProductTree 

34from csaf.vulnerability import Vulnerability 

35 

36ENCODING_ERRORS_POLICY = 'ignore' 

37CSAF_MIN_BYTES = 92 

38CSAF_WARN_MAX_BYTES = 15 << 20 

39CSAF_VERSION_STRING = '2.0' 

40 

41 

42class CSAF(BaseModel): 

43 """ 

44 Representation of security advisory information as a JSON document. 

45 """ 

46 

47 document: Annotated[ 

48 Document, 

49 Field( 

50 description='Captures the meta-data about this document describing a particular set of' 

51 ' security advisories.', 

52 title='Document level meta-data', 

53 ), 

54 ] 

55 product_tree: Annotated[ 

56 Optional[ProductTree], 

57 Field( 

58 description='Is a container for all fully qualified product names that can be referenced elsewhere' 

59 ' in the document.', 

60 title='Product tree', 

61 ), 

62 ] = None 

63 vulnerabilities: Annotated[ 

64 Optional[List[Vulnerability]], 

65 Field( 

66 description='Represents a list of all relevant vulnerability information items.', 

67 min_length=1, 

68 title='Vulnerabilities', 

69 ), 

70 ] = None 

71 

72 @no_type_check 

73 def model_dump_json(self, *args, **kwargs): 

74 kwargs.setdefault('by_alias', True) 

75 return super().model_dump_json(*args, **kwargs) 

76 

77 @classmethod 

78 @no_type_check 

79 @field_validator('vulnerabilities') 

80 def check_len(cls, v): 

81 if not v: 

82 raise ValueError('vulnerabilities present but empty') 

83 return v 

84 

85 

86@no_type_check 

87def document_optional_acknowledgments(values): 

88 """Verify optional properties of document/acknowledgments if present follow rules.""" 

89 parent, prop = 'document', 'acknowledgments' 

90 if not isinstance(values, list): 

91 return 1, f'optional {parent} property {prop} present but no array' 

92 if not values: 

93 return 1, f'optional {parent} property {prop} present but empty' 

94 ack_opt_props = ('names', 'organization', 'summary', 'urls') 

95 min_props, max_props = 1, len(ack_opt_props) 

96 ack_known_props = {el for el in ack_opt_props} 

97 for pos, value in enumerate(values): 

98 jp = f'properties of {parent}.{prop}[{pos}]' 

99 # log.info(pos, value) 

100 ack_found_props = {el for el in value} 

101 # log.info(ack_found_props) 

102 if ack_found_props <= ack_known_props: 

103 log.info(f'set of {jp} only contains known properties') 

104 if ack_found_props < ack_known_props: 

105 log.info(f'set of {jp} is a proper subset of the known properties') 

106 nr_distinct_found_props = len(ack_found_props) 

107 if nr_distinct_found_props < min_props: 

108 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}' 

109 if max_props < nr_distinct_found_props: 

110 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}' 

111 

112 for what in ('names', 'urls'): 

113 if what not in ack_found_props: 

114 continue 

115 seq = value[what] 

116 if not isinstance(seq, list): 

117 return 1, f'optional {jp} property {what} present but no array' 

118 if not len(seq): 

119 return 1, f'optional {jp} property {what} present but empty' 

120 for ndx, text in enumerate(seq): 

121 jpn = f'{jp}[{ndx}]' 

122 if not isinstance(text, str): 

123 return 1, f'optional {jpn} property {what} entry present but no text' 

124 if not len(text): 

125 return 1, f'optional {jpn} property {what} entry present but empty' 

126 if what == 'urls': 

127 try: 

128 _ = URI(text) 

129 except InvalidURIError as err: 

130 return 1, f'optional {jpn} property {what} entry present but invalid as URI({err})' 

131 

132 for what in ('organization', 'summary'): 

133 if what not in ack_found_props: 

134 continue 

135 text = value[what] 

136 if not isinstance(text, str): 

137 return 1, f'optional {jp} property {what} present but no text' 

138 if not len(text): 

139 return 1, f'optional {jp} property {what} present but empty' 

140 return 0, '' 

141 

142 

143@no_type_check 

144def document_aggregate_severity(value): 

145 """Verify properties of document/aggregate_severity present follow rules.""" 

146 parent, prop = 'document', 'aggregate_severity' 

147 jp = f'{parent}.{prop}' 

148 if not isinstance(value, dict): 

149 return 1, f'optional property {jp} present but no object' 

150 if not value: 

151 return 1, f'optional property {jp} present but empty' 

152 agg_norm_props = ('text',) 

153 agg_opt_props = ('namespace',) 

154 agg_known_props = {el for el in chain(agg_norm_props, agg_opt_props)} 

155 min_props, max_props = 1, len(agg_known_props) 

156 agg_found_props = {el for el in value} 

157 if agg_found_props <= agg_known_props: 

158 log.info(f'set of {jp} properties only contains known properties') 

159 if agg_found_props < agg_known_props: 

160 log.info(f'set of {jp} properties is a proper subset of the known properties') 

161 nr_distinct_found_props = len(agg_found_props) 

162 if nr_distinct_found_props < min_props: 

163 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}' 

164 if max_props < nr_distinct_found_props: 

165 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}' 

166 

167 sub = 'text' 

168 jps = f'property {parent}.{prop}.{sub}' 

169 entry = value.get(sub) 

170 if entry is None: 

171 return 1, f'mandatory {jps} not present' 

172 if not isinstance(entry, str): 

173 return 1, f'mandatory {jps} present but no text' 

174 if not entry: 

175 return 1, f'mandatory {jps} present but empty' 

176 

177 sub = 'namespace' 

178 jps = f'optional property {parent}.{prop}.{sub}' 

179 entry = value.get(sub) 

180 if entry is None: 

181 return 0, '' 

182 if not isinstance(entry, str): 

183 return 1, f'{jps} present but no text' 

184 if not entry: 

185 return 1, f'mandatory {jps} present but empty' 

186 try: 

187 _ = URI(entry) 

188 except InvalidURIError as err: 

189 return 1, f'{jps} present but invalid as URI({err})' 

190 

191 return 0, '' 

192 

193 

194@no_type_check 

195def document_category(value): 

196 """Verify value of document/category follow rules.""" 

197 parent, prop = 'document', 'category' 

198 jp = f'property {parent}.{prop}' 

199 if not isinstance(value, str): 

200 return 1, f'{jp} present but no text' 

201 if not value: 

202 return 1, f'{jp} present but empty' 

203 

204 return 0, '' 

205 

206 

207@no_type_check 

208def document_csaf_version(value): 

209 """Verify value of document/csaf_version follow rules.""" 

210 parent, prop = 'document', 'csaf_version' 

211 jp = f'property {parent}.{prop}' 

212 if not isinstance(value, str): 

213 return 1, f'{jp} present but no text' 

214 if not value: 

215 return 1, f'{jp} present but empty' 

216 if value != CSAF_VERSION_STRING: 

217 return 1, f'{jp} present but ({value}) not matching CSAF version 2.0' 

218 

219 return 0, '' 

220 

221 

222@no_type_check 

223def document_lang(value): 

224 """Verify value of document/lang follow rules.""" 

225 parent, prop = 'document', 'lang' 

226 jp = f'property {parent}.{prop}' 

227 if not isinstance(value, str): 

228 return 1, f'{jp} present but no text' 

229 if not value: 

230 return 1, f'{jp} present but empty' 

231 if not tag_is_valid(value): 

232 return 1, f'{jp} present but ({value}) is no valid language tag' 

233 

234 return 0, '' 

235 

236 

237@no_type_check 

238def document_optional(document): 

239 """Verify optional properties of document if present follow rules.""" 

240 norm_props = ('category', 'csaf_version', 'publisher', 'title', 'tracking') 

241 opt_props = ('acknowledgments', 'aggregate_severity', 'distribution', 'lang', 'notes', 'references', 'source_lang') 

242 known_props = {el for el in chain(norm_props, opt_props)} 

243 opt_map = {el: None for el in opt_props} 

244 parent = 'document' 

245 for prop in opt_props: 

246 value = jmespath.search(f'{prop}', document) 

247 if value is not None: 

248 opt_map[prop] = value 

249 

250 prop = 'acknowledgments' 

251 if opt_map[prop] is not None: 

252 error, message = document_optional_acknowledgments(opt_map[prop]) 

253 if error: 

254 return error, message 

255 

256 prop = 'aggregate_severity' 

257 if opt_map[prop] is not None: 

258 error, message = document_aggregate_severity(opt_map[prop]) 

259 if error: 

260 return error, message 

261 

262 found_props = {el for el in document} 

263 if found_props <= known_props: 

264 log.info(f'set of {parent} properties only contains known properties') 

265 if found_props < known_props: 

266 log.info(f'set of {parent} properties is a proper subset of the known properties') 

267 

268 return 0, 'NotImplemented' 

269 

270 

271@no_type_check 

272def verify_document(document): 

273 """Root of /document member verifier""" 

274 parent = 'document' 

275 for prop in ('category', 'csaf_version', 'publisher', 'title', 'tracking'): 

276 if not jmespath.search(f'{prop}', document): 

277 return 1, f'missing {parent} property ({prop})' 

278 

279 parent = 'document' 

280 prop = 'category' 

281 if not jmespath.search(f'{prop}', document).strip(): 

282 log.warning(f'warning - {parent} property {prop} value is space-only') 

283 error, message = document_category(document[prop]) 

284 if error: 

285 return error, message 

286 

287 prop = 'csaf_version' 

288 csaf_version = jmespath.search(f'{prop}', document) 

289 error, message = document_csaf_version(csaf_version) 

290 if error: 

291 return error, message 

292 

293 prop = 'lang' 

294 lang = jmespath.search(f'{prop}', document) 

295 if lang is not None: 

296 error, message = document_lang(lang) 

297 if error: 

298 return error, message 

299 

300 # Publisher (publisher) is object requires ('category', 'name', 'namespace') 

301 parent = 'document.publisher' 

302 for prop in ('category', 'name', 'namespace'): 

303 if not jmespath.search(f'publisher.{prop}', document): 

304 return 1, f'missing {parent} property ({prop})' 

305 

306 parent = 'document' 

307 prop = 'title' 

308 if not jmespath.search(f'{prop}', document).strip(): 

309 log.warning(f'warning - {parent} property {prop} value is space-only') 

310 

311 # Tracking (tracking) is object requires: 

312 # ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version') 

313 parent = 'document' 

314 prop = 'tracking' 

315 for sub in ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version'): 

316 if jmespath.search(f'{prop}.{sub}', document) is None: 

317 return 1, f'missing {parent}.{prop} property ({sub})' 

318 

319 return document_optional(document) 

320 

321 

322@no_type_check 

323def level_zero(csaf_doc): 

324 """Most superficial verification.""" 

325 if not csaf_doc.get('document'): 

326 return 1, 'missing document property' 

327 

328 error, message = verify_document(csaf_doc['document']) 

329 if error: 

330 return error, message 

331 

332 return 0, '' 

333 

334 

335def reader(path: str) -> Iterator[str]: 

336 """Context wrapper / generator to read the lines.""" 

337 with open(pathlib.Path(path), 'rt', encoding=csaf.ENCODING) as handle: 

338 for line in handle: 

339 yield line 

340 

341 

342def peek(data: str) -> str: 

343 """Determine trivial format of data.""" 

344 if len(data) < CSAF_MIN_BYTES: 

345 return 'TOO_SHORT' 

346 

347 sample = data[:CSAF_MIN_BYTES].strip() 

348 if sample.startswith('{'): 

349 warn_size = '_MAYBE_TOO_LARGE' if len(data) > CSAF_WARN_MAX_BYTES else '' 

350 return f'JSON{warn_size}' 

351 if sample.startswith('<'): 

352 return 'XML' 

353 return 'UNKNOWN' 

354 

355 

356def verify_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]: 

357 """Fail with grace.""" 

358 if not argv or len(argv) != 3: 

359 return 2, 'received wrong number of arguments', [''] 

360 

361 command, inp, config = argv 

362 

363 if command not in ('verify',): 

364 return 2, 'received unknown command', [''] 

365 

366 if inp: 

367 if not pathlib.Path(str(inp)).is_file(): 

368 return 1, 'source is no file', [''] 

369 

370 if not config: 

371 return 2, 'configuration missing', [''] 

372 

373 config_path = pathlib.Path(str(config)) 

374 if not config_path.is_file(): 

375 return 1, f'config ({config_path}) is no file', [''] 

376 if not ''.join(config_path.suffixes).lower().endswith('.json'): 

377 return 1, 'config has no .json extension', [''] 

378 

379 return 0, '', argv 

380 

381 

382def verify_json(data: str) -> Tuple[int, str, List[str], Dict[str, object]]: 

383 """Verify the JSON as CSAF.""" 

384 try: 

385 doc = msgspec.json.decode(data) 

386 except msgspec.DecodeError: 

387 return 1, 'advisory is no valid JSON', [], {} 

388 

389 error, message = level_zero(doc) 

390 if error: 

391 return error, message, [], {} 

392 return 0, 'OK', [], doc 

393 

394 

395def is_valid_(path: str, options: Mapping[str, bool]) -> bool: 

396 """Public API.""" 

397 code, message = process('validate', 'commit', path, options) 

398 if message: 

399 log.error(message) 

400 return bool(code) 

401 

402 

403@no_type_check 

404def walk_tree_explicit(base_path): 

405 """Visit the files in the folders below base path.""" 

406 if base_path.is_file(): 

407 yield base_path 

408 else: 

409 for entry in base_path.iterdir(): 

410 if entry.is_dir(): 

411 for file_path in entry.iterdir(): 

412 yield file_path 

413 else: 

414 yield entry 

415 

416 

417@no_type_check 

418def visit(tree_or_file_path): 

419 """Visit tree and yield the leaves.""" 

420 thing = pathlib.Path(tree_or_file_path) 

421 if thing.is_file(): 

422 yield thing 

423 else: 

424 for path in thing.rglob('*'): 

425 yield path 

426 

427 

428@no_type_check 

429def slugify(error): 

430 """Replace newlines by space.""" 

431 return str(error).replace('\n', '') 

432 

433 

434def process(command: str, transaction_mode: str, path: str, options: Mapping[str, object]) -> Tuple[int, str]: 

435 """Drive the verification and validation. 

436 This function acts as the command line interface backend. 

437 There is some duplication to support testability. 

438 """ 

439 # bail_out = options.get('bail_out', False) 

440 if command != 'validate': 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 log.error('Usage: csaf validate ...') 

442 return 2, 'USAGE' 

443 if not path.strip(): 

444 log.error('Usage: csaf validate path-to-file') 

445 return 2, 'USAGE' 

446 

447 if transaction_mode == 'dry-run': 447 ↛ 450line 447 didn't jump to line 450 because the condition on line 447 was always true

448 log.info('Operating in dry run mode (no changes persisted).') 

449 

450 data = ''.join(line for line in reader(path)) 

451 

452 guess = peek(data) 

453 

454 if guess == 'TOO_SHORT': 

455 return 1, 'advisory is too short to be valid' 

456 

457 if guess == 'UNKNOWN': 

458 return 1, 'advisory is of unknown format' 

459 

460 if guess.startswith('JSON'): 

461 if guess.endswith('_MAYBE_TOO_LARGE'): 

462 log.warning('File of %d bytes may be above known file size limits' % len(data)) 

463 error, message, strings, doc = verify_json(data) 

464 if error: 

465 log.error(message) 

466 return error, message 

467 # Later post process the business rules (spec tests) here 

468 # Like that: 

469 if is_valid(doc) is False: # For now, we return NotImplemented, sorry 

470 messages = [] 

471 log.error('advisory fails mandatory rules:') 

472 # Why not execute the rules multiple times (until we have traits in place to report the failing rule)? 

473 if not is_valid_category(doc): 

474 messages.append('invalid category') 

475 if not is_valid_defined_group_ids(doc): 

476 messages.append('undefined group ids') 

477 if not is_valid_defined_product_ids(doc): 

478 messages.append('undefined product ids') 

479 if not is_valid_translator(doc): 

480 messages.append('invalid translator') 

481 if not is_valid_unique_group_ids(doc): 

482 messages.append('non-unique group ids') 

483 if not is_valid_unique_product_ids(doc): 

484 messages.append('non-unique product ids') 

485 return 1, ', '.join(messages) 

486 return 0, '' 

487 

488 return 1, 'XML IS OUT OF SCOPE'