Coverage for attribuutit/vpf.py: 87.59%
105 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 15:52:25 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 15:52:25 +00:00
1"""Parsers for Vector Product Format (VPF) artifacts conforming to MIL-STD-2407."""
3from itertools import takewhile
4from typing import Any, Dict, Generator, Tuple, Union, no_type_check
6EMPTY_FIELD_PLACEHOLDER = '-'
7VAR_LENGTH_INDICATOR = '*'
8VAR_LENGTH = 9876543210
11class ByteOrder:
12 """Type to mix in VPF specific byte order detection."""
14 byte_orders = ('big', 'little')
15 big_endian = 'M'
16 bo_indicators = {
17 'L': 'little',
18 big_endian: 'big',
19 }
20 bo_keys = tuple(bo_indicators.keys())
21 bo_indicator_index = 4
22 head_off_expl = bo_indicator_index + 1 # Index of first header byte with byte order info present
24 @classmethod
25 @no_type_check
26 def detect(cls, seq) -> Tuple[str, bool, int]:
27 """Detect the endianness of the artifact and return byte order, declaration method, and next offset."""
28 byte_order_explicit = True if chr(seq[cls.bo_indicator_index]) in cls.bo_keys else False
29 order_indicator = chr(seq[cls.bo_indicator_index]) if byte_order_explicit else cls.bo_keys[0]
30 byte_order = cls.byte_orders[0] if order_indicator == cls.big_endian else cls.byte_orders[1]
31 header_byte_offset = cls.head_off_expl if byte_order_explicit else cls.head_off_expl - 1
32 return byte_order, byte_order_explicit, header_byte_offset
35class HeaderLength:
36 """Type to mix in VPF specific extraction of header length in bytes."""
38 header_length_range = slice(0, 4)
40 @classmethod
41 @no_type_check
42 def extract(cls, byte_order: str, seq) -> int:
43 """Extract the length of the header for the artifact as encoded in the length field matching endianness."""
44 return int.from_bytes(seq[cls.header_length_range], byteorder=byte_order) # noqa
47@no_type_check
48class Table:
49 """Type to parse a VPF table like artifact - eg. a Database Header Table (dht)."""
51 semi_chr = ';'
52 key_types = {
53 'P': 'Primary key',
54 'U': 'Unique key',
55 'N': 'Non-unique key',
56 }
57 table: Dict[str, Union[int, object, str]] = {}
58 ERROR = -1
60 @no_type_check
61 def __init__(self, label: str, byte_stream: Generator[bytes, Any, None]) -> None:
62 """Parse the table data from the stream of bytes."""
63 self.label = label
64 self._seq = list(byte_stream) # Eager consumption of the stream (for now)
66 self.bootstrap_table()
68 next_segment_start = self.parse_table_description(self.table['header_byte_offset'] + 1)
69 if next_segment_start == self.ERROR:
70 return
72 next_segment_start = self.parse_narrative_table_name(next_segment_start)
73 if next_segment_start == self.ERROR: 73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true
74 return
76 self.table['columns'] = {}
77 _ = self.parse_columns(next_segment_start) # hand over ... an start of next segment or self.ERROR
78 return
80 @no_type_check
81 def bootstrap_table(self) -> None:
82 """Detect byte order (endianness) from and determine header length of artifact."""
83 byte_order, byte_order_explicit, header_byte_offset = ByteOrder.detect(self._seq)
84 self.table = {
85 'error': False,
86 'error_detail': '',
87 'byte_order': byte_order,
88 'byte_order_explicit': byte_order_explicit,
89 'header_length': HeaderLength.extract(byte_order, self._seq),
90 'header_byte_offset': header_byte_offset,
91 }
93 @no_type_check
94 def parse_table_description(self, next_segment_start: int) -> int:
95 """Parse the name of the table description and return next segment start offset."""
96 rem_seq = self._seq[next_segment_start:]
97 try:
98 table_description = ''.join(chr(c) for c in takewhile(lambda x: chr(x) != self.semi_chr, rem_seq))
99 self.table['table_description'] = table_description
100 if not table_description:
101 raise ValueError('empty field or failed delimiter detection')
102 except ValueError as err:
103 self.table['error'] = True
104 self.table['error_detail'] = f'failing to parse table description with {err}'
105 return self.ERROR
107 return self.table['header_byte_offset'] + 1 + len(table_description) + 1
109 @no_type_check
110 def parse_narrative_table_name(self, next_segment_start: int) -> int:
111 """Parse the name of the narrative table and return next segment start offset."""
112 rem_seq = self._seq[next_segment_start:]
113 try:
114 narrative_table_name = ''.join(chr(c) for c in takewhile(lambda x: chr(x) != self.semi_chr, rem_seq))
115 self.table['narrative_table_name'] = narrative_table_name
116 except ValueError as err:
117 self.table['error'] = True
118 self.table['error_detail'] = f'failing to parse narrative table name with {err}'
119 return self.ERROR
121 return next_segment_start + len(narrative_table_name) + 1
123 @no_type_check
124 def parse_columns(self, next_segment_start: int) -> int:
125 """Parse all available column specs and return next segment start offset."""
126 eq_chr = '='
127 comma_chr = ','
128 colon_chr = ':'
129 col_rank = 0
130 rem_seq = self._seq[next_segment_start:]
131 while rem_seq and chr(rem_seq[0]) != self.semi_chr: # noqa
132 col_rank += 1
133 column_spec = ''.join(chr(c) for c in takewhile(lambda x: chr(x) != colon_chr, rem_seq))
134 try:
135 col_name, spec = column_spec.split(eq_chr, 1)
136 except ValueError as err:
137 self.table['error'] = True
138 self.table['error_detail'] = f'failing to parse column with {err}'
139 return self.ERROR
141 try:
142 (
143 field_type,
144 field_length,
145 key_type,
146 column_textual_description,
147 optional_value_description_table_name,
148 optional_thematic_index_name,
149 optional_column_narrative_table_name,
150 ) = spec.rstrip(comma_chr).split(comma_chr)
151 self.table['columns'][col_name] = {
152 'column_rank': col_rank,
153 'column_name': col_name,
154 'field_type': field_type,
155 'field_length': int(field_length) if field_length != VAR_LENGTH_INDICATOR else VAR_LENGTH,
156 'key_type': key_type,
157 'column_textual_description': column_textual_description,
158 'optional_value_description_table_name': optional_value_description_table_name,
159 'optional_thematic_index_name': optional_thematic_index_name,
160 'optional_column_narrative_table_name': optional_column_narrative_table_name,
161 }
162 except ValueError as err:
163 self.table['error'] = True
164 self.table['error_detail'] = f'failing to parse {col_name} column spec with {err}'
165 return self.ERROR
166 that_key_type = self.table['columns'][col_name]['key_type']
167 if that_key_type not in self.key_types: 167 ↛ 168line 167 didn't jump to line 168, because the condition on line 167 was never true
168 self.table['error'] = True
169 self.table['error_detail'] = (
170 f'key type error in {col_name} column spec with unknown code {that_key_type}'
171 )
172 return self.ERROR
174 next_segment_start += len(column_spec) + 1
175 rem_seq = self._seq[next_segment_start:]
177 return next_segment_start