Coverage for tekstialue/tekstialue.py: 57.38%

168 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-05 19:20:46 +00:00

1"""Purge monotonically named files in folders keeping range endpoints. 

2 

3Implementation uses sha256 hashes for identity and assumes that 

4the natural order relates to the notion of fresher or better. 

5""" 

6 

7import argparse 

8import datetime as dti 

9import json 

10import logging 

11import pathlib 

12from typing import no_type_check 

13 

14from tekstialue import DEFAULT_CONFIG_NAME, ENCODING, log 

15 

16TAB_START_TOK = r'\begin{longtable}[]{@{}' 

17TOP_RULE = r'\toprule()' 

18MID_RULE = r'\midrule()' 

19END_HEAD = r'\endhead' 

20END_DATA_ROW = r'\\' 

21BOT_RULE = r'\bottomrule()' 

22TAB_END_TOK = r'\end{longtable}' 

23 

24TAB_NEW_START = r"""\begin{small} 

25\begin{longtable}[]{| 

26>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.1500}}| 

27>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.5500}}| 

28>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.1500}}| 

29>{\raggedright\arraybackslash}p{(\columnwidth - 12\tabcolsep) * \real{0.2000}}|} 

30\hline""" 

31 

32TAB_HACKED_HEAD = r"""\begin{minipage}[b]{\linewidth}\raggedright 

33\ \mbox{\textbf{$COL1$}} 

34\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright 

35\mbox{\textbf{$COL2$}} 

36\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright 

37\mbox{\textbf{$COL3_A$}} \mbox{\textbf{$COL3_B$}} 

38\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright 

39\mbox{\textbf{$COL4_A$}} \mbox{\textbf{$COL4_B$}} 

40\end{minipage} \\ 

41\hline 

42\endfirsthead 

43\multicolumn{4}{@{}l}{\small \ldots continued}\\\hline 

44\hline 

45\begin{minipage}[b]{\linewidth}\raggedright 

46\ \mbox{\textbf{$COL1$}} 

47\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright 

48\mbox{\textbf{$COL2$}} 

49\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright 

50\mbox{\textbf{$COL3_A$}} \mbox{\textbf{$COL3_B$}} 

51\end{minipage} & \begin{minipage}[b]{\linewidth}\raggedright 

52\mbox{\textbf{$COL4_A$}} \mbox{\textbf{$COL4_B$}} 

53\end{minipage} \\ 

54\endhead 

55\hline""" 

56 

57COL_1 = 'a' 

58COL_2 = 'b' 

59COL_3_A = 'c' 

60COL_3_B = 'cc' 

61COL_4_A = 'd' 

62COL_4_B = 'dd' 

63 

64NEW_RULE = r'\hline' 

65 

66TAB_NEW_END = r"""\end{longtable} 

67\end{small} 

68\vspace*{-2em} 

69\begin{footnotesize} 

70ANNOTATION 

71\end{footnotesize}""" 

72 

73Slots = list[tuple[int, int]] 

74TableRanges = list[dict[str, int | list[int]]] 

75 

76 

77@no_type_check 

78def discover_configuration(conf: str) -> tuple[int, dict[str, object], str]: 

79 """Try to retrieve the configuration following the "(explicit, local, parents, home) 

80 first wun wins" strategy.""" 

81 configuration = None 

82 if conf: 82 ↛ 91line 82 didn't jump to line 91, because the condition on line 82 was never false

83 cp = pathlib.Path(conf) 

84 if not cp.is_file() or not cp.stat().st_size: 84 ↛ 85line 84 didn't jump to line 85, because the condition on line 84 was never true

85 log.error('Given configuration path is no file or empty') 

86 return 1, {}, '' 

87 log.debug(f'Reading configuration file {cp} as requested...') 

88 with cp.open(encoding=ENCODING) as handle: 

89 configuration = json.load(handle) 

90 else: 

91 cn = DEFAULT_CONFIG_NAME 

92 cwd = pathlib.Path.cwd().resolve() 

93 for pp in (cwd, *cwd.parents): 

94 cp = pp / cn 

95 if cp.is_file() and cp.stat().st_size: 

96 log.debug(f'Reading from discovered configuration path {cp}') 

97 with cp.open() as handle: 

98 configuration = json.load(handle) 

99 return 0, configuration, str(cp) 

100 

101 cp = pathlib.Path.home() / DEFAULT_CONFIG_NAME 

102 if cp.is_file() and cp.stat().st_size: 

103 log.debug(f'Reading configuration file {cp} from home directory at {pathlib.Path.home()} ...') 

104 with cp.open() as handle: 

105 configuration = json.load(handle) 

106 return 0, configuration, str(cp) 

107 

108 log.debug(f'User home configuration path to {cp} is no file or empty - ignoring configuration data') 

109 

110 return 0, configuration, str(cp) 

111 

112 

113@no_type_check 

114def cue_tables(lines: list[str]) -> TableRanges: 

115 """Tag all tables extracting the relevant line information for elements.""" 

116 table_section, head, annotation = False, False, False 

117 table_ranges = [] 

118 guess_slot = 0 

119 table_range = {} 

120 for n, text in enumerate(lines): 

121 if not table_section: 

122 if not text.startswith(TAB_START_TOK): 

123 continue 

124 table_range['start'] = n 

125 table_section = True 

126 head = True 

127 table_range['end_data_row'] = [] 

128 continue 

129 

130 if text.startswith(TOP_RULE): 

131 table_range['top_rule'] = n 

132 continue 

133 

134 if text.startswith(MID_RULE): 

135 table_range['mid_rule'] = n 

136 continue 

137 

138 if text.startswith(END_HEAD): 

139 table_range['end_head'] = n 

140 head = False 

141 continue 

142 

143 if not head and text.strip().endswith(END_DATA_ROW): 

144 table_range['end_data_row'].append(n) 

145 continue 

146 

147 if text.startswith(BOT_RULE): 

148 table_range['bottom_rule'] = n 

149 continue 

150 

151 if text.startswith(TAB_END_TOK): 

152 table_range['end'] = n 

153 annotation = True 

154 guess_slot = n + 2 

155 continue 

156 

157 if annotation and n == guess_slot: 

158 table_range['amend'] = n 

159 table_ranges.append(table_range) 

160 table_range = {} 

161 annotation, table_section = False, False 

162 

163 return table_ranges 

164 

165 

166@no_type_check 

167def extract_slots(table_ranges: TableRanges) -> Slots: 

168 """Extract the on and off slots for output processing.""" 

169 on_off_slots = [] 

170 for table in table_ranges: 

171 from_here = table['start'] 

172 thru_there = table['amend'] 

173 on_off = (from_here, thru_there + 1) 

174 on_off_slots.append(on_off) 

175 

176 return on_off_slots 

177 

178 

179@no_type_check 

180def weave_table(lines: list[str], on_off_slots: Slots, table_ranges: TableRanges, tab_hacked_head: str) -> list[str]: 

181 """Generate the output.""" 

182 out = [] 

183 next_slot = 0 

184 for n, line in enumerate(lines): 

185 if next_slot < len(on_off_slots): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

186 trigger_on, trigger_off = on_off_slots[next_slot] 

187 tb = table_ranges[next_slot] 

188 else: 

189 trigger_on = None 

190 if trigger_on is None: 190 ↛ 194line 190 didn't jump to line 194, because the condition on line 190 was never false

191 out.append(line) 

192 continue 

193 

194 if n < trigger_on: 

195 out.append(line) 

196 continue 

197 if n == trigger_on: 

198 out.append(TAB_NEW_START) 

199 out.append(tab_hacked_head) 

200 continue 

201 if n <= tb['end_head']: 

202 continue 

203 if n < tb['bottom_rule']: 

204 out.append(line) 

205 if n in tb['end_data_row']: 

206 out.append(NEW_RULE) 

207 continue 

208 if tb['bottom_rule'] <= n < tb['amend']: 

209 continue 

210 if n == tb['amend']: 

211 out.append(TAB_NEW_END.replace('ANNOTATION', line)) 

212 next_slot += 1 

213 

214 return out 

215 

216 

217@no_type_check 

218def main(options: argparse.Namespace) -> int: 

219 """Process the text.""" 

220 start_time = dti.datetime.utcnow() 

221 verbose = options.verbose 

222 if verbose: 

223 logging.getLogger().setLevel(logging.DEBUG) 

224 in_file, out_file = options.in_file, options.out_file 

225 

226 code, cfg, cp = discover_configuration(options.cfg_file) 

227 if code: 

228 return code 

229 log.info(f'Read configiration from {cp}') 

230 log.debug(f'{cfg=}') 

231 cols = cfg['columns'] 

232 tab_hacked_head = TAB_HACKED_HEAD.replace('$COL1$', cols['col_1'][0]) 

233 tab_hacked_head = tab_hacked_head.replace('$COL2$', cols['col_2'][0]) 

234 tab_hacked_head = tab_hacked_head.replace('$COL3_A$', cols['col_3'][0]) 

235 tab_hacked_head = tab_hacked_head.replace('$COL3_B$', cols['col_3'][1]) 

236 tab_hacked_head = tab_hacked_head.replace('$COL4_A$', cols['col_4'][0]) 

237 tab_hacked_head = tab_hacked_head.replace('$COL4_B$', cols['col_4'][1]) 

238 

239 with open(in_file, 'rt', encoding=ENCODING) as handle: 

240 lines = [''] + [line.rstrip() for line in handle.readlines()] 

241 log.debug(f'Read {len(lines)} lines from {in_file}') 

242 

243 table_ranges = cue_tables(lines) 

244 on_off_slots = extract_slots(table_ranges) 

245 

246 out = weave_table(lines, on_off_slots, table_ranges, tab_hacked_head) 

247 with open(out_file, 'wt', encoding=ENCODING) as handle: 

248 handle.write('\n'.join(out) + '\n') 

249 log.debug(f'Wrote {len(lines)} lines to {out_file}') 

250 

251 duration_seconds = (dti.datetime.utcnow() - start_time).total_seconds() 

252 

253 log.info(f'transformed tables in {in_file} into {out_file}' f' in {duration_seconds} secs') 

254 return 0