Coverage for laskea/api/jira.py: 68.77%

322 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-10 22:19:18 +00:00

1# -*- coding: utf-8 -*- 

2"""JIRA proxy connector API for code generation.""" 

3import copy 

4import json 

5import os 

6import sys 

7from typing import Iterable, Mapping, Sized, Union, no_type_check 

8 

9import jmespath 

10from atlassian import Jira # type: ignore # noqa 

11from requests.exceptions import HTTPError 

12 

13import laskea 

14from laskea import log 

15import laskea.transform as tr 

16 

17FilterMapType = dict[str, Union[dict[str, Union[str, list[list[str]]]]]] 

18API_BASE_URL = 'https://example.com' 

19 

20DEFAULT_COLUMN_FIELDS = ['Key', 'Summary', ['Priority', 'P'], 'Status', 'Custom Field Wun', 'Custom Field Other (CFO)'] 

21 

22WUN_ID = 'customfield_11501' 

23ANOTHER_ID = 'customfield_13901' 

24KNOWN_CI_FIELDS = { 

25 'key': ['key', 'key'], 

26 'summary': ['summary', 'fields.summary'], 

27 'priority': ['priority', 'fields.priority.name'], 

28 'status': ['status', 'fields.status.name'], 

29 'custom field name': [WUN_ID, f'fields.{WUN_ID}'], 

30 'custom field other': [ANOTHER_ID, f'fields.{ANOTHER_ID}[].value'], 

31} 

32KNOWN_CI_FIELD_FILTERS: FilterMapType = { 

33 'key': {}, 

34 'summary': {}, 

35 'priority': {}, 

36 'status': {}, 

37 'custom field name': {}, 

38 'custom field other': {}, 

39} 

40 

41BASE_USER = os.getenv(f'{laskea.APP_ENV}_USER', '') 

42BASE_PASS = os.getenv(f'{laskea.APP_ENV}_TOKEN', '') 

43BASE_URL = os.getenv(f'{laskea.APP_ENV}_BASE_URL', '') 

44BASE_IS_CLOUD = bool(os.getenv(f'{laskea.APP_ENV}_IS_CLOUD', '')) 

45BASE_COL_FIELDS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_FIELDS', json.dumps(DEFAULT_COLUMN_FIELDS))) 

46BASE_COL_MAPS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_MAPS', json.dumps(KNOWN_CI_FIELDS))) 

47BASE_COL_FILTERS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_FILTERS', json.dumps(KNOWN_CI_FIELD_FILTERS))) 

48BASE_JOIN_STRING = os.getenv(f'{laskea.APP_ENV}_JOIN_STRING', ' <br>') 

49BASE_LF_ONLY = bool(os.getenv(f'{laskea.APP_ENV}_LF_ONLY', 'YES')) 

50BASE_CAPTION = bool(os.getenv(f'{laskea.APP_ENV}_CAPTION', laskea.DEFAULT_CAPTION)) 

51LF = '\n' 

52 

53 

54def mock(number: int) -> int: 

55 """Intermediate for starting the dev env in a valid state.""" 

56 return number 

57 

58 

59def login(user: str = '', token: str = '', url: str = '', is_cloud: bool = False) -> Jira: # nosec 

60 """LatAli""" 

61 if not user: 

62 user = BASE_USER 

63 if not token: 

64 token = BASE_PASS 

65 if not url: 

66 url = BASE_URL 

67 if not is_cloud: 

68 is_cloud = BASE_IS_CLOUD 

69 if not user or not token or not url: 

70 raise ValueError('User, Token, and URL are all required for login.') 

71 return Jira(url=url, username=user, password=token, cloud=is_cloud) 

72 

73 

74@no_type_check 

75def query(handle: Jira, jql_text: str, column_fields=None, column_filters=None) -> dict: 

76 """EggLayingWoolMilkDear.""" 

77 

78 if not column_fields: 78 ↛ 81line 78 didn't jump to line 81, because the condition on line 78 was never false

79 column_fields = BASE_COL_FIELDS 

80 

81 if not column_filters: 81 ↛ 84line 81 didn't jump to line 84, because the condition on line 81 was never false

82 column_filters = BASE_COL_FILTERS 

83 

84 if not jql_text.strip(): 84 ↛ 90line 84 didn't jump to line 90, because the condition on line 84 was never false

85 return { 

86 'jql_text': jql_text, 

87 'error': 'Empty JIRA Query Language text detected', 

88 } 

89 

90 completed_column_fields = [] 

91 for entry in column_fields: 

92 if isinstance(entry, str): 

93 candidate, concept, label = entry.lower(), entry, entry 

94 else: 

95 try: 

96 concept, label = entry 

97 candidate = concept.lower() 

98 except TypeError: 

99 return { 

100 'jql_text': jql_text, 

101 'column_fields': column_fields, 

102 'column_filters': column_filters, 

103 'parsed_columns': completed_column_fields, 

104 'error': f'The column ({entry}) is neither a string nor a pair of (concept, label)', 

105 } 

106 

107 for field in BASE_COL_MAPS.keys(): 

108 if field in candidate: 

109 completed_column_fields.append( 

110 { 

111 'path': BASE_COL_MAPS[field][1], 

112 'id': BASE_COL_MAPS[field][0], 

113 'concept': concept, 

114 'label': label, 

115 'field': field, 

116 } 

117 ) 

118 

119 if not completed_column_fields: 

120 return { 

121 'jql_text': jql_text, 

122 'column_fields': column_fields, 

123 'column_filters': column_filters, 

124 'error': 'Completed column fields empty (no known fields?)', 

125 } 

126 

127 try: 

128 issues = handle.jql(jql_text, limit=1000) 

129 except (HTTPError, RuntimeError) as err: 

130 return { 

131 'jql_text': jql_text, 

132 'column_fields': column_fields, 

133 'column_filters': column_filters, 

134 'parsed_columns': completed_column_fields, 

135 'error': str(err), 

136 } 

137 

138 transformer = {k: tr.FilterMap(k, v) for k, v in column_filters.items()} if column_filters else {} 

139 log.debug(f'{transformer=}') 

140 triplets = [(col['label'], col['path'], col['field']) for col in completed_column_fields] 

141 rows = [] 

142 for issue in issues['issues']: 

143 row = {} 

144 for label, path, field in triplets: 

145 log.debug(f'{field=}, {path=} ...') 

146 entries_read = jmespath.search(path, issue) or [''] 

147 entries = [] 

148 trx = transformer.get(field) 

149 if trx is not None: 

150 log.debug(f'{field}:->{trx.operations=} for column ({trx.column})') 

151 else: 

152 log.debug(f'transformer class is None for column ({field})') 

153 if isinstance(entries_read, list): 

154 for entry in entries_read: 

155 trx = transformer.get(field) 

156 if trx: 

157 log.debug(f'sequence::{trx=}:({entry=}) -> ({trx.apply(entry)})') 

158 else: 

159 log.debug(f'no transform for sequence::{entry=}') 

160 processed = trx.apply(entry) if trx else entry 

161 if processed: 

162 entries.append(processed) 

163 else: 

164 entry = entries_read 

165 if trx: 

166 log.debug(f'scalar::{trx=}:({entry=}) -> ({trx.apply(entry)})') 

167 else: 

168 log.debug(f'no transform for scalar::{entry=}') 

169 entries.append(trx.apply(entry) if trx else entry) 

170 

171 row[label] = entries 

172 rows.append(row) 

173 

174 return { 

175 'jql_text': jql_text, 

176 'column_fields': column_fields, 

177 'column_filters': column_filters, 

178 'parsed_columns': completed_column_fields, 

179 'error': None, 

180 'rows': rows, 

181 } 

182 

183 

184@no_type_check 

185def separated_values_list( 

186 handle: Jira, 

187 jql_text: str, 

188 column_fields=None, 

189 column_filters=None, 

190 key_magic: bool = False, 

191 field_sep: str = laskea.PIPE, 

192 replacement: str = laskea.FS_SLUG, 

193 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

194) -> str: 

195 """Yes we can ... document later.""" 

196 if data is None: 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true

197 data = query(handle, jql_text, column_fields) 

198 if data.get('error', ''): 198 ↛ 199line 198 didn't jump to line 199, because the condition on line 198 was never true

199 return json.dumps(data, indent=2) 

200 

201 fs = field_sep # alias 

202 if not data['rows']: 

203 if laskea.STRICT: 203 ↛ 204line 203 didn't jump to line 204, because the condition on line 203 was never true

204 fs_disp = 'RS' if fs == laskea.RS else fs 

205 message = f'WARNING: received 0 results for JQL ({jql_text}) and ({fs_disp}) separated values list' 

206 if not laskea.DRY_RUN: 

207 print(message, file=sys.stderr) 

208 return message 

209 return '' 

210 

211 table = copy.deepcopy(data['rows']) 

212 header_cells = list(table[0].keys()) # noqa 

213 for slot, record in enumerate(table): 

214 for key, cell in record.items(): 

215 if key_magic and key.lower() == 'key': 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true

216 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa 

217 if not isinstance(cell, str): 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true

218 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa 

219 

220 header = f'{fs.join(cell.replace(fs, replacement) for cell in header_cells)}' 

221 rows = [f'{fs.join(str(v).replace(fs, replacement) for v in line.values())}' for line in table] 

222 the_sv_list = '\n'.join([header] + rows) + '\n' 

223 return the_sv_list.replace('\r', '') if BASE_LF_ONLY else the_sv_list 

224 

225 

226@no_type_check 

227def markdown_table( 

228 handle: Jira, 

229 jql_text: str, 

230 caption: str = '', 

231 column_fields=None, 

232 column_filters=None, 

233 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

234) -> str: 

235 """Yes we can ... document later.""" 

236 if data is None: 236 ↛ 237line 236 didn't jump to line 237, because the condition on line 236 was never true

237 data = query(handle, jql_text, column_fields) 

238 if data.get('error', ''): 238 ↛ 239line 238 didn't jump to line 239, because the condition on line 238 was never true

239 return json.dumps(data, indent=2) 

240 

241 if not data['rows']: 

242 if laskea.STRICT: 

243 message = f'WARNING: received 0 results for JQL ({jql_text}) and table' 

244 if not laskea.DRY_RUN: 244 ↛ 246line 244 didn't jump to line 246, because the condition on line 244 was never false

245 print(message, file=sys.stderr) 

246 return message 

247 return '' 

248 

249 table = copy.deepcopy(data['rows']) 

250 columns = list(table[0].keys()) # noqa 

251 col_wid = {key: len(key) for key in columns} 

252 for slot, record in enumerate(table): 

253 for key, cell in record.items(): 

254 if key.lower() == 'key': 

255 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa 

256 if not isinstance(cell, str): 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true

257 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa 

258 col_wid[key] = max(len(table[slot][key]), col_wid[key]) # noqa 

259 

260 header_cells = [key.ljust(col_wid[key]) for key in columns] 

261 header = f'| {" | ".join(header_cells)} |' 

262 

263 separator_cells = ['-' * (col_wid[key] + 1) for key in columns] 

264 separator = f'|:{"|:".join(separator_cells)}|' 

265 

266 rows = [f'| {" | ".join(str(v).ljust(col_wid[k]) for k, v in line.items())} |' for line in table] 

267 issues = len(table) 

268 # TODO(sthagen) - spike only 

269 if caption: 269 ↛ 277line 269 didn't jump to line 277, because the condition on line 269 was never false

270 summary = ( 

271 caption.replace('$QUERY_TEXT$', jql_text) 

272 .replace('$NL$', '\n') 

273 .replace('$ISSUE_COUNT$', str(issues)) 

274 .replace('$SINGULAR$$PLURAL$s$', '' if issues == 1 else 's') 

275 ) 

276 else: 

277 summary = '' 

278 

279 the_table = '\n'.join([header] + [separator] + rows) + summary 

280 return the_table.replace('\r', '') if BASE_LF_ONLY else the_table 

281 

282 

283@no_type_check 

284def markdown_list( 

285 handle: Jira, 

286 jql_text: str, 

287 column_fields=None, 

288 list_type: str = 'ul', 

289 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

290) -> str: 

291 """Yes we can ... document later.""" 

292 if data is None: 292 ↛ 293line 292 didn't jump to line 293, because the condition on line 292 was never true

293 data = query(handle, jql_text, column_fields) 

294 if data.get('error', ''): 294 ↛ 295line 294 didn't jump to line 295, because the condition on line 294 was never true

295 return json.dumps(data, indent=2) 

296 

297 if not data['rows']: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 if laskea.STRICT: 

299 message = f'WARNING: received 0 results for JQL ({jql_text}) and {list_type}' 

300 if not laskea.DRY_RUN: 

301 print(message, file=sys.stderr) 

302 return message 

303 return '' 

304 

305 items = [] 

306 for record in data['rows']: 

307 k, v = '', '' 

308 for key, cell in record.items(): 

309 if key.lower() not in ('key', 'summary'): 309 ↛ 310line 309 didn't jump to line 310, because the condition on line 309 was never true

310 continue 

311 if key.lower() == 'key': 

312 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' 

313 else: 

314 v = cell 

315 items.append((k, v)) 

316 

317 if list_type in ('ol', 'ul'): 

318 lt = '-' if list_type == 'ul' else '1.' # implicit 'ol' 

319 xl = tuple(f'{lt} {key} - {summary}' for key, summary in items) 

320 the_list = '\n'.join(xl) + '\n' 

321 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list 

322 if list_type == 'dl': 322 ↛ 329line 322 didn't jump to line 329, because the condition on line 322 was never false

323 # 'Term' 

324 # ':definition of term' 

325 # 

326 xl = tuple(f'{key}\n:{summary}\n' for key, summary in items) 

327 the_list = '\n'.join(xl) + '\n' 

328 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list 

329 return f'Unexpected list type ({list_type}) in markdown_list not in ({("dl", "ol", "ul")})' + '\n' 

330 

331 

332@no_type_check 

333def markdown_heading( 

334 handle: Jira, 

335 jql_text: str, 

336 column_fields=None, 

337 level: int = 1, 

338 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

339) -> str: 

340 """Yes we can ... document later.""" 

341 if data is None: 341 ↛ 342line 341 didn't jump to line 342, because the condition on line 341 was never true

342 data = query(handle, jql_text, column_fields) 

343 if data.get('error', ''): 343 ↛ 344line 343 didn't jump to line 344, because the condition on line 343 was never true

344 return json.dumps(data, indent=2) 

345 

346 if not data['rows']: 

347 if laskea.STRICT: 

348 message = f'WARNING: received 0 results instead of 1 for JQL ({jql_text}) and h{level}' 

349 if not laskea.DRY_RUN: 349 ↛ 351line 349 didn't jump to line 351, because the condition on line 349 was never false

350 print(message, file=sys.stderr) 

351 return message 

352 return '' 

353 

354 items = [] 

355 for record in data['rows']: 

356 k, v = '', '' 

357 for key, cell in record.items(): 

358 if key.lower() not in ('key', 'summary'): 358 ↛ 359line 358 didn't jump to line 359, because the condition on line 358 was never true

359 continue 

360 if key.lower() == 'key': 

361 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' 

362 else: 

363 v = cell 

364 items.append((k, v)) 

365 received = len(items) 

366 if received != 1: 

367 if laskea.STRICT: 

368 message = f'WARNING: received {received} results instead of 1 for JQL ({jql_text}) and h{level}' 

369 if not laskea.DRY_RUN: 369 ↛ 371line 369 didn't jump to line 371, because the condition on line 369 was never false

370 print(message, file=sys.stderr) 

371 return message.replace('\r', '') if BASE_LF_ONLY else message 

372 return '' 

373 level_range = tuple(range(1, 6 + 1)) 

374 if level in level_range: 374 ↛ 379line 374 didn't jump to line 379, because the condition on line 374 was never false

375 heading_token = '#' * level 

376 xl = tuple(f'{heading_token} {key} - {summary}' for key, summary in items) 

377 the_heading = '\n'.join(xl) 

378 return the_heading.replace('\r', '') if BASE_LF_ONLY else the_heading 

379 message = f'Unexpected level for heading ({level}) in markdown_heading not in ({level_range})' 

380 if not laskea.DRY_RUN: 

381 print(message, file=sys.stderr) 

382 return message 

383 

384 

385@no_type_check 

386def fetch_jql(handle: Jira, jql_text: str) -> dict: 

387 """Expose the JIRA result structure directly.""" 

388 if not jql_text.strip(): 

389 return { 

390 'jql_text': jql_text, 

391 'error': 'Empty JIRA Query Language text detected', 

392 } 

393 

394 try: 

395 issues = handle.jql(jql_text, limit=1000) 

396 except (HTTPError, RuntimeError) as err: 

397 return { 

398 'jql_text': jql_text, 

399 'error': str(err), 

400 } 

401 

402 return { 

403 'jql_text': jql_text, 

404 'data': issues, 

405 'error': None, 

406 } 

407 

408 

409@no_type_check 

410def parent_children_sections( 

411 handle: Jira, 

412 parent_jql: str, 

413 children_jql: str, 

414 parent_type_name: str, 

415 children_type_name: str, 

416 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

417) -> str: 

418 """Create sub(sub)section level content representing the issue content from parent children filter results.""" 

419 if data is None: 419 ↛ 420line 419 didn't jump to line 420

420 data = { 

421 'parent_data': fetch_jql(handle, jql_text=parent_jql), 

422 'children_data': fetch_jql(handle, jql_text=children_jql), 

423 } 

424 if data['parent_data'].get('error', '') or data['children_data'].get('error', ''): 424 ↛ 425line 424 didn't jump to line 425, because the condition on line 424 was never true

425 return json.dumps(data, indent=2) 

426 

427 doc = {} 

428 has_parents = {} 

429 

430 for parent in data['parent_data']['data']['issues']: 

431 p_id = parent['id'] 

432 p_key = parent['key'] 

433 

434 p_field = parent['fields'] 

435 

436 p_itn = p_field['issuetype']['name'] 

437 # assert p_itn == parent_type_name 

438 p_sum = p_field['summary'] 

439 p_des = p_field['description'] # None for parent type names 

440 p_epic = p_field['customfield_10006'] # TODO assuming here ... 

441 p_created = p_field['created'] # Textual timestamps like "2019-03-12T10:01:25.000+0100" 

442 p_updated = p_field['updated'] 

443 

444 doc[p_key] = { 

445 'id': p_id, 

446 'type': p_itn, 

447 'summary': p_sum, 

448 'description': p_des, 

449 'epic': p_epic, 

450 'created': p_created, 

451 'updated': p_updated, 

452 'children': {}, 

453 } 

454 

455 children = p_field['subtasks'] 

456 

457 for child in children: 

458 c_id = child['id'] 

459 c_key = child['key'] 

460 

461 c_field = child['fields'] 

462 

463 c_itn = c_field['issuetype']['name'] 

464 # assert c_itn == children_type_name 

465 c_sum = c_field['summary'] 

466 

467 doc[p_key]['children'][c_key] = { 

468 'id': c_id, 

469 'type': c_itn, 

470 'summary': c_sum, 

471 'description': None, 

472 'created': None, 

473 'updated': None, 

474 } 

475 if c_key not in has_parents: 475 ↛ 477line 475 didn't jump to line 477, because the condition on line 475 was never false

476 has_parents[c_key] = [] 

477 has_parents[c_key].append(p_key) 

478 

479 for child in data['children_data']['data']['issues']: 

480 c_id = child['id'] 

481 c_key = child['key'] 

482 

483 c_field = child['fields'] 

484 

485 p_key = c_field['parent']['key'] 

486 # assert p_key in has_parents[c_key] 

487 # p_itn = c_field['parent']['issuetype']['name'] 

488 # assert p_itn == parent_type_name 

489 

490 c_itn = c_field['issuetype']['name'] 

491 # assert c_itn == children_type_name 

492 c_sum = c_field['summary'] 

493 c_des = c_field['description'] # Table or list or notes for children type names 

494 c_created = c_field['created'] 

495 c_updated = c_field['updated'] 

496 

497 doc[p_key]['children'][c_key]['description'] = c_des 

498 doc[p_key]['children'][c_key]['created'] = c_created 

499 doc[p_key]['children'][c_key]['updated'] = c_updated 

500 

501 return doc_to_markdown(doc, parent_type_name, children_type_name) 

502 

503 

504@no_type_check 

505def doc_to_markdown(doc, parent_type_name: str, children_type_name: str) -> str: # noqa 

506 """Transform the document content to markdown.""" 

507 md = [] 

508 for p_tree in doc.values(): 

509 p_head = f'## {p_tree["summary"]}'.strip().strip(LF) 

510 c_count = len(p_tree['children']) 

511 c_type_disp = f'{children_type_name}{"" if c_count == 1 else "s"}' 

512 p_para = f'The {p_tree["type"]} consists of {c_count} {c_type_disp}'.strip(LF) 

513 

514 c_parts = [] 

515 double_pipe, ast_pipe, pipe_ast = '||', '|*', '*|' 

516 nbsp = '&nbsp;' 

517 for c_data in p_tree['children'].values(): 

518 c_head = f'### {c_data["summary"]}'.strip().strip(LF) 

519 c_in = list(c_data['description'].replace(nbsp, ' ').strip().split(LF)) 

520 c_out = [] 

521 for line in c_in: 

522 if line.startswith(double_pipe) or line.startswith(ast_pipe): 

523 # patch confluence markdown like table heads ... 

524 line_s = line.strip() 

525 extra_line = '' 

526 if line_s.startswith(double_pipe) and line_s.endswith(double_pipe): 

527 line = line.replace(double_pipe, '|') 

528 extra_line = ( 

529 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |') 

530 ) 

531 elif line_s.startswith(ast_pipe) and line_s.endswith(pipe_ast): 531 ↛ 535line 531 didn't jump to line 535, because the condition on line 531 was never false

532 extra_line = ( 

533 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |') 

534 ) 

535 c_out.append(line) 

536 if extra_line: 536 ↛ 521line 536 didn't jump to line 521, because the condition on line 536 was never false

537 c_out.append(extra_line) 

538 else: 

539 c_out.append(line) 

540 

541 c_parts.extend([LF, c_head, LF, *c_out]) 

542 

543 md.extend([LF, p_head, LF, p_para]) 

544 md.extend(c_parts) 

545 

546 md.append(LF) 

547 return LF.join(md).replace(LF + LF, LF)