Coverage for csaf/csaf.py: 18.58%
337 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 16:28:45 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 16:28:45 +00:00
1"""CSAF Document model.
3Minimal length of CSAF (spam) JSON is 116 bytes:
40 1 2 3 4 5 6 7 8 9
512345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012
6{"document":{"category":" ","csaf_version":"2.0","publisher":{},"title":" ","tracking":{}}}}
7"""
9from __future__ import annotations
11import pathlib
12from itertools import chain
13from typing import Annotated, Dict, Iterator, List, Mapping, Optional, Tuple, no_type_check
15import jmespath
16import msgspec
17from langcodes import tag_is_valid
18from lazr.uri import URI, InvalidURIError # type: ignore
19from pydantic import BaseModel, Field, field_validator
21import csaf
22from csaf import log
23from csaf.document import Document
24from csaf.mandatory.rules import (
25 is_valid,
26 is_valid_category,
27 is_valid_defined_group_ids,
28 is_valid_defined_product_ids,
29 is_valid_translator,
30 is_valid_unique_group_ids,
31 is_valid_unique_product_ids,
32)
33from csaf.product import ProductTree
34from csaf.vulnerability import Vulnerability
36ENCODING_ERRORS_POLICY = 'ignore'
37CSAF_MIN_BYTES = 92
38CSAF_WARN_MAX_BYTES = 15 << 20
39CSAF_VERSION_STRING = '2.0'
42class CSAF(BaseModel):
43 """
44 Representation of security advisory information as a JSON document.
45 """
47 document: Annotated[
48 Document,
49 Field(
50 description='Captures the meta-data about this document describing a particular set of'
51 ' security advisories.',
52 title='Document level meta-data',
53 ),
54 ]
55 product_tree: Annotated[
56 Optional[ProductTree],
57 Field(
58 description='Is a container for all fully qualified product names that can be referenced elsewhere'
59 ' in the document.',
60 title='Product tree',
61 ),
62 ] = None
63 vulnerabilities: Annotated[
64 Optional[List[Vulnerability]],
65 Field(
66 description='Represents a list of all relevant vulnerability information items.',
67 min_length=1,
68 title='Vulnerabilities',
69 ),
70 ] = None
72 @no_type_check
73 def model_dump_json(self, *args, **kwargs):
74 kwargs.setdefault('by_alias', True)
75 return super().model_dump_json(*args, **kwargs)
77 @classmethod
78 @no_type_check
79 @field_validator('vulnerabilities')
80 def check_len(cls, v):
81 if not v:
82 raise ValueError('vulnerabilities present but empty')
83 return v
86@no_type_check
87def document_optional_acknowledgments(values):
88 """Verify optional properties of document/acknowledgments if present follow rules."""
89 parent, prop = 'document', 'acknowledgments'
90 if not isinstance(values, list):
91 return 1, f'optional {parent} property {prop} present but no array'
92 if not values:
93 return 1, f'optional {parent} property {prop} present but empty'
94 ack_opt_props = ('names', 'organization', 'summary', 'urls')
95 min_props, max_props = 1, len(ack_opt_props)
96 ack_known_props = {el for el in ack_opt_props}
97 for pos, value in enumerate(values):
98 jp = f'properties of {parent}.{prop}[{pos}]'
99 # log.info(pos, value)
100 ack_found_props = {el for el in value}
101 # log.info(ack_found_props)
102 if ack_found_props <= ack_known_props:
103 log.info(f'set of {jp} only contains known properties')
104 if ack_found_props < ack_known_props:
105 log.info(f'set of {jp} is a proper subset of the known properties')
106 nr_distinct_found_props = len(ack_found_props)
107 if nr_distinct_found_props < min_props:
108 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}'
109 if max_props < nr_distinct_found_props:
110 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}'
112 for what in ('names', 'urls'):
113 if what not in ack_found_props:
114 continue
115 seq = value[what]
116 if not isinstance(seq, list):
117 return 1, f'optional {jp} property {what} present but no array'
118 if not len(seq):
119 return 1, f'optional {jp} property {what} present but empty'
120 for ndx, text in enumerate(seq):
121 jpn = f'{jp}[{ndx}]'
122 if not isinstance(text, str):
123 return 1, f'optional {jpn} property {what} entry present but no text'
124 if not len(text):
125 return 1, f'optional {jpn} property {what} entry present but empty'
126 if what == 'urls':
127 try:
128 _ = URI(text)
129 except InvalidURIError as err:
130 return 1, f'optional {jpn} property {what} entry present but invalid as URI({err})'
132 for what in ('organization', 'summary'):
133 if what not in ack_found_props:
134 continue
135 text = value[what]
136 if not isinstance(text, str):
137 return 1, f'optional {jp} property {what} present but no text'
138 if not len(text):
139 return 1, f'optional {jp} property {what} present but empty'
140 return 0, ''
143@no_type_check
144def document_aggregate_severity(value):
145 """Verify properties of document/aggregate_severity present follow rules."""
146 parent, prop = 'document', 'aggregate_severity'
147 jp = f'{parent}.{prop}'
148 if not isinstance(value, dict):
149 return 1, f'optional property {jp} present but no object'
150 if not value:
151 return 1, f'optional property {jp} present but empty'
152 agg_norm_props = ('text',)
153 agg_opt_props = ('namespace',)
154 agg_known_props = {el for el in chain(agg_norm_props, agg_opt_props)}
155 min_props, max_props = 1, len(agg_known_props)
156 agg_found_props = {el for el in value}
157 if agg_found_props <= agg_known_props:
158 log.info(f'set of {jp} properties only contains known properties')
159 if agg_found_props < agg_known_props:
160 log.info(f'set of {jp} properties is a proper subset of the known properties')
161 nr_distinct_found_props = len(agg_found_props)
162 if nr_distinct_found_props < min_props:
163 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}'
164 if max_props < nr_distinct_found_props:
165 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}'
167 sub = 'text'
168 jps = f'property {parent}.{prop}.{sub}'
169 entry = value.get(sub)
170 if entry is None:
171 return 1, f'mandatory {jps} not present'
172 if not isinstance(entry, str):
173 return 1, f'mandatory {jps} present but no text'
174 if not entry:
175 return 1, f'mandatory {jps} present but empty'
177 sub = 'namespace'
178 jps = f'optional property {parent}.{prop}.{sub}'
179 entry = value.get(sub)
180 if entry is None:
181 return 0, ''
182 if not isinstance(entry, str):
183 return 1, f'{jps} present but no text'
184 if not entry:
185 return 1, f'mandatory {jps} present but empty'
186 try:
187 _ = URI(entry)
188 except InvalidURIError as err:
189 return 1, f'{jps} present but invalid as URI({err})'
191 return 0, ''
194@no_type_check
195def document_category(value):
196 """Verify value of document/category follow rules."""
197 parent, prop = 'document', 'category'
198 jp = f'property {parent}.{prop}'
199 if not isinstance(value, str):
200 return 1, f'{jp} present but no text'
201 if not value:
202 return 1, f'{jp} present but empty'
204 return 0, ''
207@no_type_check
208def document_csaf_version(value):
209 """Verify value of document/csaf_version follow rules."""
210 parent, prop = 'document', 'csaf_version'
211 jp = f'property {parent}.{prop}'
212 if not isinstance(value, str):
213 return 1, f'{jp} present but no text'
214 if not value:
215 return 1, f'{jp} present but empty'
216 if value != CSAF_VERSION_STRING:
217 return 1, f'{jp} present but ({value}) not matching CSAF version 2.0'
219 return 0, ''
222@no_type_check
223def document_lang(value):
224 """Verify value of document/lang follow rules."""
225 parent, prop = 'document', 'lang'
226 jp = f'property {parent}.{prop}'
227 if not isinstance(value, str):
228 return 1, f'{jp} present but no text'
229 if not value:
230 return 1, f'{jp} present but empty'
231 if not tag_is_valid(value):
232 return 1, f'{jp} present but ({value}) is no valid language tag'
234 return 0, ''
237@no_type_check
238def document_optional(document):
239 """Verify optional properties of document if present follow rules."""
240 norm_props = ('category', 'csaf_version', 'publisher', 'title', 'tracking')
241 opt_props = ('acknowledgments', 'aggregate_severity', 'distribution', 'lang', 'notes', 'references', 'source_lang')
242 known_props = {el for el in chain(norm_props, opt_props)}
243 opt_map = {el: None for el in opt_props}
244 parent = 'document'
245 for prop in opt_props:
246 value = jmespath.search(f'{prop}', document)
247 if value is not None:
248 opt_map[prop] = value
250 prop = 'acknowledgments'
251 if opt_map[prop] is not None:
252 error, message = document_optional_acknowledgments(opt_map[prop])
253 if error:
254 return error, message
256 prop = 'aggregate_severity'
257 if opt_map[prop] is not None:
258 error, message = document_aggregate_severity(opt_map[prop])
259 if error:
260 return error, message
262 found_props = {el for el in document}
263 if found_props <= known_props:
264 log.info(f'set of {parent} properties only contains known properties')
265 if found_props < known_props:
266 log.info(f'set of {parent} properties is a proper subset of the known properties')
268 return 0, 'NotImplemented'
271@no_type_check
272def verify_document(document):
273 """Root of /document member verifier"""
274 parent = 'document'
275 for prop in ('category', 'csaf_version', 'publisher', 'title', 'tracking'):
276 if not jmespath.search(f'{prop}', document):
277 return 1, f'missing {parent} property ({prop})'
279 parent = 'document'
280 prop = 'category'
281 if not jmespath.search(f'{prop}', document).strip():
282 log.warning(f'warning - {parent} property {prop} value is space-only')
283 error, message = document_category(document[prop])
284 if error:
285 return error, message
287 prop = 'csaf_version'
288 csaf_version = jmespath.search(f'{prop}', document)
289 error, message = document_csaf_version(csaf_version)
290 if error:
291 return error, message
293 prop = 'lang'
294 lang = jmespath.search(f'{prop}', document)
295 if lang is not None:
296 error, message = document_lang(lang)
297 if error:
298 return error, message
300 # Publisher (publisher) is object requires ('category', 'name', 'namespace')
301 parent = 'document.publisher'
302 for prop in ('category', 'name', 'namespace'):
303 if not jmespath.search(f'publisher.{prop}', document):
304 return 1, f'missing {parent} property ({prop})'
306 parent = 'document'
307 prop = 'title'
308 if not jmespath.search(f'{prop}', document).strip():
309 log.warning(f'warning - {parent} property {prop} value is space-only')
311 # Tracking (tracking) is object requires:
312 # ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version')
313 parent = 'document'
314 prop = 'tracking'
315 for sub in ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version'):
316 if jmespath.search(f'{prop}.{sub}', document) is None:
317 return 1, f'missing {parent}.{prop} property ({sub})'
319 return document_optional(document)
322@no_type_check
323def level_zero(csaf_doc):
324 """Most superficial verification."""
325 if not csaf_doc.get('document'):
326 return 1, 'missing document property'
328 error, message = verify_document(csaf_doc['document'])
329 if error:
330 return error, message
332 return 0, ''
335def reader(path: str) -> Iterator[str]:
336 """Context wrapper / generator to read the lines."""
337 with open(pathlib.Path(path), 'rt', encoding=csaf.ENCODING) as handle: 337 ↛ 338line 337 didn't jump to line 338
338 for line in handle:
339 yield line
342def peek(data: str) -> str:
343 """Determine trivial format of data."""
344 if len(data) < CSAF_MIN_BYTES:
345 return 'TOO_SHORT'
347 sample = data[:CSAF_MIN_BYTES].strip()
348 if sample.startswith('{'):
349 warn_size = '_MAYBE_TOO_LARGE' if len(data) > CSAF_WARN_MAX_BYTES else ''
350 return f'JSON{warn_size}'
351 if sample.startswith('<'):
352 return 'XML'
353 return 'UNKNOWN'
356def verify_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]:
357 """Fail with grace."""
358 if not argv or len(argv) != 3:
359 return 2, 'received wrong number of arguments', ['']
361 command, inp, config = argv
363 if command not in ('verify',):
364 return 2, 'received unknown command', ['']
366 if inp:
367 if not pathlib.Path(str(inp)).is_file():
368 return 1, 'source is no file', ['']
370 if not config:
371 return 2, 'configuration missing', ['']
373 config_path = pathlib.Path(str(config))
374 if not config_path.is_file():
375 return 1, f'config ({config_path}) is no file', ['']
376 if not ''.join(config_path.suffixes).lower().endswith('.json'):
377 return 1, 'config has no .json extension', ['']
379 return 0, '', argv
382def verify_json(data: str) -> Tuple[int, str, List[str], Dict[str, object]]:
383 """Verify the JSON as CSAF."""
384 try:
385 doc = msgspec.json.decode(data)
386 except msgspec.DecodeError:
387 return 1, 'advisory is no valid JSON', [], {}
389 error, message = level_zero(doc)
390 if error:
391 return error, message, [], {}
392 return 0, 'OK', [], doc
395def is_valid_(path: str, options: Mapping[str, bool]) -> bool:
396 """Public API."""
397 code, message = process('validate', 'commit', path, options)
398 if message:
399 log.error(message)
400 return bool(code)
403@no_type_check
404def walk_tree_explicit(base_path):
405 """Visit the files in the folders below base path."""
406 if base_path.is_file():
407 yield base_path
408 else:
409 for entry in base_path.iterdir():
410 if entry.is_dir():
411 for file_path in entry.iterdir():
412 yield file_path
413 else:
414 yield entry
417@no_type_check
418def visit(tree_or_file_path):
419 """Visit tree and yield the leaves."""
420 thing = pathlib.Path(tree_or_file_path)
421 if thing.is_file():
422 yield thing
423 else:
424 for path in thing.rglob('*'):
425 yield path
428@no_type_check
429def slugify(error):
430 """Replace newlines by space."""
431 return str(error).replace('\n', '')
434def process(command: str, transaction_mode: str, path: str, options: Mapping[str, object]) -> Tuple[int, str]:
435 """Drive the verification and validation.
436 This function acts as the command line interface backend.
437 There is some duplication to support testability.
438 """
439 # bail_out = options.get('bail_out', False)
440 if command != 'validate': 440 ↛ 441line 440 didn't jump to line 441, because the condition on line 440 was never true
441 log.error('Usage: csaf validate ...')
442 return 2, 'USAGE'
443 if not path.strip():
444 log.error('Usage: csaf validate path-to-file')
445 return 2, 'USAGE'
447 if transaction_mode == 'dry-run': 447 ↛ 450line 447 didn't jump to line 450, because the condition on line 447 was never false
448 log.info('Operating in dry run mode (no changes persisted).')
450 data = ''.join(line for line in reader(path)) 450 ↛ 452line 450 didn't jump to line 452
452 guess = peek(data)
454 if guess == 'TOO_SHORT':
455 return 1, 'advisory is too short to be valid'
457 if guess == 'UNKNOWN':
458 return 1, 'advisory is of unknown format'
460 if guess.startswith('JSON'):
461 if guess.endswith('_MAYBE_TOO_LARGE'):
462 log.warning('File of %d bytes may be above known file size limits' % len(data))
463 error, message, strings, doc = verify_json(data)
464 if error:
465 log.error(message)
466 return error, message
467 # Later post process the business rules (spec tests) here
468 # Like that:
469 if is_valid(doc) is False: # For now, we return NotImplemented, sorry
470 messages = []
471 log.error('advisory fails mandatory rules:')
472 # Why not execute the rules multiple times (until we have traits in place to report the failing rule)?
473 if not is_valid_category(doc):
474 messages.append('invalid category')
475 if not is_valid_defined_group_ids(doc):
476 messages.append('undefined group ids')
477 if not is_valid_defined_product_ids(doc):
478 messages.append('undefined product ids')
479 if not is_valid_translator(doc):
480 messages.append('invalid translator')
481 if not is_valid_unique_group_ids(doc):
482 messages.append('non-unique group ids')
483 if not is_valid_unique_product_ids(doc):
484 messages.append('non-unique product ids')
485 return 1, ', '.join(messages)
486 return 0, ''
488 return 1, 'XML IS OUT OF SCOPE'