Coverage for prosessilouhinta/prosessilouhinta.py: 88.29%
206 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 21:51:33 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 21:51:33 +00:00
1"""Process mining (Finnish prosessilouhinta) from eventlogs. API."""
3import datetime as dti
4import json
5import os
6import pathlib
7import sys
8from typing import Any, Iterator, List, Optional, Tuple, Union
10import prosessilouhinta.cpa as cpa
12DEBUG_VAR = 'PROSESSILOUHINTA_DEBUG'
13DEBUG = os.getenv(DEBUG_VAR)
15ENCODING = 'utf-8'
16ENCODING_ERRORS_POLICY = 'ignore'
17CSV_HEAD_TOKEN = '#' # nosec B105
18CSV_SEP = ','
20STDIN, STDOUT = 'STDIN', 'STDOUT'
21DISPATCH = {
22 STDIN: sys.stdin,
23 STDOUT: sys.stdout,
24}
26EventLog = dict[str, List[Tuple[str, str, dti.datetime]]]
27Activity = dict[str, int]
28Flow = dict[str, dict[str, int]]
29TimeDifference = dict[str, dict[str, List[dti.timedelta]]]
30TimeDifferenceFloats = dict[str, dict[str, List[float]]]
31AverageTimeDifference = dict[str, dict[str, dti.timedelta]]
32AverageTimeDifferenceFloats = dict[str, dict[str, float]]
33UserActivity = dict[str, list[str]]
36def activity_counts(events: EventLog) -> Activity:
37 """Calculate the activity counts A from eventlog."""
38 A: Activity = {} # noqa
39 for caseid in events:
40 for i in range(0, len(events[caseid])):
41 ai = events[caseid][i][0]
42 if ai not in A:
43 A[ai] = 0
44 A[ai] += 1
46 return A
49def control_flow(events: EventLog) -> Flow:
50 """Calculate the control flow from eventlog."""
51 F: Flow = {} # noqa
52 for caseid in events:
53 for i in range(0, len(events[caseid]) - 1):
54 ai = events[caseid][i][0]
55 aj = events[caseid][i + 1][0]
56 if ai not in F:
57 F[ai] = {}
58 if aj not in F[ai]:
59 F[ai][aj] = 0
60 F[ai][aj] += 1
62 return F
65def time_differences(events: EventLog) -> TimeDifference:
66 """Calculate time differences D from eventlog."""
67 D: TimeDifference = {} # noqa
68 for caseid in events:
69 for i in range(0, len(events[caseid]) - 1):
70 (ai, _, ti) = events[caseid][i]
71 (aj, _, tj) = events[caseid][i + 1]
72 if ai not in D:
73 D[ai] = {}
74 if aj not in D[ai]:
75 D[ai][aj] = []
76 D[ai][aj].append(tj - ti)
78 return D
81def time_differences_as_float(D: TimeDifference) -> TimeDifferenceFloats: # noqa
82 """Convert the time differences from D per case transitions to float."""
83 DF: TimeDifferenceFloats = {} # noqa
84 for ai in D:
85 DF[ai] = {}
86 for aj in D[ai]:
87 DF[ai][aj] = [delta.total_seconds() for delta in D[ai][aj]]
89 return DF
92def average_time_differences(D: TimeDifference) -> AverageTimeDifference: # noqa
93 """Average the time differences from D per case transitions."""
94 AD: AverageTimeDifference = {} # noqa
95 for ai in sorted(D.keys()):
96 AD[ai] = {}
97 for aj in sorted(D[ai].keys()):
98 sum_td = sum(D[ai][aj], dti.timedelta(0))
99 count_td = len(D[ai][aj])
100 avg_td = sum_td / count_td
101 avg_td -= dti.timedelta(microseconds=avg_td.microseconds)
102 AD[ai][aj] = avg_td
104 return AD
107def average_time_differences_as_float(AD: AverageTimeDifference) -> AverageTimeDifferenceFloats: # noqa
108 """Convert the average time differences from D per case transitions to float."""
109 ADF: AverageTimeDifferenceFloats = {} # noqa
110 for ai in AD:
111 ADF[ai] = {}
112 for aj in AD[ai]:
113 ADF[ai][aj] = AD[ai][aj].total_seconds()
115 return ADF
118def user_activities(events: EventLog) -> UserActivity:
119 """Calculate the set of activities UA performed by each user from the eventlog."""
120 UA: UserActivity = {} # noqa
121 for caseid in events:
122 for i in range(0, len(events[caseid])):
123 ai = events[caseid][i][0]
124 ui = events[caseid][i][1]
125 if ui not in UA:
126 UA[ui] = []
127 if ai not in UA[ui]:
128 UA[ui].append(ai)
130 for u in UA:
131 UA[u].sort()
133 return UA
136def work_distribution(events: EventLog) -> Flow:
137 """Calculate the count of activities UAC performed by each user from the eventlog."""
138 UAC: Flow = {} # noqa
139 for caseid in events:
140 for i in range(0, len(events[caseid])):
141 ai = events[caseid][i][0]
142 ui = events[caseid][i][1]
143 if ui not in UAC:
144 UAC[ui] = {}
145 if ai not in UAC[ui]:
146 UAC[ui][ai] = 0
147 UAC[ui][ai] += 1
149 return UAC
152def working_together(events: EventLog) -> Flow:
153 """Calculate the working together matrix W from eventlog."""
154 W: Flow = {} # noqa
155 for caseid in events:
156 S = set() # noqa
157 for i in range(0, len(events[caseid])):
158 ui = events[caseid][i][1]
159 S.add(ui)
160 L = sorted(list(S)) # noqa
161 for i in range(0, len(L) - 1):
162 for j in range(i + 1, len(L)):
163 ui = L[i]
164 uj = L[j]
165 if ui not in W:
166 W[ui] = {}
167 if uj not in W[ui]:
168 W[ui][uj] = 0
169 W[ui][uj] += 1
171 return W
174def parse_eventlog_csv(source: Union[pathlib.Path, Iterator[str]]) -> Union[EventLog, Any]:
175 """Parse the eventlog into a map, matching the translation headers to columns."""
176 evemtlog: EventLog = {}
177 for line in reader(source):
178 line = line.strip()
179 if not line or line.startswith(CSV_HEAD_TOKEN):
180 continue
181 try:
182 caseid, task, user, ts_text = line.split(CSV_SEP)[:4]
183 timestamp = dti.datetime.strptime(ts_text, '%Y-%m-%d %H:%M:%S')
184 except ValueError: # Both statements may raise that wun
185 print(line)
186 raise
187 if caseid not in evemtlog:
188 evemtlog[caseid] = []
189 event = (task, user, timestamp)
190 evemtlog[caseid].append(event)
191 return evemtlog
194def reader(source: Union[pathlib.Path, Iterator[str]]) -> Iterator[str]:
195 """Context wrapper / generator to read the lines."""
196 if isinstance(source, pathlib.Path):
197 with open(source, 'rt', encoding=ENCODING) as handle:
198 for line in handle:
199 yield line
200 else:
201 for line in source:
202 yield line
205def verify_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]:
206 """Fail with grace."""
207 if not argv or len(argv) != 4:
208 return 2, 'received wrong number of arguments', ['']
210 command, inp, out, dryrun = argv
212 if command not in ('extract',):
213 return 2, 'received unknown command', ['']
215 if inp:
216 if not pathlib.Path(str(inp)).is_file():
217 return 1, 'source is no file', ['']
219 if out:
220 if pathlib.Path(str(out)).is_file():
221 return 1, 'target file exists', ['']
223 return 0, '', argv
226def main(argv: Union[List[str], None] = None) -> int:
227 """Drive the extraction."""
228 error, message, strings = verify_request(argv)
229 if error:
230 print(message, file=sys.stderr)
231 return error
233 command, inp, out, dryrun = strings
234 source = sys.stdin if not inp else reader(pathlib.Path(inp))
236 if dryrun:
237 print('dryrun requested\n# ---', file=sys.stderr)
238 print('* resources used:', file=sys.stderr)
239 inp_disp = 'STDIN' if not inp else f'"{inp}"'
240 out_disp = 'STDOUT' if not out else f'"{out}"'
241 print(f' - input from: {inp_disp}', file=sys.stderr)
242 print(f' - output to: {out_disp}', file=sys.stderr)
243 return 0
245 eventlog = parse_eventlog_csv(source)
246 D = time_differences(eventlog) # noqa
247 report = {
248 'activity_counts': activity_counts(eventlog),
249 'average_time_differences': average_time_differences_as_float(average_time_differences(D)),
250 'control_flow': control_flow(eventlog),
251 'time_differences': time_differences_as_float(D),
252 'user_activities': user_activities(eventlog),
253 'work_distribution': work_distribution(eventlog),
254 'working_together': working_together(eventlog),
255 }
256 if not out: 256 ↛ 259line 256 didn't jump to line 259, because the condition on line 256 was never false
257 json.dump(report, sys.stdout)
258 else:
259 with open(pathlib.Path(out), 'wt', encoding=ENCODING) as handle:
260 json.dump(report, handle)
262 return 0
265def verify_cpa_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]:
266 """Fail with grace for CPA."""
267 if not argv or len(argv) != 2:
268 return 2, 'received wrong number of arguments', ['']
270 command, inp = argv
272 if command not in ('cpa',):
273 return 2, 'received unknown command', ['']
275 if inp:
276 if not pathlib.Path(str(inp)).is_file():
277 return 1, 'source is no file', ['']
279 return 0, '', argv
282def cpa_dia(argv: Union[List[str], None] = None) -> int:
283 """Drive the CPA diagramming."""
284 error, message, strings = verify_cpa_request(argv)
285 if error:
286 print(message, file=sys.stderr)
287 return error
289 command, inp = strings
291 with open(inp, 'rt', encoding='utf-8') as handle:
292 peek = json.load(handle) # TODO not elegant and plausible use case to not state the name ...
294 p = cpa.Node(peek.get('name', 'no-name-found-for-project - check your data'))
295 p.load_network(str(inp))
296 print(p.aon_diagram_text_dump())
298 return 0