Coverage for attribuutit/vpf.py: 87.59%

105 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 15:52:25 +00:00

1"""Parsers for Vector Product Format (VPF) artifacts conforming to MIL-STD-2407.""" 

2 

3from itertools import takewhile 

4from typing import Any, Dict, Generator, Tuple, Union, no_type_check 

5 

6EMPTY_FIELD_PLACEHOLDER = '-' 

7VAR_LENGTH_INDICATOR = '*' 

8VAR_LENGTH = 9876543210 

9 

10 

11class ByteOrder: 

12 """Type to mix in VPF specific byte order detection.""" 

13 

14 byte_orders = ('big', 'little') 

15 big_endian = 'M' 

16 bo_indicators = { 

17 'L': 'little', 

18 big_endian: 'big', 

19 } 

20 bo_keys = tuple(bo_indicators.keys()) 

21 bo_indicator_index = 4 

22 head_off_expl = bo_indicator_index + 1 # Index of first header byte with byte order info present 

23 

24 @classmethod 

25 @no_type_check 

26 def detect(cls, seq) -> Tuple[str, bool, int]: 

27 """Detect the endianness of the artifact and return byte order, declaration method, and next offset.""" 

28 byte_order_explicit = True if chr(seq[cls.bo_indicator_index]) in cls.bo_keys else False 

29 order_indicator = chr(seq[cls.bo_indicator_index]) if byte_order_explicit else cls.bo_keys[0] 

30 byte_order = cls.byte_orders[0] if order_indicator == cls.big_endian else cls.byte_orders[1] 

31 header_byte_offset = cls.head_off_expl if byte_order_explicit else cls.head_off_expl - 1 

32 return byte_order, byte_order_explicit, header_byte_offset 

33 

34 

35class HeaderLength: 

36 """Type to mix in VPF specific extraction of header length in bytes.""" 

37 

38 header_length_range = slice(0, 4) 

39 

40 @classmethod 

41 @no_type_check 

42 def extract(cls, byte_order: str, seq) -> int: 

43 """Extract the length of the header for the artifact as encoded in the length field matching endianness.""" 

44 return int.from_bytes(seq[cls.header_length_range], byteorder=byte_order) # noqa 

45 

46 

47@no_type_check 

48class Table: 

49 """Type to parse a VPF table like artifact - eg. a Database Header Table (dht).""" 

50 

51 semi_chr = ';' 

52 key_types = { 

53 'P': 'Primary key', 

54 'U': 'Unique key', 

55 'N': 'Non-unique key', 

56 } 

57 table: Dict[str, Union[int, object, str]] = {} 

58 ERROR = -1 

59 

60 @no_type_check 

61 def __init__(self, label: str, byte_stream: Generator[bytes, Any, None]) -> None: 

62 """Parse the table data from the stream of bytes.""" 

63 self.label = label 

64 self._seq = list(byte_stream) # Eager consumption of the stream (for now) 

65 

66 self.bootstrap_table() 

67 

68 next_segment_start = self.parse_table_description(self.table['header_byte_offset'] + 1) 

69 if next_segment_start == self.ERROR: 

70 return 

71 

72 next_segment_start = self.parse_narrative_table_name(next_segment_start) 

73 if next_segment_start == self.ERROR: 73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true

74 return 

75 

76 self.table['columns'] = {} 

77 _ = self.parse_columns(next_segment_start) # hand over ... an start of next segment or self.ERROR 

78 return 

79 

80 @no_type_check 

81 def bootstrap_table(self) -> None: 

82 """Detect byte order (endianness) from and determine header length of artifact.""" 

83 byte_order, byte_order_explicit, header_byte_offset = ByteOrder.detect(self._seq) 

84 self.table = { 

85 'error': False, 

86 'error_detail': '', 

87 'byte_order': byte_order, 

88 'byte_order_explicit': byte_order_explicit, 

89 'header_length': HeaderLength.extract(byte_order, self._seq), 

90 'header_byte_offset': header_byte_offset, 

91 } 

92 

93 @no_type_check 

94 def parse_table_description(self, next_segment_start: int) -> int: 

95 """Parse the name of the table description and return next segment start offset.""" 

96 rem_seq = self._seq[next_segment_start:] 

97 try: 

98 table_description = ''.join(chr(c) for c in takewhile(lambda x: chr(x) != self.semi_chr, rem_seq)) 

99 self.table['table_description'] = table_description 

100 if not table_description: 

101 raise ValueError('empty field or failed delimiter detection') 

102 except ValueError as err: 

103 self.table['error'] = True 

104 self.table['error_detail'] = f'failing to parse table description with {err}' 

105 return self.ERROR 

106 

107 return self.table['header_byte_offset'] + 1 + len(table_description) + 1 

108 

109 @no_type_check 

110 def parse_narrative_table_name(self, next_segment_start: int) -> int: 

111 """Parse the name of the narrative table and return next segment start offset.""" 

112 rem_seq = self._seq[next_segment_start:] 

113 try: 

114 narrative_table_name = ''.join(chr(c) for c in takewhile(lambda x: chr(x) != self.semi_chr, rem_seq)) 

115 self.table['narrative_table_name'] = narrative_table_name 

116 except ValueError as err: 

117 self.table['error'] = True 

118 self.table['error_detail'] = f'failing to parse narrative table name with {err}' 

119 return self.ERROR 

120 

121 return next_segment_start + len(narrative_table_name) + 1 

122 

123 @no_type_check 

124 def parse_columns(self, next_segment_start: int) -> int: 

125 """Parse all available column specs and return next segment start offset.""" 

126 eq_chr = '=' 

127 comma_chr = ',' 

128 colon_chr = ':' 

129 col_rank = 0 

130 rem_seq = self._seq[next_segment_start:] 

131 while rem_seq and chr(rem_seq[0]) != self.semi_chr: # noqa 

132 col_rank += 1 

133 column_spec = ''.join(chr(c) for c in takewhile(lambda x: chr(x) != colon_chr, rem_seq)) 

134 try: 

135 col_name, spec = column_spec.split(eq_chr, 1) 

136 except ValueError as err: 

137 self.table['error'] = True 

138 self.table['error_detail'] = f'failing to parse column with {err}' 

139 return self.ERROR 

140 

141 try: 

142 ( 

143 field_type, 

144 field_length, 

145 key_type, 

146 column_textual_description, 

147 optional_value_description_table_name, 

148 optional_thematic_index_name, 

149 optional_column_narrative_table_name, 

150 ) = spec.rstrip(comma_chr).split(comma_chr) 

151 self.table['columns'][col_name] = { 

152 'column_rank': col_rank, 

153 'column_name': col_name, 

154 'field_type': field_type, 

155 'field_length': int(field_length) if field_length != VAR_LENGTH_INDICATOR else VAR_LENGTH, 

156 'key_type': key_type, 

157 'column_textual_description': column_textual_description, 

158 'optional_value_description_table_name': optional_value_description_table_name, 

159 'optional_thematic_index_name': optional_thematic_index_name, 

160 'optional_column_narrative_table_name': optional_column_narrative_table_name, 

161 } 

162 except ValueError as err: 

163 self.table['error'] = True 

164 self.table['error_detail'] = f'failing to parse {col_name} column spec with {err}' 

165 return self.ERROR 

166 that_key_type = self.table['columns'][col_name]['key_type'] 

167 if that_key_type not in self.key_types: 167 ↛ 168line 167 didn't jump to line 168, because the condition on line 167 was never true

168 self.table['error'] = True 

169 self.table['error_detail'] = ( 

170 f'key type error in {col_name} column spec with unknown code {that_key_type}' 

171 ) 

172 return self.ERROR 

173 

174 next_segment_start += len(column_spec) + 1 

175 rem_seq = self._seq[next_segment_start:] 

176 

177 return next_segment_start