Coverage for gelee/gelee.py: 83.33%
166 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 18:20:23 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 18:20:23 +00:00
1# -*- coding: utf-8 -*-
2# pylint: disable=c-extension-no-member,expression-not-assigned,line-too-long,logging-fstring-interpolation
3"""Do the lint."""
4import configparser
5import csv
6import json
7import logging
8import pathlib
9import sys
10from typing import Any, Tuple, Union, no_type_check
12import toml
13from lxml import etree # type: ignore
14from yaml import load as load_yaml
16try:
17 from yaml import CLoader as LoaderYaml
18except ImportError:
19 from yaml import Loader as LoaderYaml # type: ignore
21ENCODING = 'utf-8'
23APP = 'gelee'
25LOG = logging.getLogger() # Temporary refactoring: module level logger
26LOG_FOLDER = pathlib.Path('logs')
27LOG_FILE = f'{APP}.log'
28LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE)
29LOG_LEVEL = logging.INFO
31FAILURE_PATH_REASON = 'Failed validation for path %s with error: %s'
34@no_type_check
35def init_logger(name=None, level=None):
36 """Initialize module level logger"""
37 global LOG # pylint: disable=global-statement
39 log_format = {
40 'format': '%(asctime)s.%(msecs)03d %(levelname)s [%(name)s]: %(message)s',
41 'datefmt': '%Y-%m-%dT%H:%M:%S',
42 # 'filename': LOG_PATH,
43 'level': LOG_LEVEL if level is None else level,
44 }
45 logging.basicConfig(**log_format)
46 LOG = logging.getLogger(APP if name is None else name)
47 LOG.propagate = True
50def load_xml(document_path: pathlib.Path) -> Tuple[None, str]:
51 """
52 Parse the document at path (to ensure it is well-formed XML) to obtain an ElementTree object.
53 Return value is an ordered pair of Union(None, ElementTree object) and a message string
54 """
55 try:
56 doc = etree.parse(str(document_path), etree.XMLParser(encoding=ENCODING))
57 except IOError as err:
58 return None, f'file {document_path} failed with IO error {err}'
59 except etree.XMLSyntaxError as err:
60 return (
61 None,
62 f'parsing from {document_path} failed with XMLSyntaxError error {err}',
63 )
65 return doc, f'well-formed xml tree from {document_path}'
68@no_type_check
69def walk_tree_explicit(base_path):
70 """Visit the files in the folders below base path."""
71 if base_path.is_file():
72 yield base_path
73 else:
74 for entry in base_path.iterdir():
75 if entry.is_dir():
76 for file_path in entry.iterdir():
77 yield file_path
78 else:
79 yield entry
82@no_type_check
83def visit(tree_or_file_path):
84 """Visit tree and yield the leaves."""
85 thing = pathlib.Path(tree_or_file_path)
86 if thing.is_file():
87 yield thing
88 else:
89 for path in thing.rglob('*'):
90 yield path
93@no_type_check
94def slugify(error) -> str:
95 """Replace newlines by space."""
96 return str(error).replace('\n', '')
99def parse_csv(path: pathlib.Path) -> Tuple[bool, str]:
100 """Opinionated csv as config parser returning the COHDA protocol."""
101 if not path.stat().st_size:
102 return False, 'ERROR: Empty CSV file'
104 with open(path, newline='') as handle:
105 try:
106 try:
107 dialect = csv.Sniffer().sniff(handle.read(1024), ',\t; ')
108 handle.seek(0)
109 except csv.Error as err:
110 if 'could not determine delimiter' in str(err).lower():
111 dialect = csv.Dialect() # type: ignore
112 dialect.delimiter = ','
113 dialect.quoting = csv.QUOTE_NONE
114 dialect.strict = True
115 else:
116 return False, slugify(err)
117 try:
118 reader = csv.reader(handle, dialect)
119 for _ in reader:
120 pass
121 return True, ''
122 except csv.Error as err:
123 return False, slugify(err)
124 except (Exception, csv.Error) as err:
125 return False, slugify(err)
128def parse_ini(path: pathlib.Path) -> Tuple[bool, str]:
129 """Simple ini as config parser returning the COHDA protocol."""
130 config = configparser.ConfigParser()
131 try:
132 config.read(path)
133 return True, ''
134 except (
135 configparser.NoSectionError,
136 configparser.DuplicateSectionError,
137 configparser.DuplicateOptionError,
138 configparser.NoOptionError,
139 configparser.InterpolationDepthError,
140 configparser.InterpolationMissingOptionError,
141 configparser.InterpolationSyntaxError,
142 configparser.InterpolationError,
143 configparser.MissingSectionHeaderError,
144 configparser.ParsingError,
145 ) as err:
146 return False, slugify(err)
149@no_type_check
150def parse_json(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]:
151 """Simple json as config parser returning the COHDA protocol."""
152 return parse_generic(path, json.load)
155@no_type_check
156def parse_toml(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]:
157 """Simple toml as config parser returning the COHDA protocol."""
158 return parse_generic(path, toml.load)
161def parse_xml(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]:
162 """Simple xml as config parser returning the COHDA protocol."""
163 if not path.stat().st_size:
164 return False, 'ERROR: Empty XML file'
166 xml_tree, message = load_xml(path)
167 if xml_tree: 167 ↛ 170line 167 didn't jump to line 170, because the condition on line 167 was never false
168 return True, ''
170 return False, slugify(message)
173@no_type_check
174def parse_yaml(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]:
175 """Simple yaml as config parser returning the COHDA protocol."""
176 return parse_generic(path, load_yaml, {'Loader': LoaderYaml})
179@no_type_check
180def parse_generic(path: pathlib.Path, loader, loader_options=None) -> Tuple[bool, str]:
181 """Simple generic parser proxy."""
182 if loader_options is None:
183 loader_options = {}
184 with open(path, 'rt', encoding=ENCODING) as handle:
185 try:
186 _ = loader(handle, **loader_options)
187 return True, ''
188 except Exception as err:
189 return False, slugify(err)
192@no_type_check
193def process(path, handler, success, failure):
194 """Generic processing of path yields a,ended COHDA protocol."""
195 valid, message = handler(path)
196 if valid:
197 return True, message, success + 1, failure
199 return False, message, success, failure + 1
202@no_type_check
203def main(argv=None, abort=False, debug=None):
204 """Drive the validator.
205 This function acts as the command line interface backend.
206 There is some duplication to support testability.
207 """
208 init_logger(level=logging.DEBUG if debug else None)
209 forest = argv if argv else sys.argv[1:]
210 if not forest: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true
211 print('Usage: gelee paths-to-files')
212 return 0, 'USAGE'
213 num_trees = len(forest)
214 LOG.debug('Guarded dispatch forest=%s, num_trees=%d', forest, num_trees)
216 LOG.info(
217 'Starting validation visiting a forest with %d tree%s',
218 num_trees,
219 '' if num_trees == 1 else 's',
220 )
221 total, folders, ignored, csvs, inis, jsons, tomls, xmls, yamls = (
222 0,
223 0,
224 0,
225 0,
226 0,
227 0,
228 0,
229 0,
230 0,
231 )
232 failures = 0
233 for tree in forest:
234 for path in visit(tree):
235 LOG.debug(' - path=%s, total=%d', path, total)
236 total += 1
237 if not path.is_file():
238 folders += 1
239 continue
241 final_suffix = '' if not path.suffixes else path.suffixes[-1].lower()
243 if final_suffix == '.csv':
244 valid, message, csvs, failures = process(path, parse_csv, csvs, failures)
245 elif final_suffix == '.ini':
246 valid, message, inis, failures = process(path, parse_ini, inis, failures)
247 elif final_suffix in ('.geojson', '.json'):
248 valid, message, jsons, failures = process(path, parse_json, jsons, failures)
249 elif final_suffix == '.toml':
250 valid, message, tomls, failures = process(path, parse_toml, tomls, failures)
251 elif final_suffix == '.xml':
252 valid, message, xmls, failures = process(path, parse_xml, xmls, failures)
253 elif final_suffix in ('.yaml', '.yml'):
254 valid, message, yamls, failures = process(path, parse_yaml, yamls, failures)
255 else:
256 ignored += 1
257 continue
259 if not valid:
260 LOG.error(FAILURE_PATH_REASON, path, message)
261 if abort:
262 return 1, message
264 success = 'Successfully validated'
265 pairs = (
266 (csvs, 'CSV'),
267 (inis, 'INI'),
268 (jsons, 'JSON'),
269 (tomls, 'TOML'),
270 (xmls, 'XML'),
271 (yamls, 'YAML'),
272 )
273 for count, kind in pairs:
274 if count:
275 LOG.info(
276 '- %s %d total %s file%s.',
277 success,
278 count,
279 kind,
280 '' if count == 1 else 's',
281 )
283 configs = csvs + inis + jsons + tomls + xmls + yamls
284 LOG.info( # TODO remove f-strings also here
285 f"Finished validation of {configs} configuration file{'' if configs == 1 else 's'}"
286 f" with {failures} failure{'' if failures == 1 else 's'}"
287 f" visiting {total} path{'' if total == 1 else 's'}"
288 f" (ignored {ignored} non-config file{'' if ignored == 1 else 's'}"
289 f" in {folders} folder{'' if folders == 1 else 's'})"
290 )
291 print(f"{'OK' if not failures else 'FAIL'}")
293 return 0, ''