Coverage for laskea/api/jira.py: 68.77%
322 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-10 22:19:18 +00:00
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-10 22:19:18 +00:00
1# -*- coding: utf-8 -*-
2"""JIRA proxy connector API for code generation."""
3import copy
4import json
5import os
6import sys
7from typing import Iterable, Mapping, Sized, Union, no_type_check
9import jmespath
10from atlassian import Jira # type: ignore # noqa
11from requests.exceptions import HTTPError
13import laskea
14from laskea import log
15import laskea.transform as tr
17FilterMapType = dict[str, Union[dict[str, Union[str, list[list[str]]]]]]
18API_BASE_URL = 'https://example.com'
20DEFAULT_COLUMN_FIELDS = ['Key', 'Summary', ['Priority', 'P'], 'Status', 'Custom Field Wun', 'Custom Field Other (CFO)']
22WUN_ID = 'customfield_11501'
23ANOTHER_ID = 'customfield_13901'
24KNOWN_CI_FIELDS = {
25 'key': ['key', 'key'],
26 'summary': ['summary', 'fields.summary'],
27 'priority': ['priority', 'fields.priority.name'],
28 'status': ['status', 'fields.status.name'],
29 'custom field name': [WUN_ID, f'fields.{WUN_ID}'],
30 'custom field other': [ANOTHER_ID, f'fields.{ANOTHER_ID}[].value'],
31}
32KNOWN_CI_FIELD_FILTERS: FilterMapType = {
33 'key': {},
34 'summary': {},
35 'priority': {},
36 'status': {},
37 'custom field name': {},
38 'custom field other': {},
39}
41BASE_USER = os.getenv(f'{laskea.APP_ENV}_USER', '')
42BASE_PASS = os.getenv(f'{laskea.APP_ENV}_TOKEN', '')
43BASE_URL = os.getenv(f'{laskea.APP_ENV}_BASE_URL', '')
44BASE_IS_CLOUD = bool(os.getenv(f'{laskea.APP_ENV}_IS_CLOUD', ''))
45BASE_COL_FIELDS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_FIELDS', json.dumps(DEFAULT_COLUMN_FIELDS)))
46BASE_COL_MAPS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_MAPS', json.dumps(KNOWN_CI_FIELDS)))
47BASE_COL_FILTERS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_FILTERS', json.dumps(KNOWN_CI_FIELD_FILTERS)))
48BASE_JOIN_STRING = os.getenv(f'{laskea.APP_ENV}_JOIN_STRING', ' <br>')
49BASE_LF_ONLY = bool(os.getenv(f'{laskea.APP_ENV}_LF_ONLY', 'YES'))
50BASE_CAPTION = bool(os.getenv(f'{laskea.APP_ENV}_CAPTION', laskea.DEFAULT_CAPTION))
51LF = '\n'
54def mock(number: int) -> int:
55 """Intermediate for starting the dev env in a valid state."""
56 return number
59def login(user: str = '', token: str = '', url: str = '', is_cloud: bool = False) -> Jira: # nosec
60 """LatAli"""
61 if not user:
62 user = BASE_USER
63 if not token:
64 token = BASE_PASS
65 if not url:
66 url = BASE_URL
67 if not is_cloud:
68 is_cloud = BASE_IS_CLOUD
69 if not user or not token or not url:
70 raise ValueError('User, Token, and URL are all required for login.')
71 return Jira(url=url, username=user, password=token, cloud=is_cloud)
74@no_type_check
75def query(handle: Jira, jql_text: str, column_fields=None, column_filters=None) -> dict:
76 """EggLayingWoolMilkDear."""
78 if not column_fields: 78 ↛ 81line 78 didn't jump to line 81, because the condition on line 78 was never false
79 column_fields = BASE_COL_FIELDS
81 if not column_filters: 81 ↛ 84line 81 didn't jump to line 84, because the condition on line 81 was never false
82 column_filters = BASE_COL_FILTERS
84 if not jql_text.strip(): 84 ↛ 90line 84 didn't jump to line 90, because the condition on line 84 was never false
85 return {
86 'jql_text': jql_text,
87 'error': 'Empty JIRA Query Language text detected',
88 }
90 completed_column_fields = []
91 for entry in column_fields:
92 if isinstance(entry, str):
93 candidate, concept, label = entry.lower(), entry, entry
94 else:
95 try:
96 concept, label = entry
97 candidate = concept.lower()
98 except TypeError:
99 return {
100 'jql_text': jql_text,
101 'column_fields': column_fields,
102 'column_filters': column_filters,
103 'parsed_columns': completed_column_fields,
104 'error': f'The column ({entry}) is neither a string nor a pair of (concept, label)',
105 }
107 for field in BASE_COL_MAPS.keys():
108 if field in candidate:
109 completed_column_fields.append(
110 {
111 'path': BASE_COL_MAPS[field][1],
112 'id': BASE_COL_MAPS[field][0],
113 'concept': concept,
114 'label': label,
115 'field': field,
116 }
117 )
119 if not completed_column_fields:
120 return {
121 'jql_text': jql_text,
122 'column_fields': column_fields,
123 'column_filters': column_filters,
124 'error': 'Completed column fields empty (no known fields?)',
125 }
127 try:
128 issues = handle.jql(jql_text, limit=1000)
129 except (HTTPError, RuntimeError) as err:
130 return {
131 'jql_text': jql_text,
132 'column_fields': column_fields,
133 'column_filters': column_filters,
134 'parsed_columns': completed_column_fields,
135 'error': str(err),
136 }
138 transformer = {k: tr.FilterMap(k, v) for k, v in column_filters.items()} if column_filters else {}
139 log.debug(f'{transformer=}')
140 triplets = [(col['label'], col['path'], col['field']) for col in completed_column_fields]
141 rows = []
142 for issue in issues['issues']:
143 row = {}
144 for label, path, field in triplets:
145 log.debug(f'{field=}, {path=} ...')
146 entries_read = jmespath.search(path, issue) or ['']
147 entries = []
148 trx = transformer.get(field)
149 if trx is not None:
150 log.debug(f'{field}:->{trx.operations=} for column ({trx.column})')
151 else:
152 log.debug(f'transformer class is None for column ({field})')
153 if isinstance(entries_read, list):
154 for entry in entries_read:
155 trx = transformer.get(field)
156 if trx:
157 log.debug(f'sequence::{trx=}:({entry=}) -> ({trx.apply(entry)})')
158 else:
159 log.debug(f'no transform for sequence::{entry=}')
160 processed = trx.apply(entry) if trx else entry
161 if processed:
162 entries.append(processed)
163 else:
164 entry = entries_read
165 if trx:
166 log.debug(f'scalar::{trx=}:({entry=}) -> ({trx.apply(entry)})')
167 else:
168 log.debug(f'no transform for scalar::{entry=}')
169 entries.append(trx.apply(entry) if trx else entry)
171 row[label] = entries
172 rows.append(row)
174 return {
175 'jql_text': jql_text,
176 'column_fields': column_fields,
177 'column_filters': column_filters,
178 'parsed_columns': completed_column_fields,
179 'error': None,
180 'rows': rows,
181 }
184@no_type_check
185def separated_values_list(
186 handle: Jira,
187 jql_text: str,
188 column_fields=None,
189 column_filters=None,
190 key_magic: bool = False,
191 field_sep: str = laskea.PIPE,
192 replacement: str = laskea.FS_SLUG,
193 data: Mapping[str, Union[object, Iterable, Sized]] = None,
194) -> str:
195 """Yes we can ... document later."""
196 if data is None: 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true
197 data = query(handle, jql_text, column_fields)
198 if data.get('error', ''): 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true
199 return json.dumps(data, indent=2)
201 fs = field_sep # alias
202 if not data['rows']:
203 if laskea.STRICT: 203 ↛ 204line 203 didn't jump to line 204, because the condition on line 203 was never true
204 fs_disp = 'RS' if fs == laskea.RS else fs
205 message = f'WARNING: received 0 results for JQL ({jql_text}) and ({fs_disp}) separated values list'
206 if not laskea.DRY_RUN:
207 print(message, file=sys.stderr)
208 return message
209 return ''
211 table = copy.deepcopy(data['rows'])
212 header_cells = list(table[0].keys()) # noqa
213 for slot, record in enumerate(table):
214 for key, cell in record.items():
215 if key_magic and key.lower() == 'key': 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true
216 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa
217 if not isinstance(cell, str): 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true
218 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa
220 header = f'{fs.join(cell.replace(fs, replacement) for cell in header_cells)}'
221 rows = [f'{fs.join(str(v).replace(fs, replacement) for v in line.values())}' for line in table]
222 the_sv_list = '\n'.join([header] + rows) + '\n'
223 return the_sv_list.replace('\r', '') if BASE_LF_ONLY else the_sv_list
226@no_type_check
227def markdown_table(
228 handle: Jira,
229 jql_text: str,
230 caption: str = '',
231 column_fields=None,
232 column_filters=None,
233 data: Mapping[str, Union[object, Iterable, Sized]] = None,
234) -> str:
235 """Yes we can ... document later."""
236 if data is None: 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true
237 data = query(handle, jql_text, column_fields)
238 if data.get('error', ''): 238 ↛ 239line 238 didn't jump to line 239, because the condition on line 238 was never true
239 return json.dumps(data, indent=2)
241 if not data['rows']:
242 if laskea.STRICT:
243 message = f'WARNING: received 0 results for JQL ({jql_text}) and table'
244 if not laskea.DRY_RUN: 244 ↛ 246line 244 didn't jump to line 246, because the condition on line 244 was never false
245 print(message, file=sys.stderr)
246 return message
247 return ''
249 table = copy.deepcopy(data['rows'])
250 columns = list(table[0].keys()) # noqa
251 col_wid = {key: len(key) for key in columns}
252 for slot, record in enumerate(table):
253 for key, cell in record.items():
254 if key.lower() == 'key':
255 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa
256 if not isinstance(cell, str): 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true
257 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa
258 col_wid[key] = max(len(table[slot][key]), col_wid[key]) # noqa
260 header_cells = [key.ljust(col_wid[key]) for key in columns]
261 header = f'| {" | ".join(header_cells)} |'
263 separator_cells = ['-' * (col_wid[key] + 1) for key in columns]
264 separator = f'|:{"|:".join(separator_cells)}|'
266 rows = [f'| {" | ".join(str(v).ljust(col_wid[k]) for k, v in line.items())} |' for line in table]
267 issues = len(table)
268 # TODO(sthagen) - spike only
269 if caption: 269 ↛ 277line 269 didn't jump to line 277, because the condition on line 269 was never false
270 summary = (
271 caption.replace('$QUERY_TEXT$', jql_text)
272 .replace('$NL$', '\n')
273 .replace('$ISSUE_COUNT$', str(issues))
274 .replace('$SINGULAR$$PLURAL$s$', '' if issues == 1 else 's')
275 )
276 else:
277 summary = ''
279 the_table = '\n'.join([header] + [separator] + rows) + summary
280 return the_table.replace('\r', '') if BASE_LF_ONLY else the_table
283@no_type_check
284def markdown_list(
285 handle: Jira,
286 jql_text: str,
287 column_fields=None,
288 list_type: str = 'ul',
289 data: Mapping[str, Union[object, Iterable, Sized]] = None,
290) -> str:
291 """Yes we can ... document later."""
292 if data is None: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true
293 data = query(handle, jql_text, column_fields)
294 if data.get('error', ''): 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true
295 return json.dumps(data, indent=2)
297 if not data['rows']: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true
298 if laskea.STRICT:
299 message = f'WARNING: received 0 results for JQL ({jql_text}) and {list_type}'
300 if not laskea.DRY_RUN:
301 print(message, file=sys.stderr)
302 return message
303 return ''
305 items = []
306 for record in data['rows']:
307 k, v = '', ''
308 for key, cell in record.items():
309 if key.lower() not in ('key', 'summary'): 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true
310 continue
311 if key.lower() == 'key':
312 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})'
313 else:
314 v = cell
315 items.append((k, v))
317 if list_type in ('ol', 'ul'):
318 lt = '-' if list_type == 'ul' else '1.' # implicit 'ol'
319 xl = tuple(f'{lt} {key} - {summary}' for key, summary in items)
320 the_list = '\n'.join(xl) + '\n'
321 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list
322 if list_type == 'dl': 322 ↛ 329line 322 didn't jump to line 329, because the condition on line 322 was never false
323 # 'Term'
324 # ':definition of term'
325 #
326 xl = tuple(f'{key}\n:{summary}\n' for key, summary in items)
327 the_list = '\n'.join(xl) + '\n'
328 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list
329 return f'Unexpected list type ({list_type}) in markdown_list not in ({("dl", "ol", "ul")})' + '\n'
332@no_type_check
333def markdown_heading(
334 handle: Jira,
335 jql_text: str,
336 column_fields=None,
337 level: int = 1,
338 data: Mapping[str, Union[object, Iterable, Sized]] = None,
339) -> str:
340 """Yes we can ... document later."""
341 if data is None: 341 ↛ 342line 341 didn't jump to line 342, because the condition on line 341 was never true
342 data = query(handle, jql_text, column_fields)
343 if data.get('error', ''): 343 ↛ 344line 343 didn't jump to line 344, because the condition on line 343 was never true
344 return json.dumps(data, indent=2)
346 if not data['rows']:
347 if laskea.STRICT:
348 message = f'WARNING: received 0 results instead of 1 for JQL ({jql_text}) and h{level}'
349 if not laskea.DRY_RUN: 349 ↛ 351line 349 didn't jump to line 351, because the condition on line 349 was never false
350 print(message, file=sys.stderr)
351 return message
352 return ''
354 items = []
355 for record in data['rows']:
356 k, v = '', ''
357 for key, cell in record.items():
358 if key.lower() not in ('key', 'summary'): 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true
359 continue
360 if key.lower() == 'key':
361 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})'
362 else:
363 v = cell
364 items.append((k, v))
365 received = len(items)
366 if received != 1:
367 if laskea.STRICT:
368 message = f'WARNING: received {received} results instead of 1 for JQL ({jql_text}) and h{level}'
369 if not laskea.DRY_RUN: 369 ↛ 371line 369 didn't jump to line 371, because the condition on line 369 was never false
370 print(message, file=sys.stderr)
371 return message.replace('\r', '') if BASE_LF_ONLY else message
372 return ''
373 level_range = tuple(range(1, 6 + 1))
374 if level in level_range: 374 ↛ 379line 374 didn't jump to line 379, because the condition on line 374 was never false
375 heading_token = '#' * level
376 xl = tuple(f'{heading_token} {key} - {summary}' for key, summary in items)
377 the_heading = '\n'.join(xl)
378 return the_heading.replace('\r', '') if BASE_LF_ONLY else the_heading
379 message = f'Unexpected level for heading ({level}) in markdown_heading not in ({level_range})'
380 if not laskea.DRY_RUN:
381 print(message, file=sys.stderr)
382 return message
385@no_type_check
386def fetch_jql(handle: Jira, jql_text: str) -> dict:
387 """Expose the JIRA result structure directly."""
388 if not jql_text.strip():
389 return {
390 'jql_text': jql_text,
391 'error': 'Empty JIRA Query Language text detected',
392 }
394 try:
395 issues = handle.jql(jql_text, limit=1000)
396 except (HTTPError, RuntimeError) as err:
397 return {
398 'jql_text': jql_text,
399 'error': str(err),
400 }
402 return {
403 'jql_text': jql_text,
404 'data': issues,
405 'error': None,
406 }
409@no_type_check
410def parent_children_sections(
411 handle: Jira,
412 parent_jql: str,
413 children_jql: str,
414 parent_type_name: str,
415 children_type_name: str,
416 data: Mapping[str, Union[object, Iterable, Sized]] = None,
417) -> str:
418 """Create sub(sub)section level content representing the issue content from parent children filter results."""
419 if data is None: 419 ↛ 420line 419 didn't jump to line 420
420 data = {
421 'parent_data': fetch_jql(handle, jql_text=parent_jql),
422 'children_data': fetch_jql(handle, jql_text=children_jql),
423 }
424 if data['parent_data'].get('error', '') or data['children_data'].get('error', ''): 424 ↛ 425line 424 didn't jump to line 425, because the condition on line 424 was never true
425 return json.dumps(data, indent=2)
427 doc = {}
428 has_parents = {}
430 for parent in data['parent_data']['data']['issues']:
431 p_id = parent['id']
432 p_key = parent['key']
434 p_field = parent['fields']
436 p_itn = p_field['issuetype']['name']
437 # assert p_itn == parent_type_name
438 p_sum = p_field['summary']
439 p_des = p_field['description'] # None for parent type names
440 p_epic = p_field['customfield_10006'] # TODO assuming here ...
441 p_created = p_field['created'] # Textual timestamps like "2019-03-12T10:01:25.000+0100"
442 p_updated = p_field['updated']
444 doc[p_key] = {
445 'id': p_id,
446 'type': p_itn,
447 'summary': p_sum,
448 'description': p_des,
449 'epic': p_epic,
450 'created': p_created,
451 'updated': p_updated,
452 'children': {},
453 }
455 children = p_field['subtasks']
457 for child in children:
458 c_id = child['id']
459 c_key = child['key']
461 c_field = child['fields']
463 c_itn = c_field['issuetype']['name']
464 # assert c_itn == children_type_name
465 c_sum = c_field['summary']
467 doc[p_key]['children'][c_key] = {
468 'id': c_id,
469 'type': c_itn,
470 'summary': c_sum,
471 'description': None,
472 'created': None,
473 'updated': None,
474 }
475 if c_key not in has_parents: 475 ↛ 477line 475 didn't jump to line 477, because the condition on line 475 was never false
476 has_parents[c_key] = []
477 has_parents[c_key].append(p_key)
479 for child in data['children_data']['data']['issues']:
480 c_id = child['id']
481 c_key = child['key']
483 c_field = child['fields']
485 p_key = c_field['parent']['key']
486 # assert p_key in has_parents[c_key]
487 # p_itn = c_field['parent']['issuetype']['name']
488 # assert p_itn == parent_type_name
490 c_itn = c_field['issuetype']['name']
491 # assert c_itn == children_type_name
492 c_sum = c_field['summary']
493 c_des = c_field['description'] # Table or list or notes for children type names
494 c_created = c_field['created']
495 c_updated = c_field['updated']
497 doc[p_key]['children'][c_key]['description'] = c_des
498 doc[p_key]['children'][c_key]['created'] = c_created
499 doc[p_key]['children'][c_key]['updated'] = c_updated
501 return doc_to_markdown(doc, parent_type_name, children_type_name)
504@no_type_check
505def doc_to_markdown(doc, parent_type_name: str, children_type_name: str) -> str: # noqa
506 """Transform the document content to markdown."""
507 md = []
508 for p_tree in doc.values():
509 p_head = f'## {p_tree["summary"]}'.strip().strip(LF)
510 c_count = len(p_tree['children'])
511 c_type_disp = f'{children_type_name}{"" if c_count == 1 else "s"}'
512 p_para = f'The {p_tree["type"]} consists of {c_count} {c_type_disp}'.strip(LF)
514 c_parts = []
515 double_pipe, ast_pipe, pipe_ast = '||', '|*', '*|'
516 nbsp = ' '
517 for c_data in p_tree['children'].values():
518 c_head = f'### {c_data["summary"]}'.strip().strip(LF)
519 c_in = list(c_data['description'].replace(nbsp, ' ').strip().split(LF))
520 c_out = []
521 for line in c_in:
522 if line.startswith(double_pipe) or line.startswith(ast_pipe):
523 # patch confluence markdown like table heads ...
524 line_s = line.strip()
525 extra_line = ''
526 if line_s.startswith(double_pipe) and line_s.endswith(double_pipe):
527 line = line.replace(double_pipe, '|')
528 extra_line = (
529 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |')
530 )
531 elif line_s.startswith(ast_pipe) and line_s.endswith(pipe_ast): 531 ↛ 535line 531 didn't jump to line 535, because the condition on line 531 was never false
532 extra_line = (
533 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |')
534 )
535 c_out.append(line)
536 if extra_line: 536 ↛ 521line 536 didn't jump to line 521, because the condition on line 536 was never false
537 c_out.append(extra_line)
538 else:
539 c_out.append(line)
541 c_parts.extend([LF, c_head, LF, *c_out])
543 md.extend([LF, p_head, LF, p_para])
544 md.extend(c_parts)
546 md.append(LF)
547 return LF.join(md).replace(LF + LF, LF)