Coverage for turvallisuusneuvonta/turvallisuusneuvonta.py: 66.74%
303 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-18 20:29:38 +00:00
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-18 20:29:38 +00:00
1# -*- coding: utf-8 -*-
2# pylint: disable=expression-not-assigned,line-too-long
3"""Security advisory (Finnish: turvallisuusneuvonta) audit tool. API.
5Minimal length of CSAF (spam) JSON is 116 bytes:
60 1 2 3 4 5 6 7 8 9
712345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012
8{"document":{"category":" ","csaf_version":"2.0","publisher":{},"title":" ","tracking":{}}}}
9"""
10import os
11import pathlib
12import sys
13from itertools import chain
14from typing import Dict, Iterator, List, Optional, Tuple, Union, no_type_check
16import jmespath
17import msgspec
18from langcodes import tag_is_valid
19from lazr.uri import URI, InvalidURIError # type: ignore
21from turvallisuusneuvonta.csaf.core.rules.mandatory.mandatory import (
22 is_valid,
23 is_valid_category,
24 is_valid_defined_group_ids,
25 is_valid_defined_product_ids,
26 is_valid_translator,
27 is_valid_unique_group_ids,
28 is_valid_unique_product_ids,
29)
31DEBUG_VAR = 'TURVALLISUUSNEUVONTA_DEBUG'
32DEBUG = os.getenv(DEBUG_VAR)
34ENCODING = 'utf-8'
35ENCODING_ERRORS_POLICY = 'ignore'
37DEFAULT_CONFIG_NAME = '.turvallisuusneuvonta.json'
39STDIN, STDOUT = 'STDIN', 'STDOUT'
40DISPATCH = {
41 STDIN: sys.stdin,
42 STDOUT: sys.stdout,
43}
45CSAF_MIN_BYTES = 92
46CSAF_VERSION_STRING = '2.0'
49@no_type_check
50def document_optional_acknowledgments(values):
51 """Verify optional properties of document/acknowledgments if present follow rules."""
52 parent, prop = 'document', 'acknowledgments'
53 if not isinstance(values, list):
54 return 1, f'optional {parent} property {prop} present but no array'
55 if not values:
56 return 1, f'optional {parent} property {prop} present but empty'
57 ack_opt_props = ('names', 'organization', 'summary', 'urls')
58 min_props, max_props = 1, len(ack_opt_props)
59 ack_known_props = {el for el in ack_opt_props}
60 for pos, value in enumerate(values):
61 jp = f'properties of {parent}.{prop}[{pos}]'
62 # print(pos, value)
63 ack_found_props = {el for el in value}
64 # print(ack_found_props)
65 if ack_found_props <= ack_known_props: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true
66 print(f'set of {jp} only contains known properties')
67 if ack_found_props < ack_known_props: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 print(f'set of {jp} is a proper subset of the known properties')
69 nr_distinct_found_props = len(ack_found_props)
70 if nr_distinct_found_props < min_props: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}'
72 if max_props < nr_distinct_found_props: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}'
75 for what in ('names', 'urls'):
76 if what not in ack_found_props: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 continue
78 seq = value[what]
79 if not isinstance(seq, list):
80 return 1, f'optional {jp} property {what} present but no array'
81 if not len(seq):
82 return 1, f'optional {jp} property {what} present but empty'
83 for ndx, text in enumerate(seq):
84 jpn = f'{jp}[{ndx}]'
85 if not isinstance(text, str):
86 return 1, f'optional {jpn} property {what} entry present but no text'
87 if not len(text):
88 return 1, f'optional {jpn} property {what} entry present but empty'
89 if what == 'urls':
90 try:
91 _ = URI(text)
92 except InvalidURIError as err:
93 return 1, f'optional {jpn} property {what} entry present but invalid as URI({err})'
95 for what in ('organization', 'summary'):
96 if what not in ack_found_props: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true
97 continue
98 text = value[what]
99 if not isinstance(text, str):
100 return 1, f'optional {jp} property {what} present but no text'
101 if not len(text):
102 return 1, f'optional {jp} property {what} present but empty'
103 return 0, ''
106@no_type_check
107def document_aggregate_severity(value):
108 """Verify properties of document/aggregate_severity present follow rules."""
109 parent, prop = 'document', 'aggregate_severity'
110 jp = f'{parent}.{prop}'
111 if not isinstance(value, dict): 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 return 1, f'optional property {jp} present but no object'
113 if not value: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 return 1, f'optional property {jp} present but empty'
115 agg_norm_props = ('text',)
116 agg_opt_props = ('namespace',)
117 agg_known_props = {el for el in chain(agg_norm_props, agg_opt_props)}
118 min_props, max_props = 1, len(agg_known_props)
119 agg_found_props = {el for el in value}
120 if agg_found_props <= agg_known_props: 120 ↛ 122line 120 didn't jump to line 122 because the condition on line 120 was always true
121 print(f'set of {jp} properties only contains known properties')
122 if agg_found_props < agg_known_props: 122 ↛ 124line 122 didn't jump to line 124 because the condition on line 122 was always true
123 print(f'set of {jp} properties is a proper subset of the known properties')
124 nr_distinct_found_props = len(agg_found_props)
125 if nr_distinct_found_props < min_props: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}'
127 if max_props < nr_distinct_found_props: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}'
130 sub = 'text'
131 jps = f'property {parent}.{prop}.{sub}'
132 entry = value.get(sub)
133 if entry is None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 return 1, f'mandatory {jps} not present'
135 if not isinstance(entry, str): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 return 1, f'mandatory {jps} present but no text'
137 if not entry: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 return 1, f'mandatory {jps} present but empty'
140 sub = 'namespace'
141 jps = f'optional property {parent}.{prop}.{sub}'
142 entry = value.get(sub)
143 if entry is None: 143 ↛ 145line 143 didn't jump to line 145 because the condition on line 143 was always true
144 return 0, ''
145 if not isinstance(entry, str):
146 return 1, f'{jps} present but no text'
147 if not entry:
148 return 1, f'mandatory {jps} present but empty'
149 try:
150 _ = URI(entry)
151 except InvalidURIError as err:
152 return 1, f'{jps} present but invalid as URI({err})'
154 return 0, ''
157@no_type_check
158def document_category(value):
159 """Verify value of document/category follow rules."""
160 parent, prop = 'document', 'category'
161 jp = f'property {parent}.{prop}'
162 if not isinstance(value, str): 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 return 1, f'{jp} present but no text'
164 if not value: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 return 1, f'{jp} present but empty'
167 return 0, ''
170@no_type_check
171def document_csaf_version(value):
172 """Verify value of document/csaf_version follow rules."""
173 parent, prop = 'document', 'csaf_version'
174 jp = f'property {parent}.{prop}'
175 if not isinstance(value, str):
176 return 1, f'{jp} present but no text'
177 if not value: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 return 1, f'{jp} present but empty'
179 if value != CSAF_VERSION_STRING:
180 return 1, f'{jp} present but ({value}) not matching CSAF version 2.0'
182 return 0, ''
185@no_type_check
186def document_lang(value):
187 """Verify value of document/lang follow rules."""
188 parent, prop = 'document', 'lang'
189 jp = f'property {parent}.{prop}'
190 if not isinstance(value, str):
191 return 1, f'{jp} present but no text'
192 if not value:
193 return 1, f'{jp} present but empty'
194 if not tag_is_valid(value):
195 return 1, f'{jp} present but ({value}) is no valid language tag'
197 return 0, ''
200@no_type_check
201def document_optional(document):
202 """Verify optional properties of document if present follow rules."""
203 norm_props = ('category', 'csaf_version', 'publisher', 'title', 'tracking')
204 opt_props = ('acknowledgments', 'aggregate_severity', 'distribution', 'lang', 'notes', 'references', 'source_lang')
205 known_props = {el for el in chain(norm_props, opt_props)}
206 opt_map = {el: None for el in opt_props}
207 parent = 'document'
208 for prop in opt_props:
209 value = jmespath.search(f'{prop}', document)
210 if value is not None:
211 opt_map[prop] = value
213 prop = 'acknowledgments'
214 if opt_map[prop] is not None:
215 error, message = document_optional_acknowledgments(opt_map[prop])
216 if error:
217 return error, message
219 prop = 'aggregate_severity'
220 if opt_map[prop] is not None:
221 error, message = document_aggregate_severity(opt_map[prop])
222 if error: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 return error, message
225 found_props = {el for el in document}
226 if found_props <= known_props:
227 print(f'set of {parent} properties only contains known properties')
228 if found_props < known_props:
229 print(f'set of {parent} properties is a proper subset of the known properties')
231 return 0, 'NotImplemented'
234@no_type_check
235def verify_document(document):
236 """Root of /document member verifier"""
237 parent = 'document'
238 for prop in ('category', 'csaf_version', 'publisher', 'title', 'tracking'):
239 if not jmespath.search(f'{prop}', document):
240 return 1, f'missing {parent} property ({prop})'
242 parent = 'document'
243 prop = 'category'
244 if not jmespath.search(f'{prop}', document).strip(): 244 ↛ 246line 244 didn't jump to line 246 because the condition on line 244 was always true
245 print(f'warning - {parent} property {prop} value is space-only')
246 error, message = document_category(document[prop])
247 if error: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 return error, message
250 prop = 'csaf_version'
251 csaf_version = jmespath.search(f'{prop}', document)
252 error, message = document_csaf_version(csaf_version)
253 if error:
254 return error, message
256 prop = 'lang'
257 lang = jmespath.search(f'{prop}', document)
258 if lang is not None: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 error, message = document_lang(lang)
260 if error:
261 return error, message
263 # Publisher (publisher) is object requires ('category', 'name', 'namespace')
264 parent = 'document.publisher'
265 for prop in ('category', 'name', 'namespace'): 265 ↛ 269line 265 didn't jump to line 269 because the loop on line 265 didn't complete
266 if not jmespath.search(f'publisher.{prop}', document): 266 ↛ 265line 266 didn't jump to line 265 because the condition on line 266 was always true
267 return 1, f'missing {parent} property ({prop})'
269 parent = 'document'
270 prop = 'title'
271 if not jmespath.search(f'{prop}', document).strip():
272 print(f'warning - {parent} property {prop} value is space-only')
274 # Tracking (tracking) is object requires:
275 # ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version')
276 parent = 'document'
277 prop = 'tracking'
278 for sub in ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version'):
279 if jmespath.search(f'{prop}.{sub}', document) is None:
280 return 1, f'missing {parent}.{prop} property ({sub})'
282 return document_optional(document)
285@no_type_check
286def level_zero(csaf_doc):
287 """Most superficial verification."""
288 if not csaf_doc.get('document'):
289 return 1, 'missing document property'
291 error, message = verify_document(csaf_doc['document'])
292 if error: 292 ↛ 295line 292 didn't jump to line 295 because the condition on line 292 was always true
293 return error, message
295 return 0, ''
298def reader(path: str) -> Iterator[str]:
299 """Context wrapper / generator to read the lines."""
300 with open(pathlib.Path(path), 'rt', encoding=ENCODING) as handle:
301 for line in handle: 301 ↛ exitline 301 didn't jump to the function exit
302 yield line
305def peek(data: str) -> str:
306 """Determine trivial format of data."""
307 if len(data) < CSAF_MIN_BYTES:
308 return 'TOO_SHORT'
309 sample = data[:CSAF_MIN_BYTES].strip()
310 if sample.startswith('{'):
311 return 'JSON'
312 if sample.startswith('<'):
313 return 'XML'
314 return 'UNKNOWN'
317def verify_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]:
318 """Fail with grace."""
319 if not argv or len(argv) != 3:
320 return 2, 'received wrong number of arguments', ['']
322 command, inp, config = argv
324 if command not in ('verify',):
325 return 2, 'received unknown command', ['']
327 if inp:
328 if not pathlib.Path(str(inp)).is_file():
329 return 1, 'source is no file', ['']
331 if not config: 331 ↛ 334line 331 didn't jump to line 334 because the condition on line 331 was always true
332 return 2, 'configuration missing', ['']
334 config_path = pathlib.Path(str(config))
335 if not config_path.is_file():
336 return 1, f'config ({config_path}) is no file', ['']
337 if not ''.join(config_path.suffixes).lower().endswith('.json'):
338 return 1, 'config has no .json extension', ['']
340 return 0, '', argv
343def verify_json(data: str) -> Tuple[int, str, List[str], Dict[str, object]]:
344 """Verify the JSON as CSAF."""
345 try:
346 doc = msgspec.json.decode(data)
347 except msgspec.DecodeError:
348 return 1, 'advisory is no valid JSON', [], {}
350 error, message = level_zero(doc)
351 if error: 351 ↛ 353line 351 didn't jump to line 353 because the condition on line 351 was always true
352 return error, message, [], {}
353 return 0, 'OK', [], doc
356def main(argv: Union[List[str], None] = None) -> int:
357 """Drive the lookup."""
358 error, message, strings = verify_request(argv)
359 if error: 359 ↛ 363line 359 didn't jump to line 363 because the condition on line 359 was always true
360 print(message, file=sys.stderr)
361 return error
363 command, inp, config = strings
365 with open(config, 'rb') as handle:
366 configuration = msgspec.json.decode(handle.read())
368 print(f'using configuration ({configuration})')
369 source = sys.stdin if not inp else reader(inp)
370 data = ''.join(line for line in source)
372 guess = peek(data)
374 if guess == 'TOO_SHORT':
375 print('advisory is too short to be valid')
376 return 1
378 if guess == 'UNKNOWN':
379 print('advisory is of unknown format')
380 return 1
382 if guess == 'JSON':
383 error, message, strings, doc = verify_json(data)
384 if error:
385 print(message, file=sys.stderr)
386 return error
387 # Later post process the business rules (spec tests) here
388 # Like that:
389 if is_valid(doc) is False: # For now, we return NotImplemented, sorry
390 print('advisory fails mandatory rules:')
391 # Why not execute the rules multiple times (until we have traits in place to report the failing rule)?
392 if not is_valid_category(doc):
393 print('- invalid category')
394 if not is_valid_defined_group_ids(doc):
395 print('- undefined group ids')
396 if not is_valid_defined_product_ids(doc):
397 print('- undefined product ids')
398 if not is_valid_translator(doc):
399 print('- invalid translator')
400 if not is_valid_unique_group_ids(doc):
401 print('- non-unique group ids')
402 if not is_valid_unique_product_ids(doc):
403 print('- non-unique product ids')
404 return 1
405 print('OK')
406 return 0
408 print('advisory may be XML')
409 if 'DocumentTitle>' not in data:
410 print('advisory is no valid CVRF')
411 return 1
413 print('advisory may be valid CVRF')
414 return 0