Coverage for afasi/afasi.py: 95.02%
128 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-21 13:35:37 +00:00
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-21 13:35:37 +00:00
1# -*- coding: utf-8 -*-
2# pylint: disable=expression-not-assigned,line-too-long
3"""Fuzz a language by mixing up only few words. API."""
4import difflib
5import json
6import pathlib
7import sys
8import typing
9from json.decoder import JSONDecodeError
10from typing import Iterator, Union
12import yaml
14import afasi.tabel as tb
15from afasi import ENCODING, log
17STDIN, STDOUT = 'STDIN', 'STDOUT'
18DISPATCH = {
19 STDIN: sys.stdin,
20 STDOUT: sys.stdout,
21}
24def filter_table(pairs: list[tuple[str, str]]) -> tuple[tuple[str, str], ...]:
25 """Filter same -> same and redundant repl -> ace from redundant table of pairs."""
26 table = []
27 for repl, ace in pairs:
28 s_repl, s_ace = str(repl), str(ace)
29 if s_repl != s_ace:
30 pair = (s_repl, s_ace)
31 if pair not in table:
32 table.append(pair)
34 return tuple(table)
37def load_translation_table(path: pathlib.Path) -> tuple[tuple[str, str], ...]:
38 """Load the translation table into a tuple of unique non-idempotent pairs."""
39 if not path:
40 raise ValueError('translation table path not given')
42 if not path.is_file():
43 raise ValueError('translation table path must lead to a file')
45 suffix = path.suffix.lower()
46 if suffix not in ('.json', '.yaml', '.yml'):
47 raise ValueError('translation table path must have a .json, yaml, or .yml suffix')
48 elif suffix == '.json': 48 ↛ 55line 48 didn't jump to line 55 because the condition on line 48 was always true
49 with open(path, 'r', encoding=ENCODING) as handle:
50 try:
51 table = json.load(handle)
52 except JSONDecodeError:
53 raise ValueError('translation table path must lead to a JSON file')
54 else:
55 with open(path, 'r', encoding=ENCODING) as handle:
56 try:
57 table = yaml.safe_load(handle)
58 except yaml.YAMLError:
59 raise ValueError('translation table path must lead to a YAML file')
61 if not table:
62 raise ValueError('translation table is empty')
64 if any(len(entry) != 2 for entry in table):
65 raise ValueError('translation table is not array of two element arrays')
67 return filter_table([(repl, ace) for repl, ace in table])
70def report_request(trans: tuple[tuple[str, str], ...]) -> list[str]:
71 """Generate report of request per list of lines."""
72 report = ['* translations (in order):']
73 repl_col_width = max(len(repl) for repl, _ in trans) + 1
74 for rank, (repl, ace) in enumerate(trans, start=1):
75 lim_repl = "'" if "'" not in repl else ''
76 lim_ace = "'" if "'" not in ace else ''
77 repl_cell = f'{lim_repl}{repl}{lim_repl}'.ljust(repl_col_width)
78 report.append(f' {rank:>2d}. {repl_cell} -> {lim_ace}{ace}{lim_ace}')
80 return report + ['']
83def replace(trans: tuple[tuple[str, str], ...], text: str) -> str:
84 """Naive replacer."""
85 for repl, ace in trans:
86 text = text.replace(repl, ace)
87 return text
90def reader(path: str) -> Iterator[str]:
91 """Context wrapper / generator to read the lines."""
92 with open(pathlib.Path(path), 'rt', encoding=ENCODING) as handle:
93 for line in handle:
94 yield line
97def verify_request(argv: Union[list[str], None]) -> tuple[int, str, list[str]]:
98 """Gail with grace."""
99 if not argv or len(argv) != 5:
100 return 2, 'received wrong number of arguments', ['']
102 command, inp, out, translation_table_path, dryrun = argv
104 if command not in ('translate',):
105 return 2, 'received unknown command', ['']
107 if inp:
108 if not pathlib.Path(str(inp)).is_file():
109 return 1, 'source is no file', ['']
111 if out:
112 if pathlib.Path(str(out)).is_file():
113 return 1, 'target file exists', ['']
115 return 0, '', argv
118@typing.no_type_check
119def speculative_table_loader(path: pathlib.Path):
120 """Try loading table data as pod or as object."""
121 try:
122 return True, load_translation_table(path)
123 except ValueError:
124 pass
126 try:
127 return False, tb.load_table(path)
128 except (IsADirectoryError, ValueError):
129 pass
131 log.warning('neither plain old parallel array nor object table data given')
132 return True, (tuple(),)
135def main(argv: Union[list[str], None] = None) -> int:
136 """Drive the translation."""
137 error, message, strings = verify_request(argv)
138 if error:
139 log.error(message)
140 return error
142 command, inp, out, translation_table_path, dryrun = strings
144 is_pod, meta = speculative_table_loader(pathlib.Path(translation_table_path))
145 if is_pod and not meta[0]:
146 return 1
148 source = sys.stdin if not inp else reader(inp)
149 if dryrun:
150 log.info('dryrun requested\n# ---')
151 log.info('* resources used:')
152 inp_disp = 'STDIN' if not inp else f'"{inp}"'
153 out_disp = 'STDOUT' if not out else f'"{out}"'
154 log.info(f' - input from: {inp_disp}')
155 log.info(f' - output to: {out_disp}')
156 log.info(f' - translation from: "{translation_table_path}"')
157 if is_pod:
158 log.info('\n'.join(report_request(meta)))
159 else:
160 log.info(f'* {meta}')
161 src, tgt = [], []
162 for line in source:
163 src.append(line)
164 if is_pod:
165 tgt.append(replace(meta, line))
166 else:
167 tgt.append(meta.translate(line))
168 log.info('* diff of source to target:')
169 log.info(''.join(line for line in difflib.unified_diff(src, tgt, fromfile='SOURCE', tofile='TARGET')).strip())
170 log.info('# ---')
171 else:
172 if out:
173 with open(pathlib.Path(out), 'wt', encoding=ENCODING) as target:
174 for line in source:
175 if is_pod:
176 target.write(replace(meta, line))
177 else:
178 target.write(meta.translate(line))
179 else:
180 for line in source:
181 if is_pod:
182 sys.stdout.write(replace(meta, line))
183 else:
184 sys.stdout.write(meta.translate(line))
186 return 0