Coverage for prosessilouhinta/cpa.py: 84.45%
325 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 21:51:33 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 21:51:33 +00:00
1"""Naive Critical Path Analysis (CPA) implementation."""
3import json
4import pathlib
5from typing import no_type_check # Self when 3.11 is lowest bound
7ActOnNodeMap = dict[str, str | int | float | None]
8ActOnNodeTup = tuple[str, str, str, str, str, str, str, str, str]
9ENCODING = 'utf-8'
12@no_type_check
13def cyclic(digraph) -> bool:
14 """Determine if the directed graph has a cycle per a stack of iterators.
16 The dgraph structure maps vertices to iterables of neighbouring vertices.
18 # Examples:
20 >>> cyclic({})
21 False
22 >>> cyclic({'a': ('a',)})
23 True
24 >>> cyclic({'a': ('z',), 'z': ('b',), 'b': ('y', 'z')})
25 True
26 >>> cyclic({'a': ('z',), 'z': ('b',), 'b': ('y', 'c')})
27 False
28 """
29 visited = set()
30 path = []
31 paths = set(path)
32 sit = [iter(digraph)]
33 while sit:
34 for vertex in sit[-1]:
35 if vertex in paths:
36 return True
37 elif vertex not in visited:
38 visited.add(vertex)
39 path.append(vertex)
40 paths.add(vertex)
41 sit.append(iter(digraph.get(vertex, tuple())))
42 break
43 else:
44 paths.discard(path.pop() if path else None)
45 sit.pop()
46 return False
49@no_type_check
50class Node:
51 """Represents a task with linked nodes in an act precedence network."""
53 @no_type_check
54 def __init__(self, name, duration=None, lag=0) -> None:
55 """Initialize the model."""
56 self.parent: Node | None = None
57 self.name: str = name # model.NAME shall be unique
58 self.description: str | None = None
59 self.duration: float | None = duration # model.DUR in agreed time units
60 self.lag: int | float | None = lag # preceding.finished plus in agreed time units
61 self.drag: int | float | None = None # model.DRAG in agreed time units
62 self._es: int | float | None = None # model.ES
63 self.ef: int | float | None = None # model.EF
64 self.ls: int | float | None = None # model.LS
65 self.lf: int | float | None = None # model.LF
66 self._free_float: int | float | None = None # act can delay without change start of any other act
67 self._total_float: int | float | None = None # act can delay without increase of overall project dur
68 self.nodes: list[Node] = []
69 self.name_to_node: dict[str, Node] = {}
70 self.to_nodes: set[Node] = set()
71 self.incoming_nodes: set[Node] = set()
72 self.forward_pending: set[Node] = set()
73 self.backward_pending: list[Node] = []
74 self._critical_path = None
75 self.exit_node: Node | None = None
77 @no_type_check
78 def lookup_node(self, name: str):
79 return self.name_to_node[name]
81 @no_type_check
82 def get_or_create_node(self, name, **kwargs):
83 try:
84 return self.lookup_node(name=name)
85 except KeyError:
86 n = Node(name=name, **kwargs)
87 self.add(n)
88 return n
90 @property
91 def es(self) -> int | float | None:
92 return self._es
94 @es.setter
95 def es(self, v: int | float) -> None:
96 self._es = v
97 if self.parent:
98 self.parent.forward_pending.add(self)
100 def __repr__(self) -> str:
101 return str(self.name)
103 def __hash__(self) -> int:
104 return hash(self.name)
106 @no_type_check
107 def __eq__(self, other) -> bool:
108 if not isinstance(other, Node): 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true
109 return False
110 return self.name == other.name
112 @no_type_check
113 def __ne__(self, other) -> bool:
114 return not self.__eq__(other)
116 @no_type_check
117 def add(self, node):
118 """Includes the given node as a child node."""
119 if not isinstance(node, Node): 119 ↛ 120line 119 didn't jump to line 120, because the condition on line 119 was never true
120 raise ValueError(f'tried to add non-Node instance {type(node).__name__}')
121 if node.duration is None: 121 ↛ 122line 121 didn't jump to line 122, because the condition on line 121 was never true
122 raise ValueError('unspecified duration')
123 if node in self.nodes: 123 ↛ 124line 123 didn't jump to line 124, because the condition on line 123 was never true
124 return
125 self.nodes.append(node)
126 self.name_to_node[node.name] = node
127 node.parent = self
128 self.forward_pending.add(node)
129 self._critical_path = None
130 return node
132 @no_type_check
133 def link(self, from_node, to_node=None) -> 'Node':
134 """Directed link of two child nodes in the graph."""
135 if not isinstance(from_node, Node): 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true
136 from_node = self.name_to_node[from_node]
137 if not isinstance(from_node, Node): 137 ↛ 138line 137 didn't jump to line 138, because the condition on line 137 was never true
138 raise ValueError(f'tried to link from non-Node instance {type(from_node).__name__}')
139 if to_node is not None: 139 ↛ 147line 139 didn't jump to line 147, because the condition on line 139 was never false
140 if not isinstance(to_node, Node): 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true
141 to_node = self.name_to_node[to_node]
142 if not isinstance(to_node, Node): 142 ↛ 143line 142 didn't jump to line 143, because the condition on line 142 was never true
143 raise ValueError(f'tried to link to non-Node instance {type(to_node).__name__}')
144 from_node.to_nodes.add(to_node)
145 to_node.incoming_nodes.add(from_node)
146 else:
147 self.to_nodes.add(from_node)
148 from_node.incoming_nodes.add(self)
149 return self
151 @no_type_check
152 def load_network(self, file_path: str) -> None:
153 """Load the network from JSON file with matching top level name value."""
154 if not file_path.strip(): 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true
155 raise ValueError('cannot load from empty file path string')
156 fp = pathlib.Path(file_path)
157 if not fp.is_file(): 157 ↛ 158line 157 didn't jump to line 158, because the condition on line 157 was never true
158 raise ValueError('cannot load from non existing file')
159 if not fp.stat().st_size: 159 ↛ 160line 159 didn't jump to line 160, because the condition on line 159 was never true
160 raise ValueError('cannot load from empty file')
161 with open(fp, 'rt', encoding=ENCODING) as handle:
162 network = json.load(handle)
163 if not network: 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true
164 raise ValueError('cannot load from empty network')
165 name = network.get('name', 'not-existing-name-key')
166 if name != self.name: 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true
167 raise ValueError(f'cannot load network with name ({name}) into ({self.name})')
168 x_nodes = {}
169 for x_key, x_node in network.get('nodes', {}).items():
170 duration = x_node.get('duration', None)
171 lag = x_node.get('lag', 0)
172 x_nodes[x_key] = self.add(Node(x_key, duration=duration, lag=lag))
173 for edge in network.get('edges', []):
174 if len(edge) < 2: 174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true
175 if not edge:
176 raise ValueError('cannot build an empty edge')
177 raise ValueError(f'cannot build directed edge from ({edge[0]}) without a target')
178 if len(edge) == 2:
179 if any(edg not in x_nodes for edg in edge): 179 ↛ 180line 179 didn't jump to line 180, because the condition on line 179 was never true
180 raise ValueError(f'cannot build edge with nodes not in {x_nodes}')
181 self.link(x_nodes[edge[0]], x_nodes[edge[1]])
182 else:
183 raise NotImplementedError('Compressed edges notation not yet implemented.')
184 self.update_all()
186 @property
187 @no_type_check
188 def first_nodes(self):
189 """Child nodes that have no in-bound dependencies."""
190 first = set(self.nodes)
191 for node in self.nodes:
192 first.difference_update(node.to_nodes)
193 return first
195 @property
196 @no_type_check
197 def last_nodes(self):
198 """Child nodes that have to out-bound dependencies."""
199 return [node for node in self.nodes if not node.to_nodes]
201 @no_type_check
202 def update_forward(self) -> None:
203 """Updates forward timing calculations for the current node assuming min(es) is set."""
204 changed = False
205 if self.es is not None and self.duration is not None:
206 self.ef = self.es + self.duration
207 changed = True
209 if changed:
210 for to_node in self.to_nodes:
211 if to_node == self: 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true
212 continue
213 add_some_lag = self.ef + to_node.lag
214 to_node.es = add_some_lag if to_node.es is None else max(to_node.es, add_some_lag)
216 if self.parent: 216 ↛ 210line 216 didn't jump to line 210, because the condition on line 216 was never false
217 self.parent.forward_pending.add(to_node)
219 if self.parent: 219 ↛ exitline 219 didn't return from function 'update_forward', because the condition on line 219 was never false
220 self.parent.backward_pending.append(self)
222 @no_type_check
223 def update_backward(self) -> None:
224 """Updates backward timing calculations for the current node."""
225 if self.lf is None:
226 if self.to_nodes:
227 self.lf = min(target.ls for target in self.to_nodes)
228 else:
229 self.lf = self.ef
230 if self.lf is None: 230 ↛ 231line 230 didn't jump to line 231, because the condition on line 230 was never true
231 raise ValueError('No latest finish time found')
232 self.ls = self.lf - self.duration
234 @no_type_check
235 def add_exit(self) -> None:
236 """Links all leaf nodes to a common exit node."""
237 if self.exit_node is None:
238 self.exit_node = Node('COMMON_EXIT', duration=0)
239 self.add(self.exit_node)
240 for node in self.nodes:
241 if node is self.exit_node:
242 continue
243 if not node.to_nodes:
244 self.link(from_node=node, to_node=self.exit_node)
246 @no_type_check
247 def update_all(self) -> None:
248 """Updates timing calculations for all children nodes."""
249 if not self.is_acyclic(): 249 ↛ 250line 249 didn't jump to line 250, because the condition on line 249 was never true
250 raise TypeError('Network contains cycles')
252 for node in list(self.forward_pending.intersection(self.first_nodes)):
253 node.es = self.lag + node.lag
254 node.update_forward()
255 self.forward_pending.remove(node)
257 forward_priors = set()
258 while self.forward_pending:
259 q = set(self.forward_pending)
260 self.forward_pending.clear()
261 while q:
262 node = q.pop()
263 if node in forward_priors: 263 ↛ 264line 263 didn't jump to line 264, because the condition on line 263 was never true
264 continue
265 node.update_forward()
267 backward_priors = set()
268 while self.backward_pending:
269 node = self.backward_pending.pop()
270 if node in backward_priors: 270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true
271 continue
272 node.update_backward()
274 duration, path, priors = self.get_critical_path(as_item=True)
275 self._critical_path = duration, path, priors
276 self.duration = duration
277 self.es = path[0].es
278 self.ls = path[0].ls
279 self.ef = path[-1].ef
280 self.lf = path[-1].lf
282 @no_type_check
283 def get_critical_path(self, as_item: bool = False):
284 """Finds the longest path in among the child nodes."""
285 if self._critical_path is not None:
286 return self._critical_path[1]
287 longest = None
288 q = [(_.duration, [_], set([_])) for _ in self.first_nodes]
289 while q:
290 item = length, path, priors = q.pop(0)
291 if longest is None:
292 longest = item
293 else:
294 try:
295 longest = max(longest, item)
296 except TypeError:
297 longest = longest
298 for to_node in path[-1].to_nodes:
299 if to_node in priors: 299 ↛ 300line 299 didn't jump to line 300, because the condition on line 299 was never true
300 continue
301 q.append((length + to_node.duration, path + [to_node], priors.union([to_node])))
302 if longest is None: 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true
303 return
304 elif as_item: 304 ↛ 307line 304 didn't jump to line 307, because the condition on line 304 was never false
305 return longest
306 else:
307 return longest[1]
309 @no_type_check
310 def activity(self, node) -> ActOnNodeMap:
311 """Provide the activity-on-node element of given node as JSON serializable dict."""
312 return {
313 'duration': node.duration,
314 'earliest_start': node.es,
315 'earliest_finish': node.ef,
316 'name': node.name,
317 'latest_start': node.ls,
318 'latest_finish': node.lf,
319 'drag': node.drag,
320 }
322 @no_type_check
323 def activity_on_node(self) -> ActOnNodeMap:
324 """Provide the activity-on-node element as JSON serializable dict."""
325 return self.activity(self)
327 @no_type_check
328 def activity_on_node_diagram_data(self) -> list[ActOnNodeMap]:
329 """Provide the activity-on-node diagram as JSON serializable list of dicts."""
330 cp = self.get_critical_path()
331 return [self.activity(node) for node in cp] if cp else []
333 @staticmethod
334 @no_type_check
335 def aon_strings(aon: ActOnNodeMap) -> ActOnNodeTup:
336 """Provide the activity-on-node element from map data as 9-tuple of strings.
338 Example:
340 cf. example of activity_on_node_strings method
341 """
342 duration = aon['duration']
343 earliest_start = aon['earliest_start']
344 earliest_finish = aon['earliest_finish']
345 name = aon['name']
346 latest_start = aon['latest_start']
347 latest_finish = aon['latest_finish']
348 drag = aon['drag']
350 lk_width = max(len(f'{lk}=') for lk in ('ES', 'EF', 'LS', 'LF'))
351 sp, hr, vr = ' ', '-', '|'
352 bd_width = len(vr)
353 max_width, lc_max_width, cc_max_width, rc_max_width = 0, 0, 0, 0
354 lc_max_width = max(len(str(earliest_start)), len(str(latest_start))) + lk_width
355 cc_max_width = len(str(name))
356 rc_max_width = max(len(str(earliest_finish)), len(str(latest_finish))) + lk_width
357 max_width = sum((lc_max_width, cc_max_width, rc_max_width)) + 2 * bd_width # inner borders
359 section_sep = f'+{hr * max_width}+'
361 dur_disp = f'DUR={duration}'
362 dur_sect = f'{vr}{dur_disp.center(max_width, sp)}{vr}'
363 es_disp = f'ES={earliest_start}'
364 ef_disp = f'EF={earliest_finish}'
365 es_ef_sects = (
366 f'{vr}{es_disp.center(lc_max_width, sp)}'
367 f'{vr}{sp.center(cc_max_width, sp)}'
368 f'{vr}{ef_disp.center(rc_max_width, sp)}{vr}'
369 )
370 name_sect = (
371 f'{vr}{hr.center(lc_max_width, hr)}'
372 f'{vr}{str(name).center(cc_max_width, sp)}'
373 f'{vr}{hr.center(rc_max_width, hr)}{vr}'
374 )
375 ls_disp = f'LS={latest_start}'
376 lf_disp = f'LF={latest_finish}'
377 ls_lf_sects = (
378 f'{vr}{ls_disp.center(lc_max_width, sp)}'
379 f'{vr}{sp.center(cc_max_width, sp)}'
380 f'{vr}{lf_disp.center(rc_max_width, sp)}{vr}'
381 )
382 drg_disp = f'DRAG={drag if drag is not None else "n/a"}'
383 drg_sect = f'{vr}{drg_disp.center(max_width, sp)}{vr}'
385 return (
386 section_sep,
387 dur_sect,
388 section_sep,
389 es_ef_sects,
390 name_sect,
391 ls_lf_sects,
392 section_sep,
393 drg_sect,
394 section_sep,
395 )
397 def activity_on_node_strings(self) -> ActOnNodeTup:
398 """Provide the activity-on-node element as 9-tuple of strings for e.g. linking along the critical path.
400 Example:
402 +-----------------------+ # section_sep
403 | DUR=31 | # dur_sect
404 +-----------------------+ # section_sep
405 |ES=8308| |EF=8339| # es_ef_sects
406 |-------| 4696 |-------| # name_sect
407 |LS=8308| |LF=8339| # ls_lf_sects
408 +-----------------------+ # section_sep
409 | DRAG=None | # drg_sect
410 +-----------------------+ # section_sep
411 """
412 lk_width = max(len(f'{lk}=') for lk in ('ES', 'EF', 'LS', 'LF'))
413 sp, hr, vr = ' ', '-', '|'
414 bd_width = len(vr)
415 max_width, lc_max_width, cc_max_width, rc_max_width = 0, 0, 0, 0
416 lc_max_width = max(len(str(self.es)), len(str(self.ls))) + lk_width
417 cc_max_width = len(str(self.name))
418 rc_max_width = max(len(str(self.ef)), len(str(self.lf))) + lk_width
419 max_width = sum((lc_max_width, cc_max_width, rc_max_width)) + 2 * bd_width # inner borders
421 section_sep = f'+{hr * max_width}+'
423 dur_disp = f'DUR={self.duration}'
424 dur_sect = f'{vr}{dur_disp.center(max_width, sp)}{vr}'
425 es_disp = f'ES={self.es}'
426 ef_disp = f'EF={self.ef}'
427 es_ef_sects = (
428 f'{vr}{es_disp.center(lc_max_width, sp)}'
429 f'{vr}{sp.center(cc_max_width, sp)}'
430 f'{vr}{ef_disp.center(rc_max_width, sp)}{vr}'
431 )
432 name_sect = (
433 f'{vr}{hr.center(lc_max_width, hr)}'
434 f'{vr}{str(self.name).center(cc_max_width, sp)}'
435 f'{vr}{hr.center(rc_max_width, hr)}{vr}'
436 )
437 ls_disp = f'LS={self.ls}'
438 lf_disp = f'LF={self.lf}'
439 ls_lf_sects = (
440 f'{vr}{ls_disp.center(lc_max_width, sp)}'
441 f'{vr}{sp.center(cc_max_width, sp)}'
442 f'{vr}{lf_disp.center(rc_max_width, sp)}{vr}'
443 )
444 drg_disp = f'DRAG={self.drag if self.drag is not None else "n/a"}'
445 drg_sect = f'{vr}{drg_disp.center(max_width, sp)}{vr}'
447 return (
448 section_sep,
449 dur_sect,
450 section_sep,
451 es_ef_sects,
452 name_sect,
453 ls_lf_sects,
454 section_sep,
455 drg_sect,
456 section_sep,
457 )
459 @staticmethod
460 def aon_strings_with_link(aon_strings: ActOnNodeTup) -> ActOnNodeTup:
461 """DRY."""
462 arrow = ' => '
463 spacer = ' ' * len(arrow)
464 link: ActOnNodeTup = (spacer, spacer, spacer, spacer, arrow, spacer, spacer, spacer, spacer)
465 return tuple(f'{rec}{spc}' for rec, spc in zip(aon_strings, link)) # type: ignore
467 @staticmethod
468 def aon_diagram_text(aons: list[ActOnNodeMap]) -> str:
469 """Build a text representation for a path with activity-on-nodes linked directionally."""
470 dia = []
471 for aon in aons[:-1]:
472 dia.append(Node.aon_strings_with_link(Node.aon_strings(aon)))
473 dia.append(Node.aon_strings(aons[-1]))
474 return '\n'.join(''.join(row) for row in zip(*dia)) + '\n'
476 def aon_diagram_text_dump(self) -> str:
477 """Build a text representation for the critical path with activity-on-nodes linked directionally."""
478 return self.aon_diagram_text(self.activity_on_node_diagram_data())
480 def activity_on_node_strings_with_link(self) -> ActOnNodeTup:
481 """DRY."""
482 return self.aon_strings_with_link(self.activity_on_node_strings())
484 def activity_on_node_text(self) -> str:
485 """Provide the activity-on-node element as string with trailing newline for e.g. a textual diagram."""
486 return '\n'.join(self.activity_on_node_strings()) + '\n'
488 def is_acyclic(self) -> bool:
489 g = dict((node.name, tuple(child.name for child in node.to_nodes)) for node in self.nodes)
490 return not cyclic(g)