Coverage for tutkia/api.py: 0.00%

144 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-05 19:32:08 +00:00

1"""Explore (Finnish: tutkia) ticket system trees. - application programming interface.""" 

2 

3import argparse 

4import inspect 

5import os 

6from typing import Union, no_type_check 

7 

8from atlassian import Jira # type: ignore 

9from tutkia import APP_ALIAS 

10 

11COMMA = ',' 

12FIELDS = 'fields' 

13NAME = 'name' 

14NONE = 'None' 

15PERCENT = 'percent' 

16SEMI = ';' 

17VALUE = 'value' 

18CONN_SERVER = os.getenv(f'{APP_ALIAS}_SERVER', '').rstrip(SEMI) 

19CONN_USER = os.getenv(f'{APP_ALIAS}_USER', '') 

20CONN_TOKEN = os.getenv(f'{APP_ALIAS}_TOKEN', '') 

21 

22 

23@no_type_check 

24def extract(per, seq, slot: int): 

25 """Extract per functors from sequence at slot.""" 

26 return [[extractor.__name__, extractor(seq[slot][FIELDS])] for extractor in per] 

27 

28 

29def cf(n: int) -> str: 

30 """Shorthand for data interpolation.""" 

31 return f'customfield_{n}' 

32 

33 

34CF = { 

35 'Acceptance Criteria': cf(11000), 

36 'Customer Project Code': cf(13601), 

37 'Department': cf(11616), 

38 'ORG Engineering Service': cf(13801), 

39 'Lifecycle Stage': cf(13701), 

40 'Start date': cf(13500), 

41 'ORG customers': cf(11621), 

42 'ORG Product': cf(12317), 

43} 

44 

45FNI = inspect.currentframe 

46 

47 

48@no_type_check 

49def unwrap(d, sk, default: Union[int, str] = NONE, pk=None): 

50 if pk is None: 

51 pk = FNI().f_back.f_code.co_name 

52 data = d.get(pk, None) 

53 if data is None: 

54 return default 

55 return data.get(sk, default) if d[pk] else default 

56 

57 

58@no_type_check 

59def summary(d): 

60 return d[FNI().f_code.co_name] 

61 

62 

63@no_type_check 

64def issuetype(d): 

65 return unwrap(d, NAME) 

66 

67 

68@no_type_check 

69def priority(d): 

70 return unwrap(d, NAME) 

71 

72 

73@no_type_check 

74def status(d): 

75 return unwrap(d, NAME) 

76 

77 

78@no_type_check 

79def resolution(d): 

80 return unwrap(d, NAME) 

81 

82 

83@no_type_check 

84def reporter(d): 

85 return unwrap(d, NAME) 

86 

87 

88@no_type_check 

89def assignee(d): 

90 return unwrap(d, NAME) 

91 

92 

93@no_type_check 

94def department(d): 

95 return unwrap(d, VALUE, pk=CF['Department']) 

96 

97 

98@no_type_check 

99def org_eng_service(d): 

100 return unwrap(d, VALUE, pk=CF['ORG Engineering Service']) 

101 

102 

103@no_type_check 

104def life_cycle_stage(d): 

105 return unwrap(d, VALUE, pk=CF['Lifecycle Stage']) 

106 

107 

108@no_type_check 

109def org_customer(d): 

110 return unwrap(d, VALUE, pk=CF['ORG customers']) 

111 

112 

113@no_type_check 

114def customer_project_code(d): 

115 return d.get(CF['Customer Project Code'], NONE) 

116 

117 

118@no_type_check 

119def org_product(d): 

120 return unwrap(d, VALUE, pk=CF['ORG Product']) 

121 

122 

123@no_type_check 

124def created(d): 

125 return d[FNI().f_code.co_name] 

126 

127 

128@no_type_check 

129def start_date(d): 

130 return d.get(CF['Start date'], NONE) 

131 

132 

133@no_type_check 

134def updated(d): 

135 return d[FNI().f_code.co_name] 

136 

137 

138@no_type_check 

139def due_date(d): 

140 return d.get('duedate', NONE) 

141 

142 

143@no_type_check 

144def aggregateprogress(d): 

145 return unwrap(d, PERCENT) 

146 

147 

148@no_type_check 

149def labels(d): 

150 return d[FNI().f_code.co_name] 

151 

152 

153@no_type_check 

154def aggregatetimeoriginalestimate(d): 

155 return d[FNI().f_code.co_name] 

156 

157 

158@no_type_check 

159def timeoriginalestimate(d): 

160 return d[FNI().f_code.co_name] 

161 

162 

163@no_type_check 

164def timeestimate(d): 

165 return d[FNI().f_code.co_name] 

166 

167 

168@no_type_check 

169def original_estimate_seconds(d): 

170 return unwrap(d, 'originalEstimateSeconds', default=0, pk='timetracking') 

171 

172 

173@no_type_check 

174def remaining_estimate_seconds(d): 

175 return unwrap(d, 'remainingEstimateSeconds', default=0, pk='timetracking') 

176 

177 

178@no_type_check 

179def time_spent_seconds(d): 

180 return unwrap(d, 'timeSpentSeconds', default=0, pk='timetracking') 

181 

182 

183@no_type_check 

184def process(options: argparse.Namespace): 

185 """Process the command line request.""" 

186 global CONN_TOKEN 

187 query = options.query 

188 jira = Jira(url=CONN_SERVER, username=CONN_USER, password=CONN_TOKEN, cloud=False) 

189 CONN_TOKEN = '*' * 42 # overwrite after use 

190 

191 try: 

192 issues = jira.jql(query) 

193 except Exception as err: # noqa 

194 print(err) 

195 return 1 

196 

197 retrieved = len(issues['issues']) 

198 total = issues['total'] 

199 print(f'Retrieved {retrieved} of {total} matching issues') 

200 page_first = issues['startAt'] 

201 page_max = issues['maxResults'] 

202 max_page_frame = (page_first, page_max - 1) 

203 print(f'Current maximal page frame is {max_page_frame}') 

204 eff_page_frame = max_page_frame if total > page_max else (page_first, total - 1) 

205 print(f'Current effective page frame is {eff_page_frame}') 

206 

207 try: 

208 expand_dims = issues['expand'].split(COMMA) 

209 except KeyError: 

210 print(f'No issues match query ({query})') 

211 print(f'- received response: {issues}') 

212 return 0 

213 

214 print(f'Expanded dimensions is {tuple(str(v) for v in expand_dims)}') 

215 issue_seq = issues['issues'] 

216 keys_present = [issue['key'] for issue in issue_seq] 

217 print(f'Keys present: {tuple(str(v) for v in keys_present)}') 

218 

219 # list(issues['issues'][0]['fields'].keys()) 

220 via = ( 

221 summary, 

222 issuetype, 

223 priority, 

224 status, 

225 resolution, 

226 reporter, 

227 assignee, 

228 department, 

229 org_eng_service, 

230 life_cycle_stage, 

231 org_customer, 

232 customer_project_code, 

233 org_product, 

234 created, 

235 start_date, 

236 updated, 

237 due_date, 

238 aggregateprogress, 

239 labels, 

240 aggregatetimeoriginalestimate, 

241 timeoriginalestimate, 

242 timeestimate, 

243 original_estimate_seconds, 

244 remaining_estimate_seconds, 

245 time_spent_seconds, 

246 ) 

247 

248 print() 

249 for slot, key in enumerate(keys_present): 

250 pairs = extract(via, issue_seq, slot) 

251 print(f'{slot + 1}. {key}') 

252 for k, v in pairs: 

253 print(f' - {k :42s}: {v}') 

254 print() 

255 return 1