Coverage for tekstialue/tekstialue.py: 57.38%
168 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-05 19:20:46 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-05 19:20:46 +00:00
1"""Purge monotonically named files in folders keeping range endpoints.
3Implementation uses sha256 hashes for identity and assumes that
4the natural order relates to the notion of fresher or better.
5"""
7import argparse
8import datetime as dti
9import json
10import logging
11import pathlib
12from typing import no_type_check
14from tekstialue import DEFAULT_CONFIG_NAME, ENCODING, log
16TAB_START_TOK = r'\begin{longtable}[]{@{}'
17TOP_RULE = r'\toprule()'
18MID_RULE = r'\midrule()'
19END_HEAD = r'\endhead'
20END_DATA_ROW = r'\\'
21BOT_RULE = r'\bottomrule()'
22TAB_END_TOK = r'\end{longtable}'
24TAB_NEW_START = r"""\begin{small}
25\begin{longtable}[]{|
26>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.1500}}|
27>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.5500}}|
28>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.1500}}|
29>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.2000}}|}
30\hline"""
32TAB_HACKED_HEAD = r"""\begin{minipage}[b]{\linewidth}\raggedright
33\ \mbox{\textbf{$COL1$}}
34\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright
35\mbox{\textbf{$COL2$}}
36\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright
37\mbox{\textbf{$COL3_A$}} \mbox{\textbf{$COL3_B$}}
38\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright
39\mbox{\textbf{$COL4_A$}} \mbox{\textbf{$COL4_B$}}
40\end{minipage} \\
41\hline
42\endfirsthead
43\multicolumn{4}{@{}l}{\small \ldots continued}\\\hline
44\hline
45\begin{minipage}[b]{\linewidth}\raggedright
46\ \mbox{\textbf{$COL1$}}
47\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright
48\mbox{\textbf{$COL2$}}
49\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright
50\mbox{\textbf{$COL3_A$}} \mbox{\textbf{$COL3_B$}}
51\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright
52\mbox{\textbf{$COL4_A$}} \mbox{\textbf{$COL4_B$}}
53\end{minipage} \\
54\endhead
55\hline"""
57COL_1 = 'a'
58COL_2 = 'b'
59COL_3_A = 'c'
60COL_3_B = 'cc'
61COL_4_A = 'd'
62COL_4_B = 'dd'
64NEW_RULE = r'\hline'
66TAB_NEW_END = r"""\end{longtable}
67\end{small}
68\vspace*{-2em}
69\begin{footnotesize}
70ANNOTATION
71\end{footnotesize}"""
73Slots = list[tuple[int, int]]
74TableRanges = list[dict[str, int | list[int]]]
77@no_type_check
78def discover_configuration(conf: str) -> tuple[int, dict[str, object], str]:
79 """Try to retrieve the configuration following the "(explicit, local, parents, home)
80 first wun wins" strategy."""
81 configuration = None
82 if conf: 82 ↛ 91line 82 didn't jump to line 91, because the condition on line 82 was never false
83 cp = pathlib.Path(conf)
84 if not cp.is_file() or not cp.stat().st_size: 84 ↛ 85line 84 didn't jump to line 85, because the condition on line 84 was never true
85 log.error('Given configuration path is no file or empty')
86 return 1, {}, ''
87 log.debug(f'Reading configuration file {cp} as requested...')
88 with cp.open(encoding=ENCODING) as handle:
89 configuration = json.load(handle)
90 else:
91 cn = DEFAULT_CONFIG_NAME
92 cwd = pathlib.Path.cwd().resolve()
93 for pp in (cwd, *cwd.parents):
94 cp = pp / cn
95 if cp.is_file() and cp.stat().st_size:
96 log.debug(f'Reading from discovered configuration path {cp}')
97 with cp.open() as handle:
98 configuration = json.load(handle)
99 return 0, configuration, str(cp)
101 cp = pathlib.Path.home() / DEFAULT_CONFIG_NAME
102 if cp.is_file() and cp.stat().st_size:
103 log.debug(f'Reading configuration file {cp} from home directory at {pathlib.Path.home()} ...')
104 with cp.open() as handle:
105 configuration = json.load(handle)
106 return 0, configuration, str(cp)
108 log.debug(f'User home configuration path to {cp} is no file or empty - ignoring configuration data')
110 return 0, configuration, str(cp)
113@no_type_check
114def cue_tables(lines: list[str]) -> TableRanges:
115 """Tag all tables extracting the relevant line information for elements."""
116 table_section, head, annotation = False, False, False
117 table_ranges = []
118 guess_slot = 0
119 table_range = {}
120 for n, text in enumerate(lines):
121 if not table_section:
122 if not text.startswith(TAB_START_TOK):
123 continue
124 table_range['start'] = n
125 table_section = True
126 head = True
127 table_range['end_data_row'] = []
128 continue
130 if text.startswith(TOP_RULE):
131 table_range['top_rule'] = n
132 continue
134 if text.startswith(MID_RULE):
135 table_range['mid_rule'] = n
136 continue
138 if text.startswith(END_HEAD):
139 table_range['end_head'] = n
140 head = False
141 continue
143 if not head and text.strip().endswith(END_DATA_ROW):
144 table_range['end_data_row'].append(n)
145 continue
147 if text.startswith(BOT_RULE):
148 table_range['bottom_rule'] = n
149 continue
151 if text.startswith(TAB_END_TOK):
152 table_range['end'] = n
153 annotation = True
154 guess_slot = n + 2
155 continue
157 if annotation and n == guess_slot:
158 table_range['amend'] = n
159 table_ranges.append(table_range)
160 table_range = {}
161 annotation, table_section = False, False
163 return table_ranges
166@no_type_check
167def extract_slots(table_ranges: TableRanges) -> Slots:
168 """Extract the on and off slots for output processing."""
169 on_off_slots = []
170 for table in table_ranges:
171 from_here = table['start']
172 thru_there = table['amend']
173 on_off = (from_here, thru_there + 1)
174 on_off_slots.append(on_off)
176 return on_off_slots
179@no_type_check
180def weave_table(lines: list[str], on_off_slots: Slots, table_ranges: TableRanges, tab_hacked_head: str) -> list[str]:
181 """Generate the output."""
182 out = []
183 next_slot = 0
184 for n, line in enumerate(lines):
185 if next_slot < len(on_off_slots): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true
186 trigger_on, trigger_off = on_off_slots[next_slot]
187 tb = table_ranges[next_slot]
188 else:
189 trigger_on = None
190 if trigger_on is None: 190 ↛ 194line 190 didn't jump to line 194, because the condition on line 190 was never false
191 out.append(line)
192 continue
194 if n < trigger_on:
195 out.append(line)
196 continue
197 if n == trigger_on:
198 out.append(TAB_NEW_START)
199 out.append(tab_hacked_head)
200 continue
201 if n <= tb['end_head']:
202 continue
203 if n < tb['bottom_rule']:
204 out.append(line)
205 if n in tb['end_data_row']:
206 out.append(NEW_RULE)
207 continue
208 if tb['bottom_rule'] <= n < tb['amend']:
209 continue
210 if n == tb['amend']:
211 out.append(TAB_NEW_END.replace('ANNOTATION', line))
212 next_slot += 1
214 return out
217@no_type_check
218def main(options: argparse.Namespace) -> int:
219 """Process the text."""
220 start_time = dti.datetime.utcnow()
221 verbose = options.verbose
222 if verbose:
223 logging.getLogger().setLevel(logging.DEBUG)
224 in_file, out_file = options.in_file, options.out_file
226 code, cfg, cp = discover_configuration(options.cfg_file)
227 if code:
228 return code
229 log.info(f'Read configiration from {cp}')
230 log.debug(f'{cfg=}')
231 cols = cfg['columns']
232 tab_hacked_head = TAB_HACKED_HEAD.replace('$COL1$', cols['col_1'][0])
233 tab_hacked_head = tab_hacked_head.replace('$COL2$', cols['col_2'][0])
234 tab_hacked_head = tab_hacked_head.replace('$COL3_A$', cols['col_3'][0])
235 tab_hacked_head = tab_hacked_head.replace('$COL3_B$', cols['col_3'][1])
236 tab_hacked_head = tab_hacked_head.replace('$COL4_A$', cols['col_4'][0])
237 tab_hacked_head = tab_hacked_head.replace('$COL4_B$', cols['col_4'][1])
239 with open(in_file, 'rt', encoding=ENCODING) as handle:
240 lines = [''] + [line.rstrip() for line in handle.readlines()]
241 log.debug(f'Read {len(lines)} lines from {in_file}')
243 table_ranges = cue_tables(lines)
244 on_off_slots = extract_slots(table_ranges)
246 out = weave_table(lines, on_off_slots, table_ranges, tab_hacked_head)
247 with open(out_file, 'wt', encoding=ENCODING) as handle:
248 handle.write('\n'.join(out) + '\n')
249 log.debug(f'Wrote {len(lines)} lines to {out_file}')
251 duration_seconds = (dti.datetime.utcnow() - start_time).total_seconds()
253 log.info(f'transformed tables in {in_file} into {out_file}' f' in {duration_seconds} secs')
254 return 0