Coverage for laskea/api/tabulator.py: 8.55%
90 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-10 22:19:18 +00:00
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-10 22:19:18 +00:00
1# -*- coding: utf-8 -*-
2"""Tabulator style REST/JSON data proxy connector API for code generation."""
3from typing import Mapping, no_type_check
5import jmespath
6import requests # noqa
7from urllib3.exceptions import InsecureRequestWarning # noqa
9import laskea
11requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # type: ignore
14@no_type_check
15def parse_matrix(configuration: Mapping[str, object]) -> Mapping[str, object]:
16 """DRY."""
17 matrix = configuration['matrix']
18 parsed = {
19 'columns': tuple(key for key, _, _, _ in matrix), # noqa
20 'labels': tuple(label for _, label, _, _ in matrix), # noqa
21 'is_numeric': [key for key, _, is_num, _ in matrix if is_num], # noqa
22 'align': [lcr for _, _, _, lcr in matrix], # noqa
23 }
25 parsed['out_round'] = tuple(
26 (key, 1 if key in parsed['is_numeric'] else 0) for key, _ in zip(parsed['columns'], parsed['labels'])
27 )
28 parsed['out_format'] = tuple(
29 (key, '5.1f' if key in parsed['is_numeric'] else '') for key, _ in zip(parsed['columns'], parsed['labels'])
30 )
31 parsed['column_fields'] = tuple((key, label) for key, label in zip(parsed['columns'], parsed['labels']))
33 return parsed
36@no_type_check
37def tabulator_overview_table(configuration: Mapping[str, object]) -> str:
38 """Because we can ... document later."""
39 m = parse_matrix(configuration)
40 over_view = f'{configuration["base_url"]}{configuration["path"]}'
41 http_options = {'timeout': laskea.REQUESTS_TIMEOUT_SECS}
43 data = []
44 data_version = ''
45 for year in configuration['years']: # noqa
46 source = over_view.replace('$year$', str(year))
47 r = requests.get(source, verify=configuration['verify_server_certificate'], **http_options) # nosec B113
48 as_json = r.json()
49 data_version = jmespath.search('data_version', as_json)
50 for entry in jmespath.search('data[]', as_json):
51 record = [entry[key] for key, _ in m['column_fields']] # noqa
52 for i, (l, f) in enumerate(m['out_format']): # noqa
53 if i and f:
54 if record[i] is not None:
55 try:
56 v = float(record[i])
57 record[i] = f'{round(v, m["out_round"][i][1]): {f}}' # noqa
58 except ValueError:
59 pass
60 else:
61 record[i] = ''
63 data.append(record)
65 header_widths = [len(label) for _, label in m['column_fields']] # noqa
66 widths = header_widths[:]
67 selection = data[:]
68 for record in selection:
69 for i, s in enumerate(record):
70 if s is not None:
71 widths[i] = max(widths[i], len(s))
73 header_cells = [
74 m['labels'][key].rjust(widths[key]) # noqa
75 if m['align'][key] == 'R' # noqa
76 else (
77 m['labels'][key].ljust(widths[key]) # noqa
78 if m['align'][key] == 'L' # noqa
79 else m['labels'][key].center(widths[key]) # noqa
80 ) # noqa
81 for key in range(len(m['column_fields'])) # noqa
82 ]
83 header = f'| {" | ".join(header_cells)} |'
85 separator_cells = ['-' * (widths[key] + 1) for key in range(len(m['column_fields']))] # noqa
86 separator = [
87 f':{v}' if lcr == 'L' else (f'{v}:' if lcr == 'R' else f':{v[:-1]}:')
88 for lcr, v in zip(m['align'], separator_cells) # noqa
89 ]
90 separator_display = f'|{"|".join(separator)}|'
92 rows = []
93 for row in selection:
94 rows.append(
95 [
96 str(v).rjust(widths[k])
97 if m['align'][k] == 'R' # noqa
98 else (str(v).ljust(widths[k]) if m['align'][k] == 'L' else str(v).center(widths[k])) # noqa
99 for k, v in enumerate(row)
100 ]
101 )
103 rows_display = [f'| {" | ".join(v for v in row)} |' for row in rows]
105 summary = f'\n\nData version: {data_version}'
106 the_table = '\n'.join([header] + [separator_display] + rows_display) + summary
108 return the_table.replace('\r', '') if laskea.BASE_LF_ONLY else the_table
111@no_type_check
112def tabulator_kpi_table(configuration: Mapping[str, object], selected: str) -> str:
113 """Because we can too ... document later."""
114 m = parse_matrix(configuration)
115 http_options = {'timeout': laskea.REQUESTS_TIMEOUT_SECS}
116 data = []
117 data_version = ''
118 for year in configuration['years']: # noqa
119 the_path = configuration['paths'][selected].replace('$year$', str(year)) # noqa
120 source = f'{configuration["base_url"]}{the_path}'
121 r = requests.get(source, configuration['verify_server_certificate'], **http_options) # nosec B113
122 as_json = r.json()
123 data_version = jmespath.search('data_version', as_json)
124 for entry in jmespath.search('data[]', as_json):
125 record = [entry[key] for key, _ in m['column_fields']] # noqa
126 for i, (l, f) in enumerate(m['out_format']): # noqa
127 if i and f and record[i] is not None:
128 record[i] = f'{round(record[i], m["out_round"][i][1]): {f}}' # noqa
129 data.append(record)
131 header_widths = [len(label) for _, label in m['column_fields']] # noqa
132 widths = header_widths[:]
133 selection = sorted(data, reverse=True)[:12]
134 for record in selection:
135 for i, s in enumerate(record):
136 widths[i] = max(widths[i], len(s))
138 header_cells = [
139 m['labels'][key].rjust(widths[key]) if key else m['labels'][key].ljust(widths[key]) # noqa
140 for key in range(len(m['column_fields'])) # noqa
141 ]
142 header = f'| {" | ".join(header_cells)} |'
144 separator_cells = ['-' * (widths[key] + 1) for key in range(len(m['column_fields']))] # noqa
145 separator = f'|:{separator_cells[0]}|{":|".join(separator_cells[1:])}:|'
147 rows = [f'| {" | ".join(str(v).rjust(widths[k]) for k, v in enumerate(line))} |' for line in selection]
149 summary = f'\n\nData version: {data_version}'
150 the_table = '\n'.join([header] + [separator] + rows) + summary
152 return the_table.replace('\r', '') if laskea.BASE_LF_ONLY else the_table