Coverage for laskea/api/jira.py: 75.22%
284 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-17 13:24:57 +00:00
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-17 13:24:57 +00:00
1# -*- coding: utf-8 -*-
2"""JIRA proxy connector API for code generation."""
3import copy
4import json
5import os
6import sys
7from typing import Iterable, Mapping, Sized, Union, no_type_check
9import jmespath
10from atlassian import Jira # type: ignore # noqa
11from requests.exceptions import HTTPError
13import laskea
15API_BASE_URL = 'https://example.com'
17DEFAULT_COLUMN_FIELDS = ['Key', 'Summary', ['Priority', 'P'], 'Status', 'Custom Field Wun', 'Custom Field Other (CFO)']
19WUN_ID = 'customfield_11501'
20ANOTHER_ID = 'customfield_13901'
21KNOWN_CI_FIELDS = {
22 'key': ['key', 'key'],
23 'summary': ['summary', 'fields.summary'],
24 'priority': ['priority', 'fields.priority.name'],
25 'status': ['status', 'fields.status.name'],
26 'custom field name': [WUN_ID, f'fields.{WUN_ID}'],
27 'custom field other': [ANOTHER_ID, f'fields.{ANOTHER_ID}[].value'],
28}
30BASE_USER = os.getenv(f'{laskea.APP_ENV}_USER', '')
31BASE_PASS = os.getenv(f'{laskea.APP_ENV}_TOKEN', '')
32BASE_URL = os.getenv(f'{laskea.APP_ENV}_BASE_URL', '')
33BASE_IS_CLOUD = bool(os.getenv(f'{laskea.APP_ENV}_IS_CLOUD', ''))
34BASE_COL_FIELDS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_FIELDS', json.dumps(DEFAULT_COLUMN_FIELDS)))
35BASE_COL_MAPS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_MAPS', json.dumps(KNOWN_CI_FIELDS)))
36BASE_JOIN_STRING = os.getenv(f'{laskea.APP_ENV}_JOIN_STRING', ' <br>')
37BASE_LF_ONLY = bool(os.getenv(f'{laskea.APP_ENV}_LF_ONLY', 'YES'))
38LF = '\n'
41def mock(number: int) -> int:
42 """Intermediate for starting the dev env in a valid state."""
43 return number
46def login(user: str = '', token: str = '', url: str = '', is_cloud: bool = False) -> Jira: # nosec
47 """LatAli"""
48 if not user:
49 user = BASE_USER
50 if not token:
51 token = BASE_PASS
52 if not url:
53 url = BASE_URL
54 if not is_cloud:
55 is_cloud = BASE_IS_CLOUD
56 if not user or not token or not url:
57 raise ValueError('User, Token, and URL are all required for login.')
58 return Jira(url=url, username=user, password=token, cloud=is_cloud)
61@no_type_check
62def query(handle: Jira, jql_text: str, column_fields=None) -> dict:
63 """EggLayingWoolMilkDear."""
65 if not column_fields: 65 ↛ 68line 65 didn't jump to line 68, because the condition on line 65 was never false
66 column_fields = BASE_COL_FIELDS
68 if not jql_text.strip(): 68 ↛ 74line 68 didn't jump to line 74, because the condition on line 68 was never false
69 return {
70 'jql_text': jql_text,
71 'error': 'Empty JIRA Query Language text detected',
72 }
74 completed_column_fields = []
75 for entry in column_fields:
76 if isinstance(entry, str):
77 candidate, concept, label = entry.lower(), entry, entry
78 else:
79 try:
80 concept, label = entry
81 candidate = concept.lower()
82 except TypeError:
83 return {
84 'jql_text': jql_text,
85 'column_fields': column_fields,
86 'parsed_columns': completed_column_fields,
87 'error': f'The column ({entry}) is neither a string nor a pair of (concept, label)',
88 }
90 for field in BASE_COL_MAPS.keys():
91 if field in candidate:
92 completed_column_fields.append(
93 {
94 'path': BASE_COL_MAPS[field][1],
95 'id': BASE_COL_MAPS[field][0],
96 'concept': concept,
97 'label': label,
98 'field': field,
99 }
100 )
102 if not completed_column_fields:
103 return {
104 'jql_text': jql_text,
105 'column_fields': column_fields,
106 'error': 'Completed column fields empty (no known fields?)',
107 }
109 try:
110 issues = handle.jql(jql_text, limit=1000)
111 except (HTTPError, RuntimeError) as err:
112 return {
113 'jql_text': jql_text,
114 'column_fields': column_fields,
115 'parsed_columns': completed_column_fields,
116 'error': str(err),
117 }
119 pairs = [(col['label'], col['path']) for col in completed_column_fields]
120 rows = [{label: jmespath.search(path, issue) or [''] for label, path in pairs} for issue in issues['issues']]
121 return {
122 'jql_text': jql_text,
123 'column_fields': column_fields,
124 'parsed_columns': completed_column_fields,
125 'error': None,
126 'rows': rows,
127 }
130@no_type_check
131def separated_values_list(
132 handle: Jira,
133 jql_text: str,
134 column_fields=None,
135 key_magic: bool = False,
136 field_sep: str = laskea.PIPE,
137 replacement: str = laskea.FS_SLUG,
138 data: Mapping[str, Union[object, Iterable, Sized]] = None,
139) -> str:
140 """Yes we can ... document later."""
141 if data is None: 141 ↛ 142line 141 didn't jump to line 142, because the condition on line 141 was never true
142 data = query(handle, jql_text, column_fields)
143 if data.get('error', ''): 143 ↛ 144line 143 didn't jump to line 144, because the condition on line 143 was never true
144 return json.dumps(data, indent=2)
146 fs = field_sep # alias
147 if not data['rows']:
148 if laskea.STRICT: 148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true
149 fs_disp = 'RS' if fs == laskea.RS else fs
150 message = f'WARNING: received 0 results for JQL ({jql_text}) and ({fs_disp}) separated values list'
151 if not laskea.DRY_RUN:
152 print(message, file=sys.stderr)
153 return message
154 return ''
156 table = copy.deepcopy(data['rows'])
157 header_cells = list(table[0].keys()) # noqa
158 for slot, record in enumerate(table):
159 for key, cell in record.items():
160 if key_magic and key.lower() == 'key': 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true
161 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa
162 if not isinstance(cell, str): 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true
163 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa
165 header = f'{fs.join(cell.replace(fs, replacement) for cell in header_cells)}'
166 rows = [f'{fs.join(str(v).replace(fs, replacement) for v in line.values())}' for line in table]
167 the_sv_list = '\n'.join([header] + rows) + '\n'
168 return the_sv_list.replace('\r', '') if BASE_LF_ONLY else the_sv_list
171@no_type_check
172def markdown_table(
173 handle: Jira, jql_text: str, column_fields=None, data: Mapping[str, Union[object, Iterable, Sized]] = None
174) -> str:
175 """Yes we can ... document later."""
176 if data is None: 176 ↛ 177line 176 didn't jump to line 177, because the condition on line 176 was never true
177 data = query(handle, jql_text, column_fields)
178 if data.get('error', ''): 178 ↛ 179line 178 didn't jump to line 179, because the condition on line 178 was never true
179 return json.dumps(data, indent=2)
181 if not data['rows']:
182 if laskea.STRICT:
183 message = f'WARNING: received 0 results for JQL ({jql_text}) and table'
184 if not laskea.DRY_RUN: 184 ↛ 186line 184 didn't jump to line 186, because the condition on line 184 was never false
185 print(message, file=sys.stderr)
186 return message
187 return ''
189 table = copy.deepcopy(data['rows'])
190 columns = list(table[0].keys()) # noqa
191 col_wid = {key: len(key) for key in columns}
192 for slot, record in enumerate(table):
193 for key, cell in record.items():
194 if key.lower() == 'key':
195 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa
196 if not isinstance(cell, str): 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true
197 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa
198 col_wid[key] = max(len(table[slot][key]), col_wid[key]) # noqa
200 header_cells = [key.ljust(col_wid[key]) for key in columns]
201 header = f'| {" | ".join(header_cells)} |'
203 separator_cells = ['-' * (col_wid[key] + 1) for key in columns]
204 separator = f'|:{"|:".join(separator_cells)}|'
206 rows = [f'| {" | ".join(str(v).ljust(col_wid[k]) for k, v in line.items())} |' for line in table]
207 issues = len(table)
208 summary = f'\n\n{issues} issue{"" if issues == 1 else "s"}'
209 the_table = '\n'.join([header] + [separator] + rows) + summary
210 return the_table.replace('\r', '') if BASE_LF_ONLY else the_table
213@no_type_check
214def markdown_list(
215 handle: Jira,
216 jql_text: str,
217 column_fields=None,
218 list_type: str = 'ul',
219 data: Mapping[str, Union[object, Iterable, Sized]] = None,
220) -> str:
221 """Yes we can ... document later."""
222 if data is None: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true
223 data = query(handle, jql_text, column_fields)
224 if data.get('error', ''): 224 ↛ 225line 224 didn't jump to line 225, because the condition on line 224 was never true
225 return json.dumps(data, indent=2)
227 if not data['rows']: 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true
228 if laskea.STRICT:
229 message = f'WARNING: received 0 results for JQL ({jql_text}) and {list_type}'
230 if not laskea.DRY_RUN:
231 print(message, file=sys.stderr)
232 return message
233 return ''
235 items = []
236 for record in data['rows']:
237 k, v = '', ''
238 for key, cell in record.items():
239 if key.lower() not in ('key', 'summary'): 239 ↛ 240line 239 didn't jump to line 240, because the condition on line 239 was never true
240 continue
241 if key.lower() == 'key':
242 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})'
243 else:
244 v = cell
245 items.append((k, v))
247 if list_type in ('ol', 'ul'):
248 lt = '-' if list_type == 'ul' else '1.' # implicit 'ol'
249 xl = tuple(f'{lt} {key} - {summary}' for key, summary in items)
250 the_list = '\n'.join(xl) + '\n'
251 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list
252 if list_type == 'dl': 252 ↛ 259line 252 didn't jump to line 259, because the condition on line 252 was never false
253 # 'Term'
254 # ':definition of term'
255 #
256 xl = tuple(f'{key}\n:{summary}\n' for key, summary in items)
257 the_list = '\n'.join(xl) + '\n'
258 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list
259 return f'Unexpected list type ({list_type}) in markdown_list not in ({("dl", "ol", "ul")})' + '\n'
262@no_type_check
263def markdown_heading(
264 handle: Jira,
265 jql_text: str,
266 column_fields=None,
267 level: int = 1,
268 data: Mapping[str, Union[object, Iterable, Sized]] = None,
269) -> str:
270 """Yes we can ... document later."""
271 if data is None: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true
272 data = query(handle, jql_text, column_fields)
273 if data.get('error', ''): 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true
274 return json.dumps(data, indent=2)
276 if not data['rows']:
277 if laskea.STRICT:
278 message = f'WARNING: received 0 results instead of 1 for JQL ({jql_text}) and h{level}'
279 if not laskea.DRY_RUN: 279 ↛ 281line 279 didn't jump to line 281, because the condition on line 279 was never false
280 print(message, file=sys.stderr)
281 return message
282 return ''
284 items = []
285 for record in data['rows']:
286 k, v = '', ''
287 for key, cell in record.items():
288 if key.lower() not in ('key', 'summary'): 288 ↛ 289line 288 didn't jump to line 289, because the condition on line 288 was never true
289 continue
290 if key.lower() == 'key':
291 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})'
292 else:
293 v = cell
294 items.append((k, v))
295 received = len(items)
296 if received != 1:
297 if laskea.STRICT:
298 message = f'WARNING: received {received} results instead of 1 for JQL ({jql_text}) and h{level}'
299 if not laskea.DRY_RUN: 299 ↛ 301line 299 didn't jump to line 301, because the condition on line 299 was never false
300 print(message, file=sys.stderr)
301 return message.replace('\r', '') if BASE_LF_ONLY else message
302 return ''
303 level_range = tuple(range(1, 6 + 1))
304 if level in level_range: 304 ↛ 309line 304 didn't jump to line 309, because the condition on line 304 was never false
305 heading_token = '#' * level
306 xl = tuple(f'{heading_token} {key} - {summary}' for key, summary in items)
307 the_heading = '\n'.join(xl)
308 return the_heading.replace('\r', '') if BASE_LF_ONLY else the_heading
309 message = f'Unexpected level for heading ({level}) in markdown_heading not in ({level_range})'
310 if not laskea.DRY_RUN:
311 print(message, file=sys.stderr)
312 return message
315@no_type_check
316def fetch_jql(handle: Jira, jql_text: str) -> dict:
317 """Expose the JIRA result structure directly."""
318 if not jql_text.strip():
319 return {
320 'jql_text': jql_text,
321 'error': 'Empty JIRA Query Language text detected',
322 }
324 try:
325 issues = handle.jql(jql_text, limit=1000)
326 except (HTTPError, RuntimeError) as err:
327 return {
328 'jql_text': jql_text,
329 'error': str(err),
330 }
332 return {
333 'jql_text': jql_text,
334 'data': issues,
335 'error': None,
336 }
339@no_type_check
340def parent_children_sections(
341 handle: Jira,
342 parent_jql: str,
343 children_jql: str,
344 parent_type_name: str,
345 children_type_name: str,
346 data: Mapping[str, Union[object, Iterable, Sized]] = None,
347) -> str:
348 """Create sub(sub)section level content representing the issue content from parent children filter results."""
349 if data is None: 349 ↛ 350line 349 didn't jump to line 350
350 data = {
351 'parent_data': fetch_jql(handle, jql_text=parent_jql),
352 'children_data': fetch_jql(handle, jql_text=children_jql),
353 }
354 if data['parent_data'].get('error', '') or data['children_data'].get('error', ''): 354 ↛ 355line 354 didn't jump to line 355, because the condition on line 354 was never true
355 return json.dumps(data, indent=2)
357 doc = {}
358 has_parents = {}
360 for parent in data['parent_data']['data']['issues']:
361 p_id = parent['id']
362 p_key = parent['key']
364 p_field = parent['fields']
366 p_itn = p_field['issuetype']['name']
367 # assert p_itn == parent_type_name
368 p_sum = p_field['summary']
369 p_des = p_field['description'] # None for parent type names
370 p_epic = p_field['customfield_10006'] # TODO assuming here ...
371 p_created = p_field['created'] # Textual timestamps like "2019-03-12T10:01:25.000+0100"
372 p_updated = p_field['updated']
374 doc[p_key] = {
375 'id': p_id,
376 'type': p_itn,
377 'summary': p_sum,
378 'description': p_des,
379 'epic': p_epic,
380 'created': p_created,
381 'updated': p_updated,
382 'children': {},
383 }
385 children = p_field['subtasks']
387 for child in children:
388 c_id = child['id']
389 c_key = child['key']
391 c_field = child['fields']
393 c_itn = c_field['issuetype']['name']
394 # assert c_itn == children_type_name
395 c_sum = c_field['summary']
397 doc[p_key]['children'][c_key] = {
398 'id': c_id,
399 'type': c_itn,
400 'summary': c_sum,
401 'description': None,
402 'created': None,
403 'updated': None,
404 }
405 if c_key not in has_parents: 405 ↛ 407line 405 didn't jump to line 407, because the condition on line 405 was never false
406 has_parents[c_key] = []
407 has_parents[c_key].append(p_key)
409 for child in data['children_data']['data']['issues']:
410 c_id = child['id']
411 c_key = child['key']
413 c_field = child['fields']
415 p_key = c_field['parent']['key']
416 # assert p_key in has_parents[c_key]
417 # p_itn = c_field['parent']['issuetype']['name']
418 # assert p_itn == parent_type_name
420 c_itn = c_field['issuetype']['name']
421 # assert c_itn == children_type_name
422 c_sum = c_field['summary']
423 c_des = c_field['description'] # Table or list or notes for children type names
424 c_created = c_field['created']
425 c_updated = c_field['updated']
427 doc[p_key]['children'][c_key]['description'] = c_des
428 doc[p_key]['children'][c_key]['created'] = c_created
429 doc[p_key]['children'][c_key]['updated'] = c_updated
431 return doc_to_markdown(doc, parent_type_name, children_type_name)
434@no_type_check
435def doc_to_markdown(doc, parent_type_name: str, children_type_name: str) -> str: # noqa
436 """Transform the document content to markdown."""
437 md = []
438 for p_tree in doc.values():
439 p_head = f'## {p_tree["summary"]}'.strip().strip(LF)
440 c_count = len(p_tree['children'])
441 c_type_disp = f'{children_type_name}{"" if c_count == 1 else "s"}'
442 p_para = f'The {p_tree["type"]} consists of {c_count} {c_type_disp}'.strip(LF)
444 c_parts = []
445 double_pipe, ast_pipe, pipe_ast = '||', '|*', '*|'
446 nbsp = ' '
447 for c_data in p_tree['children'].values():
448 c_head = f'### {c_data["summary"]}'.strip().strip(LF)
449 c_in = list(c_data['description'].replace(nbsp, ' ').strip().split(LF))
450 c_out = []
451 for line in c_in:
452 if line.startswith(double_pipe) or line.startswith(ast_pipe):
453 # patch confluence markdown like table heads ...
454 line_s = line.strip()
455 extra_line = ''
456 if line_s.startswith(double_pipe) and line_s.endswith(double_pipe):
457 line = line.replace(double_pipe, '|')
458 extra_line = (
459 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |')
460 )
461 elif line_s.startswith(ast_pipe) and line_s.endswith(pipe_ast): 461 ↛ 465line 461 didn't jump to line 465, because the condition on line 461 was never false
462 extra_line = (
463 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |')
464 )
465 c_out.append(line)
466 if extra_line: 466 ↛ 451line 466 didn't jump to line 451, because the condition on line 466 was never false
467 c_out.append(extra_line)
468 else:
469 c_out.append(line)
471 c_parts.extend([LF, c_head, LF, *c_out])
473 md.extend([LF, p_head, LF, p_para])
474 md.extend(c_parts)
476 md.append(LF)
477 return LF.join(md).replace(LF + LF, LF)