Coverage for turvallisuusneuvonta/turvallisuusneuvonta.py: 67.27%

304 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-05 19:27:17 +00:00

1# -*- coding: utf-8 -*- 

2# pylint: disable=expression-not-assigned,line-too-long 

3"""Security advisory (Finnish: turvallisuusneuvonta) audit tool. API. 

4 

5Minimal length of CSAF (spam) JSON is 116 bytes: 

60 1 2 3 4 5 6 7 8 9 

712345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012 

8{"document":{"category":" ","csaf_version":"2.0","publisher":{},"title":" ","tracking":{}}}} 

9""" 

10import os 

11import pathlib 

12import sys 

13from itertools import chain 

14from typing import Dict, Iterator, List, Optional, Tuple, Union, no_type_check 

15 

16import jmespath 

17import msgspec 

18from langcodes import tag_is_valid 

19from lazr.uri import URI, InvalidURIError # type: ignore 

20 

21from turvallisuusneuvonta.csaf.core.rules.mandatory.mandatory import ( 

22 is_valid, 

23 is_valid_category, 

24 is_valid_defined_group_ids, 

25 is_valid_defined_product_ids, 

26 is_valid_translator, 

27 is_valid_unique_group_ids, 

28 is_valid_unique_product_ids, 

29) 

30 

31DEBUG_VAR = 'TURVALLISUUSNEUVONTA_DEBUG' 

32DEBUG = os.getenv(DEBUG_VAR) 

33 

34ENCODING = 'utf-8' 

35ENCODING_ERRORS_POLICY = 'ignore' 

36 

37DEFAULT_CONFIG_NAME = '.turvallisuusneuvonta.json' 

38 

39STDIN, STDOUT = 'STDIN', 'STDOUT' 

40DISPATCH = { 

41 STDIN: sys.stdin, 

42 STDOUT: sys.stdout, 

43} 

44 

45CSAF_MIN_BYTES = 92 

46CSAF_VERSION_STRING = '2.0' 

47 

48 

49@no_type_check 

50def document_optional_acknowledgments(values): 

51 """Verify optional properties of document/acknowledgments if present follow rules.""" 

52 parent, prop = 'document', 'acknowledgments' 

53 if not isinstance(values, list): 

54 return 1, f'optional {parent} property {prop} present but no array' 

55 if not values: 

56 return 1, f'optional {parent} property {prop} present but empty' 

57 ack_opt_props = ('names', 'organization', 'summary', 'urls') 

58 min_props, max_props = 1, len(ack_opt_props) 

59 ack_known_props = {el for el in ack_opt_props} 

60 for pos, value in enumerate(values): 

61 jp = f'properties of {parent}.{prop}[{pos}]' 

62 # print(pos, value) 

63 ack_found_props = {el for el in value} 

64 # print(ack_found_props) 

65 if ack_found_props <= ack_known_props: 65 ↛ 67line 65 didn't jump to line 67, because the condition on line 65 was never false

66 print(f'set of {jp} only contains known properties') 

67 if ack_found_props < ack_known_props: 67 ↛ 68line 67 didn't jump to line 68, because the condition on line 67 was never true

68 print(f'set of {jp} is a proper subset of the known properties') 

69 nr_distinct_found_props = len(ack_found_props) 

70 if nr_distinct_found_props < min_props: 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true

71 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}' 

72 if max_props < nr_distinct_found_props: 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true

73 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}' 

74 

75 for what in ('names', 'urls'): 

76 if what not in ack_found_props: 76 ↛ 77line 76 didn't jump to line 77, because the condition on line 76 was never true

77 continue 

78 seq = value[what] 

79 if not isinstance(seq, list): 

80 return 1, f'optional {jp} property {what} present but no array' 

81 if not len(seq): 

82 return 1, f'optional {jp} property {what} present but empty' 

83 for ndx, text in enumerate(seq): 

84 jpn = f'{jp}[{ndx}]' 

85 if not isinstance(text, str): 

86 return 1, f'optional {jpn} property {what} entry present but no text' 

87 if not len(text): 

88 return 1, f'optional {jpn} property {what} entry present but empty' 

89 if what == 'urls': 

90 try: 

91 _ = URI(text) 

92 except InvalidURIError as err: 

93 return 1, f'optional {jpn} property {what} entry present but invalid as URI({err})' 

94 

95 for what in ('organization', 'summary'): 

96 if what not in ack_found_props: 96 ↛ 97line 96 didn't jump to line 97, because the condition on line 96 was never true

97 continue 

98 text = value[what] 

99 if not isinstance(text, str): 

100 return 1, f'optional {jp} property {what} present but no text' 

101 if not len(text): 

102 return 1, f'optional {jp} property {what} present but empty' 

103 return 0, '' 

104 

105 

106@no_type_check 

107def document_aggregate_severity(value): 

108 """Verify properties of document/aggregate_severity present follow rules.""" 

109 parent, prop = 'document', 'aggregate_severity' 

110 jp = f'{parent}.{prop}' 

111 if not isinstance(value, dict): 111 ↛ 112line 111 didn't jump to line 112, because the condition on line 111 was never true

112 return 1, f'optional property {jp} present but no object' 

113 if not value: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true

114 return 1, f'optional property {jp} present but empty' 

115 agg_norm_props = ('text',) 

116 agg_opt_props = ('namespace',) 

117 agg_known_props = {el for el in chain(agg_norm_props, agg_opt_props)} 

118 min_props, max_props = 1, len(agg_known_props) 

119 agg_found_props = {el for el in value} 

120 if agg_found_props <= agg_known_props: 120 ↛ 122line 120 didn't jump to line 122, because the condition on line 120 was never false

121 print(f'set of {jp} properties only contains known properties') 

122 if agg_found_props < agg_known_props: 122 ↛ 124line 122 didn't jump to line 124, because the condition on line 122 was never false

123 print(f'set of {jp} properties is a proper subset of the known properties') 

124 nr_distinct_found_props = len(agg_found_props) 

125 if nr_distinct_found_props < min_props: 125 ↛ 126line 125 didn't jump to line 126, because the condition on line 125 was never true

126 return 1, f'found too few properties ({nr_distinct_found_props}) for {jp}' 

127 if max_props < nr_distinct_found_props: 127 ↛ 128line 127 didn't jump to line 128, because the condition on line 127 was never true

128 return 1, f'found too many properties ({nr_distinct_found_props}) for {jp}' 

129 

130 sub = 'text' 

131 jps = f'property {parent}.{prop}.{sub}' 

132 entry = value.get(sub) 

133 if entry is None: 133 ↛ 134line 133 didn't jump to line 134, because the condition on line 133 was never true

134 return 1, f'mandatory {jps} not present' 

135 if not isinstance(entry, str): 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true

136 return 1, f'mandatory {jps} present but no text' 

137 if not entry: 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true

138 return 1, f'mandatory {jps} present but empty' 

139 

140 sub = 'namespace' 

141 jps = f'optional property {parent}.{prop}.{sub}' 

142 entry = value.get(sub) 

143 if entry is None: 143 ↛ 145line 143 didn't jump to line 145, because the condition on line 143 was never false

144 return 0, '' 

145 if not isinstance(entry, str): 

146 return 1, f'{jps} present but no text' 

147 if not entry: 

148 return 1, f'mandatory {jps} present but empty' 

149 try: 

150 _ = URI(entry) 

151 except InvalidURIError as err: 

152 return 1, f'{jps} present but invalid as URI({err})' 

153 

154 return 0, '' 

155 

156 

157@no_type_check 

158def document_category(value): 

159 """Verify value of document/category follow rules.""" 

160 parent, prop = 'document', 'category' 

161 jp = f'property {parent}.{prop}' 

162 if not isinstance(value, str): 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true

163 return 1, f'{jp} present but no text' 

164 if not value: 164 ↛ 165line 164 didn't jump to line 165, because the condition on line 164 was never true

165 return 1, f'{jp} present but empty' 

166 

167 return 0, '' 

168 

169 

170@no_type_check 

171def document_csaf_version(value): 

172 """Verify value of document/csaf_version follow rules.""" 

173 parent, prop = 'document', 'csaf_version' 

174 jp = f'property {parent}.{prop}' 

175 if not isinstance(value, str): 

176 return 1, f'{jp} present but no text' 

177 if not value: 177 ↛ 178line 177 didn't jump to line 178, because the condition on line 177 was never true

178 return 1, f'{jp} present but empty' 

179 if value != CSAF_VERSION_STRING: 

180 return 1, f'{jp} present but ({value}) not matching CSAF version 2.0' 

181 

182 return 0, '' 

183 

184 

185@no_type_check 

186def document_lang(value): 

187 """Verify value of document/lang follow rules.""" 

188 parent, prop = 'document', 'lang' 

189 jp = f'property {parent}.{prop}' 

190 if not isinstance(value, str): 

191 return 1, f'{jp} present but no text' 

192 if not value: 

193 return 1, f'{jp} present but empty' 

194 if not tag_is_valid(value): 

195 return 1, f'{jp} present but ({value}) is no valid language tag' 

196 

197 return 0, '' 

198 

199 

200@no_type_check 

201def document_optional(document): 

202 """Verify optional properties of document if present follow rules.""" 

203 norm_props = ('category', 'csaf_version', 'publisher', 'title', 'tracking') 

204 opt_props = ('acknowledgments', 'aggregate_severity', 'distribution', 'lang', 'notes', 'references', 'source_lang') 

205 known_props = {el for el in chain(norm_props, opt_props)} 

206 opt_map = {el: None for el in opt_props} 

207 parent = 'document' 

208 for prop in opt_props: 

209 value = jmespath.search(f'{prop}', document) 

210 if value is not None: 

211 opt_map[prop] = value 

212 

213 prop = 'acknowledgments' 

214 if opt_map[prop] is not None: 

215 error, message = document_optional_acknowledgments(opt_map[prop]) 

216 if error: 

217 return error, message 

218 

219 prop = 'aggregate_severity' 

220 if opt_map[prop] is not None: 

221 error, message = document_aggregate_severity(opt_map[prop]) 

222 if error: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true

223 return error, message 

224 

225 found_props = {el for el in document} 

226 if found_props <= known_props: 

227 print(f'set of {parent} properties only contains known properties') 

228 if found_props < known_props: 

229 print(f'set of {parent} properties is a proper subset of the known properties') 

230 

231 return 0, 'NotImplemented' 

232 

233 

234@no_type_check 

235def verify_document(document): 

236 """Root of /document member verifier""" 

237 parent = 'document' 

238 for prop in ('category', 'csaf_version', 'publisher', 'title', 'tracking'): 

239 if not jmespath.search(f'{prop}', document): 

240 return 1, f'missing {parent} property ({prop})' 

241 

242 parent = 'document' 

243 prop = 'category' 

244 if not jmespath.search(f'{prop}', document).strip(): 244 ↛ 246line 244 didn't jump to line 246, because the condition on line 244 was never false

245 print(f'warning - {parent} property {prop} value is space-only') 

246 error, message = document_category(document[prop]) 

247 if error: 247 ↛ 248line 247 didn't jump to line 248, because the condition on line 247 was never true

248 return error, message 

249 

250 prop = 'csaf_version' 

251 csaf_version = jmespath.search(f'{prop}', document) 

252 error, message = document_csaf_version(csaf_version) 

253 if error: 

254 return error, message 

255 

256 prop = 'lang' 

257 lang = jmespath.search(f'{prop}', document) 

258 if lang is not None: 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true

259 error, message = document_lang(lang) 

260 if error: 

261 return error, message 

262 

263 # Publisher (publisher) is object requires ('category', 'name', 'namespace') 

264 parent = 'document.publisher' 

265 for prop in ('category', 'name', 'namespace'): 265 ↛ 269line 265 didn't jump to line 269, because the loop on line 265 didn't complete

266 if not jmespath.search(f'publisher.{prop}', document): 266 ↛ 265line 266 didn't jump to line 265, because the condition on line 266 was never false

267 return 1, f'missing {parent} property ({prop})' 

268 

269 parent = 'document' 

270 prop = 'title' 

271 if not jmespath.search(f'{prop}', document).strip(): 

272 print(f'warning - {parent} property {prop} value is space-only') 

273 

274 # Tracking (tracking) is object requires: 

275 # ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version') 

276 parent = 'document' 

277 prop = 'tracking' 

278 for sub in ('current_release_date', 'id', 'initial_release_date', 'revision_history', 'status', 'version'): 

279 if jmespath.search(f'{prop}.{sub}', document) is None: 

280 return 1, f'missing {parent}.{prop} property ({sub})' 

281 

282 return document_optional(document) 

283 

284 

285@no_type_check 

286def level_zero(csaf_doc): 

287 """Most superficial verification.""" 

288 if not csaf_doc.get('document'): 

289 return 1, 'missing document property' 

290 

291 error, message = verify_document(csaf_doc['document']) 

292 if error: 292 ↛ 295line 292 didn't jump to line 295, because the condition on line 292 was never false

293 return error, message 

294 

295 return 0, '' 

296 

297 

298def reader(path: str) -> Iterator[str]: 

299 """Context wrapper / generator to read the lines.""" 

300 with open(pathlib.Path(path), 'rt', encoding=ENCODING) as handle: 300 ↛ exitline 300 didn't return from function 'reader'

301 for line in handle: 301 ↛ 300line 301 didn't jump to line 300

302 yield line 

303 

304 

305def peek(data: str) -> str: 

306 """Determine trivial format of data.""" 

307 if len(data) < CSAF_MIN_BYTES: 

308 return 'TOO_SHORT' 

309 sample = data[:CSAF_MIN_BYTES].strip() 

310 if sample.startswith('{'): 

311 return 'JSON' 

312 if sample.startswith('<'): 

313 return 'XML' 

314 return 'UNKNOWN' 

315 

316 

317def verify_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]: 

318 """Fail with grace.""" 

319 if not argv or len(argv) != 3: 

320 return 2, 'received wrong number of arguments', [''] 

321 

322 command, inp, config = argv 

323 

324 if command not in ('verify',): 

325 return 2, 'received unknown command', [''] 

326 

327 if inp: 

328 if not pathlib.Path(str(inp)).is_file(): 

329 return 1, 'source is no file', [''] 

330 

331 if not config: 331 ↛ 334line 331 didn't jump to line 334, because the condition on line 331 was never false

332 return 2, 'configuration missing', [''] 

333 

334 config_path = pathlib.Path(str(config)) 

335 if not config_path.is_file(): 

336 return 1, f'config ({config_path}) is no file', [''] 

337 if not ''.join(config_path.suffixes).lower().endswith('.json'): 

338 return 1, 'config has no .json extension', [''] 

339 

340 return 0, '', argv 

341 

342 

343def verify_json(data: str) -> Tuple[int, str, List[str], Dict[str, object]]: 

344 """Verify the JSON as CSAF.""" 

345 try: 

346 doc = msgspec.json.decode(data) 

347 except msgspec.DecodeError: 

348 return 1, 'advisory is no valid JSON', [], {} 

349 

350 error, message = level_zero(doc) 

351 if error: 351 ↛ 353line 351 didn't jump to line 353, because the condition on line 351 was never false

352 return error, message, [], {} 

353 return 0, 'OK', [], doc 

354 

355 

356def main(argv: Union[List[str], None] = None) -> int: 

357 """Drive the lookup.""" 

358 error, message, strings = verify_request(argv) 

359 if error: 359 ↛ 363line 359 didn't jump to line 363, because the condition on line 359 was never false

360 print(message, file=sys.stderr) 

361 return error 

362 

363 command, inp, config = strings 

364 

365 with open(config, 'rb') as handle: 

366 configuration = msgspec.json.decode(handle.read()) 

367 

368 print(f'using configuration ({configuration})') 

369 source = sys.stdin if not inp else reader(inp) 

370 data = ''.join(line for line in source) 

371 

372 guess = peek(data) 

373 

374 if guess == 'TOO_SHORT': 

375 print('advisory is too short to be valid') 

376 return 1 

377 

378 if guess == 'UNKNOWN': 

379 print('advisory is of unknown format') 

380 return 1 

381 

382 if guess == 'JSON': 

383 error, message, strings, doc = verify_json(data) 

384 if error: 

385 print(message, file=sys.stderr) 

386 return error 

387 # Later post process the business rules (spec tests) here 

388 # Like that: 

389 if is_valid(doc) is False: # For now, we return NotImplemented, sorry 

390 print('advisory fails mandatory rules:') 

391 # Why not execute the rules multiple times (until we have traits in place to report the failing rule)? 

392 if not is_valid_category(doc): 

393 print('- invalid category') 

394 if not is_valid_defined_group_ids(doc): 

395 print('- undefined group ids') 

396 if not is_valid_defined_product_ids(doc): 

397 print('- undefined product ids') 

398 if not is_valid_translator(doc): 

399 print('- invalid translator') 

400 if not is_valid_unique_group_ids(doc): 

401 print('- non-unique group ids') 

402 if not is_valid_unique_product_ids(doc): 

403 print('- non-unique product ids') 

404 return 1 

405 print('OK') 

406 return 0 

407 

408 print('advisory may be XML') 

409 if 'DocumentTitle>' not in data: 

410 print('advisory is no valid CVRF') 

411 return 1 

412 

413 print('advisory may be valid CVRF') 

414 return 0