Coverage for csaf_lint/ 87.59%
280 statements
« prev ^ index » next v7.4.1, created at 2024-02-04 16:41:21 +00:00
« prev ^ index » next v7.4.1, created at 2024-02-04 16:41:21 +00:00
1"""Visit CSAF/CVRF files and validate them against envelope (core) and given body profiles."""
3import json
4import logging
5import os
6import pathlib
7import sys
8import typing
10import jsonschema # type: ignore
11from lxml import etree # type: ignore
13ENCODING = 'utf-8'
15APP = 'csaf-lint'
17LOG = logging.getLogger() # Temporary refactoring: module level logger
18LOG_FOLDER = pathlib.Path('logs')
19LOG_FILE = f'{APP}.log'
20LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE)
21LOG_LEVEL = logging.INFO
27CVRF_PARTS = ('cvrf', 'vuln', 'prod')
28CSAF_2_0_SCHEMA_PATH = pathlib.Path('csaf_lint', 'schema', 'csaf', CSAF_DEFAULT_SEMANTIC_VERSION, 'csaf.json')
34 part.upper(): '{{{CVRF_OASIS_ROOT}v{CRVF_DEFAULT_SEMANTIC_VERSION}/{part}}}' for part in CVRF_PARTS
37CVRF_DEFAULT_CATALOG = pathlib.Path(
38 'csaf_lint', 'schema', f'catalog_{CRVF_DEFAULT_SEMANTIC_VERSION.replace(".", "_")}.xml'
40CVRF_DEFAULT_SCHEMA_FILE = pathlib.Path('csaf_lint', 'schema', 'cvrf', f'{CRVF_DEFAULT_SEMANTIC_VERSION}/cvrf.xsd')
46 part.upper(): '{{{CVRF_OASIS_ROOT}v{CRVF_DEFAULT_SEMANTIC_VERSION}/{part}}}' for part in CVRF_PARTS
49CVRF_PRE_OASIS_CATALOG = pathlib.Path(
50 'csaf_lint', 'schema', f'catalog_{CRVF_PRE_OASIS_SEMANTIC_VERSION.replace(".", "_")}.xml'
52CVRF_PRE_OASIS_SCHEMA_FILE = pathlib.Path('csaf_lint', 'schema', 'cvrf', f'{CRVF_PRE_OASIS_SEMANTIC_VERSION}/cvrf.xsd')
71DEBUG = bool(os.getenv(DEBUG_VAR, ''))
75def read_stdin():
76 """Create document from stdin data."""
77 LOG.debug('call site loading from stdin')
78 return
82def load(file_path):
83 """Create JSON object from file."""
84 LOG.debug('call site file loading file_path=%s', file_path)
85 with open(file_path, 'rt', encoding=ENCODING) as handle:
86 return json.load(handle)
90def version_peek(document_path):
91 """HACK A DID ACK derives schema version from reading the first lines from path.
92 Something like:
94 <?xml version="1.0" encoding="UTF-8"?>
95 <cvrfdoc xmlns="" xmlns:cvrf="">
97 or (in addition should work with <cvrf:cvrfdoc style xml documents):
99 <?xml version="1.0" encoding="UTF-8"?>
100 <cvrfdoc
101 xmlns:xsd=""
102 xmlns:cpe=""
103 xmlns:cvrf=""
104 xmlns:cvrf-common=""
105 xmlns:cvssv2=""
106 xmlns:cvssv3=""
107 xmlns:dc=""
108 xmlns:ns0=""
109 xmlns:prod=""
110 xmlns:scap-core=""
111 xmlns:sch=""
112 xmlns:vuln=""
113 xmlns:xsi=""
114 xmlns=""
115 >
116 """
117 LOG.debug('version peek cheap detect on path string document_path=%s', document_path)
118 if CRVF_PRE_OASIS_SEMANTIC_VERSION in str(document_path):
120 if CRVF_DEFAULT_SEMANTIC_VERSION in str(document_path):
123 LOG.debug('version peek naive but deep detect on path content document_path=%s', document_path)
124 cvrf_element_start = '<cvrf'
125 cvrf_element_end = '>'
126 naive = []
127 with open(document_path) as handle:
128 for line in handle.readlines(): 128 ↛ 127line 128 didn't jump to line 127
129 LOG.debug('version peek scanner line=%s', line)
130 if cvrf_element_start in line or naive:
131 naive.append(line.strip())
132 LOG.debug('version peek parser triggered cvrf_element_start=%s, naive=%s', cvrf_element_start, naive)
133 if naive and any(cvrf_element_end in chunk for chunk in naive):
134 LOG.debug('version peek harvest done triggered cvrf_element_end=%s, naive=%s', cvrf_element_end, naive)
135 break
136 LOG.debug('version peek normal harvest naive=%s', naive)
138 oasis_token = f'"{CRVF_DEFAULT_SEMANTIC_VERSION}/cvrf"'
139 if any(oasis_token in chunk for chunk in naive):
141 pre_oasis_token = f'"{CRVF_PRE_OASIS_SEMANTIC_VERSION}"'
142 if any(pre_oasis_token in chunk for chunk in naive): 142 ↛ exit, 142 ↛ 1452 missed branches: 1) line 142 didn't finish the generator expression on line 142, 2) line 142 didn't jump to line 145, because the condition on line 142 was never false
145 LOG.debug('version peek finally failed')
146 return None
150def version_from(schema_path, document_path):
151 """HACK A DID ACK derives non-default 1.1 version from path."""
152 LOG.debug('xml version derivation flat inspection schema_path=%s', schema_path)
153 if CRVF_PRE_OASIS_SEMANTIC_VERSION in str(schema_path):
155 if CRVF_DEFAULT_SEMANTIC_VERSION in str(schema_path):
157 LOG.debug('xml version derivation deep call document_path=%s', document_path)
158 return version_peek(document_path)
162def validate_json(document, schema, conformance=None) -> typing.Tuple[int, str]:
163 """Validate the JSON document against the schema."""
164 conformance = conformance if conformance else jsonschema.draft7_format_checker
165 LOG.debug(
166 f'caller site json validation list(document.keys())={list(document.keys())},'
167 f' list(schema.keys())={list(schema.keys())}, format_checker={conformance}'
168 )
169 code, message = 0, 'OK'
170 try:
171 jsonschema.validate(document, schema, format_checker=conformance)
172 except jsonschema.exceptions.ValidationError as err: 172 ↛ 175line 172 didn't jump to line 175
173 LOG.error(f'err.message={err.message} [err.validator={err.validator}] err.relative_path={err.relative_path}')
174 code, message = 1, f'{err}'
175 except jsonschema.exceptions.SchemaError as err:
176 LOG.error(f'err.message={err.message} [err.validator={err.validator}] err.relative_path={err.relative_path}')
177 code, message = 2, f'{err}'
179 LOG.debug(f'success in JSON validation: code={code}, message={message}')
180 return code, message
184def validate(document, schema, conformance=None) -> typing.Tuple[int, str]:
185 """Validate the document against the schema."""
186 if isinstance(document, dict): # HACK A DID ACK
187 return validate_json(document, schema, conformance)
189 LOG.debug(f'caller site xml loading document={document}, schema={schema}, conformance={conformance}')
190 xml_tree, message = load_xml(document)
191 if not xml_tree: 191 ↛ 192line 191 didn't jump to line 192, because the condition on line 191 was never true
192 LOG.error(message)
193 return 1, 'ERROR'
194 request_version = version_from(schema, document)
195 LOG.debug(f'version detected schema={schema}, document={document}, request_version={request_version}')
196 found, version, namespace = versions_xml(xml_tree, request_version)
197 LOG.debug(f'versions consistency found={found}, version={version}, namespace={namespace}')
198 catalog = CVRF_VERSION_CATALOG_MAP[request_version]
199 LOG.debug(
200 f'caller site validation: schema={schema}, catalog={catalog}, xml_tree={xml_tree},'
201 f' request_version={request_version}'
202 )
203 status, message = xml_validate(schema, catalog, xml_tree, request_version)
204 LOG.debug(f'validation xml results status={status}, message={message}')
205 if status: 205 ↛ 207line 205 didn't jump to line 207, because the condition on line 205 was never false
206 return 0, 'OK'
207 LOG.warning(message)
208 return 1, 'ERROR'
212def load_xml(document_path):
213 """
214 First things first: parse the document (to ensure it is well-formed XML) to obtain an ElementTree object
215 to pass to the CVRF validator/parser
216 """
217 try:
218 cvrf_doc = etree.parse(document_path, etree.XMLParser(encoding=ENCODING))
219 except IOError as err:
220 return None, f'file {document_path} failed with IO error {err}'
221 except etree.XMLSyntaxError as err:
222 return None, f'parsing from {document_path} failed with XMLSyntaxError error {err}'
224 return cvrf_doc, f'well-formed xml tree from {document_path}'
228def derive_version_from_namespace(root):
229 """Version detection of XML document per element tree object root."""
230 LOG.debug('versions from namespace callee site root=%s', root)
231 not_found = '', None
232 if root is None:
233 return not_found
235 str_rep_root = str(root)
236 LOG.debug(f'versions from namespace callee site naive match str_rep_root={str_rep_root} start')
237 for version, namespace in CVRF_VERSION_NS_MAP.items(): 237 ↛ 252line 237 didn't jump to line 252, because the loop on line 237 didn't complete
238 LOG.debug(
239 f'versions from namespace callee site naive trial str_rep_root={str_rep_root},'
240 f' version={version}, namespace={namespace}'
241 )
242 if version in str_rep_root:
243 LOG.debug(
244 f'versions from namespace callee site naive match root={root},'
245 f' version={version}, namespace={namespace}'
246 )
247 return version, namespace
248 LOG.debug(
249 f'versions from namespace callee site naive miss root={root},' f' version={version}, namespace={namespace}'
250 )
252 return not_found
256def versions_xml(xml_tree, request_version):
257 """Versions from cvrf namespace in xml tree and request version."""
258 sem_ver, doc_cvrf_version = derive_version_from_namespace(xml_tree.getroot())
259 req_cvrf_version = f'{request_version}/cvrf'
261 LOG.debug(f'versions xml callee site sem_ver={sem_ver}, doc_cvrf_version={doc_cvrf_version}, xml_tree={xml_tree}')
262 if doc_cvrf_version: 262 ↛ 265line 262 didn't jump to line 265, because the condition on line 262 was never false
263 return doc_cvrf_version == req_cvrf_version, doc_cvrf_version, req_cvrf_version
265 return False, None, req_cvrf_version
269def cvrf_validate(handle: typing.IO, xml_tree: etree.ElementTree) -> typing.Tuple[bool, str]:
270 """Validates a CVRF document."""
271 try:
272 xmlschema_doc = etree.parse(handle)
273 except etree.XMLSyntaxError as err:
274 return False, f'Parsing error, schema document "{}" is not well-formed: {err}'
275 xmlschema = etree.XMLSchema(xmlschema_doc)
277 try:
278 xmlschema.assertValid(xml_tree)
279 return True, 'Valid'
280 except etree.DocumentInvalid:
281 return False, xmlschema.error_log
285def push_catalog(catalog, request_version):
286 """Isolate side effect interface to os env -> libxml2 <- lxml."""
287 fallback_catalog = CVRF_DEFAULT_CATALOG
288 if request_version != CRVF_DEFAULT_SEMANTIC_VERSION:
289 fallback_catalog = CVRF_PRE_OASIS_CATALOG
290 catalog = catalog if catalog else fallback_catalog
292 # If the supplied file is not a valid catalog.xml or doesn't exist lxml will fall back to using remote validation
293 os.environ['XML_CATALOG_FILES'] = str(catalog)
295 return catalog
299def derive_schema_path(catalog, request_version, schema):
300 """Handle the implicit schema case by falling back on locally provided schema (matching the catalog)."""
301 if schema:
302 LOG.debug(
303 f'xml validate try reading schema catalog={catalog},'
304 f" schema={schema}, catalog env=({os.getenv('XML_CATALOG_FILES')})"
305 )
306 else:
307 LOG.debug(
308 f'xml validate try reading local implicit schema catalog={catalog},'
309 f" schema={schema}, catalog env=({os.getenv('XML_CATALOG_FILES')})"
310 )
311 # try to use local schema file
312 fallback_schema = CVRF_DEFAULT_SCHEMA_FILE
313 if request_version != CRVF_DEFAULT_SEMANTIC_VERSION: 313 ↛ 315line 313 didn't jump to line 315, because the condition on line 313 was never false
314 fallback_schema = CVRF_PRE_OASIS_SCHEMA_FILE
315 schema = fallback_schema
316 return schema
320def xml_validate(schema, catalog, xml_tree, request_version):
321 """Validate xml tree against given xml schema of request version assisted by catalog."""
322 LOG.debug(
323 f'xml validate parameters: schema={schema}, catalog={catalog},'
324 f' xml_tree={xml_tree}, request_version={request_version}'
325 )
326 catalog = push_catalog(catalog, request_version)
327 schema = derive_schema_path(catalog, request_version, schema)
329 try:
330 with open(schema, 'r') as handle:
331 LOG.debug(
332 f'xml validate success reading schema catalog={catalog},'
333 f" schema={schema}, catalog env=({os.getenv('XML_CATALOG_FILES')})"
334 )
335 code, result = cvrf_validate(handle, xml_tree)
336 except IOError as err:
337 return False, f'validation of {xml_tree} against {schema} not performed due to IO error: {err}'
339 if code is False: 339 ↛ 340line 339 didn't jump to line 340, because the condition on line 339 was never true
340 return False, f'validation of {xml_tree} against {schema} failed with error: {result}'
342 return True, f'validation of {xml_tree} against {schema} succeeded with result: {result}'
346def dispatch_embedding(argv, embedded, num_args, pos_args):
347 """Dispatch of embedded inputs (documents as arguments)."""
348 if embedded:
349 LOG.debug(f'embedded dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args}')
350 json_token, xml_token = '{', '<'
351 is_json = any(arg and str(arg).startswith(json_token) for arg in pos_args)
352 is_xml = not is_json and any(arg and str(arg).startswith(xml_token) for arg in pos_args)
353 else:
354 LOG.debug(f'non-embedded dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args}')
355 json_token, xml_token = '.json', '.xml'
356 is_json = any(arg and str(arg).endswith(json_token) for arg in pos_args)
357 is_xml = not is_json and any(arg and str(arg).endswith(xml_token) for arg in pos_args) 357 ↛ exitline 357 didn't finish the generator expression on line 357
358 document_data, document, schema = '', '', ''
359 if not (embedded or is_json or is_xml): # type: ignore 359 ↛ 360line 359 didn't jump to line 360, because the condition on line 359 was never true
360 LOG.debug(
361 f'streaming dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args},'
362 f' is_json={is_json}, is_xml={is_xml}' # type: ignore
363 )
364 document_data = read_stdin()
365 json_token, xml_token = '{', '<'
366 is_json = document_data.startswith(json_token)
367 is_xml = not is_json and document_data.startswith(xml_token)
368 return document, document_data, is_json, is_xml, schema
372def init_logger(name=None, level=None):
373 """Temporary refactoring: Initialize module level logger"""
374 global LOG # pylint: disable=global-statement
376 log_format = {
377 'format': '%(asctime)s %(levelname)s [%(name)s]: %(message)s',
378 'datefmt': '%Y-%m-%d %H:%M:%S',
379 # 'filename': LOG_PATH,
380 'level': LOG_LEVEL if level is None else level,
381 }
382 logging.basicConfig(**log_format)
383 LOG = logging.getLogger(APP if name is None else name)
387def inputs_xml(num_args, pos_args):
388 """Derive document and schema inputs for JSON format tasks."""
389 if num_args == 2: # Schema file path is first
390 schema = pos_args[0]
391 document = pos_args[1]
392 else:
393 if num_args == 1: # Assume schema implicit, argument given is document file path
394 document = pos_args[0]
395 schema = CVRF_VERSION_SCHEMA_MAP[version_from(None, document)]
396 else:
397 document, schema = None, None
399 return document, schema
403def inputs_json(document_data, embedded, num_args, pos_args):
404 """Derive document and schema inputs for JSON format tasks."""
405 if num_args == 2: # Schema file path is first
406 schema = json.loads(pos_args[0]) if embedded else load(pos_args[0])
407 document = json.loads(pos_args[1]) if embedded else load(pos_args[1])
408 else:
409 schema = load(CSAF_2_0_SCHEMA_PATH)
410 if num_args == 1: # Assume schema implicit, argument given is document file path 410 ↛ 413line 410 didn't jump to line 413, because the condition on line 410 was never false
411 document = json.loads(pos_args[0]) if embedded else load(pos_args[0])
412 else:
413 document = json.loads(document_data)
415 return document, schema
419def main(argv=None, embedded=False, debug=None):
420 """Drive the validator.
421 This function acts as the command line interface backend.
422 There is some duplication to support testability.
423 TODO(sthagen) the dispatch has become Rococo - needs Bauhaus again.
424 """
425 debug = DEBUG if debug is None else debug is True # debug is None and DEBUG or debug is True
426 init_logger(level=logging.DEBUG if debug else None)
427 argv = argv if argv else sys.argv[1:]
428 num_args = len(argv)
429 LOG.debug(f'guarded dispatch embedded={embedded}, argv={argv}, num_args={num_args}')
430 if num_args > 2: # Unclear what the inputs beyond two may be
431 LOG.error('Usage error (num_args > 2)')
432 print('Usage: csaf-lint [schema.json] document.json')
433 print(' or: csaf-lint < document.json')
434 return 2
435 pos_args = tuple(argv[n] if n < num_args and argv[n] else None for n in range(3))
437 document, document_data, is_json, is_xml, schema = dispatch_embedding(argv, embedded, num_args, pos_args)
439 LOG.debug(
440 f'post dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args},'
441 f' is_json={is_json}, is_xml={is_xml}'
442 )
444 if is_json:
445 document, schema = inputs_json(document_data, embedded, num_args, pos_args)
447 code, message = validate(document, schema)
448'Validation(JSON): code={code}, message={message}')
449 return code
451 if embedded and not is_xml and not is_json:
452 LOG.error('Usage error (embedded and not is_xml and not is_json)')
453 print('Usage: csaf-lint [schema.xsd] document.xml')
454 print(' note: no embedding support for non xml/json data')
455 return 2
457 if embedded and is_xml:
458 LOG.error('Usage error (embedded and is_xml)')
459 print('Usage: csaf-lint [schema.xsd] document.xml')
460 print(' note: no embedding supported for xsd/xml')
461 return 2
463 if num_args and is_xml: 463 ↛ 465line 463 didn't jump to line 465, because the condition on line 463 was never false
464 document, schema = inputs_xml(num_args, pos_args)
465 if document is None: 465 ↛ 466line 465 didn't jump to line 466, because the condition on line 465 was never true
466 LOG.error('Usage error (no embedding supported for xsd/xml)')
467 print('Usage: csaf-lint [schema.xsd] document.xml')
468 print(' note: no embedding supported for xsd/xml')
469 return 2
471 code, message = validate(document, schema)
472'Validation(XML): code={code}, message={message}')
473 return code