Coverage for omforme/ 85.42%
32 statements
« prev ^ index » next v7.4.1, created at 2024-02-04 21:04:04 +00:00
« prev ^ index » next v7.4.1, created at 2024-02-04 21:04:04 +00:00
1"""Provide the class Omforme for transformations on a generator.
3Use case example:
5from omforme.omforme import Omforme
7Given a stream of lines e.g. in gen
9gen = (x for x in ('a', 'b', 'c', 'c', 'c', 'd', '1', '2', '3', 'e'))
11with defining a playbook of:
13playbook = (('b', 'ignore', lambda: None), ('d', 'collect', list()), ('e', 'return', lambda: None),)
15when calling
17transform = Omforme(playbook)(gen)
19then transform should become:
22 ('b', 'ignore', <function <lambda> at 0x...>),
23 ('d', 'collect', ['1', '2', '3']),
24 ('e', 'return', <function <lambda> at 0x...>)
29import argparse
30from typing import no_type_check
32from omforme import DEBUG, ENCODING, ENCODING_ERRORS_POLICY, log
35class Omforme:
36 """Provide a generator borrowing consumer that returns the transform applying playbook to the stream.
38 The playbook is a tuple of (trigger, action, function) triplets.
40 Actions are epected to be within ('ignore', 'collect', 'return')
42 Example:
44 >>> # from omforme.omforme import Omforme
45 >>> gen = (x for x in ('a', 'b', 'c', 'c', 'c', 'd', '1', '2', '3', 'e'))
46 >>> playbook = (('b', 'ignore', lambda: None), ('d', 'collect', list()), ('e', 'return', lambda: None),)
47 >>> transform = Omforme(playbook)(gen) # doctest: +ELLIPSIS
48 0 3 a b ignore <function <lambda> at 0x...> d
49 0 3 b b ignore <function <lambda> at 0x...> d
50 0 3 c b ignore <function <lambda> at 0x...> d
51 0 3 c b ignore <function <lambda> at 0x...> d
52 0 3 c b ignore <function <lambda> at 0x...> d
53 1 3 1 d collect ['1'] e
54 1 3 2 d collect ['1', '2'] e
55 1 3 3 d collect ['1', '2', '3'] e
56 1 3 e d collect ['1', '2', '3'] e
57 >>> transform[1]
58 ('d', 'collect', ['1', '2', '3'])
59 >>>
61 """
63 @no_type_check
64 def __init__(self, playbook):
65 """Later alligator."""
66 self.playbook = playbook
68 @no_type_check
69 def __call__(self, gen):
70 """Apply transform to generator stream of events."""
71 phase = 0
72 trigger, what, where_to = self.playbook[phase]
73 peek = self.playbook[phase + 1][0]
74 stop = len(self.playbook)
75 for data in gen:
76 if data == peek:
77 if phase + 2 < stop:
78 phase += 1
79 trigger, what, where_to = self.playbook[phase]
80 peek = self.playbook[phase + 1][0]
81 continue
82 if what == 'return': 82 ↛ 83line 82 didn't jump to line 83, because the condition on line 82 was never true
83 break
84 if what == 'collect' and data != peek:
85 where_to.append(data)
86 print(phase, stop, data, trigger, what, where_to, peek)
88 return self.playbook
91def main(options: argparse.Namespace) -> int:
93 return 0
96if __name__ == '__main__': 96 ↛ 97line 96 didn't jump to line 97, because the condition on line 96 was never true
97 import doctest
99 doctest.testmod(verbose=True, optionflags=doctest.ELLIPSIS)