Coverage for prosessilouhinta/cpa.py: 84.45%

325 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 21:51:33 +00:00

1"""Naive Critical Path Analysis (CPA) implementation.""" 

2 

3import json 

4import pathlib 

5from typing import no_type_check # Self when 3.11 is lowest bound 

6 

7ActOnNodeMap = dict[str, str | int | float | None] 

8ActOnNodeTup = tuple[str, str, str, str, str, str, str, str, str] 

9ENCODING = 'utf-8' 

10 

11 

12@no_type_check 

13def cyclic(digraph) -> bool: 

14 """Determine if the directed graph has a cycle per a stack of iterators. 

15 

16 The dgraph structure maps vertices to iterables of neighbouring vertices. 

17 

18 # Examples: 

19 

20 >>> cyclic({}) 

21 False 

22 >>> cyclic({'a': ('a',)}) 

23 True 

24 >>> cyclic({'a': ('z',), 'z': ('b',), 'b': ('y', 'z')}) 

25 True 

26 >>> cyclic({'a': ('z',), 'z': ('b',), 'b': ('y', 'c')}) 

27 False 

28 """ 

29 visited = set() 

30 path = [] 

31 paths = set(path) 

32 sit = [iter(digraph)] 

33 while sit: 

34 for vertex in sit[-1]: 

35 if vertex in paths: 

36 return True 

37 elif vertex not in visited: 

38 visited.add(vertex) 

39 path.append(vertex) 

40 paths.add(vertex) 

41 sit.append(iter(digraph.get(vertex, tuple()))) 

42 break 

43 else: 

44 paths.discard(path.pop() if path else None) 

45 sit.pop() 

46 return False 

47 

48 

49@no_type_check 

50class Node: 

51 """Represents a task with linked nodes in an act precedence network.""" 

52 

53 @no_type_check 

54 def __init__(self, name, duration=None, lag=0) -> None: 

55 """Initialize the model.""" 

56 self.parent: Node | None = None 

57 self.name: str = name # model.NAME shall be unique 

58 self.description: str | None = None 

59 self.duration: float | None = duration # model.DUR in agreed time units 

60 self.lag: int | float | None = lag # preceding.finished plus in agreed time units 

61 self.drag: int | float | None = None # model.DRAG in agreed time units 

62 self._es: int | float | None = None # model.ES 

63 self.ef: int | float | None = None # model.EF 

64 self.ls: int | float | None = None # model.LS 

65 self.lf: int | float | None = None # model.LF 

66 self._free_float: int | float | None = None # act can delay without change start of any other act 

67 self._total_float: int | float | None = None # act can delay without increase of overall project dur 

68 self.nodes: list[Node] = [] 

69 self.name_to_node: dict[str, Node] = {} 

70 self.to_nodes: set[Node] = set() 

71 self.incoming_nodes: set[Node] = set() 

72 self.forward_pending: set[Node] = set() 

73 self.backward_pending: list[Node] = [] 

74 self._critical_path = None 

75 self.exit_node: Node | None = None 

76 

77 @no_type_check 

78 def lookup_node(self, name: str): 

79 return self.name_to_node[name] 

80 

81 @no_type_check 

82 def get_or_create_node(self, name, **kwargs): 

83 try: 

84 return self.lookup_node(name=name) 

85 except KeyError: 

86 n = Node(name=name, **kwargs) 

87 self.add(n) 

88 return n 

89 

90 @property 

91 def es(self) -> int | float | None: 

92 return self._es 

93 

94 @es.setter 

95 def es(self, v: int | float) -> None: 

96 self._es = v 

97 if self.parent: 

98 self.parent.forward_pending.add(self) 

99 

100 def __repr__(self) -> str: 

101 return str(self.name) 

102 

103 def __hash__(self) -> int: 

104 return hash(self.name) 

105 

106 @no_type_check 

107 def __eq__(self, other) -> bool: 

108 if not isinstance(other, Node): 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true

109 return False 

110 return self.name == other.name 

111 

112 @no_type_check 

113 def __ne__(self, other) -> bool: 

114 return not self.__eq__(other) 

115 

116 @no_type_check 

117 def add(self, node): 

118 """Includes the given node as a child node.""" 

119 if not isinstance(node, Node): 119 ↛ 120line 119 didn't jump to line 120, because the condition on line 119 was never true

120 raise ValueError(f'tried to add non-Node instance {type(node).__name__}') 

121 if node.duration is None: 121 ↛ 122line 121 didn't jump to line 122, because the condition on line 121 was never true

122 raise ValueError('unspecified duration') 

123 if node in self.nodes: 123 ↛ 124line 123 didn't jump to line 124, because the condition on line 123 was never true

124 return 

125 self.nodes.append(node) 

126 self.name_to_node[node.name] = node 

127 node.parent = self 

128 self.forward_pending.add(node) 

129 self._critical_path = None 

130 return node 

131 

132 @no_type_check 

133 def link(self, from_node, to_node=None) -> 'Node': 

134 """Directed link of two child nodes in the graph.""" 

135 if not isinstance(from_node, Node): 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true

136 from_node = self.name_to_node[from_node] 

137 if not isinstance(from_node, Node): 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true

138 raise ValueError(f'tried to link from non-Node instance {type(from_node).__name__}') 

139 if to_node is not None: 139 ↛ 147line 139 didn't jump to line 147, because the condition on line 139 was never false

140 if not isinstance(to_node, Node): 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true

141 to_node = self.name_to_node[to_node] 

142 if not isinstance(to_node, Node): 142 ↛ 143line 142 didn't jump to line 143, because the condition on line 142 was never true

143 raise ValueError(f'tried to link to non-Node instance {type(to_node).__name__}') 

144 from_node.to_nodes.add(to_node) 

145 to_node.incoming_nodes.add(from_node) 

146 else: 

147 self.to_nodes.add(from_node) 

148 from_node.incoming_nodes.add(self) 

149 return self 

150 

151 @no_type_check 

152 def load_network(self, file_path: str) -> None: 

153 """Load the network from JSON file with matching top level name value.""" 

154 if not file_path.strip(): 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true

155 raise ValueError('cannot load from empty file path string') 

156 fp = pathlib.Path(file_path) 

157 if not fp.is_file(): 157 ↛ 158line 157 didn't jump to line 158, because the condition on line 157 was never true

158 raise ValueError('cannot load from non existing file') 

159 if not fp.stat().st_size: 159 ↛ 160line 159 didn't jump to line 160, because the condition on line 159 was never true

160 raise ValueError('cannot load from empty file') 

161 with open(fp, 'rt', encoding=ENCODING) as handle: 

162 network = json.load(handle) 

163 if not network: 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true

164 raise ValueError('cannot load from empty network') 

165 name = network.get('name', 'not-existing-name-key') 

166 if name != self.name: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true

167 raise ValueError(f'cannot load network with name ({name}) into ({self.name})') 

168 x_nodes = {} 

169 for x_key, x_node in network.get('nodes', {}).items(): 

170 duration = x_node.get('duration', None) 

171 lag = x_node.get('lag', 0) 

172 x_nodes[x_key] = self.add(Node(x_key, duration=duration, lag=lag)) 

173 for edge in network.get('edges', []): 

174 if len(edge) < 2: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true

175 if not edge: 

176 raise ValueError('cannot build an empty edge') 

177 raise ValueError(f'cannot build directed edge from ({edge[0]}) without a target') 

178 if len(edge) == 2: 

179 if any(edg not in x_nodes for edg in edge): 179 ↛ 180line 179 didn't jump to line 180, because the condition on line 179 was never true

180 raise ValueError(f'cannot build edge with nodes not in {x_nodes}') 

181 self.link(x_nodes[edge[0]], x_nodes[edge[1]]) 

182 else: 

183 raise NotImplementedError('Compressed edges notation not yet implemented.') 

184 self.update_all() 

185 

186 @property 

187 @no_type_check 

188 def first_nodes(self): 

189 """Child nodes that have no in-bound dependencies.""" 

190 first = set(self.nodes) 

191 for node in self.nodes: 

192 first.difference_update(node.to_nodes) 

193 return first 

194 

195 @property 

196 @no_type_check 

197 def last_nodes(self): 

198 """Child nodes that have to out-bound dependencies.""" 

199 return [node for node in self.nodes if not node.to_nodes] 

200 

201 @no_type_check 

202 def update_forward(self) -> None: 

203 """Updates forward timing calculations for the current node assuming min(es) is set.""" 

204 changed = False 

205 if self.es is not None and self.duration is not None: 

206 self.ef = self.es + self.duration 

207 changed = True 

208 

209 if changed: 

210 for to_node in self.to_nodes: 

211 if to_node == self: 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true

212 continue 

213 add_some_lag = self.ef + to_node.lag 

214 to_node.es = add_some_lag if to_node.es is None else max(to_node.es, add_some_lag) 

215 

216 if self.parent: 216 ↛ 210line 216 didn't jump to line 210, because the condition on line 216 was never false

217 self.parent.forward_pending.add(to_node) 

218 

219 if self.parent: 219 ↛ exitline 219 didn't return from function 'update_forward', because the condition on line 219 was never false

220 self.parent.backward_pending.append(self) 

221 

222 @no_type_check 

223 def update_backward(self) -> None: 

224 """Updates backward timing calculations for the current node.""" 

225 if self.lf is None: 

226 if self.to_nodes: 

227 self.lf = min(target.ls for target in self.to_nodes) 

228 else: 

229 self.lf = self.ef 

230 if self.lf is None: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true

231 raise ValueError('No latest finish time found') 

232 self.ls = self.lf - self.duration 

233 

234 @no_type_check 

235 def add_exit(self) -> None: 

236 """Links all leaf nodes to a common exit node.""" 

237 if self.exit_node is None: 

238 self.exit_node = Node('COMMON_EXIT', duration=0) 

239 self.add(self.exit_node) 

240 for node in self.nodes: 

241 if node is self.exit_node: 

242 continue 

243 if not node.to_nodes: 

244 self.link(from_node=node, to_node=self.exit_node) 

245 

246 @no_type_check 

247 def update_all(self) -> None: 

248 """Updates timing calculations for all children nodes.""" 

249 if not self.is_acyclic(): 249 ↛ 250line 249 didn't jump to line 250, because the condition on line 249 was never true

250 raise TypeError('Network contains cycles') 

251 

252 for node in list(self.forward_pending.intersection(self.first_nodes)): 

253 node.es = self.lag + node.lag 

254 node.update_forward() 

255 self.forward_pending.remove(node) 

256 

257 forward_priors = set() 

258 while self.forward_pending: 

259 q = set(self.forward_pending) 

260 self.forward_pending.clear() 

261 while q: 

262 node = q.pop() 

263 if node in forward_priors: 263 ↛ 264line 263 didn't jump to line 264, because the condition on line 263 was never true

264 continue 

265 node.update_forward() 

266 

267 backward_priors = set() 

268 while self.backward_pending: 

269 node = self.backward_pending.pop() 

270 if node in backward_priors: 270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true

271 continue 

272 node.update_backward() 

273 

274 duration, path, priors = self.get_critical_path(as_item=True) 

275 self._critical_path = duration, path, priors 

276 self.duration = duration 

277 self.es = path[0].es 

278 self.ls = path[0].ls 

279 self.ef = path[-1].ef 

280 self.lf = path[-1].lf 

281 

282 @no_type_check 

283 def get_critical_path(self, as_item: bool = False): 

284 """Finds the longest path in among the child nodes.""" 

285 if self._critical_path is not None: 

286 return self._critical_path[1] 

287 longest = None 

288 q = [(_.duration, [_], set([_])) for _ in self.first_nodes] 

289 while q: 

290 item = length, path, priors = q.pop(0) 

291 if longest is None: 

292 longest = item 

293 else: 

294 try: 

295 longest = max(longest, item) 

296 except TypeError: 

297 longest = longest 

298 for to_node in path[-1].to_nodes: 

299 if to_node in priors: 299 ↛ 300line 299 didn't jump to line 300, because the condition on line 299 was never true

300 continue 

301 q.append((length + to_node.duration, path + [to_node], priors.union([to_node]))) 

302 if longest is None: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true

303 return 

304 elif as_item: 304 ↛ 307line 304 didn't jump to line 307, because the condition on line 304 was never false

305 return longest 

306 else: 

307 return longest[1] 

308 

309 @no_type_check 

310 def activity(self, node) -> ActOnNodeMap: 

311 """Provide the activity-on-node element of given node as JSON serializable dict.""" 

312 return { 

313 'duration': node.duration, 

314 'earliest_start': node.es, 

315 'earliest_finish': node.ef, 

316 'name': node.name, 

317 'latest_start': node.ls, 

318 'latest_finish': node.lf, 

319 'drag': node.drag, 

320 } 

321 

322 @no_type_check 

323 def activity_on_node(self) -> ActOnNodeMap: 

324 """Provide the activity-on-node element as JSON serializable dict.""" 

325 return self.activity(self) 

326 

327 @no_type_check 

328 def activity_on_node_diagram_data(self) -> list[ActOnNodeMap]: 

329 """Provide the activity-on-node diagram as JSON serializable list of dicts.""" 

330 cp = self.get_critical_path() 

331 return [self.activity(node) for node in cp] if cp else [] 

332 

333 @staticmethod 

334 @no_type_check 

335 def aon_strings(aon: ActOnNodeMap) -> ActOnNodeTup: 

336 """Provide the activity-on-node element from map data as 9-tuple of strings. 

337 

338 Example: 

339 

340 cf. example of activity_on_node_strings method 

341 """ 

342 duration = aon['duration'] 

343 earliest_start = aon['earliest_start'] 

344 earliest_finish = aon['earliest_finish'] 

345 name = aon['name'] 

346 latest_start = aon['latest_start'] 

347 latest_finish = aon['latest_finish'] 

348 drag = aon['drag'] 

349 

350 lk_width = max(len(f'{lk}=') for lk in ('ES', 'EF', 'LS', 'LF')) 

351 sp, hr, vr = ' ', '-', '|' 

352 bd_width = len(vr) 

353 max_width, lc_max_width, cc_max_width, rc_max_width = 0, 0, 0, 0 

354 lc_max_width = max(len(str(earliest_start)), len(str(latest_start))) + lk_width 

355 cc_max_width = len(str(name)) 

356 rc_max_width = max(len(str(earliest_finish)), len(str(latest_finish))) + lk_width 

357 max_width = sum((lc_max_width, cc_max_width, rc_max_width)) + 2 * bd_width # inner borders 

358 

359 section_sep = f'+{hr * max_width}+' 

360 

361 dur_disp = f'DUR={duration}' 

362 dur_sect = f'{vr}{dur_disp.center(max_width, sp)}{vr}' 

363 es_disp = f'ES={earliest_start}' 

364 ef_disp = f'EF={earliest_finish}' 

365 es_ef_sects = ( 

366 f'{vr}{es_disp.center(lc_max_width, sp)}' 

367 f'{vr}{sp.center(cc_max_width, sp)}' 

368 f'{vr}{ef_disp.center(rc_max_width, sp)}{vr}' 

369 ) 

370 name_sect = ( 

371 f'{vr}{hr.center(lc_max_width, hr)}' 

372 f'{vr}{str(name).center(cc_max_width, sp)}' 

373 f'{vr}{hr.center(rc_max_width, hr)}{vr}' 

374 ) 

375 ls_disp = f'LS={latest_start}' 

376 lf_disp = f'LF={latest_finish}' 

377 ls_lf_sects = ( 

378 f'{vr}{ls_disp.center(lc_max_width, sp)}' 

379 f'{vr}{sp.center(cc_max_width, sp)}' 

380 f'{vr}{lf_disp.center(rc_max_width, sp)}{vr}' 

381 ) 

382 drg_disp = f'DRAG={drag if drag is not None else "n/a"}' 

383 drg_sect = f'{vr}{drg_disp.center(max_width, sp)}{vr}' 

384 

385 return ( 

386 section_sep, 

387 dur_sect, 

388 section_sep, 

389 es_ef_sects, 

390 name_sect, 

391 ls_lf_sects, 

392 section_sep, 

393 drg_sect, 

394 section_sep, 

395 ) 

396 

397 def activity_on_node_strings(self) -> ActOnNodeTup: 

398 """Provide the activity-on-node element as 9-tuple of strings for e.g. linking along the critical path. 

399 

400 Example: 

401 

402 +-----------------------+ # section_sep 

403 | DUR=31 | # dur_sect 

404 +-----------------------+ # section_sep 

405 |ES=8308| |EF=8339| # es_ef_sects 

406 |-------| 4696 |-------| # name_sect 

407 |LS=8308| |LF=8339| # ls_lf_sects 

408 +-----------------------+ # section_sep 

409 | DRAG=None | # drg_sect 

410 +-----------------------+ # section_sep 

411 """ 

412 lk_width = max(len(f'{lk}=') for lk in ('ES', 'EF', 'LS', 'LF')) 

413 sp, hr, vr = ' ', '-', '|' 

414 bd_width = len(vr) 

415 max_width, lc_max_width, cc_max_width, rc_max_width = 0, 0, 0, 0 

416 lc_max_width = max(len(str(self.es)), len(str(self.ls))) + lk_width 

417 cc_max_width = len(str(self.name)) 

418 rc_max_width = max(len(str(self.ef)), len(str(self.lf))) + lk_width 

419 max_width = sum((lc_max_width, cc_max_width, rc_max_width)) + 2 * bd_width # inner borders 

420 

421 section_sep = f'+{hr * max_width}+' 

422 

423 dur_disp = f'DUR={self.duration}' 

424 dur_sect = f'{vr}{dur_disp.center(max_width, sp)}{vr}' 

425 es_disp = f'ES={self.es}' 

426 ef_disp = f'EF={self.ef}' 

427 es_ef_sects = ( 

428 f'{vr}{es_disp.center(lc_max_width, sp)}' 

429 f'{vr}{sp.center(cc_max_width, sp)}' 

430 f'{vr}{ef_disp.center(rc_max_width, sp)}{vr}' 

431 ) 

432 name_sect = ( 

433 f'{vr}{hr.center(lc_max_width, hr)}' 

434 f'{vr}{str(self.name).center(cc_max_width, sp)}' 

435 f'{vr}{hr.center(rc_max_width, hr)}{vr}' 

436 ) 

437 ls_disp = f'LS={self.ls}' 

438 lf_disp = f'LF={self.lf}' 

439 ls_lf_sects = ( 

440 f'{vr}{ls_disp.center(lc_max_width, sp)}' 

441 f'{vr}{sp.center(cc_max_width, sp)}' 

442 f'{vr}{lf_disp.center(rc_max_width, sp)}{vr}' 

443 ) 

444 drg_disp = f'DRAG={self.drag if self.drag is not None else "n/a"}' 

445 drg_sect = f'{vr}{drg_disp.center(max_width, sp)}{vr}' 

446 

447 return ( 

448 section_sep, 

449 dur_sect, 

450 section_sep, 

451 es_ef_sects, 

452 name_sect, 

453 ls_lf_sects, 

454 section_sep, 

455 drg_sect, 

456 section_sep, 

457 ) 

458 

459 @staticmethod 

460 def aon_strings_with_link(aon_strings: ActOnNodeTup) -> ActOnNodeTup: 

461 """DRY.""" 

462 arrow = ' => ' 

463 spacer = ' ' * len(arrow) 

464 link: ActOnNodeTup = (spacer, spacer, spacer, spacer, arrow, spacer, spacer, spacer, spacer) 

465 return tuple(f'{rec}{spc}' for rec, spc in zip(aon_strings, link)) # type: ignore 

466 

467 @staticmethod 

468 def aon_diagram_text(aons: list[ActOnNodeMap]) -> str: 

469 """Build a text representation for a path with activity-on-nodes linked directionally.""" 

470 dia = [] 

471 for aon in aons[:-1]: 

472 dia.append(Node.aon_strings_with_link(Node.aon_strings(aon))) 

473 dia.append(Node.aon_strings(aons[-1])) 

474 return '\n'.join(''.join(row) for row in zip(*dia)) + '\n' 

475 

476 def aon_diagram_text_dump(self) -> str: 

477 """Build a text representation for the critical path with activity-on-nodes linked directionally.""" 

478 return self.aon_diagram_text(self.activity_on_node_diagram_data()) 

479 

480 def activity_on_node_strings_with_link(self) -> ActOnNodeTup: 

481 """DRY.""" 

482 return self.aon_strings_with_link(self.activity_on_node_strings()) 

483 

484 def activity_on_node_text(self) -> str: 

485 """Provide the activity-on-node element as string with trailing newline for e.g. a textual diagram.""" 

486 return '\n'.join(self.activity_on_node_strings()) + '\n' 

487 

488 def is_acyclic(self) -> bool: 

489 g = dict((node.name, tuple(child.name for child in node.to_nodes)) for node in self.nodes) 

490 return not cyclic(g)