Coverage for laskea/api/tabulator.py: 8.55%

90 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-10 22:19:18 +00:00

1# -*- coding: utf-8 -*- 

2"""Tabulator style REST/JSON data proxy connector API for code generation.""" 

3from typing import Mapping, no_type_check 

4 

5import jmespath 

6import requests # noqa 

7from urllib3.exceptions import InsecureRequestWarning # noqa 

8 

9import laskea 

10 

11requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # type: ignore 

12 

13 

14@no_type_check 

15def parse_matrix(configuration: Mapping[str, object]) -> Mapping[str, object]: 

16 """DRY.""" 

17 matrix = configuration['matrix'] 

18 parsed = { 

19 'columns': tuple(key for key, _, _, _ in matrix), # noqa 

20 'labels': tuple(label for _, label, _, _ in matrix), # noqa 

21 'is_numeric': [key for key, _, is_num, _ in matrix if is_num], # noqa 

22 'align': [lcr for _, _, _, lcr in matrix], # noqa 

23 } 

24 

25 parsed['out_round'] = tuple( 

26 (key, 1 if key in parsed['is_numeric'] else 0) for key, _ in zip(parsed['columns'], parsed['labels']) 

27 ) 

28 parsed['out_format'] = tuple( 

29 (key, '5.1f' if key in parsed['is_numeric'] else '') for key, _ in zip(parsed['columns'], parsed['labels']) 

30 ) 

31 parsed['column_fields'] = tuple((key, label) for key, label in zip(parsed['columns'], parsed['labels'])) 

32 

33 return parsed 

34 

35 

36@no_type_check 

37def tabulator_overview_table(configuration: Mapping[str, object]) -> str: 

38 """Because we can ... document later.""" 

39 m = parse_matrix(configuration) 

40 over_view = f'{configuration["base_url"]}{configuration["path"]}' 

41 http_options = {'timeout': laskea.REQUESTS_TIMEOUT_SECS} 

42 

43 data = [] 

44 data_version = '' 

45 for year in configuration['years']: # noqa 

46 source = over_view.replace('$year$', str(year)) 

47 r = requests.get(source, verify=configuration['verify_server_certificate'], **http_options) # nosec B113 

48 as_json = r.json() 

49 data_version = jmespath.search('data_version', as_json) 

50 for entry in jmespath.search('data[]', as_json): 

51 record = [entry[key] for key, _ in m['column_fields']] # noqa 

52 for i, (l, f) in enumerate(m['out_format']): # noqa 

53 if i and f: 

54 if record[i] is not None: 

55 try: 

56 v = float(record[i]) 

57 record[i] = f'{round(v, m["out_round"][i][1]): {f}}' # noqa 

58 except ValueError: 

59 pass 

60 else: 

61 record[i] = '' 

62 

63 data.append(record) 

64 

65 header_widths = [len(label) for _, label in m['column_fields']] # noqa 

66 widths = header_widths[:] 

67 selection = data[:] 

68 for record in selection: 

69 for i, s in enumerate(record): 

70 if s is not None: 

71 widths[i] = max(widths[i], len(s)) 

72 

73 header_cells = [ 

74 m['labels'][key].rjust(widths[key]) # noqa 

75 if m['align'][key] == 'R' # noqa 

76 else ( 

77 m['labels'][key].ljust(widths[key]) # noqa 

78 if m['align'][key] == 'L' # noqa 

79 else m['labels'][key].center(widths[key]) # noqa 

80 ) # noqa 

81 for key in range(len(m['column_fields'])) # noqa 

82 ] 

83 header = f'| {" | ".join(header_cells)} |' 

84 

85 separator_cells = ['-' * (widths[key] + 1) for key in range(len(m['column_fields']))] # noqa 

86 separator = [ 

87 f':{v}' if lcr == 'L' else (f'{v}:' if lcr == 'R' else f':{v[:-1]}:') 

88 for lcr, v in zip(m['align'], separator_cells) # noqa 

89 ] 

90 separator_display = f'|{"|".join(separator)}|' 

91 

92 rows = [] 

93 for row in selection: 

94 rows.append( 

95 [ 

96 str(v).rjust(widths[k]) 

97 if m['align'][k] == 'R' # noqa 

98 else (str(v).ljust(widths[k]) if m['align'][k] == 'L' else str(v).center(widths[k])) # noqa 

99 for k, v in enumerate(row) 

100 ] 

101 ) 

102 

103 rows_display = [f'| {" | ".join(v for v in row)} |' for row in rows] 

104 

105 summary = f'\n\nData version: {data_version}' 

106 the_table = '\n'.join([header] + [separator_display] + rows_display) + summary 

107 

108 return the_table.replace('\r', '') if laskea.BASE_LF_ONLY else the_table 

109 

110 

111@no_type_check 

112def tabulator_kpi_table(configuration: Mapping[str, object], selected: str) -> str: 

113 """Because we can too ... document later.""" 

114 m = parse_matrix(configuration) 

115 http_options = {'timeout': laskea.REQUESTS_TIMEOUT_SECS} 

116 data = [] 

117 data_version = '' 

118 for year in configuration['years']: # noqa 

119 the_path = configuration['paths'][selected].replace('$year$', str(year)) # noqa 

120 source = f'{configuration["base_url"]}{the_path}' 

121 r = requests.get(source, configuration['verify_server_certificate'], **http_options) # nosec B113 

122 as_json = r.json() 

123 data_version = jmespath.search('data_version', as_json) 

124 for entry in jmespath.search('data[]', as_json): 

125 record = [entry[key] for key, _ in m['column_fields']] # noqa 

126 for i, (l, f) in enumerate(m['out_format']): # noqa 

127 if i and f and record[i] is not None: 

128 record[i] = f'{round(record[i], m["out_round"][i][1]): {f}}' # noqa 

129 data.append(record) 

130 

131 header_widths = [len(label) for _, label in m['column_fields']] # noqa 

132 widths = header_widths[:] 

133 selection = sorted(data, reverse=True)[:12] 

134 for record in selection: 

135 for i, s in enumerate(record): 

136 widths[i] = max(widths[i], len(s)) 

137 

138 header_cells = [ 

139 m['labels'][key].rjust(widths[key]) if key else m['labels'][key].ljust(widths[key]) # noqa 

140 for key in range(len(m['column_fields'])) # noqa 

141 ] 

142 header = f'| {" | ".join(header_cells)} |' 

143 

144 separator_cells = ['-' * (widths[key] + 1) for key in range(len(m['column_fields']))] # noqa 

145 separator = f'|:{separator_cells[0]}|{":|".join(separator_cells[1:])}:|' 

146 

147 rows = [f'| {" | ".join(str(v).rjust(widths[k]) for k, v in enumerate(line))} |' for line in selection] 

148 

149 summary = f'\n\nData version: {data_version}' 

150 the_table = '\n'.join([header] + [separator] + rows) + summary 

151 

152 return the_table.replace('\r', '') if laskea.BASE_LF_ONLY else the_table