Coverage for roter/api.py: 8.18%

95 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-06-24 19:21:34 +00:00

1"""General API for rotate and combine tables (Danish: Roter og kombiner borde).""" 

2 

3import argparse 

4import datetime as dti 

5import os 

6import pathlib 

7import sys 

8from typing import Generator, Iterable, Union, no_type_check 

9 

10from roter import ( 

11 APP_ALIAS, 

12 COMMA, 

13 ENCODING, 

14 TS_FORMAT, 

15 VERSION_INFO, 

16 log, 

17) 

18 

19PathLike = Union[str, pathlib.Path] 

20 

21ENCODING_ERRORS_POLICY = 'ignore' 

22NL = '\n' 

23 

24 

25def load_markdown_line_stream(resource: PathLike, encoding: str = ENCODING) -> Generator[str, None, None]: 

26 """Load the markdown resource to harvest from.""" 

27 with open(resource, 'rt', encoding=encoding) as handle: 

28 return (line.strip() for line in handle.readlines()) 

29 

30 

31def dump_markdown(lines: Iterable[str], resource: PathLike, encoding: str = ENCODING) -> None: 

32 """Dump the markdown lines into a file.""" 

33 with open(resource, 'wt', encoding=encoding) as handle: 

34 handle.write(NL.join(lines)) 

35 

36 

37def main(options: argparse.Namespace) -> int: 

38 """Visit the folder tree below root and yield the taxonomy.""" 

39 log.info(f'Turning and combining tables') 

40 soft = {} 

41 for path in options.paths: 

42 log.info(f'parsing tables in {path}') 

43 if not path.is_file(): 

44 log.warning(f'ignoring non-existing file ({path})') 

45 

46 for slot, line in enumerate(load_markdown_line_stream(path), start=1): 

47 try: 

48 cells = line.strip('|').strip().split('|') 

49 key_text = cells[options.child_column_pos - 1].strip() 

50 parent_set_text = cells[options.parents_column_pos - 1].strip() 

51 key = key_text.strip() 

52 if options.child.lower() in key.lower() or '---' in key: 

53 continue 

54 parent_set = sorted(set(entry.strip() for entry in parent_set_text.strip().split('<br>'))) 

55 if key in soft: 

56 log.debug(f'key {key} is a duplicate on line {slot} (with parents {parent_set})') 

57 soft[key] = parent_set 

58 except (IndexError, ValueError): 

59 pass 

60 

61 parents = sorted(set(parent for k, v in soft.items() for parent in v)) 

62 

63 upstream = {} 

64 for parent in parents: 

65 upstream[parent] = [] 

66 for downstream, upstreams in soft.items(): 

67 if parent in upstreams: 

68 upstream[parent].append(downstream) 

69 upstream[parent].sort() 

70 

71 with open(options.out_path, 'rt', encoding=ENCODING) as handle: 

72 lines = [line.strip() for line in handle.readlines()] 

73 

74 inject_regions = { 

75 'combined': { 

76 'begin': -1, 

77 'end': -1, 

78 }, 

79 'inverted': { 

80 'begin': -1, 

81 'end': -1, 

82 } 

83 } 

84 for slot, line in enumerate(lines): 

85 if line.startswith(options.markers_combined[0]): 

86 inject_regions['combined']['begin'] = slot 

87 log.debug(f'found line begin offset inject region for combined at line {slot + 1}') 

88 if line.startswith(options.markers_combined[1]): 

89 inject_regions['combined']['end'] = slot 

90 log.debug(f'found line end offset inject region for combined at line {slot + 1}') 

91 if line.startswith(options.markers_inverted[0]): 

92 inject_regions['inverted']['begin'] = slot 

93 log.debug(f'found line begin offset inject region for inverted at line {slot + 1}') 

94 if line.startswith(options.markers_inverted[1]): 

95 inject_regions['inverted']['end'] = slot 

96 log.debug(f'found line end offset inject region for inverted at line {slot + 1}') 

97 

98 if options.concat_only or not options.invert_only: 

99 if inject_regions['combined']['begin'] == -1 or inject_regions['combined']['begin'] >= inject_regions['combined']['end']: 

100 log.error(f'combined region is invalid ({inject_regions["combined"]})') 

101 return 1 

102 

103 if options.invert_only or not options.concat_only: 

104 if inject_regions['inverted']['begin'] == -1 or inject_regions['inverted']['begin'] >= inject_regions['inverted']['end']: 

105 log.error(f'inverted region is invalid ({inject_regions["inverted"]})') 

106 return 1 

107 

108 out_lines_combined = [ 

109 f'| {options.child} | {options.parents} |', 

110 '|:-------|:--------------------|', 

111 ] 

112 for child in sorted(soft): 

113 parents = soft[child] 

114 out_lines_combined.append(f'| {child} | {" <br>".join(parents)} |') 

115 

116 log.info('## Combined') 

117 for line in out_lines_combined: 

118 log.info(line) 

119 

120 if options.concat_only or not options.invert_only: 

121 lines[inject_regions['combined']['begin']] += NL + NL.join(out_lines_combined) 

122 

123 out_lines_inverted = [ 

124 f'| {options.parent} | {options.children} |', 

125 '|:-------|:--------------------|', 

126 ] 

127 for parent, children in upstream.items(): 

128 out_lines_inverted.append(f'| {parent} | {" <br>".join(children)} |') 

129 

130 log.info('## Inverted') 

131 for line in out_lines_inverted: 

132 log.info(line) 

133 

134 if options.invert_only or not options.concat_only: 

135 lines[inject_regions['inverted']['begin']] += NL + NL.join(out_lines_inverted) 

136 

137 for slot in range(inject_regions['inverted']['begin'] + 1, inject_regions['inverted']['end']-3): 

138 del lines[slot] 

139 

140 lines.append('') 

141 

142 if options.concat_only or not options.invert_only: 

143 for slot in range(inject_regions['combined']['begin'] + 1, inject_regions['combined']['end']-2): 

144 del lines[slot] 

145 

146 dump_markdown(lines, options.out_path) 

147 

148 

149 log.info('Done.') 

150 

151 return 0