Coverage for muuntaa/vuln.py: 0.00%

193 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-21 12:16:47 +00:00

1"""Vulnerabilities type.""" 

2 

3import bisect 

4import logging 

5import re 

6from typing import Union, no_type_check 

7 

8from collections import defaultdict 

9from itertools import chain 

10 

11import lxml.objectify # nosec B410 

12 

13from muuntaa.ack import Acknowledgments 

14from muuntaa.dialect import SCORE_CVSS_V2, SCORE_CVSS_V3, REMEDIATION_CATEGORY 

15from muuntaa.notes import Notes 

16from muuntaa.refs import References 

17from muuntaa.strftime import get_utc_timestamp 

18from muuntaa.subtree import Subtree 

19from muuntaa import ConfigType 

20 

21RootType = lxml.objectify.ObjectifiedElement 

22RevHistType = list[dict[str, Union[str, None, tuple[int, ...]]]] 

23 

24 

25class Vulnerabilities(Subtree): 

26 """Represents the Vulnerabilities type. 

27 

28 ( 

29 /cvrf:cvrfdoc/vuln:Vulnerability, 

30 ) 

31 """ 

32 

33 def __init__(self, config: ConfigType): 

34 super().__init__() 

35 self.config = config 

36 self.remove_cvss_values_without_vector = config['remove_CVSS_values_without_vector'] 

37 self.default_cvss_version = config['default_CVSS3_version'] 

38 if self.tree.get('vulnerabilities') is None: 

39 self.tree['vulnerabilities'] = [] 

40 self.hook = self.tree['vulnerabilities'] 

41 

42 def always(self, root: RootType) -> None: 

43 pass 

44 

45 @no_type_check 

46 def _handle_involvements(self, root: RootType): 

47 involvements = [] 

48 for involvement_elem in root.Involvement: 

49 involvement = { 

50 'party': involvement_elem.attrib['Party'].lower(), 

51 'status': involvement_elem.attrib['Status'].lower().replace(' ', '_'), 

52 } 

53 

54 if hasattr(involvement_elem, 'Description'): 

55 involvement['summary'] = involvement_elem.Description.text 

56 involvements.append(involvement) 

57 

58 return involvements 

59 

60 @no_type_check 

61 def _handle_product_statuses(self, root: RootType): 

62 statuses = defaultdict(list) 

63 for status_elem in root.Status: 

64 status_type = status_elem.attrib['Type'].lower().replace(' ', '_') 

65 product_ids = [product_id.text for product_id in status_elem.ProductID] 

66 statuses[status_type].extend(product_ids) 

67 

68 return statuses 

69 

70 @no_type_check 

71 def _handle_threats(self, root: RootType): 

72 threats = [] 

73 for threat_elem in root.Threat: 

74 threat = { 

75 'details': threat_elem.Description.text, 

76 'category': threat_elem.attrib['Type'].lower().replace(' ', '_'), 

77 } 

78 

79 if product_ids := threat_elem.ProductID: 

80 threat['product_ids'] = [product_id.text for product_id in product_ids] 

81 

82 if group_ids := threat_elem.GroupID: 

83 threat['group_ids'] = [group_id.text for group_id in group_ids] 

84 

85 if 'Date' in threat_elem.attrib: 

86 threat['date'] = get_utc_timestamp(threat_elem.attrib['Date']) 

87 

88 threats.append(threat) 

89 

90 return threats 

91 

92 @no_type_check 

93 def _handle_remediations(self, root: RootType, product_status): 

94 

95 remediations = [] 

96 for remediation_elem in root.Remediation: 

97 remediation = { 

98 'category': REMEDIATION_CATEGORY[remediation_elem.attrib['Type']], 

99 'details': remediation_elem.Description.text, 

100 } 

101 

102 if entitlements := remediation_elem.Entitlement: 

103 remediation['entitlements'] = [entitlement.text for entitlement in entitlements] 

104 

105 if url := remediation_elem.URL: 

106 remediation['url'] = url.text 

107 

108 if product_ids := remediation_elem.ProductID: 

109 remediation['product_ids'] = [product_id.text for product_id in product_ids] 

110 

111 if group_ids := remediation_elem.GroupID: 

112 remediation['group_ids'] = [group_id.text for group_id in group_ids] 

113 

114 if not any(('product_ids' in remediation, 'group_ids' in remediation)): 

115 if product_status: # try to fix 

116 product_ids = Vulnerabilities._parse_affected_product_ids(product_status) 

117 

118 if len(product_ids): 

119 remediation['product_ids'] = product_ids 

120 else: 

121 self.some_error = True 

122 logging.error('No product_ids or group_ids entries for remediation.') 

123 

124 if 'Date' in remediation_elem.attrib: 

125 remediation['date'] = get_utc_timestamp(remediation_elem.attrib['Date']) 

126 

127 remediations.append(remediation) 

128 

129 return remediations 

130 

131 @staticmethod 

132 def _base_score_to_severity(base_score: float) -> str: 

133 base_severity = ((0, 'NONE'), (3.9, 'LOW'), (6.9, 'MEDIUM'), (8.9, 'HIGH'), (10, 'CRITICAL')) 

134 return base_severity[bisect.bisect_right(base_severity, (base_score, ''))][1] 

135 

136 @no_type_check 

137 def _parse_affected_product_ids(self, product_status): 

138 """Parses ProductIDs with the states 'known_affected', 'first_affected' or 'last_affected' 

139 from product_status. 

140 """ 

141 states = ('known_affected', 'first_affected', 'last_affected') 

142 return sorted( 

143 set(affected_product_id for state in states for affected_product_id in product_status.get(state, [])) 

144 ) 

145 

146 @no_type_check 

147 def _parse_score_set(self, score_set_element, mapping, version, json_property, product_status): 

148 """Parses ScoreSetV2 or ScoreSetV3 element.""" 

149 cvss_score = { 

150 csaf: score_set_element.find(f'{ *} {cvrf}').text 

151 for cvrf, csaf in mapping.items() 

152 if score_set_element.find(f'{ *} {cvrf}') 

153 } 

154 

155 scores = ['baseScore', 'temporalScore', 'environmentalScore'] 

156 for score in scores: 

157 if cvss_score.get(score): 

158 cvss_score[score] = float(cvss_score[score]) 

159 

160 if json_property == 'cvss_v3': # Only cvss_v3 has baseSeverity 

161 cvss_score['baseSeverity'] = self._base_score_to_severity(cvss_score['baseScore']) 

162 

163 products = [] 

164 if product_ids := score_set_element.ProductID: 

165 products = [product_id.text for product_id in product_ids] 

166 elif product_status: # try fix missing product ids 

167 products = self._parse_affected_product_ids(product_status) 

168 

169 if len(products) == 0: 

170 self.some_error = True 

171 logging.error('No product_id entry for CVSS score set.') 

172 

173 # if missing, conversion fails unless remove_CVSS_values_without_vector is true 

174 # if remove_CVSS_values_without_vector is true, we just ignore the score_set 

175 if 'vectorString' not in cvss_score: 

176 if self.remove_cvss_values_without_vector: 

177 logging.warning( 

178 'No CVSS vector string found on the input,' 

179 ' ignoring ScoreSet element due to "remove_CVSS_values_without_vector" option.' 

180 ) 

181 return None 

182 

183 self.some_error = True 

184 logging.error('No CVSS vector string found on the input.') 

185 

186 # DETERMINE CVSS v 3.x from namespace 

187 cvss_3_regex = r'.*cvss-v(3\.[01]).*' 

188 match = re.match(cvss_3_regex, score_set_element.tag) 

189 if match: 

190 version = match.groups()[0] 

191 

192 # DETERMINE CVSS v 3.x from vector if present 

193 if 'vectorString' in cvss_score and json_property == 'cvss_v3': 

194 # Regex for determining the CVSS version 

195 regex = r'CVSS:(3\.[01]).*' 

196 match = re.match(regex, cvss_score['vectorString']) 

197 if not match: 

198 self.some_error = True 

199 logging.error('CVSS vector %s is not valid.', cvss_score['vectorString']) 

200 else: 

201 version = match.groups()[0] 

202 

203 cvss_score['version'] = version 

204 

205 score = {json_property: cvss_score, 'products': products} 

206 

207 return score 

208 

209 @no_type_check 

210 def _remove_cvssv3_duplicates(self, scores): 

211 """Removes products/cvssv3.x score sets for products having both v3.0 and v3.1 score. 

212 

213 Three-step approach: 

214 

215 - find products having both versions specified 

216 - remove those products from score set with version 3.0 

217 - removes score sets with no products 

218 """ 

219 products_v3_1 = set( 

220 chain.from_iterable( 

221 [ 

222 score_set['products'] 

223 for score_set in scores 

224 if 'cvss_v3' in score_set and score_set['cvss_v3']['version'] == '3.1' 

225 ] 

226 ) 

227 ) 

228 products_v3_0 = set( 

229 chain.from_iterable( 

230 [ 

231 score_set['products'] 

232 for score_set in scores 

233 if 'cvss_v3' in score_set and score_set['cvss_v3']['version'] == '3.0' 

234 ] 

235 ) 

236 ) 

237 both_versions = products_v3_0.intersection(products_v3_1) 

238 

239 for score_set in scores: 

240 if 'cvss_v3' in score_set and score_set['cvss_v3']['version'] == '3.0': 

241 score_set['products'] = [product for product in score_set['products'] if product not in both_versions] 

242 

243 return [score_set for score_set in scores if len(score_set['products']) > 0] 

244 

245 @no_type_check 

246 def _handle_scores(self, root: RootType, product_status): 

247 score_variants = ( 

248 ('ScoreSetV2', SCORE_CVSS_V2, '2.0', 'cvss_v2'), 

249 ('ScoreSetV3', SCORE_CVSS_V3, self.default_cvss_version, 'cvss_v3'), 

250 ) 

251 

252 scores = [] 

253 for score_variant, mapping, score_version, target in score_variants: 

254 for score_set in root.findall(f'{ *} {score_variant}'): 

255 score = self._parse_score_set(score_set, mapping, score_version, target, product_status) 

256 if score is not None: 

257 scores.append(score) 

258 

259 return self._remove_cvssv3_duplicates(scores) 

260 

261 def sometimes(self, root: RootType) -> None: 

262 vulnerability = {} 

263 if acknowledgments := root.Acknowledgments: 

264 acks = Acknowledgments(lc_parent_code='vuln') 

265 acks.load(acknowledgments) 

266 vulnerability['acknowledgments'] = acks.dump() 

267 

268 if cve := root.CVE: 

269 # Note: "^CVE-[0-9]{4}-[0-9]{4,}$" differs from CVRF regex -> delegate to JSON Schema validation 

270 vulnerability['cve'] = cve.text # type: ignore 

271 

272 if cwes := root.CWE: 

273 if len(cwes) > 1: 

274 logging.warning('%s CWE elements found, using only the first one.', len(cwes)) 

275 vulnerability['cwe'] = {'id': cwes[0].attrib['ID'], 'name': cwes[0].text} 

276 

277 if discovery_date_in := root.DiscoveryDate: 

278 discovery_date, problems = get_utc_timestamp(discovery_date_in.text or '') 

279 for level, problem in problems: 

280 logging.log(level, problem) 

281 vulnerability['discovery_date'] = discovery_date # type: ignore 

282 

283 if vuln_id := root.ID: 

284 vulnerability['ids'] = [ 

285 {'system_name': vuln_id.attrib['SystemName'], 'text': vuln_id.text}, # type: ignore 

286 ] 

287 

288 if involvements := root.Involvements: 

289 vulnerability['involvements'] = self._handle_involvements(involvements) 

290 

291 if notes_root := root.Notes: 

292 notes = Notes(lc_parent_code='vuln') 

293 notes.load(notes_root) 

294 vulnerability['notes'] = notes.dump() 

295 

296 if product_statuses := root.ProductStatuses: 

297 vulnerability['product_status'] = self._handle_product_statuses(product_statuses) 

298 

299 if references_root := root.References: 

300 references = References(config=self.config, lc_parent_code='vuln') 

301 references.load(references_root) 

302 vulnerability['references'] = references.dump() 

303 

304 if release_date_in := root.ReleaseDate: 

305 release_date, problems = get_utc_timestamp(release_date_in.text or '') 

306 for level, problem in problems: 

307 logging.log(level, problem) 

308 vulnerability['release_date'] = release_date # type: ignore 

309 

310 if remediations := root.Remediations: 

311 product_status = vulnerability.get('product_status') 

312 vulnerability['remediations'] = self._handle_remediations(remediations, product_status) 

313 

314 if scores_root := root.CVSSScoreSets: 

315 if len(scores := self._handle_scores(scores_root, vulnerability.get('product_status'))): 

316 vulnerability['scores'] = scores 

317 else: 

318 logging.warning('None of the ScoreSet elements parsed, removing "scores" entry from the output.') 

319 

320 if threats := root.Threats: 

321 vulnerability['threats'] = self._handle_threats(threats) 

322 

323 if title := root.Title: 

324 vulnerability['title'] = title.text # type: ignore 

325 

326 self.hook.append(vulnerability)