Coverage for tutkia/api.py: 0.00%
144 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-05 19:32:08 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-05 19:32:08 +00:00
1"""Explore (Finnish: tutkia) ticket system trees. - application programming interface."""
3import argparse
4import inspect
5import os
6from typing import Union, no_type_check
8from atlassian import Jira # type: ignore
9from tutkia import APP_ALIAS
11COMMA = ','
12FIELDS = 'fields'
13NAME = 'name'
14NONE = 'None'
15PERCENT = 'percent'
16SEMI = ';'
17VALUE = 'value'
18CONN_SERVER = os.getenv(f'{APP_ALIAS}_SERVER', '').rstrip(SEMI)
19CONN_USER = os.getenv(f'{APP_ALIAS}_USER', '')
20CONN_TOKEN = os.getenv(f'{APP_ALIAS}_TOKEN', '')
23@no_type_check
24def extract(per, seq, slot: int):
25 """Extract per functors from sequence at slot."""
26 return [[extractor.__name__, extractor(seq[slot][FIELDS])] for extractor in per]
29def cf(n: int) -> str:
30 """Shorthand for data interpolation."""
31 return f'customfield_{n}'
34CF = {
35 'Acceptance Criteria': cf(11000),
36 'Customer Project Code': cf(13601),
37 'Department': cf(11616),
38 'ORG Engineering Service': cf(13801),
39 'Lifecycle Stage': cf(13701),
40 'Start date': cf(13500),
41 'ORG customers': cf(11621),
42 'ORG Product': cf(12317),
43}
45FNI = inspect.currentframe
48@no_type_check
49def unwrap(d, sk, default: Union[int, str] = NONE, pk=None):
50 if pk is None:
51 pk = FNI().f_back.f_code.co_name
52 data = d.get(pk, None)
53 if data is None:
54 return default
55 return data.get(sk, default) if d[pk] else default
58@no_type_check
59def summary(d):
60 return d[FNI().f_code.co_name]
63@no_type_check
64def issuetype(d):
65 return unwrap(d, NAME)
68@no_type_check
69def priority(d):
70 return unwrap(d, NAME)
73@no_type_check
74def status(d):
75 return unwrap(d, NAME)
78@no_type_check
79def resolution(d):
80 return unwrap(d, NAME)
83@no_type_check
84def reporter(d):
85 return unwrap(d, NAME)
88@no_type_check
89def assignee(d):
90 return unwrap(d, NAME)
93@no_type_check
94def department(d):
95 return unwrap(d, VALUE, pk=CF['Department'])
98@no_type_check
99def org_eng_service(d):
100 return unwrap(d, VALUE, pk=CF['ORG Engineering Service'])
103@no_type_check
104def life_cycle_stage(d):
105 return unwrap(d, VALUE, pk=CF['Lifecycle Stage'])
108@no_type_check
109def org_customer(d):
110 return unwrap(d, VALUE, pk=CF['ORG customers'])
113@no_type_check
114def customer_project_code(d):
115 return d.get(CF['Customer Project Code'], NONE)
118@no_type_check
119def org_product(d):
120 return unwrap(d, VALUE, pk=CF['ORG Product'])
123@no_type_check
124def created(d):
125 return d[FNI().f_code.co_name]
128@no_type_check
129def start_date(d):
130 return d.get(CF['Start date'], NONE)
133@no_type_check
134def updated(d):
135 return d[FNI().f_code.co_name]
138@no_type_check
139def due_date(d):
140 return d.get('duedate', NONE)
143@no_type_check
144def aggregateprogress(d):
145 return unwrap(d, PERCENT)
148@no_type_check
149def labels(d):
150 return d[FNI().f_code.co_name]
153@no_type_check
154def aggregatetimeoriginalestimate(d):
155 return d[FNI().f_code.co_name]
158@no_type_check
159def timeoriginalestimate(d):
160 return d[FNI().f_code.co_name]
163@no_type_check
164def timeestimate(d):
165 return d[FNI().f_code.co_name]
168@no_type_check
169def original_estimate_seconds(d):
170 return unwrap(d, 'originalEstimateSeconds', default=0, pk='timetracking')
173@no_type_check
174def remaining_estimate_seconds(d):
175 return unwrap(d, 'remainingEstimateSeconds', default=0, pk='timetracking')
178@no_type_check
179def time_spent_seconds(d):
180 return unwrap(d, 'timeSpentSeconds', default=0, pk='timetracking')
183@no_type_check
184def process(options: argparse.Namespace):
185 """Process the command line request."""
186 global CONN_TOKEN
187 query = options.query
188 jira = Jira(url=CONN_SERVER, username=CONN_USER, password=CONN_TOKEN, cloud=False)
189 CONN_TOKEN = '*' * 42 # overwrite after use
191 try:
192 issues = jira.jql(query)
193 except Exception as err: # noqa
194 print(err)
195 return 1
197 retrieved = len(issues['issues'])
198 total = issues['total']
199 print(f'Retrieved {retrieved} of {total} matching issues')
200 page_first = issues['startAt']
201 page_max = issues['maxResults']
202 max_page_frame = (page_first, page_max - 1)
203 print(f'Current maximal page frame is {max_page_frame}')
204 eff_page_frame = max_page_frame if total > page_max else (page_first, total - 1)
205 print(f'Current effective page frame is {eff_page_frame}')
207 try:
208 expand_dims = issues['expand'].split(COMMA)
209 except KeyError:
210 print(f'No issues match query ({query})')
211 print(f'- received response: {issues}')
212 return 0
214 print(f'Expanded dimensions is {tuple(str(v) for v in expand_dims)}')
215 issue_seq = issues['issues']
216 keys_present = [issue['key'] for issue in issue_seq]
217 print(f'Keys present: {tuple(str(v) for v in keys_present)}')
219 # list(issues['issues'][0]['fields'].keys())
220 via = (
221 summary,
222 issuetype,
223 priority,
224 status,
225 resolution,
226 reporter,
227 assignee,
228 department,
229 org_eng_service,
230 life_cycle_stage,
231 org_customer,
232 customer_project_code,
233 org_product,
234 created,
235 start_date,
236 updated,
237 due_date,
238 aggregateprogress,
239 labels,
240 aggregatetimeoriginalestimate,
241 timeoriginalestimate,
242 timeestimate,
243 original_estimate_seconds,
244 remaining_estimate_seconds,
245 time_spent_seconds,
246 )
248 print()
249 for slot, key in enumerate(keys_present):
250 pairs = extract(via, issue_seq, slot)
251 print(f'{slot + 1}. {key}')
252 for k, v in pairs:
253 print(f' - {k :42s}: {v}')
254 print()
255 return 1