Coverage for aikasilta/bridge.py: 34.46%

119 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 15:10:26 +00:00

1# -*- coding: utf-8 -*- 

2# pylint: disable=c-extension-no-member,expression-not-assigned,line-too-long,logging-fstring-interpolation 

3"""Build a bridge back from the Nineties.""" 

4import configparser 

5import csv 

6import json 

7import logging 

8import pathlib 

9import sys 

10from typing import no_type_check 

11 

12# from atlassian import Bitbucket, Confluence, Jira 

13 

14 

15ENCODING = 'utf-8' 

16 

17APP = 'aikasilta' 

18 

19LOG = logging.getLogger() # Temporary refactoring: module level logger 

20LOG_FOLDER = pathlib.Path('logs') 

21LOG_FILE = f'{APP}.log' 

22LOG_PATH = pathlib.Path(LOG_FOLDER, LOG_FILE) if LOG_FOLDER.is_dir() else pathlib.Path(LOG_FILE) 

23LOG_LEVEL = logging.INFO 

24 

25FAILURE_PATH_REASON = 'Failed validation for path %s with error: %s' 

26 

27 

28@no_type_check 

29def init_logger(name=None, level=None): 

30 """Initialize module level logger""" 

31 global LOG # pylint: disable=global-statement 

32 

33 log_format = { 

34 'format': '%(asctime)s.%(msecs)03d %(levelname)s [%(name)s]: %(message)s', 

35 'datefmt': '%Y-%m-%dT%H:%M:%S', 

36 # 'filename': LOG_PATH, 

37 'level': LOG_LEVEL if level is None else level, 

38 } 

39 logging.basicConfig(**log_format) 

40 LOG = logging.getLogger(APP if name is None else name) 

41 LOG.propagate = True 

42 

43 

44@no_type_check 

45def walk_tree_explicit(base_path): 

46 """Visit the files in the folders below base path.""" 

47 if base_path.is_file(): 

48 yield base_path 

49 else: 

50 for entry in base_path.iterdir(): 

51 if entry.is_dir(): 

52 for file_path in entry.iterdir(): 

53 yield file_path 

54 else: 

55 yield entry 

56 

57 

58@no_type_check 

59def visit(tree_or_file_path): 

60 """Visit tree and yield the leaves.""" 

61 thing = pathlib.Path(tree_or_file_path) 

62 if thing.is_file(): 

63 yield thing 

64 else: 

65 for path in thing.rglob('*'): 

66 yield path 

67 

68 

69@no_type_check 

70def slugify(error): 

71 """Replace newlines by space.""" 

72 return str(error).replace('\n', '') 

73 

74 

75@no_type_check 

76def parse_csv(path): 

77 """Opinionated csv as config parser returning the COHDA protocol.""" 

78 if not path.stat().st_size: 

79 return False, 'ERROR: Empty CSV file' 

80 

81 with open(path, newline='') as handle: 

82 try: 

83 try: 

84 dialect = csv.Sniffer().sniff(handle.read(1024), ',\t; ') 

85 handle.seek(0) 

86 except csv.Error as err: 

87 if 'could not determine delimiter' in str(err).lower(): 

88 dialect = csv.Dialect() 

89 dialect.delimiter = ',' 

90 dialect.quoting = csv.QUOTE_NONE 

91 dialect.strict = True 

92 else: 

93 return False, slugify(err) 

94 try: 

95 reader = csv.reader(handle, dialect) 

96 for _ in reader: 

97 pass 

98 return True, '' 

99 except csv.Error as err: 

100 return False, slugify(err) 

101 except (Exception, csv.Error) as err: 

102 return False, slugify(err) 

103 

104 

105@no_type_check 

106def parse_ini(path): 

107 """Simple ini as config parser returning the COHDA protocol.""" 

108 config = configparser.ConfigParser() 

109 try: 

110 config.read(path) 

111 return True, '' 

112 except ( 

113 configparser.NoSectionError, 

114 configparser.DuplicateSectionError, 

115 configparser.DuplicateOptionError, 

116 configparser.NoOptionError, 

117 configparser.InterpolationDepthError, 

118 configparser.InterpolationMissingOptionError, 

119 configparser.InterpolationSyntaxError, 

120 configparser.InterpolationError, 

121 configparser.MissingSectionHeaderError, 

122 configparser.ParsingError, 

123 ) as err: 

124 return False, slugify(err) 

125 

126 

127@no_type_check 

128def parse_json(path): 

129 """Simple json as config parser returning the COHDA protocol.""" 

130 return parse_generic(path, json.load) 

131 

132 

133@no_type_check 

134def load_xml(source): 

135 """Proxy until implemented.""" 

136 return NotImplemented 

137 

138 

139@no_type_check 

140def parse_xml(path): 

141 """Simple xml as config parser returning the COHDA protocol.""" 

142 if not path.stat().st_size: 

143 return False, 'ERROR: Empty XML file' 

144 

145 xml_tree, message = load_xml(path) 

146 if xml_tree: 

147 return True, '' 

148 

149 return False, slugify(message) 

150 

151 

152@no_type_check 

153def parse_generic(path, loader, loader_options=None): 

154 """Simple generic parser proxy.""" 

155 if loader_options is None: 

156 loader_options = {} 

157 with open(path, 'rt', encoding='utf-8') as handle: 

158 try: 

159 _ = loader(handle, **loader_options) 

160 return True, '' 

161 except Exception as err: 

162 return False, slugify(err) 

163 

164 

165@no_type_check 

166def process(path, handler, success, failure): 

167 """Generic processing of path yields a,ended COHDA protocol.""" 

168 valid, message = handler(path) 

169 if valid: 

170 return True, message, success + 1, failure 

171 

172 return False, message, success, failure + 1 

173 

174 

175def main(argv: dict[str, None] | None = None, abort: bool | None = None, debug: bool | None = None) -> tuple[int, str]: 

176 """Drive across the bridge. 

177 This function acts as the command line interface backend. 

178 There is some duplication to support testability. 

179 """ 

180 init_logger(level=logging.DEBUG if debug else None) 

181 forest = argv if argv else sys.argv[1:] 

182 if not forest: 

183 print('Usage: time-bridge paths-to-files') 

184 return 0, 'USAGE' 

185 num_trees = len(forest) 

186 LOG.debug('Guarded dispatch forest=%s, num_trees=%d', forest, num_trees) 

187 

188 LOG.info( 

189 'Starting validation visiting a forest with %d tree%s', 

190 num_trees, 

191 '' if num_trees == 1 else 's', 

192 ) 

193 for tree in forest: 

194 for path in visit(tree): 

195 LOG.debug(' - path=%s', path) 

196 failure = False 

197 print(f"{'OK' if not failure else 'FAIL'}") 

198 

199 return 0, ''