Coverage for ajallaan/api.py: 0.00%
133 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 15:19:53 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 15:19:53 +00:00
1"""In due time (Finnish: ajallaan) - reporting on worklog entries of some ticket system - api"""
3import argparse
4import datetime as dti
5import json
6import logging
7import operator
8import pathlib
9import sys
10from typing import no_type_check
12from ajallaan import (
13 API_TOKEN,
14 API_USER,
15 DEBUG,
16 ENCODING,
17 QUIET,
18 VERBOSE,
19 WORKLOG_AUTHOR,
20 log,
21)
23USER_KEY = 'u14826'
24EPH_TITLE = 'EPIC-TITLE-MISSING-IN-DATA'
25workdays = {
26 '01': 21,
27 '02': 20,
28 '03': 23,
29 '04': 18,
30 '05': 20,
31 '06': 20,
32 '07': 21,
33 '08': 21,
34 '09': 21,
35 '10': 22,
36 '11': 21,
37 '12': 15,
38}
41@no_type_check
42def behind_the_moon():
43 """Spike content in here or empty (goal)."""
44 year_wds = sum(workdays.values())
45 first_hour = 4
46 last_hour = 18
47 source = sys.argv[1]
48 year = 2023
49 # f'worklog-monthly-{year}-{mm}-to-date-{to_date}.json'
50 # assume 2023 for now, ...
51 mm = source.lstrip('./').replace(f'worklog-monthly-{year}-', '', 1).split('-', 1)[0]
53 mm_wds = workdays[mm]
55 def my_time(epocms: int) -> str:
56 """Convert EPOC timestamp to local ISO string."""
57 return dti.datetime.fromtimestamp(epocms // 1000).strftime('%Y-%m-%d %H:%M:%S')
59 def my_duration(seconds: int) -> float:
60 """Convert number of seconds to fractional hours."""
61 return seconds / 3600.0
63 worklog = json.load(open(source, 'rt', encoding=ENCODING))
64 print(len(worklog['projects']))
66 pi_counts = [[i['key'] for i in p['issues']] for p in worklog['projects']]
67 print(pi_counts)
69 my_hours = 0
70 my_wls = {}
71 issue_titles = {}
72 epic_of = {}
73 for project in worklog['projects']:
74 p_key = project['key']
75 my_wls[p_key] = {}
76 for issue in project['issues']:
77 i_key = issue['key']
78 epic_of[i_key] = issue.get('epicKey', 'MISSING-EPIC')
79 issue_titles[i_key] = issue['summary']
80 my_wls[p_key][i_key] = []
81 for entry in issue['workLogs']:
82 if entry['authorUserKey'] == USER_KEY:
83 these_hours = my_duration(entry['timeSpent'])
84 my_hours += these_hours
85 my_wls[p_key][i_key].append(
86 (
87 my_time(entry['workStart']),
88 these_hours,
89 entry['comment'],
90 )
91 )
93 daily = {}
94 suspicious_start = []
95 epic_totals = {v: 0.0 for v in epic_of.values()}
96 for p_key in my_wls.keys():
97 print(f'{p_key}:')
98 for i_key in my_wls[p_key].keys():
99 th_issue = sum(duration for (_, duration, _) in my_wls[p_key][i_key])
100 epic_totals[epic_of[i_key]] += th_issue
101 if th_issue > 0:
102 print(f'- {i_key} - ({issue_titles[i_key]}) - total hours = {round(th_issue, 2) :.2f}:')
103 for start, duration, comment in my_wls[p_key][i_key]:
104 print(f' + {start}: {round(duration, 2) :.2f} hours - {comment}')
105 for start, duration, comment in my_wls[p_key][i_key]:
106 date_slot = start[:10]
107 if date_slot not in daily:
108 daily[date_slot] = 0.0
109 daily[date_slot] += duration
110 hour_code = start[11:13]
111 hour_num = int(hour_code)
112 if not (first_hour <= hour_num <= last_hour):
113 suspicious_start.append((i_key, issue_titles[i_key], start))
115 print(f'Total: {round(my_hours, 2) :.2f} hours')
117 print('Hours per Epic:')
118 epic_totals = dict(reversed(sorted(epic_totals.items(), key=operator.itemgetter(1))))
119 for epic_key, th_epic in epic_totals.items():
120 if th_epic > 0 and epic_key != 'MISSING-EPIC':
121 title = issue_titles.get(epic_key, EPH_TITLE)
122 print(f'- {epic_key} - ({title}) - total hours = {round(th_epic, 2) :.2f}:')
124 if 'MISSING-EPIC' in epic_totals:
125 print('Hours without Epic:')
126 th_wo_epic = epic_totals['MISSING-EPIC']
127 if th_wo_epic > 0:
128 print(f'- MISSING-EPIC - {EPH_TITLE} - total hours = {round(th_wo_epic, 2) :.2f}:')
130 print(f'Workdays of {year}-{mm}: {mm_wds}')
131 print(f'Workdays of full {year}: {year_wds}')
132 print(f'Hours of contract for {year}-{mm}: {round(mm_wds * 8.6667, 2) :.2f}')
134 print('Hours per workday:')
135 daily = dict(sorted(daily.items(), key=operator.itemgetter(0)))
136 delta_mm = 0
137 for date_slot, hours in daily.items():
138 if hours > 0:
139 delta = hours - 8.6667
140 delta_mm += delta
141 print(
142 f'- {date_slot} total hours = {round(hours, 2) :.2f}'
143 f' -> {round(delta_mm, 2) :+.2f} ({round(delta, 2) :+.2f})'
144 )
146 print(f'Monthly contract balance is {round(my_hours, 2) :.2f} -> Delta = {round(delta_mm, 2) :+.2f}')
148 if suspicious_start:
149 print(
150 f'WARNING: {len(suspicious_start)} worklog entries found with'
151 f' suspicious start times outside of [{first_hour}, {last_hour}] UTC:'
152 )
153 for key, title, start in suspicious_start:
154 print(f'- {start}: {key} - {title}')
157@no_type_check
158def process(options: argparse.Namespace):
159 """Process the command line request."""
160 if not options:
161 log.error('no data given to process')
162 return 1
164 debug = options.debug if options.debug else DEBUG
165 verbose = options.verbose if options.verbose else VERBOSE
166 quiet = options.quiet if options.quiet else QUIET
168 if quiet:
169 debug = verbose = not quiet
171 if debug:
172 log.setLevel(logging.DEBUG)
173 log.debug('log level debug requested')
174 elif verbose:
175 log.setLevel(logging.INFO)
176 elif quiet:
177 log.setLevel(logging.ERROR)
179 if not (options.user and options.token or API_USER and API_TOKEN):
180 log.info('entering local mode as no coherent credentials found')
181 in_path = pathlib.Path(options.in_path)
182 if not in_path.is_file() or not in_path.stat().st_size:
183 log.error(f'given path ({in_path}) is either no file or has no content')
184 return 1
186 try:
187 data = json.load(in_path.open())
188 except RuntimeError as err:
189 log.error(f'parsing path ({in_path}) as json failed with error ({err})')
190 return 1
191 log.warning(
192 f'would process local data of {sys.getsizeof(data)} bytes in host memory' ' for worker inherent in data'
193 )
194 return 0
196 if options.in_path:
197 log.error('why use api user and token when local input is given - confusion leads to error')
198 return 2
200 if not options.worker and not WORKLOG_AUTHOR:
201 log.info(f'setting worker equal to api user ({options.user})')
202 options.worker = options.user or API_USER
204 log.warning('would fetch the worklog data using the given coordinates and credentials')
205 return 0