Coverage for gelee/gelee.py: 83.33%

166 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 18:20:23 +00:00

1# -*- coding: utf-8 -*- 

2# pylint: disable=c-extension-no-member,expression-not-assigned,line-too-long,logging-fstring-interpolation 

3"""Do the lint.""" 

4import configparser 

5import csv 

6import json 

7import logging 

8import pathlib 

9import sys 

10from typing import Any, Tuple, Union, no_type_check 

11 

12import toml 

13from lxml import etree # type: ignore 

14from yaml import load as load_yaml 

15 

16try: 

17 from yaml import CLoader as LoaderYaml 

18except ImportError: 

19 from yaml import Loader as LoaderYaml # type: ignore 

20 

21ENCODING = 'utf-8' 

22 

23APP = 'gelee' 

24 

25LOG = logging.getLogger() # Temporary refactoring: module level logger 

26LOG_FOLDER = pathlib.Path('logs') 

27LOG_FILE = f'{APP}.log' 

28LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE) 

29LOG_LEVEL = logging.INFO 

30 

31FAILURE_PATH_REASON = 'Failed validation for path %s with error: %s' 

32 

33 

34@no_type_check 

35def init_logger(name=None, level=None): 

36 """Initialize module level logger""" 

37 global LOG # pylint: disable=global-statement 

38 

39 log_format = { 

40 'format': '%(asctime)s.%(msecs)03d %(levelname)s [%(name)s]: %(message)s', 

41 'datefmt': '%Y-%m-%dT%H:%M:%S', 

42 # 'filename': LOG_PATH, 

43 'level': LOG_LEVEL if level is None else level, 

44 } 

45 logging.basicConfig(**log_format) 

46 LOG = logging.getLogger(APP if name is None else name) 

47 LOG.propagate = True 

48 

49 

50def load_xml(document_path: pathlib.Path) -> Tuple[None, str]: 

51 """ 

52 Parse the document at path (to ensure it is well-formed XML) to obtain an ElementTree object. 

53 Return value is an ordered pair of Union(None, ElementTree object) and a message string 

54 """ 

55 try: 

56 doc = etree.parse(str(document_path), etree.XMLParser(encoding=ENCODING)) 

57 except IOError as err: 

58 return None, f'file {document_path} failed with IO error {err}' 

59 except etree.XMLSyntaxError as err: 

60 return ( 

61 None, 

62 f'parsing from {document_path} failed with XMLSyntaxError error {err}', 

63 ) 

64 

65 return doc, f'well-formed xml tree from {document_path}' 

66 

67 

68@no_type_check 

69def walk_tree_explicit(base_path): 

70 """Visit the files in the folders below base path.""" 

71 if base_path.is_file(): 

72 yield base_path 

73 else: 

74 for entry in base_path.iterdir(): 

75 if entry.is_dir(): 

76 for file_path in entry.iterdir(): 

77 yield file_path 

78 else: 

79 yield entry 

80 

81 

82@no_type_check 

83def visit(tree_or_file_path): 

84 """Visit tree and yield the leaves.""" 

85 thing = pathlib.Path(tree_or_file_path) 

86 if thing.is_file(): 

87 yield thing 

88 else: 

89 for path in thing.rglob('*'): 

90 yield path 

91 

92 

93@no_type_check 

94def slugify(error) -> str: 

95 """Replace newlines by space.""" 

96 return str(error).replace('\n', '') 

97 

98 

99def parse_csv(path: pathlib.Path) -> Tuple[bool, str]: 

100 """Opinionated csv as config parser returning the COHDA protocol.""" 

101 if not path.stat().st_size: 

102 return False, 'ERROR: Empty CSV file' 

103 

104 with open(path, newline='') as handle: 

105 try: 

106 try: 

107 dialect = csv.Sniffer().sniff(handle.read(1024), ',\t; ') 

108 handle.seek(0) 

109 except csv.Error as err: 

110 if 'could not determine delimiter' in str(err).lower(): 

111 dialect = csv.Dialect() # type: ignore 

112 dialect.delimiter = ',' 

113 dialect.quoting = csv.QUOTE_NONE 

114 dialect.strict = True 

115 else: 

116 return False, slugify(err) 

117 try: 

118 reader = csv.reader(handle, dialect) 

119 for _ in reader: 

120 pass 

121 return True, '' 

122 except csv.Error as err: 

123 return False, slugify(err) 

124 except (Exception, csv.Error) as err: 

125 return False, slugify(err) 

126 

127 

128def parse_ini(path: pathlib.Path) -> Tuple[bool, str]: 

129 """Simple ini as config parser returning the COHDA protocol.""" 

130 config = configparser.ConfigParser() 

131 try: 

132 config.read(path) 

133 return True, '' 

134 except ( 

135 configparser.NoSectionError, 

136 configparser.DuplicateSectionError, 

137 configparser.DuplicateOptionError, 

138 configparser.NoOptionError, 

139 configparser.InterpolationDepthError, 

140 configparser.InterpolationMissingOptionError, 

141 configparser.InterpolationSyntaxError, 

142 configparser.InterpolationError, 

143 configparser.MissingSectionHeaderError, 

144 configparser.ParsingError, 

145 ) as err: 

146 return False, slugify(err) 

147 

148 

149@no_type_check 

150def parse_json(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]: 

151 """Simple json as config parser returning the COHDA protocol.""" 

152 return parse_generic(path, json.load) 

153 

154 

155@no_type_check 

156def parse_toml(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]: 

157 """Simple toml as config parser returning the COHDA protocol.""" 

158 return parse_generic(path, toml.load) 

159 

160 

161def parse_xml(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]: 

162 """Simple xml as config parser returning the COHDA protocol.""" 

163 if not path.stat().st_size: 

164 return False, 'ERROR: Empty XML file' 

165 

166 xml_tree, message = load_xml(path) 

167 if xml_tree: 167 ↛ 170line 167 didn't jump to line 170, because the condition on line 167 was never false

168 return True, '' 

169 

170 return False, slugify(message) 

171 

172 

173@no_type_check 

174def parse_yaml(path: pathlib.Path) -> Union[Any, Tuple[bool, str]]: 

175 """Simple yaml as config parser returning the COHDA protocol.""" 

176 return parse_generic(path, load_yaml, {'Loader': LoaderYaml}) 

177 

178 

179@no_type_check 

180def parse_generic(path: pathlib.Path, loader, loader_options=None) -> Tuple[bool, str]: 

181 """Simple generic parser proxy.""" 

182 if loader_options is None: 

183 loader_options = {} 

184 with open(path, 'rt', encoding=ENCODING) as handle: 

185 try: 

186 _ = loader(handle, **loader_options) 

187 return True, '' 

188 except Exception as err: 

189 return False, slugify(err) 

190 

191 

192@no_type_check 

193def process(path, handler, success, failure): 

194 """Generic processing of path yields a,ended COHDA protocol.""" 

195 valid, message = handler(path) 

196 if valid: 

197 return True, message, success + 1, failure 

198 

199 return False, message, success, failure + 1 

200 

201 

202@no_type_check 

203def main(argv=None, abort=False, debug=None): 

204 """Drive the validator. 

205 This function acts as the command line interface backend. 

206 There is some duplication to support testability. 

207 """ 

208 init_logger(level=logging.DEBUG if debug else None) 

209 forest = argv if argv else sys.argv[1:] 

210 if not forest: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 print('Usage: gelee paths-to-files') 

212 return 0, 'USAGE' 

213 num_trees = len(forest) 

214 LOG.debug('Guarded dispatch forest=%s, num_trees=%d', forest, num_trees) 

215 

216 LOG.info( 

217 'Starting validation visiting a forest with %d tree%s', 

218 num_trees, 

219 '' if num_trees == 1 else 's', 

220 ) 

221 total, folders, ignored, csvs, inis, jsons, tomls, xmls, yamls = ( 

222 0, 

223 0, 

224 0, 

225 0, 

226 0, 

227 0, 

228 0, 

229 0, 

230 0, 

231 ) 

232 failures = 0 

233 for tree in forest: 

234 for path in visit(tree): 

235 LOG.debug(' - path=%s, total=%d', path, total) 

236 total += 1 

237 if not path.is_file(): 

238 folders += 1 

239 continue 

240 

241 final_suffix = '' if not path.suffixes else path.suffixes[-1].lower() 

242 

243 if final_suffix == '.csv': 

244 valid, message, csvs, failures = process(path, parse_csv, csvs, failures) 

245 elif final_suffix == '.ini': 

246 valid, message, inis, failures = process(path, parse_ini, inis, failures) 

247 elif final_suffix in ('.geojson', '.json'): 

248 valid, message, jsons, failures = process(path, parse_json, jsons, failures) 

249 elif final_suffix == '.toml': 

250 valid, message, tomls, failures = process(path, parse_toml, tomls, failures) 

251 elif final_suffix == '.xml': 

252 valid, message, xmls, failures = process(path, parse_xml, xmls, failures) 

253 elif final_suffix in ('.yaml', '.yml'): 

254 valid, message, yamls, failures = process(path, parse_yaml, yamls, failures) 

255 else: 

256 ignored += 1 

257 continue 

258 

259 if not valid: 

260 LOG.error(FAILURE_PATH_REASON, path, message) 

261 if abort: 

262 return 1, message 

263 

264 success = 'Successfully validated' 

265 pairs = ( 

266 (csvs, 'CSV'), 

267 (inis, 'INI'), 

268 (jsons, 'JSON'), 

269 (tomls, 'TOML'), 

270 (xmls, 'XML'), 

271 (yamls, 'YAML'), 

272 ) 

273 for count, kind in pairs: 

274 if count: 

275 LOG.info( 

276 '- %s %d total %s file%s.', 

277 success, 

278 count, 

279 kind, 

280 '' if count == 1 else 's', 

281 ) 

282 

283 configs = csvs + inis + jsons + tomls + xmls + yamls 

284 LOG.info( # TODO remove f-strings also here 

285 f"Finished validation of {configs} configuration file{'' if configs == 1 else 's'}" 

286 f" with {failures} failure{'' if failures == 1 else 's'}" 

287 f" visiting {total} path{'' if total == 1 else 's'}" 

288 f" (ignored {ignored} non-config file{'' if ignored == 1 else 's'}" 

289 f" in {folders} folder{'' if folders == 1 else 's'})" 

290 ) 

291 print(f"{'OK' if not failures else 'FAIL'}") 

292 

293 return 0, ''