Coverage for csaf_lint/lint.py: 87.59%
280 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 16:41:21 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 16:41:21 +00:00
1"""Visit CSAF/CVRF files and validate them against envelope (core) and given body profiles."""
3import json
4import logging
5import os
6import pathlib
7import sys
8import typing
10import jsonschema # type: ignore
11from lxml import etree # type: ignore
13ENCODING = 'utf-8'
15APP = 'csaf-lint'
17LOG = logging.getLogger() # Temporary refactoring: module level logger
18LOG_FOLDER = pathlib.Path('logs')
19LOG_FILE = f'{APP}.log'
20LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE)
21LOG_LEVEL = logging.INFO
23CSAF_DEFAULT_SEMANTIC_VERSION = '2.0'
24CRVF_DEFAULT_SEMANTIC_VERSION = '1.2'
25CRVF_PRE_OASIS_SEMANTIC_VERSION = '1.1'
26CRVF_KNOWN_SEMANTIC_VERSIONS = (CRVF_DEFAULT_SEMANTIC_VERSION, CRVF_PRE_OASIS_SEMANTIC_VERSION)
27CVRF_PARTS = ('cvrf', 'vuln', 'prod')
28CSAF_2_0_SCHEMA_PATH = pathlib.Path('csaf_lint', 'schema', 'csaf', CSAF_DEFAULT_SEMANTIC_VERSION, 'csaf.json')
30CVRF_OASIS_ROOT = 'http://docs.oasis-open.org/csaf/csaf-cvrf/'
32CVRF_DEFAULT_SCHEMA = f'{CVRF_OASIS_ROOT}v{CRVF_DEFAULT_SEMANTIC_VERSION}/cs01/schemas/cvrf.xsd'
33CVRF_DEFAULT_NAMESPACES = {
34 part.upper(): '{{{CVRF_OASIS_ROOT}v{CRVF_DEFAULT_SEMANTIC_VERSION}/{part}}}' for part in CVRF_PARTS
35}
37CVRF_DEFAULT_CATALOG = pathlib.Path(
38 'csaf_lint', 'schema', f'catalog_{CRVF_DEFAULT_SEMANTIC_VERSION.replace(".", "_")}.xml'
39)
40CVRF_DEFAULT_SCHEMA_FILE = pathlib.Path('csaf_lint', 'schema', 'cvrf', f'{CRVF_DEFAULT_SEMANTIC_VERSION}/cvrf.xsd')
42CVRF_PRE_OASIS_ROOT = 'http://www.icasi.org/CVRF/schema/cvrf/'
44CVRF_PRE_OASIS_SCHEMA = f'{CVRF_PRE_OASIS_ROOT}{CRVF_PRE_OASIS_SEMANTIC_VERSION}/cs01/schemas/cvrf.xsd'
45CVRF_PRE_OASIS_NAMESPACES = {
46 part.upper(): '{{{CVRF_OASIS_ROOT}v{CRVF_DEFAULT_SEMANTIC_VERSION}/{part}}}' for part in CVRF_PARTS
47}
49CVRF_PRE_OASIS_CATALOG = pathlib.Path(
50 'csaf_lint', 'schema', f'catalog_{CRVF_PRE_OASIS_SEMANTIC_VERSION.replace(".", "_")}.xml'
51)
52CVRF_PRE_OASIS_SCHEMA_FILE = pathlib.Path('csaf_lint', 'schema', 'cvrf', f'{CRVF_PRE_OASIS_SEMANTIC_VERSION}/cvrf.xsd')
54CVRF_OASIS_NS_ROOT = 'http://docs.oasis-open.org/csaf/ns/csaf-cvrf/'
55CVRF_VERSION_NS_MAP = {
56 CRVF_DEFAULT_SEMANTIC_VERSION: f'{CVRF_OASIS_NS_ROOT}v1.2/cvrf',
57 CRVF_PRE_OASIS_SEMANTIC_VERSION: f'{CVRF_OASIS_NS_ROOT}v1.1/cvrf',
58}
60CVRF_VERSION_SCHEMA_MAP = {
61 CRVF_DEFAULT_SEMANTIC_VERSION: CVRF_DEFAULT_SCHEMA_FILE,
62 CRVF_PRE_OASIS_SEMANTIC_VERSION: CVRF_PRE_OASIS_SCHEMA_FILE,
63}
65CVRF_VERSION_CATALOG_MAP = {
66 CRVF_DEFAULT_SEMANTIC_VERSION: CVRF_DEFAULT_CATALOG,
67 CRVF_PRE_OASIS_SEMANTIC_VERSION: CVRF_PRE_OASIS_CATALOG,
68}
70DEBUG_VAR = 'CSL_DEBUG'
71DEBUG = bool(os.getenv(DEBUG_VAR, ''))
74@typing.no_type_check
75def read_stdin():
76 """Create document from stdin data."""
77 LOG.debug('call site loading from stdin')
78 return sys.stdin.read()
81@typing.no_type_check
82def load(file_path):
83 """Create JSON object from file."""
84 LOG.debug('call site file loading file_path=%s', file_path)
85 with open(file_path, 'rt', encoding=ENCODING) as handle:
86 return json.load(handle)
89@typing.no_type_check
90def version_peek(document_path):
91 """HACK A DID ACK derives schema version from reading the first lines from path.
92 Something like:
94 <?xml version="1.0" encoding="UTF-8"?>
95 <cvrfdoc xmlns="http://www.icasi.org/CVRF/schema/cvrf/1.1" xmlns:cvrf="http://www.icasi.org/CVRF/schema/cvrf/1.1">
97 or (in addition should work with <cvrf:cvrfdoc style xml documents):
99 <?xml version="1.0" encoding="UTF-8"?>
100 <cvrfdoc
101 xmlns:xsd="http://www.w3.org/2001/XMLSchema"
102 xmlns:cpe="http://cpe.mitre.org/language/2.0"
103 xmlns:cvrf="http://docs.oasis-open.org/csaf/ns/csaf-cvrf/v1.2/cvrf"
104 xmlns:cvrf-common="http://docs.oasis-open.org/csaf/ns/csaf-cvrf/v1.2/common"
105 xmlns:cvssv2="http://scap.nist.gov/schema/cvss-v2/1.0"
106 xmlns:cvssv3="https://www.first.org/cvss/cvss-v3.0.xsd"
107 xmlns:dc="http://purl.org/dc/elements/1.1/"
108 xmlns:ns0="http://purl.org/dc/elements/1.1/"
109 xmlns:prod="http://docs.oasis-open.org/csaf/ns/csaf-cvrf/v1.2/prod"
110 xmlns:scap-core="http://scap.nist.gov/schema/scap-core/1.0"
111 xmlns:sch="http://purl.oclc.org/dsdl/schematron"
112 xmlns:vuln="http://docs.oasis-open.org/csaf/ns/csaf-cvrf/v1.2/vuln"
113 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
114 xmlns="http://docs.oasis-open.org/csaf/ns/csaf-cvrf/v1.2/cvrf"
115 >
116 """
117 LOG.debug('version peek cheap detect on path string document_path=%s', document_path)
118 if CRVF_PRE_OASIS_SEMANTIC_VERSION in str(document_path):
119 return CRVF_PRE_OASIS_SEMANTIC_VERSION
120 if CRVF_DEFAULT_SEMANTIC_VERSION in str(document_path):
121 return CRVF_DEFAULT_SEMANTIC_VERSION
123 LOG.debug('version peek naive but deep detect on path content document_path=%s', document_path)
124 cvrf_element_start = '<cvrf'
125 cvrf_element_end = '>'
126 naive = []
127 with open(document_path) as handle:
128 for line in handle.readlines(): 128 ↛ 127line 128 didn't jump to line 127
129 LOG.debug('version peek scanner line=%s', line)
130 if cvrf_element_start in line or naive:
131 naive.append(line.strip())
132 LOG.debug('version peek parser triggered cvrf_element_start=%s, naive=%s', cvrf_element_start, naive)
133 if naive and any(cvrf_element_end in chunk for chunk in naive):
134 LOG.debug('version peek harvest done triggered cvrf_element_end=%s, naive=%s', cvrf_element_end, naive)
135 break
136 LOG.debug('version peek normal harvest naive=%s', naive)
138 oasis_token = f'"http://docs.oasis-open.org/csaf/ns/csaf-cvrf/v{CRVF_DEFAULT_SEMANTIC_VERSION}/cvrf"'
139 if any(oasis_token in chunk for chunk in naive):
140 return CRVF_DEFAULT_SEMANTIC_VERSION
141 pre_oasis_token = f'"http://www.icasi.org/CVRF/schema/cvrf/{CRVF_PRE_OASIS_SEMANTIC_VERSION}"'
142 if any(pre_oasis_token in chunk for chunk in naive): 142 ↛ exit, 142 ↛ 1452 missed branches: 1) line 142 didn't finish the generator expression on line 142, 2) line 142 didn't jump to line 145, because the condition on line 142 was never false
143 return CRVF_PRE_OASIS_SEMANTIC_VERSION
145 LOG.debug('version peek finally failed')
146 return None
149@typing.no_type_check
150def version_from(schema_path, document_path):
151 """HACK A DID ACK derives non-default 1.1 version from path."""
152 LOG.debug('xml version derivation flat inspection schema_path=%s', schema_path)
153 if CRVF_PRE_OASIS_SEMANTIC_VERSION in str(schema_path):
154 return CRVF_PRE_OASIS_SEMANTIC_VERSION
155 if CRVF_DEFAULT_SEMANTIC_VERSION in str(schema_path):
156 return CRVF_DEFAULT_SEMANTIC_VERSION
157 LOG.debug('xml version derivation deep call document_path=%s', document_path)
158 return version_peek(document_path)
161@typing.no_type_check
162def validate_json(document, schema, conformance=None) -> typing.Tuple[int, str]:
163 """Validate the JSON document against the schema."""
164 conformance = conformance if conformance else jsonschema.draft7_format_checker
165 LOG.debug(
166 f'caller site json validation list(document.keys())={list(document.keys())},'
167 f' list(schema.keys())={list(schema.keys())}, format_checker={conformance}'
168 )
169 code, message = 0, 'OK'
170 try:
171 jsonschema.validate(document, schema, format_checker=conformance)
172 except jsonschema.exceptions.ValidationError as err: 172 ↛ 175line 172 didn't jump to line 175
173 LOG.error(f'err.message={err.message} [err.validator={err.validator}] err.relative_path={err.relative_path}')
174 code, message = 1, f'{err}'
175 except jsonschema.exceptions.SchemaError as err:
176 LOG.error(f'err.message={err.message} [err.validator={err.validator}] err.relative_path={err.relative_path}')
177 code, message = 2, f'{err}'
179 LOG.debug(f'success in JSON validation: code={code}, message={message}')
180 return code, message
183@typing.no_type_check
184def validate(document, schema, conformance=None) -> typing.Tuple[int, str]:
185 """Validate the document against the schema."""
186 if isinstance(document, dict): # HACK A DID ACK
187 return validate_json(document, schema, conformance)
189 LOG.debug(f'caller site xml loading document={document}, schema={schema}, conformance={conformance}')
190 xml_tree, message = load_xml(document)
191 if not xml_tree: 191 ↛ 192line 191 didn't jump to line 192, because the condition on line 191 was never true
192 LOG.error(message)
193 return 1, 'ERROR'
194 request_version = version_from(schema, document)
195 LOG.debug(f'version detected schema={schema}, document={document}, request_version={request_version}')
196 found, version, namespace = versions_xml(xml_tree, request_version)
197 LOG.debug(f'versions consistency found={found}, version={version}, namespace={namespace}')
198 catalog = CVRF_VERSION_CATALOG_MAP[request_version]
199 LOG.debug(
200 f'caller site validation: schema={schema}, catalog={catalog}, xml_tree={xml_tree},'
201 f' request_version={request_version}'
202 )
203 status, message = xml_validate(schema, catalog, xml_tree, request_version)
204 LOG.debug(f'validation xml results status={status}, message={message}')
205 if status: 205 ↛ 207line 205 didn't jump to line 207, because the condition on line 205 was never false
206 return 0, 'OK'
207 LOG.warning(message)
208 return 1, 'ERROR'
211@typing.no_type_check
212def load_xml(document_path):
213 """
214 First things first: parse the document (to ensure it is well-formed XML) to obtain an ElementTree object
215 to pass to the CVRF validator/parser
216 """
217 try:
218 cvrf_doc = etree.parse(document_path, etree.XMLParser(encoding=ENCODING))
219 except IOError as err:
220 return None, f'file {document_path} failed with IO error {err}'
221 except etree.XMLSyntaxError as err:
222 return None, f'parsing from {document_path} failed with XMLSyntaxError error {err}'
224 return cvrf_doc, f'well-formed xml tree from {document_path}'
227@typing.no_type_check
228def derive_version_from_namespace(root):
229 """Version detection of XML document per element tree object root."""
230 LOG.debug('versions from namespace callee site root=%s', root)
231 not_found = '', None
232 if root is None:
233 return not_found
235 str_rep_root = str(root)
236 LOG.debug(f'versions from namespace callee site naive match str_rep_root={str_rep_root} start')
237 for version, namespace in CVRF_VERSION_NS_MAP.items(): 237 ↛ 252line 237 didn't jump to line 252, because the loop on line 237 didn't complete
238 LOG.debug(
239 f'versions from namespace callee site naive trial str_rep_root={str_rep_root},'
240 f' version={version}, namespace={namespace}'
241 )
242 if version in str_rep_root:
243 LOG.debug(
244 f'versions from namespace callee site naive match root={root},'
245 f' version={version}, namespace={namespace}'
246 )
247 return version, namespace
248 LOG.debug(
249 f'versions from namespace callee site naive miss root={root},' f' version={version}, namespace={namespace}'
250 )
252 return not_found
255@typing.no_type_check
256def versions_xml(xml_tree, request_version):
257 """Versions from cvrf namespace in xml tree and request version."""
258 sem_ver, doc_cvrf_version = derive_version_from_namespace(xml_tree.getroot())
259 req_cvrf_version = f'http://docs.oasis-open.org/csaf/ns/csaf-cvrf/v{request_version}/cvrf'
261 LOG.debug(f'versions xml callee site sem_ver={sem_ver}, doc_cvrf_version={doc_cvrf_version}, xml_tree={xml_tree}')
262 if doc_cvrf_version: 262 ↛ 265line 262 didn't jump to line 265, because the condition on line 262 was never false
263 return doc_cvrf_version == req_cvrf_version, doc_cvrf_version, req_cvrf_version
265 return False, None, req_cvrf_version
268@typing.no_type_check
269def cvrf_validate(handle: typing.IO, xml_tree: etree.ElementTree) -> typing.Tuple[bool, str]:
270 """Validates a CVRF document."""
271 try:
272 xmlschema_doc = etree.parse(handle)
273 except etree.XMLSyntaxError as err:
274 return False, f'Parsing error, schema document "{handle.name}" is not well-formed: {err}'
275 xmlschema = etree.XMLSchema(xmlschema_doc)
277 try:
278 xmlschema.assertValid(xml_tree)
279 return True, 'Valid'
280 except etree.DocumentInvalid:
281 return False, xmlschema.error_log
284@typing.no_type_check
285def push_catalog(catalog, request_version):
286 """Isolate side effect interface to os env -> libxml2 <- lxml."""
287 fallback_catalog = CVRF_DEFAULT_CATALOG
288 if request_version != CRVF_DEFAULT_SEMANTIC_VERSION:
289 fallback_catalog = CVRF_PRE_OASIS_CATALOG
290 catalog = catalog if catalog else fallback_catalog
292 # If the supplied file is not a valid catalog.xml or doesn't exist lxml will fall back to using remote validation
293 os.environ['XML_CATALOG_FILES'] = str(catalog)
295 return catalog
298@typing.no_type_check
299def derive_schema_path(catalog, request_version, schema):
300 """Handle the implicit schema case by falling back on locally provided schema (matching the catalog)."""
301 if schema:
302 LOG.debug(
303 f'xml validate try reading schema catalog={catalog},'
304 f" schema={schema}, catalog env=({os.getenv('XML_CATALOG_FILES')})"
305 )
306 else:
307 LOG.debug(
308 f'xml validate try reading local implicit schema catalog={catalog},'
309 f" schema={schema}, catalog env=({os.getenv('XML_CATALOG_FILES')})"
310 )
311 # try to use local schema file
312 fallback_schema = CVRF_DEFAULT_SCHEMA_FILE
313 if request_version != CRVF_DEFAULT_SEMANTIC_VERSION: 313 ↛ 315line 313 didn't jump to line 315, because the condition on line 313 was never false
314 fallback_schema = CVRF_PRE_OASIS_SCHEMA_FILE
315 schema = fallback_schema
316 return schema
319@typing.no_type_check
320def xml_validate(schema, catalog, xml_tree, request_version):
321 """Validate xml tree against given xml schema of request version assisted by catalog."""
322 LOG.debug(
323 f'xml validate parameters: schema={schema}, catalog={catalog},'
324 f' xml_tree={xml_tree}, request_version={request_version}'
325 )
326 catalog = push_catalog(catalog, request_version)
327 schema = derive_schema_path(catalog, request_version, schema)
329 try:
330 with open(schema, 'r') as handle:
331 LOG.debug(
332 f'xml validate success reading schema catalog={catalog},'
333 f" schema={schema}, catalog env=({os.getenv('XML_CATALOG_FILES')})"
334 )
335 code, result = cvrf_validate(handle, xml_tree)
336 except IOError as err:
337 return False, f'validation of {xml_tree} against {schema} not performed due to IO error: {err}'
339 if code is False: 339 ↛ 340line 339 didn't jump to line 340, because the condition on line 339 was never true
340 return False, f'validation of {xml_tree} against {schema} failed with error: {result}'
342 return True, f'validation of {xml_tree} against {schema} succeeded with result: {result}'
345@typing.no_type_check
346def dispatch_embedding(argv, embedded, num_args, pos_args):
347 """Dispatch of embedded inputs (documents as arguments)."""
348 if embedded:
349 LOG.debug(f'embedded dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args}')
350 json_token, xml_token = '{', '<'
351 is_json = any(arg and str(arg).startswith(json_token) for arg in pos_args)
352 is_xml = not is_json and any(arg and str(arg).startswith(xml_token) for arg in pos_args)
353 else:
354 LOG.debug(f'non-embedded dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args}')
355 json_token, xml_token = '.json', '.xml'
356 is_json = any(arg and str(arg).endswith(json_token) for arg in pos_args)
357 is_xml = not is_json and any(arg and str(arg).endswith(xml_token) for arg in pos_args) 357 ↛ exitline 357 didn't finish the generator expression on line 357
358 document_data, document, schema = '', '', ''
359 if not (embedded or is_json or is_xml): # type: ignore 359 ↛ 360line 359 didn't jump to line 360, because the condition on line 359 was never true
360 LOG.debug(
361 f'streaming dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args},'
362 f' is_json={is_json}, is_xml={is_xml}' # type: ignore
363 )
364 document_data = read_stdin()
365 json_token, xml_token = '{', '<'
366 is_json = document_data.startswith(json_token)
367 is_xml = not is_json and document_data.startswith(xml_token)
368 return document, document_data, is_json, is_xml, schema
371@typing.no_type_check
372def init_logger(name=None, level=None):
373 """Temporary refactoring: Initialize module level logger"""
374 global LOG # pylint: disable=global-statement
376 log_format = {
377 'format': '%(asctime)s %(levelname)s [%(name)s]: %(message)s',
378 'datefmt': '%Y-%m-%d %H:%M:%S',
379 # 'filename': LOG_PATH,
380 'level': LOG_LEVEL if level is None else level,
381 }
382 logging.basicConfig(**log_format)
383 LOG = logging.getLogger(APP if name is None else name)
386@typing.no_type_check
387def inputs_xml(num_args, pos_args):
388 """Derive document and schema inputs for JSON format tasks."""
389 if num_args == 2: # Schema file path is first
390 schema = pos_args[0]
391 document = pos_args[1]
392 else:
393 if num_args == 1: # Assume schema implicit, argument given is document file path
394 document = pos_args[0]
395 schema = CVRF_VERSION_SCHEMA_MAP[version_from(None, document)]
396 else:
397 document, schema = None, None
399 return document, schema
402@typing.no_type_check
403def inputs_json(document_data, embedded, num_args, pos_args):
404 """Derive document and schema inputs for JSON format tasks."""
405 if num_args == 2: # Schema file path is first
406 schema = json.loads(pos_args[0]) if embedded else load(pos_args[0])
407 document = json.loads(pos_args[1]) if embedded else load(pos_args[1])
408 else:
409 schema = load(CSAF_2_0_SCHEMA_PATH)
410 if num_args == 1: # Assume schema implicit, argument given is document file path 410 ↛ 413line 410 didn't jump to line 413, because the condition on line 410 was never false
411 document = json.loads(pos_args[0]) if embedded else load(pos_args[0])
412 else:
413 document = json.loads(document_data)
415 return document, schema
418@typing.no_type_check
419def main(argv=None, embedded=False, debug=None):
420 """Drive the validator.
421 This function acts as the command line interface backend.
422 There is some duplication to support testability.
423 TODO(sthagen) the dispatch has become Rococo - needs Bauhaus again.
424 """
425 debug = DEBUG if debug is None else debug is True # debug is None and DEBUG or debug is True
426 init_logger(level=logging.DEBUG if debug else None)
427 argv = argv if argv else sys.argv[1:]
428 num_args = len(argv)
429 LOG.debug(f'guarded dispatch embedded={embedded}, argv={argv}, num_args={num_args}')
430 if num_args > 2: # Unclear what the inputs beyond two may be
431 LOG.error('Usage error (num_args > 2)')
432 print('Usage: csaf-lint [schema.json] document.json')
433 print(' or: csaf-lint < document.json')
434 return 2
435 pos_args = tuple(argv[n] if n < num_args and argv[n] else None for n in range(3))
437 document, document_data, is_json, is_xml, schema = dispatch_embedding(argv, embedded, num_args, pos_args)
439 LOG.debug(
440 f'post dispatch embedded={embedded}, argv={argv}, num_args={num_args}, pos_args={pos_args},'
441 f' is_json={is_json}, is_xml={is_xml}'
442 )
444 if is_json:
445 document, schema = inputs_json(document_data, embedded, num_args, pos_args)
447 code, message = validate(document, schema)
448 LOG.info(f'Validation(JSON): code={code}, message={message}')
449 return code
451 if embedded and not is_xml and not is_json:
452 LOG.error('Usage error (embedded and not is_xml and not is_json)')
453 print('Usage: csaf-lint [schema.xsd] document.xml')
454 print(' note: no embedding support for non xml/json data')
455 return 2
457 if embedded and is_xml:
458 LOG.error('Usage error (embedded and is_xml)')
459 print('Usage: csaf-lint [schema.xsd] document.xml')
460 print(' note: no embedding supported for xsd/xml')
461 return 2
463 if num_args and is_xml: 463 ↛ 465line 463 didn't jump to line 465, because the condition on line 463 was never false
464 document, schema = inputs_xml(num_args, pos_args)
465 if document is None: 465 ↛ 466line 465 didn't jump to line 466, because the condition on line 465 was never true
466 LOG.error('Usage error (no embedding supported for xsd/xml)')
467 print('Usage: csaf-lint [schema.xsd] document.xml')
468 print(' note: no embedding supported for xsd/xml')
469 return 2
471 code, message = validate(document, schema)
472 LOG.info(f'Validation(XML): code={code}, message={message}')
473 return code