Coverage for nineties/parser.py: 100%
82 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:53:48 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:53:48 +00:00
1# -*- coding: utf-8 -*-
2"""Parsers for issues from the Nineties."""
3import datetime as dti
4import operator
5from collections import Counter
6from typing import Tuple, no_type_check
8DASH = '-'
9ISO_FMT = '%Y-%m-%dT%H:%M:%S.%f'
10ISO_LENGTH = len('YYYY-mm-ddTHH:MM:SS.fff')
11TZ_OP = {'+': operator.sub, '-': operator.add} # + indicates ahead of UTC
13JR_NULL = '<null>'
14NA = 'n/a'
16START_DATA, END_DATA = '[', ']'
17REC_SEP, KV_SEP = ',', '='
18FINAL_DSL_KEY = 'final'
21def split_at(text_fragment: str, pos: int) -> Tuple[str, str]:
22 """Split text fragment by position and return pair as tuple."""
23 return text_fragment[:pos], text_fragment[pos:]
26@no_type_check
27def parse_timestamp(text_stamp):
28 """
29 Parse the timestamp formats found in REST responses from the Nineties.
31 Return as datetime timestamp in UTC (implicit).
32 """
33 if text_stamp is None or text_stamp == JR_NULL:
34 return None
36 iso_value, off = split_at(text_stamp, ISO_LENGTH)
37 local_time = dti.datetime.strptime(iso_value, ISO_FMT)
38 if not off:
39 return local_time
41 sign_pos = 0
42 assert off and off[sign_pos] in TZ_OP # nosec B101
44 m_start = 3 if ':' not in off else 4
45 assert len(off) == m_start + 2 # nosec B101
47 oper, hours, minutes = off[sign_pos], int(off[1:3]), int(off[m_start:])
49 return TZ_OP[oper](local_time, dti.timedelta(hours=hours, minutes=minutes))
52@no_type_check
53def split_kv(text_pair, sep):
54 """Helper."""
55 try:
56 key, value = text_pair.split(sep, 1)
57 except TypeError:
58 return None, text_pair
59 except ValueError:
60 return None, text_pair
61 if not key:
62 return None, None
63 return key, value
66@no_type_check
67def split_issue_key(text_pair, sep=DASH):
68 """Split left hand project identifier text from integer local id.
70 Many issue tracking systems from the Nineties use dash (-) to separate the two scopes."""
71 project, serial = split_kv(text_pair, sep=sep)
72 if project:
73 return project, int(serial)
75 raise ValueError('%s is not a valid issue key composed of project and serial' % (text_pair,))
78@no_type_check
79def sorted_issue_keys_gen(key_iter, sep=DASH):
80 """Sort by project first and serial second."""
81 for project, serial in sorted(split_issue_key(txt, sep=sep) for txt in key_iter):
82 yield '{}-{}'.format(project, serial)
85@no_type_check
86def most_common_issue_projects(key_iter, n=None, sep=DASH):
87 """Provide issue counts grouped by project and most frequent first."""
88 return Counter(split_issue_key(txt, sep=sep)[0] for txt in key_iter).most_common(n)
91@no_type_check
92def stable_make_unique(key_iter):
93 """Filter duplicates from hashable elements of key_iter maintaining insert order."""
94 return tuple({val: None for val in key_iter}.keys())
97@no_type_check
98def parse_dsl_entry(text_entry, final_key=None):
99 """
100 Parse some nifty dict() like argument list where the final rhs is untrusted.
102 Return a dict of the pairs upon success or empty otherwise.
103 """
104 if not text_entry:
105 return {}
106 rococo = '"' + END_DATA
107 text_entry = text_entry.strip(rococo)
108 _, payload = text_entry.split(START_DATA, 1) # final rhs may by anything
110 final_key = FINAL_DSL_KEY if final_key is None else final_key
111 final_key_indicator = REC_SEP + final_key + KV_SEP
112 empty_goal_indicator = final_key_indicator + JR_NULL
113 if ( # pylint: disable=bad-continuation
114 payload.endswith(empty_goal_indicator) and payload.count(final_key_indicator) == 1
115 ):
116 text_pairs = payload.split(REC_SEP)
117 else:
118 others, final = payload.split(final_key_indicator, 1)
119 text_pairs = others.split(REC_SEP) + [final_key + KV_SEP + final]
121 pairs = [split_kv(text_pair, KV_SEP) for text_pair in text_pairs]
122 record = {k: v for k, v in pairs if k}
124 for key, value in record.items():
125 key_lower = key.lower()
126 if 'date' in key_lower:
127 record[key] = parse_timestamp(value)
128 elif 'id' in key_lower or 'sequence' in key_lower:
129 record[key] = int(value)
130 elif key == final_key and value == JR_NULL:
131 record[key] = NA
133 return record