Coverage for nineties/parser.py: 100%

82 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 20:53:48 +00:00

1# -*- coding: utf-8 -*- 

2"""Parsers for issues from the Nineties.""" 

3import datetime as dti 

4import operator 

5from collections import Counter 

6from typing import Tuple, no_type_check 

7 

8DASH = '-' 

9ISO_FMT = '%Y-%m-%dT%H:%M:%S.%f' 

10ISO_LENGTH = len('YYYY-mm-ddTHH:MM:SS.fff') 

11TZ_OP = {'+': operator.sub, '-': operator.add} # + indicates ahead of UTC 

12 

13JR_NULL = '<null>' 

14NA = 'n/a' 

15 

16START_DATA, END_DATA = '[', ']' 

17REC_SEP, KV_SEP = ',', '=' 

18FINAL_DSL_KEY = 'final' 

19 

20 

21def split_at(text_fragment: str, pos: int) -> Tuple[str, str]: 

22 """Split text fragment by position and return pair as tuple.""" 

23 return text_fragment[:pos], text_fragment[pos:] 

24 

25 

26@no_type_check 

27def parse_timestamp(text_stamp): 

28 """ 

29 Parse the timestamp formats found in REST responses from the Nineties. 

30 

31 Return as datetime timestamp in UTC (implicit). 

32 """ 

33 if text_stamp is None or text_stamp == JR_NULL: 

34 return None 

35 

36 iso_value, off = split_at(text_stamp, ISO_LENGTH) 

37 local_time = dti.datetime.strptime(iso_value, ISO_FMT) 

38 if not off: 

39 return local_time 

40 

41 sign_pos = 0 

42 assert off and off[sign_pos] in TZ_OP # nosec B101 

43 

44 m_start = 3 if ':' not in off else 4 

45 assert len(off) == m_start + 2 # nosec B101 

46 

47 oper, hours, minutes = off[sign_pos], int(off[1:3]), int(off[m_start:]) 

48 

49 return TZ_OP[oper](local_time, dti.timedelta(hours=hours, minutes=minutes)) 

50 

51 

52@no_type_check 

53def split_kv(text_pair, sep): 

54 """Helper.""" 

55 try: 

56 key, value = text_pair.split(sep, 1) 

57 except TypeError: 

58 return None, text_pair 

59 except ValueError: 

60 return None, text_pair 

61 if not key: 

62 return None, None 

63 return key, value 

64 

65 

66@no_type_check 

67def split_issue_key(text_pair, sep=DASH): 

68 """Split left hand project identifier text from integer local id. 

69 

70 Many issue tracking systems from the Nineties use dash (-) to separate the two scopes.""" 

71 project, serial = split_kv(text_pair, sep=sep) 

72 if project: 

73 return project, int(serial) 

74 

75 raise ValueError('%s is not a valid issue key composed of project and serial' % (text_pair,)) 

76 

77 

78@no_type_check 

79def sorted_issue_keys_gen(key_iter, sep=DASH): 

80 """Sort by project first and serial second.""" 

81 for project, serial in sorted(split_issue_key(txt, sep=sep) for txt in key_iter): 

82 yield '{}-{}'.format(project, serial) 

83 

84 

85@no_type_check 

86def most_common_issue_projects(key_iter, n=None, sep=DASH): 

87 """Provide issue counts grouped by project and most frequent first.""" 

88 return Counter(split_issue_key(txt, sep=sep)[0] for txt in key_iter).most_common(n) 

89 

90 

91@no_type_check 

92def stable_make_unique(key_iter): 

93 """Filter duplicates from hashable elements of key_iter maintaining insert order.""" 

94 return tuple({val: None for val in key_iter}.keys()) 

95 

96 

97@no_type_check 

98def parse_dsl_entry(text_entry, final_key=None): 

99 """ 

100 Parse some nifty dict() like argument list where the final rhs is untrusted. 

101 

102 Return a dict of the pairs upon success or empty otherwise. 

103 """ 

104 if not text_entry: 

105 return {} 

106 rococo = '"' + END_DATA 

107 text_entry = text_entry.strip(rococo) 

108 _, payload = text_entry.split(START_DATA, 1) # final rhs may by anything 

109 

110 final_key = FINAL_DSL_KEY if final_key is None else final_key 

111 final_key_indicator = REC_SEP + final_key + KV_SEP 

112 empty_goal_indicator = final_key_indicator + JR_NULL 

113 if ( # pylint: disable=bad-continuation 

114 payload.endswith(empty_goal_indicator) and payload.count(final_key_indicator) == 1 

115 ): 

116 text_pairs = payload.split(REC_SEP) 

117 else: 

118 others, final = payload.split(final_key_indicator, 1) 

119 text_pairs = others.split(REC_SEP) + [final_key + KV_SEP + final] 

120 

121 pairs = [split_kv(text_pair, KV_SEP) for text_pair in text_pairs] 

122 record = {k: v for k, v in pairs if k} 

123 

124 for key, value in record.items(): 

125 key_lower = key.lower() 

126 if 'date' in key_lower: 

127 record[key] = parse_timestamp(value) 

128 elif 'id' in key_lower or 'sequence' in key_lower: 

129 record[key] = int(value) 

130 elif key == final_key and value == JR_NULL: 

131 record[key] = NA 

132 

133 return record