Coverage for aikasilta/bridge.py: 34.46%
119 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 15:10:26 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 15:10:26 +00:00
1# -*- coding: utf-8 -*-
2# pylint: disable=c-extension-no-member,expression-not-assigned,line-too-long,logging-fstring-interpolation
3"""Build a bridge back from the Nineties."""
4import configparser
5import csv
6import json
7import logging
8import pathlib
9import sys
10from typing import no_type_check
12# from atlassian import Bitbucket, Confluence, Jira
15ENCODING = 'utf-8'
17APP = 'aikasilta'
19LOG = logging.getLogger() # Temporary refactoring: module level logger
20LOG_FOLDER = pathlib.Path('logs')
21LOG_FILE = f'{APP}.log'
22LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE)
23LOG_LEVEL = logging.INFO
25FAILURE_PATH_REASON = 'Failed validation for path %s with error: %s'
28@no_type_check
29def init_logger(name=None, level=None):
30 """Initialize module level logger"""
31 global LOG # pylint: disable=global-statement
33 log_format = {
34 'format': '%(asctime)s.%(msecs)03d %(levelname)s [%(name)s]: %(message)s',
35 'datefmt': '%Y-%m-%dT%H:%M:%S',
36 # 'filename': LOG_PATH,
37 'level': LOG_LEVEL if level is None else level,
38 }
39 logging.basicConfig(**log_format)
40 LOG = logging.getLogger(APP if name is None else name)
41 LOG.propagate = True
44@no_type_check
45def walk_tree_explicit(base_path):
46 """Visit the files in the folders below base path."""
47 if base_path.is_file():
48 yield base_path
49 else:
50 for entry in base_path.iterdir():
51 if entry.is_dir():
52 for file_path in entry.iterdir():
53 yield file_path
54 else:
55 yield entry
58@no_type_check
59def visit(tree_or_file_path):
60 """Visit tree and yield the leaves."""
61 thing = pathlib.Path(tree_or_file_path)
62 if thing.is_file():
63 yield thing
64 else:
65 for path in thing.rglob('*'):
66 yield path
69@no_type_check
70def slugify(error):
71 """Replace newlines by space."""
72 return str(error).replace('\n', '')
75@no_type_check
76def parse_csv(path):
77 """Opinionated csv as config parser returning the COHDA protocol."""
78 if not path.stat().st_size:
79 return False, 'ERROR: Empty CSV file'
81 with open(path, newline='') as handle:
82 try:
83 try:
84 dialect = csv.Sniffer().sniff(handle.read(1024), ',\t; ')
85 handle.seek(0)
86 except csv.Error as err:
87 if 'could not determine delimiter' in str(err).lower():
88 dialect = csv.Dialect()
89 dialect.delimiter = ','
90 dialect.quoting = csv.QUOTE_NONE
91 dialect.strict = True
92 else:
93 return False, slugify(err)
94 try:
95 reader = csv.reader(handle, dialect)
96 for _ in reader:
97 pass
98 return True, ''
99 except csv.Error as err:
100 return False, slugify(err)
101 except (Exception, csv.Error) as err:
102 return False, slugify(err)
105@no_type_check
106def parse_ini(path):
107 """Simple ini as config parser returning the COHDA protocol."""
108 config = configparser.ConfigParser()
109 try:
110 config.read(path)
111 return True, ''
112 except (
113 configparser.NoSectionError,
114 configparser.DuplicateSectionError,
115 configparser.DuplicateOptionError,
116 configparser.NoOptionError,
117 configparser.InterpolationDepthError,
118 configparser.InterpolationMissingOptionError,
119 configparser.InterpolationSyntaxError,
120 configparser.InterpolationError,
121 configparser.MissingSectionHeaderError,
122 configparser.ParsingError,
123 ) as err:
124 return False, slugify(err)
127@no_type_check
128def parse_json(path):
129 """Simple json as config parser returning the COHDA protocol."""
130 return parse_generic(path, json.load)
133@no_type_check
134def load_xml(source):
135 """Proxy until implemented."""
136 return NotImplemented
139@no_type_check
140def parse_xml(path):
141 """Simple xml as config parser returning the COHDA protocol."""
142 if not path.stat().st_size:
143 return False, 'ERROR: Empty XML file'
145 xml_tree, message = load_xml(path)
146 if xml_tree:
147 return True, ''
149 return False, slugify(message)
152@no_type_check
153def parse_generic(path, loader, loader_options=None):
154 """Simple generic parser proxy."""
155 if loader_options is None:
156 loader_options = {}
157 with open(path, 'rt', encoding='utf-8') as handle:
158 try:
159 _ = loader(handle, **loader_options)
160 return True, ''
161 except Exception as err:
162 return False, slugify(err)
165@no_type_check
166def process(path, handler, success, failure):
167 """Generic processing of path yields a,ended COHDA protocol."""
168 valid, message = handler(path)
169 if valid:
170 return True, message, success + 1, failure
172 return False, message, success, failure + 1
175def main(argv: dict[str, None] | None = None, abort: bool | None = None, debug: bool | None = None) -> tuple[int, str]:
176 """Drive across the bridge.
177 This function acts as the command line interface backend.
178 There is some duplication to support testability.
179 """
180 init_logger(level=logging.DEBUG if debug else None)
181 forest = argv if argv else sys.argv[1:]
182 if not forest:
183 print('Usage: time-bridge paths-to-files')
184 return 0, 'USAGE'
185 num_trees = len(forest)
186 LOG.debug('Guarded dispatch forest=%s, num_trees=%d', forest, num_trees)
188 LOG.info(
189 'Starting validation visiting a forest with %d tree%s',
190 num_trees,
191 '' if num_trees == 1 else 's',
192 )
193 for tree in forest:
194 for path in visit(tree):
195 LOG.debug(' - path=%s', path)
196 failure = False
197 print(f"{'OK' if not failure else 'FAIL'}")
199 return 0, ''