Coverage for prosessilouhinta/prosessilouhinta.py: 88.29%

206 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 21:51:33 +00:00

1"""Process mining (Finnish prosessilouhinta) from eventlogs. API.""" 

2 

3import datetime as dti 

4import json 

5import os 

6import pathlib 

7import sys 

8from typing import Any, Iterator, List, Optional, Tuple, Union 

9 

10import prosessilouhinta.cpa as cpa 

11 

12DEBUG_VAR = 'PROSESSILOUHINTA_DEBUG' 

13DEBUG = os.getenv(DEBUG_VAR) 

14 

15ENCODING = 'utf-8' 

16ENCODING_ERRORS_POLICY = 'ignore' 

17CSV_HEAD_TOKEN = '#' # nosec B105 

18CSV_SEP = ',' 

19 

20STDIN, STDOUT = 'STDIN', 'STDOUT' 

21DISPATCH = { 

22 STDIN: sys.stdin, 

23 STDOUT: sys.stdout, 

24} 

25 

26EventLog = dict[str, List[Tuple[str, str, dti.datetime]]] 

27Activity = dict[str, int] 

28Flow = dict[str, dict[str, int]] 

29TimeDifference = dict[str, dict[str, List[dti.timedelta]]] 

30TimeDifferenceFloats = dict[str, dict[str, List[float]]] 

31AverageTimeDifference = dict[str, dict[str, dti.timedelta]] 

32AverageTimeDifferenceFloats = dict[str, dict[str, float]] 

33UserActivity = dict[str, list[str]] 

34 

35 

36def activity_counts(events: EventLog) -> Activity: 

37 """Calculate the activity counts A from eventlog.""" 

38 A: Activity = {} # noqa 

39 for caseid in events: 

40 for i in range(0, len(events[caseid])): 

41 ai = events[caseid][i][0] 

42 if ai not in A: 

43 A[ai] = 0 

44 A[ai] += 1 

45 

46 return A 

47 

48 

49def control_flow(events: EventLog) -> Flow: 

50 """Calculate the control flow from eventlog.""" 

51 F: Flow = {} # noqa 

52 for caseid in events: 

53 for i in range(0, len(events[caseid]) - 1): 

54 ai = events[caseid][i][0] 

55 aj = events[caseid][i + 1][0] 

56 if ai not in F: 

57 F[ai] = {} 

58 if aj not in F[ai]: 

59 F[ai][aj] = 0 

60 F[ai][aj] += 1 

61 

62 return F 

63 

64 

65def time_differences(events: EventLog) -> TimeDifference: 

66 """Calculate time differences D from eventlog.""" 

67 D: TimeDifference = {} # noqa 

68 for caseid in events: 

69 for i in range(0, len(events[caseid]) - 1): 

70 (ai, _, ti) = events[caseid][i] 

71 (aj, _, tj) = events[caseid][i + 1] 

72 if ai not in D: 

73 D[ai] = {} 

74 if aj not in D[ai]: 

75 D[ai][aj] = [] 

76 D[ai][aj].append(tj - ti) 

77 

78 return D 

79 

80 

81def time_differences_as_float(D: TimeDifference) -> TimeDifferenceFloats: # noqa 

82 """Convert the time differences from D per case transitions to float.""" 

83 DF: TimeDifferenceFloats = {} # noqa 

84 for ai in D: 

85 DF[ai] = {} 

86 for aj in D[ai]: 

87 DF[ai][aj] = [delta.total_seconds() for delta in D[ai][aj]] 

88 

89 return DF 

90 

91 

92def average_time_differences(D: TimeDifference) -> AverageTimeDifference: # noqa 

93 """Average the time differences from D per case transitions.""" 

94 AD: AverageTimeDifference = {} # noqa 

95 for ai in sorted(D.keys()): 

96 AD[ai] = {} 

97 for aj in sorted(D[ai].keys()): 

98 sum_td = sum(D[ai][aj], dti.timedelta(0)) 

99 count_td = len(D[ai][aj]) 

100 avg_td = sum_td / count_td 

101 avg_td -= dti.timedelta(microseconds=avg_td.microseconds) 

102 AD[ai][aj] = avg_td 

103 

104 return AD 

105 

106 

107def average_time_differences_as_float(AD: AverageTimeDifference) -> AverageTimeDifferenceFloats: # noqa 

108 """Convert the average time differences from D per case transitions to float.""" 

109 ADF: AverageTimeDifferenceFloats = {} # noqa 

110 for ai in AD: 

111 ADF[ai] = {} 

112 for aj in AD[ai]: 

113 ADF[ai][aj] = AD[ai][aj].total_seconds() 

114 

115 return ADF 

116 

117 

118def user_activities(events: EventLog) -> UserActivity: 

119 """Calculate the set of activities UA performed by each user from the eventlog.""" 

120 UA: UserActivity = {} # noqa 

121 for caseid in events: 

122 for i in range(0, len(events[caseid])): 

123 ai = events[caseid][i][0] 

124 ui = events[caseid][i][1] 

125 if ui not in UA: 

126 UA[ui] = [] 

127 if ai not in UA[ui]: 

128 UA[ui].append(ai) 

129 

130 for u in UA: 

131 UA[u].sort() 

132 

133 return UA 

134 

135 

136def work_distribution(events: EventLog) -> Flow: 

137 """Calculate the count of activities UAC performed by each user from the eventlog.""" 

138 UAC: Flow = {} # noqa 

139 for caseid in events: 

140 for i in range(0, len(events[caseid])): 

141 ai = events[caseid][i][0] 

142 ui = events[caseid][i][1] 

143 if ui not in UAC: 

144 UAC[ui] = {} 

145 if ai not in UAC[ui]: 

146 UAC[ui][ai] = 0 

147 UAC[ui][ai] += 1 

148 

149 return UAC 

150 

151 

152def working_together(events: EventLog) -> Flow: 

153 """Calculate the working together matrix W from eventlog.""" 

154 W: Flow = {} # noqa 

155 for caseid in events: 

156 S = set() # noqa 

157 for i in range(0, len(events[caseid])): 

158 ui = events[caseid][i][1] 

159 S.add(ui) 

160 L = sorted(list(S)) # noqa 

161 for i in range(0, len(L) - 1): 

162 for j in range(i + 1, len(L)): 

163 ui = L[i] 

164 uj = L[j] 

165 if ui not in W: 

166 W[ui] = {} 

167 if uj not in W[ui]: 

168 W[ui][uj] = 0 

169 W[ui][uj] += 1 

170 

171 return W 

172 

173 

174def parse_eventlog_csv(source: Union[pathlib.Path, Iterator[str]]) -> Union[EventLog, Any]: 

175 """Parse the eventlog into a map, matching the translation headers to columns.""" 

176 evemtlog: EventLog = {} 

177 for line in reader(source): 

178 line = line.strip() 

179 if not line or line.startswith(CSV_HEAD_TOKEN): 

180 continue 

181 try: 

182 caseid, task, user, ts_text = line.split(CSV_SEP)[:4] 

183 timestamp = dti.datetime.strptime(ts_text, '%Y-%m-%d %H:%M:%S') 

184 except ValueError: # Both statements may raise that wun 

185 print(line) 

186 raise 

187 if caseid not in evemtlog: 

188 evemtlog[caseid] = [] 

189 event = (task, user, timestamp) 

190 evemtlog[caseid].append(event) 

191 return evemtlog 

192 

193 

194def reader(source: Union[pathlib.Path, Iterator[str]]) -> Iterator[str]: 

195 """Context wrapper / generator to read the lines.""" 

196 if isinstance(source, pathlib.Path): 

197 with open(source, 'rt', encoding=ENCODING) as handle: 

198 for line in handle: 

199 yield line 

200 else: 

201 for line in source: 

202 yield line 

203 

204 

205def verify_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]: 

206 """Fail with grace.""" 

207 if not argv or len(argv) != 4: 

208 return 2, 'received wrong number of arguments', [''] 

209 

210 command, inp, out, dryrun = argv 

211 

212 if command not in ('extract',): 

213 return 2, 'received unknown command', [''] 

214 

215 if inp: 

216 if not pathlib.Path(str(inp)).is_file(): 

217 return 1, 'source is no file', [''] 

218 

219 if out: 

220 if pathlib.Path(str(out)).is_file(): 

221 return 1, 'target file exists', [''] 

222 

223 return 0, '', argv 

224 

225 

226def main(argv: Union[List[str], None] = None) -> int: 

227 """Drive the extraction.""" 

228 error, message, strings = verify_request(argv) 

229 if error: 

230 print(message, file=sys.stderr) 

231 return error 

232 

233 command, inp, out, dryrun = strings 

234 source = sys.stdin if not inp else reader(pathlib.Path(inp)) 

235 

236 if dryrun: 

237 print('dryrun requested\n# ---', file=sys.stderr) 

238 print('* resources used:', file=sys.stderr) 

239 inp_disp = 'STDIN' if not inp else f'"{inp}"' 

240 out_disp = 'STDOUT' if not out else f'"{out}"' 

241 print(f' - input from: {inp_disp}', file=sys.stderr) 

242 print(f' - output to: {out_disp}', file=sys.stderr) 

243 return 0 

244 

245 eventlog = parse_eventlog_csv(source) 

246 D = time_differences(eventlog) # noqa 

247 report = { 

248 'activity_counts': activity_counts(eventlog), 

249 'average_time_differences': average_time_differences_as_float(average_time_differences(D)), 

250 'control_flow': control_flow(eventlog), 

251 'time_differences': time_differences_as_float(D), 

252 'user_activities': user_activities(eventlog), 

253 'work_distribution': work_distribution(eventlog), 

254 'working_together': working_together(eventlog), 

255 } 

256 if not out: 256 ↛ 259line 256 didn't jump to line 259, because the condition on line 256 was never false

257 json.dump(report, sys.stdout) 

258 else: 

259 with open(pathlib.Path(out), 'wt', encoding=ENCODING) as handle: 

260 json.dump(report, handle) 

261 

262 return 0 

263 

264 

265def verify_cpa_request(argv: Optional[List[str]]) -> Tuple[int, str, List[str]]: 

266 """Fail with grace for CPA.""" 

267 if not argv or len(argv) != 2: 

268 return 2, 'received wrong number of arguments', [''] 

269 

270 command, inp = argv 

271 

272 if command not in ('cpa',): 

273 return 2, 'received unknown command', [''] 

274 

275 if inp: 

276 if not pathlib.Path(str(inp)).is_file(): 

277 return 1, 'source is no file', [''] 

278 

279 return 0, '', argv 

280 

281 

282def cpa_dia(argv: Union[List[str], None] = None) -> int: 

283 """Drive the CPA diagramming.""" 

284 error, message, strings = verify_cpa_request(argv) 

285 if error: 

286 print(message, file=sys.stderr) 

287 return error 

288 

289 command, inp = strings 

290 

291 with open(inp, 'rt', encoding='utf-8') as handle: 

292 peek = json.load(handle) # TODO not elegant and plausible use case to not state the name ... 

293 

294 p = cpa.Node(peek.get('name', 'no-name-found-for-project - check your data')) 

295 p.load_network(str(inp)) 

296 print(p.aon_diagram_text_dump()) 

297 

298 return 0