Coverage for ajallaan/api.py: 0.00%

133 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 15:19:53 +00:00

1"""In due time (Finnish: ajallaan) - reporting on worklog entries of some ticket system - api""" 

2 

3import argparse 

4import datetime as dti 

5import json 

6import logging 

7import operator 

8import pathlib 

9import sys 

10from typing import no_type_check 

11 

12from ajallaan import ( 

13 API_TOKEN, 

14 API_USER, 

15 DEBUG, 

16 ENCODING, 

17 QUIET, 

18 VERBOSE, 

19 WORKLOG_AUTHOR, 

20 log, 

21) 

22 

23USER_KEY = 'u14826' 

24EPH_TITLE = 'EPIC-TITLE-MISSING-IN-DATA' 

25workdays = { 

26 '01': 21, 

27 '02': 20, 

28 '03': 23, 

29 '04': 18, 

30 '05': 20, 

31 '06': 20, 

32 '07': 21, 

33 '08': 21, 

34 '09': 21, 

35 '10': 22, 

36 '11': 21, 

37 '12': 15, 

38} 

39 

40 

41@no_type_check 

42def behind_the_moon(): 

43 """Spike content in here or empty (goal).""" 

44 year_wds = sum(workdays.values()) 

45 first_hour = 4 

46 last_hour = 18 

47 source = sys.argv[1] 

48 year = 2023 

49 # f'worklog-monthly-{year}-{mm}-to-date-{to_date}.json' 

50 # assume 2023 for now, ... 

51 mm = source.lstrip('./').replace(f'worklog-monthly-{year}-', '', 1).split('-', 1)[0] 

52 

53 mm_wds = workdays[mm] 

54 

55 def my_time(epocms: int) -> str: 

56 """Convert EPOC timestamp to local ISO string.""" 

57 return dti.datetime.fromtimestamp(epocms // 1000).strftime('%Y-%m-%d %H:%M:%S') 

58 

59 def my_duration(seconds: int) -> float: 

60 """Convert number of seconds to fractional hours.""" 

61 return seconds / 3600.0 

62 

63 worklog = json.load(open(source, 'rt', encoding=ENCODING)) 

64 print(len(worklog['projects'])) 

65 

66 pi_counts = [[i['key'] for i in p['issues']] for p in worklog['projects']] 

67 print(pi_counts) 

68 

69 my_hours = 0 

70 my_wls = {} 

71 issue_titles = {} 

72 epic_of = {} 

73 for project in worklog['projects']: 

74 p_key = project['key'] 

75 my_wls[p_key] = {} 

76 for issue in project['issues']: 

77 i_key = issue['key'] 

78 epic_of[i_key] = issue.get('epicKey', 'MISSING-EPIC') 

79 issue_titles[i_key] = issue['summary'] 

80 my_wls[p_key][i_key] = [] 

81 for entry in issue['workLogs']: 

82 if entry['authorUserKey'] == USER_KEY: 

83 these_hours = my_duration(entry['timeSpent']) 

84 my_hours += these_hours 

85 my_wls[p_key][i_key].append( 

86 ( 

87 my_time(entry['workStart']), 

88 these_hours, 

89 entry['comment'], 

90 ) 

91 ) 

92 

93 daily = {} 

94 suspicious_start = [] 

95 epic_totals = {v: 0.0 for v in epic_of.values()} 

96 for p_key in my_wls.keys(): 

97 print(f'{p_key}:') 

98 for i_key in my_wls[p_key].keys(): 

99 th_issue = sum(duration for (_, duration, _) in my_wls[p_key][i_key]) 

100 epic_totals[epic_of[i_key]] += th_issue 

101 if th_issue > 0: 

102 print(f'- {i_key} - ({issue_titles[i_key]}) - total hours = {round(th_issue, 2) :.2f}:') 

103 for start, duration, comment in my_wls[p_key][i_key]: 

104 print(f' + {start}: {round(duration, 2) :.2f} hours - {comment}') 

105 for start, duration, comment in my_wls[p_key][i_key]: 

106 date_slot = start[:10] 

107 if date_slot not in daily: 

108 daily[date_slot] = 0.0 

109 daily[date_slot] += duration 

110 hour_code = start[11:13] 

111 hour_num = int(hour_code) 

112 if not (first_hour <= hour_num <= last_hour): 

113 suspicious_start.append((i_key, issue_titles[i_key], start)) 

114 

115 print(f'Total: {round(my_hours, 2) :.2f} hours') 

116 

117 print('Hours per Epic:') 

118 epic_totals = dict(reversed(sorted(epic_totals.items(), key=operator.itemgetter(1)))) 

119 for epic_key, th_epic in epic_totals.items(): 

120 if th_epic > 0 and epic_key != 'MISSING-EPIC': 

121 title = issue_titles.get(epic_key, EPH_TITLE) 

122 print(f'- {epic_key} - ({title}) - total hours = {round(th_epic, 2) :.2f}:') 

123 

124 if 'MISSING-EPIC' in epic_totals: 

125 print('Hours without Epic:') 

126 th_wo_epic = epic_totals['MISSING-EPIC'] 

127 if th_wo_epic > 0: 

128 print(f'- MISSING-EPIC - {EPH_TITLE} - total hours = {round(th_wo_epic, 2) :.2f}:') 

129 

130 print(f'Workdays of {year}-{mm}: {mm_wds}') 

131 print(f'Workdays of full {year}: {year_wds}') 

132 print(f'Hours of contract for {year}-{mm}: {round(mm_wds * 8.6667, 2) :.2f}') 

133 

134 print('Hours per workday:') 

135 daily = dict(sorted(daily.items(), key=operator.itemgetter(0))) 

136 delta_mm = 0 

137 for date_slot, hours in daily.items(): 

138 if hours > 0: 

139 delta = hours - 8.6667 

140 delta_mm += delta 

141 print( 

142 f'- {date_slot} total hours = {round(hours, 2) :.2f}' 

143 f' -> {round(delta_mm, 2) :+.2f} ({round(delta, 2) :+.2f})' 

144 ) 

145 

146 print(f'Monthly contract balance is {round(my_hours, 2) :.2f} -> Delta = {round(delta_mm, 2) :+.2f}') 

147 

148 if suspicious_start: 

149 print( 

150 f'WARNING: {len(suspicious_start)} worklog entries found with' 

151 f' suspicious start times outside of [{first_hour}, {last_hour}] UTC:' 

152 ) 

153 for key, title, start in suspicious_start: 

154 print(f'- {start}: {key} - {title}') 

155 

156 

157@no_type_check 

158def process(options: argparse.Namespace): 

159 """Process the command line request.""" 

160 if not options: 

161 log.error('no data given to process') 

162 return 1 

163 

164 debug = options.debug if options.debug else DEBUG 

165 verbose = options.verbose if options.verbose else VERBOSE 

166 quiet = options.quiet if options.quiet else QUIET 

167 

168 if quiet: 

169 debug = verbose = not quiet 

170 

171 if debug: 

172 log.setLevel(logging.DEBUG) 

173 log.debug('log level debug requested') 

174 elif verbose: 

175 log.setLevel(logging.INFO) 

176 elif quiet: 

177 log.setLevel(logging.ERROR) 

178 

179 if not (options.user and options.token or API_USER and API_TOKEN): 

180 log.info('entering local mode as no coherent credentials found') 

181 in_path = pathlib.Path(options.in_path) 

182 if not in_path.is_file() or not in_path.stat().st_size: 

183 log.error(f'given path ({in_path}) is either no file or has no content') 

184 return 1 

185 

186 try: 

187 data = json.load(in_path.open()) 

188 except RuntimeError as err: 

189 log.error(f'parsing path ({in_path}) as json failed with error ({err})') 

190 return 1 

191 log.warning( 

192 f'would process local data of {sys.getsizeof(data)} bytes in host memory' ' for worker inherent in data' 

193 ) 

194 return 0 

195 

196 if options.in_path: 

197 log.error('why use api user and token when local input is given - confusion leads to error') 

198 return 2 

199 

200 if not options.worker and not WORKLOG_AUTHOR: 

201 log.info(f'setting worker equal to api user ({options.user})') 

202 options.worker = options.user or API_USER 

203 

204 log.warning('would fetch the worklog data using the given coordinates and credentials') 

205 return 0