Coverage for laskea/api/jira.py: 75.22%

284 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-08-17 13:24:57 +00:00

1# -*- coding: utf-8 -*- 

2"""JIRA proxy connector API for code generation.""" 

3import copy 

4import json 

5import os 

6import sys 

7from typing import Iterable, Mapping, Sized, Union, no_type_check 

8 

9import jmespath 

10from atlassian import Jira # type: ignore # noqa 

11from requests.exceptions import HTTPError 

12 

13import laskea 

14 

15API_BASE_URL = 'https://example.com' 

16 

17DEFAULT_COLUMN_FIELDS = ['Key', 'Summary', ['Priority', 'P'], 'Status', 'Custom Field Wun', 'Custom Field Other (CFO)'] 

18 

19WUN_ID = 'customfield_11501' 

20ANOTHER_ID = 'customfield_13901' 

21KNOWN_CI_FIELDS = { 

22 'key': ['key', 'key'], 

23 'summary': ['summary', 'fields.summary'], 

24 'priority': ['priority', 'fields.priority.name'], 

25 'status': ['status', 'fields.status.name'], 

26 'custom field name': [WUN_ID, f'fields.{WUN_ID}'], 

27 'custom field other': [ANOTHER_ID, f'fields.{ANOTHER_ID}[].value'], 

28} 

29 

30BASE_USER = os.getenv(f'{laskea.APP_ENV}_USER', '') 

31BASE_PASS = os.getenv(f'{laskea.APP_ENV}_TOKEN', '') 

32BASE_URL = os.getenv(f'{laskea.APP_ENV}_BASE_URL', '') 

33BASE_IS_CLOUD = bool(os.getenv(f'{laskea.APP_ENV}_IS_CLOUD', '')) 

34BASE_COL_FIELDS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_FIELDS', json.dumps(DEFAULT_COLUMN_FIELDS))) 

35BASE_COL_MAPS = json.loads(os.getenv(f'{laskea.APP_ENV}_COL_MAPS', json.dumps(KNOWN_CI_FIELDS))) 

36BASE_JOIN_STRING = os.getenv(f'{laskea.APP_ENV}_JOIN_STRING', ' <br>') 

37BASE_LF_ONLY = bool(os.getenv(f'{laskea.APP_ENV}_LF_ONLY', 'YES')) 

38LF = '\n' 

39 

40 

41def mock(number: int) -> int: 

42 """Intermediate for starting the dev env in a valid state.""" 

43 return number 

44 

45 

46def login(user: str = '', token: str = '', url: str = '', is_cloud: bool = False) -> Jira: # nosec 

47 """LatAli""" 

48 if not user: 

49 user = BASE_USER 

50 if not token: 

51 token = BASE_PASS 

52 if not url: 

53 url = BASE_URL 

54 if not is_cloud: 

55 is_cloud = BASE_IS_CLOUD 

56 if not user or not token or not url: 

57 raise ValueError('User, Token, and URL are all required for login.') 

58 return Jira(url=url, username=user, password=token, cloud=is_cloud) 

59 

60 

61@no_type_check 

62def query(handle: Jira, jql_text: str, column_fields=None) -> dict: 

63 """EggLayingWoolMilkDear.""" 

64 

65 if not column_fields: 65 ↛ 68line 65 didn't jump to line 68, because the condition on line 65 was never false

66 column_fields = BASE_COL_FIELDS 

67 

68 if not jql_text.strip(): 68 ↛ 74line 68 didn't jump to line 74, because the condition on line 68 was never false

69 return { 

70 'jql_text': jql_text, 

71 'error': 'Empty JIRA Query Language text detected', 

72 } 

73 

74 completed_column_fields = [] 

75 for entry in column_fields: 

76 if isinstance(entry, str): 

77 candidate, concept, label = entry.lower(), entry, entry 

78 else: 

79 try: 

80 concept, label = entry 

81 candidate = concept.lower() 

82 except TypeError: 

83 return { 

84 'jql_text': jql_text, 

85 'column_fields': column_fields, 

86 'parsed_columns': completed_column_fields, 

87 'error': f'The column ({entry}) is neither a string nor a pair of (concept, label)', 

88 } 

89 

90 for field in BASE_COL_MAPS.keys(): 

91 if field in candidate: 

92 completed_column_fields.append( 

93 { 

94 'path': BASE_COL_MAPS[field][1], 

95 'id': BASE_COL_MAPS[field][0], 

96 'concept': concept, 

97 'label': label, 

98 'field': field, 

99 } 

100 ) 

101 

102 if not completed_column_fields: 

103 return { 

104 'jql_text': jql_text, 

105 'column_fields': column_fields, 

106 'error': 'Completed column fields empty (no known fields?)', 

107 } 

108 

109 try: 

110 issues = handle.jql(jql_text, limit=1000) 

111 except (HTTPError, RuntimeError) as err: 

112 return { 

113 'jql_text': jql_text, 

114 'column_fields': column_fields, 

115 'parsed_columns': completed_column_fields, 

116 'error': str(err), 

117 } 

118 

119 pairs = [(col['label'], col['path']) for col in completed_column_fields] 

120 rows = [{label: jmespath.search(path, issue) or [''] for label, path in pairs} for issue in issues['issues']] 

121 return { 

122 'jql_text': jql_text, 

123 'column_fields': column_fields, 

124 'parsed_columns': completed_column_fields, 

125 'error': None, 

126 'rows': rows, 

127 } 

128 

129 

130@no_type_check 

131def separated_values_list( 

132 handle: Jira, 

133 jql_text: str, 

134 column_fields=None, 

135 key_magic: bool = False, 

136 field_sep: str = laskea.PIPE, 

137 replacement: str = laskea.FS_SLUG, 

138 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

139) -> str: 

140 """Yes we can ... document later.""" 

141 if data is None: 141 ↛ 142line 141 didn't jump to line 142, because the condition on line 141 was never true

142 data = query(handle, jql_text, column_fields) 

143 if data.get('error', ''): 143 ↛ 144line 143 didn't jump to line 144, because the condition on line 143 was never true

144 return json.dumps(data, indent=2) 

145 

146 fs = field_sep # alias 

147 if not data['rows']: 

148 if laskea.STRICT: 148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true

149 fs_disp = 'RS' if fs == laskea.RS else fs 

150 message = f'WARNING: received 0 results for JQL ({jql_text}) and ({fs_disp}) separated values list' 

151 if not laskea.DRY_RUN: 

152 print(message, file=sys.stderr) 

153 return message 

154 return '' 

155 

156 table = copy.deepcopy(data['rows']) 

157 header_cells = list(table[0].keys()) # noqa 

158 for slot, record in enumerate(table): 

159 for key, cell in record.items(): 

160 if key_magic and key.lower() == 'key': 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true

161 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa 

162 if not isinstance(cell, str): 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true

163 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa 

164 

165 header = f'{fs.join(cell.replace(fs, replacement) for cell in header_cells)}' 

166 rows = [f'{fs.join(str(v).replace(fs, replacement) for v in line.values())}' for line in table] 

167 the_sv_list = '\n'.join([header] + rows) + '\n' 

168 return the_sv_list.replace('\r', '') if BASE_LF_ONLY else the_sv_list 

169 

170 

171@no_type_check 

172def markdown_table( 

173 handle: Jira, jql_text: str, column_fields=None, data: Mapping[str, Union[object, Iterable, Sized]] = None 

174) -> str: 

175 """Yes we can ... document later.""" 

176 if data is None: 176 ↛ 177line 176 didn't jump to line 177, because the condition on line 176 was never true

177 data = query(handle, jql_text, column_fields) 

178 if data.get('error', ''): 178 ↛ 179line 178 didn't jump to line 179, because the condition on line 178 was never true

179 return json.dumps(data, indent=2) 

180 

181 if not data['rows']: 

182 if laskea.STRICT: 

183 message = f'WARNING: received 0 results for JQL ({jql_text}) and table' 

184 if not laskea.DRY_RUN: 184 ↛ 186line 184 didn't jump to line 186, because the condition on line 184 was never false

185 print(message, file=sys.stderr) 

186 return message 

187 return '' 

188 

189 table = copy.deepcopy(data['rows']) 

190 columns = list(table[0].keys()) # noqa 

191 col_wid = {key: len(key) for key in columns} 

192 for slot, record in enumerate(table): 

193 for key, cell in record.items(): 

194 if key.lower() == 'key': 

195 table[slot][key] = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' # noqa 

196 if not isinstance(cell, str): 196 ↛ 197line 196 didn't jump to line 197, because the condition on line 196 was never true

197 table[slot][key] = BASE_JOIN_STRING.join(cell) # noqa 

198 col_wid[key] = max(len(table[slot][key]), col_wid[key]) # noqa 

199 

200 header_cells = [key.ljust(col_wid[key]) for key in columns] 

201 header = f'| {" | ".join(header_cells)} |' 

202 

203 separator_cells = ['-' * (col_wid[key] + 1) for key in columns] 

204 separator = f'|:{"|:".join(separator_cells)}|' 

205 

206 rows = [f'| {" | ".join(str(v).ljust(col_wid[k]) for k, v in line.items())} |' for line in table] 

207 issues = len(table) 

208 summary = f'\n\n{issues} issue{"" if issues == 1 else "s"}' 

209 the_table = '\n'.join([header] + [separator] + rows) + summary 

210 return the_table.replace('\r', '') if BASE_LF_ONLY else the_table 

211 

212 

213@no_type_check 

214def markdown_list( 

215 handle: Jira, 

216 jql_text: str, 

217 column_fields=None, 

218 list_type: str = 'ul', 

219 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

220) -> str: 

221 """Yes we can ... document later.""" 

222 if data is None: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true

223 data = query(handle, jql_text, column_fields) 

224 if data.get('error', ''): 224 ↛ 225line 224 didn't jump to line 225, because the condition on line 224 was never true

225 return json.dumps(data, indent=2) 

226 

227 if not data['rows']: 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true

228 if laskea.STRICT: 

229 message = f'WARNING: received 0 results for JQL ({jql_text}) and {list_type}' 

230 if not laskea.DRY_RUN: 

231 print(message, file=sys.stderr) 

232 return message 

233 return '' 

234 

235 items = [] 

236 for record in data['rows']: 

237 k, v = '', '' 

238 for key, cell in record.items(): 

239 if key.lower() not in ('key', 'summary'): 239 ↛ 240line 239 didn't jump to line 240, because the condition on line 239 was never true

240 continue 

241 if key.lower() == 'key': 

242 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' 

243 else: 

244 v = cell 

245 items.append((k, v)) 

246 

247 if list_type in ('ol', 'ul'): 

248 lt = '-' if list_type == 'ul' else '1.' # implicit 'ol' 

249 xl = tuple(f'{lt} {key} - {summary}' for key, summary in items) 

250 the_list = '\n'.join(xl) + '\n' 

251 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list 

252 if list_type == 'dl': 252 ↛ 259line 252 didn't jump to line 259, because the condition on line 252 was never false

253 # 'Term' 

254 # ':definition of term' 

255 # 

256 xl = tuple(f'{key}\n:{summary}\n' for key, summary in items) 

257 the_list = '\n'.join(xl) + '\n' 

258 return the_list.replace('\r', '') if BASE_LF_ONLY else the_list 

259 return f'Unexpected list type ({list_type}) in markdown_list not in ({("dl", "ol", "ul")})' + '\n' 

260 

261 

262@no_type_check 

263def markdown_heading( 

264 handle: Jira, 

265 jql_text: str, 

266 column_fields=None, 

267 level: int = 1, 

268 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

269) -> str: 

270 """Yes we can ... document later.""" 

271 if data is None: 271 ↛ 272line 271 didn't jump to line 272, because the condition on line 271 was never true

272 data = query(handle, jql_text, column_fields) 

273 if data.get('error', ''): 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true

274 return json.dumps(data, indent=2) 

275 

276 if not data['rows']: 

277 if laskea.STRICT: 

278 message = f'WARNING: received 0 results instead of 1 for JQL ({jql_text}) and h{level}' 

279 if not laskea.DRY_RUN: 279 ↛ 281line 279 didn't jump to line 281, because the condition on line 279 was never false

280 print(message, file=sys.stderr) 

281 return message 

282 return '' 

283 

284 items = [] 

285 for record in data['rows']: 

286 k, v = '', '' 

287 for key, cell in record.items(): 

288 if key.lower() not in ('key', 'summary'): 288 ↛ 289line 288 didn't jump to line 289, because the condition on line 288 was never true

289 continue 

290 if key.lower() == 'key': 

291 k = f'[{cell}]({BASE_URL.strip("/")}/browse/{cell})' 

292 else: 

293 v = cell 

294 items.append((k, v)) 

295 received = len(items) 

296 if received != 1: 

297 if laskea.STRICT: 

298 message = f'WARNING: received {received} results instead of 1 for JQL ({jql_text}) and h{level}' 

299 if not laskea.DRY_RUN: 299 ↛ 301line 299 didn't jump to line 301, because the condition on line 299 was never false

300 print(message, file=sys.stderr) 

301 return message.replace('\r', '') if BASE_LF_ONLY else message 

302 return '' 

303 level_range = tuple(range(1, 6 + 1)) 

304 if level in level_range: 304 ↛ 309line 304 didn't jump to line 309, because the condition on line 304 was never false

305 heading_token = '#' * level 

306 xl = tuple(f'{heading_token} {key} - {summary}' for key, summary in items) 

307 the_heading = '\n'.join(xl) 

308 return the_heading.replace('\r', '') if BASE_LF_ONLY else the_heading 

309 message = f'Unexpected level for heading ({level}) in markdown_heading not in ({level_range})' 

310 if not laskea.DRY_RUN: 

311 print(message, file=sys.stderr) 

312 return message 

313 

314 

315@no_type_check 

316def fetch_jql(handle: Jira, jql_text: str) -> dict: 

317 """Expose the JIRA result structure directly.""" 

318 if not jql_text.strip(): 

319 return { 

320 'jql_text': jql_text, 

321 'error': 'Empty JIRA Query Language text detected', 

322 } 

323 

324 try: 

325 issues = handle.jql(jql_text, limit=1000) 

326 except (HTTPError, RuntimeError) as err: 

327 return { 

328 'jql_text': jql_text, 

329 'error': str(err), 

330 } 

331 

332 return { 

333 'jql_text': jql_text, 

334 'data': issues, 

335 'error': None, 

336 } 

337 

338 

339@no_type_check 

340def parent_children_sections( 

341 handle: Jira, 

342 parent_jql: str, 

343 children_jql: str, 

344 parent_type_name: str, 

345 children_type_name: str, 

346 data: Mapping[str, Union[object, Iterable, Sized]] = None, 

347) -> str: 

348 """Create sub(sub)section level content representing the issue content from parent children filter results.""" 

349 if data is None: 349 ↛ 350line 349 didn't jump to line 350

350 data = { 

351 'parent_data': fetch_jql(handle, jql_text=parent_jql), 

352 'children_data': fetch_jql(handle, jql_text=children_jql), 

353 } 

354 if data['parent_data'].get('error', '') or data['children_data'].get('error', ''): 354 ↛ 355line 354 didn't jump to line 355, because the condition on line 354 was never true

355 return json.dumps(data, indent=2) 

356 

357 doc = {} 

358 has_parents = {} 

359 

360 for parent in data['parent_data']['data']['issues']: 

361 p_id = parent['id'] 

362 p_key = parent['key'] 

363 

364 p_field = parent['fields'] 

365 

366 p_itn = p_field['issuetype']['name'] 

367 # assert p_itn == parent_type_name 

368 p_sum = p_field['summary'] 

369 p_des = p_field['description'] # None for parent type names 

370 p_epic = p_field['customfield_10006'] # TODO assuming here ... 

371 p_created = p_field['created'] # Textual timestamps like "2019-03-12T10:01:25.000+0100" 

372 p_updated = p_field['updated'] 

373 

374 doc[p_key] = { 

375 'id': p_id, 

376 'type': p_itn, 

377 'summary': p_sum, 

378 'description': p_des, 

379 'epic': p_epic, 

380 'created': p_created, 

381 'updated': p_updated, 

382 'children': {}, 

383 } 

384 

385 children = p_field['subtasks'] 

386 

387 for child in children: 

388 c_id = child['id'] 

389 c_key = child['key'] 

390 

391 c_field = child['fields'] 

392 

393 c_itn = c_field['issuetype']['name'] 

394 # assert c_itn == children_type_name 

395 c_sum = c_field['summary'] 

396 

397 doc[p_key]['children'][c_key] = { 

398 'id': c_id, 

399 'type': c_itn, 

400 'summary': c_sum, 

401 'description': None, 

402 'created': None, 

403 'updated': None, 

404 } 

405 if c_key not in has_parents: 405 ↛ 407line 405 didn't jump to line 407, because the condition on line 405 was never false

406 has_parents[c_key] = [] 

407 has_parents[c_key].append(p_key) 

408 

409 for child in data['children_data']['data']['issues']: 

410 c_id = child['id'] 

411 c_key = child['key'] 

412 

413 c_field = child['fields'] 

414 

415 p_key = c_field['parent']['key'] 

416 # assert p_key in has_parents[c_key] 

417 # p_itn = c_field['parent']['issuetype']['name'] 

418 # assert p_itn == parent_type_name 

419 

420 c_itn = c_field['issuetype']['name'] 

421 # assert c_itn == children_type_name 

422 c_sum = c_field['summary'] 

423 c_des = c_field['description'] # Table or list or notes for children type names 

424 c_created = c_field['created'] 

425 c_updated = c_field['updated'] 

426 

427 doc[p_key]['children'][c_key]['description'] = c_des 

428 doc[p_key]['children'][c_key]['created'] = c_created 

429 doc[p_key]['children'][c_key]['updated'] = c_updated 

430 

431 return doc_to_markdown(doc, parent_type_name, children_type_name) 

432 

433 

434@no_type_check 

435def doc_to_markdown(doc, parent_type_name: str, children_type_name: str) -> str: # noqa 

436 """Transform the document content to markdown.""" 

437 md = [] 

438 for p_tree in doc.values(): 

439 p_head = f'## {p_tree["summary"]}'.strip().strip(LF) 

440 c_count = len(p_tree['children']) 

441 c_type_disp = f'{children_type_name}{"" if c_count == 1 else "s"}' 

442 p_para = f'The {p_tree["type"]} consists of {c_count} {c_type_disp}'.strip(LF) 

443 

444 c_parts = [] 

445 double_pipe, ast_pipe, pipe_ast = '||', '|*', '*|' 

446 nbsp = '&nbsp;' 

447 for c_data in p_tree['children'].values(): 

448 c_head = f'### {c_data["summary"]}'.strip().strip(LF) 

449 c_in = list(c_data['description'].replace(nbsp, ' ').strip().split(LF)) 

450 c_out = [] 

451 for line in c_in: 

452 if line.startswith(double_pipe) or line.startswith(ast_pipe): 

453 # patch confluence markdown like table heads ... 

454 line_s = line.strip() 

455 extra_line = '' 

456 if line_s.startswith(double_pipe) and line_s.endswith(double_pipe): 

457 line = line.replace(double_pipe, '|') 

458 extra_line = ( 

459 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |') 

460 ) 

461 elif line_s.startswith(ast_pipe) and line_s.endswith(pipe_ast): 461 ↛ 465line 461 didn't jump to line 465, because the condition on line 461 was never false

462 extra_line = ( 

463 ''.join(c if c == '|' else '-' for c in line).replace('|-', '|:').replace('-|', ' |') 

464 ) 

465 c_out.append(line) 

466 if extra_line: 466 ↛ 451line 466 didn't jump to line 451, because the condition on line 466 was never false

467 c_out.append(extra_line) 

468 else: 

469 c_out.append(line) 

470 

471 c_parts.extend([LF, c_head, LF, *c_out]) 

472 

473 md.extend([LF, p_head, LF, p_para]) 

474 md.extend(c_parts) 

475 

476 md.append(LF) 

477 return LF.join(md).replace(LF + LF, LF)