Coverage for muuntaa/vuln.py: 0.00%
193 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-21 12:16:47 +00:00
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-21 12:16:47 +00:00
1"""Vulnerabilities type."""
3import bisect
4import logging
5import re
6from typing import Union, no_type_check
8from collections import defaultdict
9from itertools import chain
11import lxml.objectify # nosec B410
13from muuntaa.ack import Acknowledgments
14from muuntaa.dialect import SCORE_CVSS_V2, SCORE_CVSS_V3, REMEDIATION_CATEGORY
15from muuntaa.notes import Notes
16from muuntaa.refs import References
17from muuntaa.strftime import get_utc_timestamp
18from muuntaa.subtree import Subtree
19from muuntaa import ConfigType
21RootType = lxml.objectify.ObjectifiedElement
22RevHistType = list[dict[str, Union[str, None, tuple[int, ...]]]]
25class Vulnerabilities(Subtree):
26 """Represents the Vulnerabilities type.
28 (
29 /cvrf:cvrfdoc/vuln:Vulnerability,
30 )
31 """
33 def __init__(self, config: ConfigType):
34 super().__init__()
35 self.config = config
36 self.remove_cvss_values_without_vector = config['remove_CVSS_values_without_vector']
37 self.default_cvss_version = config['default_CVSS3_version']
38 if self.tree.get('vulnerabilities') is None:
39 self.tree['vulnerabilities'] = []
40 self.hook = self.tree['vulnerabilities']
42 def always(self, root: RootType) -> None:
43 pass
45 @no_type_check
46 def _handle_involvements(self, root: RootType):
47 involvements = []
48 for involvement_elem in root.Involvement:
49 involvement = {
50 'party': involvement_elem.attrib['Party'].lower(),
51 'status': involvement_elem.attrib['Status'].lower().replace(' ', '_'),
52 }
54 if hasattr(involvement_elem, 'Description'):
55 involvement['summary'] = involvement_elem.Description.text
56 involvements.append(involvement)
58 return involvements
60 @no_type_check
61 def _handle_product_statuses(self, root: RootType):
62 statuses = defaultdict(list)
63 for status_elem in root.Status:
64 status_type = status_elem.attrib['Type'].lower().replace(' ', '_')
65 product_ids = [product_id.text for product_id in status_elem.ProductID]
66 statuses[status_type].extend(product_ids)
68 return statuses
70 @no_type_check
71 def _handle_threats(self, root: RootType):
72 threats = []
73 for threat_elem in root.Threat:
74 threat = {
75 'details': threat_elem.Description.text,
76 'category': threat_elem.attrib['Type'].lower().replace(' ', '_'),
77 }
79 if product_ids := threat_elem.ProductID:
80 threat['product_ids'] = [product_id.text for product_id in product_ids]
82 if group_ids := threat_elem.GroupID:
83 threat['group_ids'] = [group_id.text for group_id in group_ids]
85 if 'Date' in threat_elem.attrib:
86 threat['date'] = get_utc_timestamp(threat_elem.attrib['Date'])
88 threats.append(threat)
90 return threats
92 @no_type_check
93 def _handle_remediations(self, root: RootType, product_status):
95 remediations = []
96 for remediation_elem in root.Remediation:
97 remediation = {
98 'category': REMEDIATION_CATEGORY[remediation_elem.attrib['Type']],
99 'details': remediation_elem.Description.text,
100 }
102 if entitlements := remediation_elem.Entitlement:
103 remediation['entitlements'] = [entitlement.text for entitlement in entitlements]
105 if url := remediation_elem.URL:
106 remediation['url'] = url.text
108 if product_ids := remediation_elem.ProductID:
109 remediation['product_ids'] = [product_id.text for product_id in product_ids]
111 if group_ids := remediation_elem.GroupID:
112 remediation['group_ids'] = [group_id.text for group_id in group_ids]
114 if not any(('product_ids' in remediation, 'group_ids' in remediation)):
115 if product_status: # try to fix
116 product_ids = Vulnerabilities._parse_affected_product_ids(product_status)
118 if len(product_ids):
119 remediation['product_ids'] = product_ids
120 else:
121 self.some_error = True
122 logging.error('No product_ids or group_ids entries for remediation.')
124 if 'Date' in remediation_elem.attrib:
125 remediation['date'] = get_utc_timestamp(remediation_elem.attrib['Date'])
127 remediations.append(remediation)
129 return remediations
131 @staticmethod
132 def _base_score_to_severity(base_score: float) -> str:
133 base_severity = ((0, 'NONE'), (3.9, 'LOW'), (6.9, 'MEDIUM'), (8.9, 'HIGH'), (10, 'CRITICAL'))
134 return base_severity[bisect.bisect_right(base_severity, (base_score, ''))][1]
136 @no_type_check
137 def _parse_affected_product_ids(self, product_status):
138 """Parses ProductIDs with the states 'known_affected', 'first_affected' or 'last_affected'
139 from product_status.
140 """
141 states = ('known_affected', 'first_affected', 'last_affected')
142 return sorted(
143 set(affected_product_id for state in states for affected_product_id in product_status.get(state, []))
144 )
146 @no_type_check
147 def _parse_score_set(self, score_set_element, mapping, version, json_property, product_status):
148 """Parses ScoreSetV2 or ScoreSetV3 element."""
149 cvss_score = {
150 csaf: score_set_element.find(f'{ *} {cvrf}').text
151 for cvrf, csaf in mapping.items()
152 if score_set_element.find(f'{ *} {cvrf}')
153 }
155 scores = ['baseScore', 'temporalScore', 'environmentalScore']
156 for score in scores:
157 if cvss_score.get(score):
158 cvss_score[score] = float(cvss_score[score])
160 if json_property == 'cvss_v3': # Only cvss_v3 has baseSeverity
161 cvss_score['baseSeverity'] = self._base_score_to_severity(cvss_score['baseScore'])
163 products = []
164 if product_ids := score_set_element.ProductID:
165 products = [product_id.text for product_id in product_ids]
166 elif product_status: # try fix missing product ids
167 products = self._parse_affected_product_ids(product_status)
169 if len(products) == 0:
170 self.some_error = True
171 logging.error('No product_id entry for CVSS score set.')
173 # if missing, conversion fails unless remove_CVSS_values_without_vector is true
174 # if remove_CVSS_values_without_vector is true, we just ignore the score_set
175 if 'vectorString' not in cvss_score:
176 if self.remove_cvss_values_without_vector:
177 logging.warning(
178 'No CVSS vector string found on the input,'
179 ' ignoring ScoreSet element due to "remove_CVSS_values_without_vector" option.'
180 )
181 return None
183 self.some_error = True
184 logging.error('No CVSS vector string found on the input.')
186 # DETERMINE CVSS v 3.x from namespace
187 cvss_3_regex = r'.*cvss-v(3\.[01]).*'
188 match = re.match(cvss_3_regex, score_set_element.tag)
189 if match:
190 version = match.groups()[0]
192 # DETERMINE CVSS v 3.x from vector if present
193 if 'vectorString' in cvss_score and json_property == 'cvss_v3':
194 # Regex for determining the CVSS version
195 regex = r'CVSS:(3\.[01]).*'
196 match = re.match(regex, cvss_score['vectorString'])
197 if not match:
198 self.some_error = True
199 logging.error('CVSS vector %s is not valid.', cvss_score['vectorString'])
200 else:
201 version = match.groups()[0]
203 cvss_score['version'] = version
205 score = {json_property: cvss_score, 'products': products}
207 return score
209 @no_type_check
210 def _remove_cvssv3_duplicates(self, scores):
211 """Removes products/cvssv3.x score sets for products having both v3.0 and v3.1 score.
213 Three-step approach:
215 - find products having both versions specified
216 - remove those products from score set with version 3.0
217 - removes score sets with no products
218 """
219 products_v3_1 = set(
220 chain.from_iterable(
221 [
222 score_set['products']
223 for score_set in scores
224 if 'cvss_v3' in score_set and score_set['cvss_v3']['version'] == '3.1'
225 ]
226 )
227 )
228 products_v3_0 = set(
229 chain.from_iterable(
230 [
231 score_set['products']
232 for score_set in scores
233 if 'cvss_v3' in score_set and score_set['cvss_v3']['version'] == '3.0'
234 ]
235 )
236 )
237 both_versions = products_v3_0.intersection(products_v3_1)
239 for score_set in scores:
240 if 'cvss_v3' in score_set and score_set['cvss_v3']['version'] == '3.0':
241 score_set['products'] = [product for product in score_set['products'] if product not in both_versions]
243 return [score_set for score_set in scores if len(score_set['products']) > 0]
245 @no_type_check
246 def _handle_scores(self, root: RootType, product_status):
247 score_variants = (
248 ('ScoreSetV2', SCORE_CVSS_V2, '2.0', 'cvss_v2'),
249 ('ScoreSetV3', SCORE_CVSS_V3, self.default_cvss_version, 'cvss_v3'),
250 )
252 scores = []
253 for score_variant, mapping, score_version, target in score_variants:
254 for score_set in root.findall(f'{ *} {score_variant}'):
255 score = self._parse_score_set(score_set, mapping, score_version, target, product_status)
256 if score is not None:
257 scores.append(score)
259 return self._remove_cvssv3_duplicates(scores)
261 def sometimes(self, root: RootType) -> None:
262 vulnerability = {}
263 if acknowledgments := root.Acknowledgments:
264 acks = Acknowledgments(lc_parent_code='vuln')
265 acks.load(acknowledgments)
266 vulnerability['acknowledgments'] = acks.dump()
268 if cve := root.CVE:
269 # Note: "^CVE-[0-9]{4}-[0-9]{4,}$" differs from CVRF regex -> delegate to JSON Schema validation
270 vulnerability['cve'] = cve.text # type: ignore
272 if cwes := root.CWE:
273 if len(cwes) > 1:
274 logging.warning('%s CWE elements found, using only the first one.', len(cwes))
275 vulnerability['cwe'] = {'id': cwes[0].attrib['ID'], 'name': cwes[0].text}
277 if discovery_date_in := root.DiscoveryDate:
278 discovery_date, problems = get_utc_timestamp(discovery_date_in.text or '')
279 for level, problem in problems:
280 logging.log(level, problem)
281 vulnerability['discovery_date'] = discovery_date # type: ignore
283 if vuln_id := root.ID:
284 vulnerability['ids'] = [
285 {'system_name': vuln_id.attrib['SystemName'], 'text': vuln_id.text}, # type: ignore
286 ]
288 if involvements := root.Involvements:
289 vulnerability['involvements'] = self._handle_involvements(involvements)
291 if notes_root := root.Notes:
292 notes = Notes(lc_parent_code='vuln')
293 notes.load(notes_root)
294 vulnerability['notes'] = notes.dump()
296 if product_statuses := root.ProductStatuses:
297 vulnerability['product_status'] = self._handle_product_statuses(product_statuses)
299 if references_root := root.References:
300 references = References(config=self.config, lc_parent_code='vuln')
301 references.load(references_root)
302 vulnerability['references'] = references.dump()
304 if release_date_in := root.ReleaseDate:
305 release_date, problems = get_utc_timestamp(release_date_in.text or '')
306 for level, problem in problems:
307 logging.log(level, problem)
308 vulnerability['release_date'] = release_date # type: ignore
310 if remediations := root.Remediations:
311 product_status = vulnerability.get('product_status')
312 vulnerability['remediations'] = self._handle_remediations(remediations, product_status)
314 if scores_root := root.CVSSScoreSets:
315 if len(scores := self._handle_scores(scores_root, vulnerability.get('product_status'))):
316 vulnerability['scores'] = scores
317 else:
318 logging.warning('None of the ScoreSet elements parsed, removing "scores" entry from the output.')
320 if threats := root.Threats:
321 vulnerability['threats'] = self._handle_threats(threats)
323 if title := root.Title:
324 vulnerability['title'] = title.text # type: ignore
326 self.hook.append(vulnerability)