Coverage for arbejdstimer/arbejdstimer.py: 94.82%
239 statements
« prev ^ index » next coverage.py v7.0.1, created at 2023-01-02 19:01 +0100
« prev ^ index » next coverage.py v7.0.1, created at 2023-01-02 19:01 +0100
1# -*- coding: utf-8 -*-
2# pylint: disable=expression-not-assigned,line-too-long
3"""Working hours (Danish arbejdstimer) or not? API."""
4import copy
5import datetime as dti
6import json
7import os
8import pathlib
9import sys
10from typing import Tuple, Union, no_type_check
12from pydantic.error_wrappers import ValidationError
14import arbejdstimer.api as api
16DEBUG_VAR = 'ARBEJDSTIMER_DEBUG'
17DEBUG = os.getenv(DEBUG_VAR)
19ENCODING = 'utf-8'
20ENCODING_ERRORS_POLICY = 'ignore'
22DEFAULT_CONFIG_NAME = '.arbejdstimer.json'
23CfgType = dict[str, Union[dict[str, str], list[dict[str, Union[str, list[str]]]]]]
24WorkingHoursType = Union[tuple[int, int], tuple[None, None]]
25CmdType = Tuple[str, str, str, bool]
26DATE_FMT = '%Y-%m-%d'
27YEAR_MONTH_FORMAT = '%Y-%m'
28DEFAULT_WORK_HOURS_MARKER = (None, None)
29DEFAULT_WORK_HOURS_CLOSED_INTERVAL = (7, 16)
32@no_type_check
33def year_month_me(date) -> str:
34 """DRY."""
35 return date.strftime(YEAR_MONTH_FORMAT)
38@no_type_check
39def load_config(path_or_str):
40 """DRY."""
41 with open(path_or_str, 'rt', encoding=ENCODING) as handle:
42 return json.load(handle)
45def weekday(date: dti.date) -> int:
46 """Return current weekday."""
47 return date.isoweekday()
50def no_weekend(day_number: int) -> bool:
51 """Return if day number is weekend."""
52 return day_number < 6
55def the_hour() -> int:
56 """Return the hour of day as integer within [0, 23]."""
57 return dti.datetime.now().hour
60@no_type_check
61def days_of_year(day=None) -> list[dti.date]:
62 """Return all days of the year that contains the day."""
63 year = dti.date.today().year if day is None else day.year
64 d_start = dti.date(year, 1, 1)
65 d_end = dti.date(year, 12, 31)
66 return [d_start + dti.timedelta(days=x) for x in range((d_end - d_start).days + 1)]
69@no_type_check
70def workdays(off_days: list[dti.date], days=None) -> list[dti.date]:
71 """Return all workdays of the year that contains the day."""
72 if days is None:
73 days = days_of_year(None)
74 return [cand for cand in days if cand not in off_days and no_weekend(weekday(cand))]
77@no_type_check
78def workdays_of_month(work_days, month) -> list[dti.date]:
79 """Return all workdays of the month."""
80 return [work_day for work_day in work_days if year_month_me(work_day) == month]
83@no_type_check
84def workdays_count_per_month(work_days) -> dict[str, int]:
85 """Return the workday count per month of the year that contains the day."""
86 per = {}
87 for work_day in work_days:
88 month = year_month_me(work_day)
89 if month not in per:
90 per[month] = 0
91 per[month] += 1
92 return per
95@no_type_check
96def cumulative_workdays_count_per_month(work_days) -> dict[str, int]:
97 """Return the cumulative workday count per month of the year that contains the day."""
98 per = workdays_count_per_month(work_days)
99 months = list(per)
100 cum = copy.deepcopy(per)
101 for slot, month in enumerate(months, start=1):
102 cum[month] = sum(per[m] for m in months[:slot])
103 return cum
106@no_type_check
107def workdays_count_of_month_in_between(work_days, month, day, first_month, last_month) -> int:
108 """Return the workday count of month to date for day (incl.) given first and last month."""
109 per = workdays_count_per_month(work_days)
110 if month not in per or month < first_month or month > last_month:
111 return 0
113 wds = workdays_of_month(work_days, month)
114 return sum(1 for d in wds if d.day <= day)
117@no_type_check
118def closed_interval_months(work_days) -> tuple[str, str]:
119 """DRY."""
120 return year_month_me(work_days[0]), year_month_me(work_days[-1])
123@no_type_check
124def workdays_count_of_year_in_between(work_days, month, day, first_month=None, last_month=None) -> int:
125 """Return the workday count of year to date for day (incl.) given first and last month."""
126 per = workdays_count_per_month(work_days)
127 if any(
128 (
129 month not in per,
130 first_month and first_month not in per,
131 last_month and last_month not in per,
132 )
133 ):
134 return 0
136 initial, final = closed_interval_months(work_days)
137 if first_month is None:
138 first_month = initial
139 if last_month is None:
140 last_month = final
142 count = 0
143 for d in work_days:
144 ds = year_month_me(d)
145 if first_month <= ds <= last_month:
146 if ds < month or ds == month and d.day <= day:
147 count += 1
149 return count
152@no_type_check
153def remaining_workdays_count_of_year_in_between(work_days, month, day, first_month=None, last_month=None) -> int:
154 """Return the workday count of year from date for day (incl.) given first and last month."""
155 per = workdays_count_per_month(work_days)
156 if any(
157 (
158 month not in per,
159 first_month and first_month not in per,
160 last_month and last_month not in per,
161 )
162 ):
163 return 0
165 initial, final = closed_interval_months(work_days)
166 if first_month is None:
167 first_month = initial
168 if last_month is None:
169 last_month = final
171 count = 0
172 for d in work_days:
173 ds = year_month_me(d)
174 if first_month <= ds <= last_month:
175 if ds == month and d.day > day or ds > month:
176 count += 1
178 return count
181@no_type_check
182def workday(off_days: list[dti.date], cmd: str, date: str = '', strict: bool = False) -> Tuple[int, str]:
183 """Apply the effective rules to the given date (default today)."""
184 day = dti.datetime.strptime(date, DATE_FMT).date() if date else dti.date.today()
185 if strict:
186 if not off_days:
187 return 2, '- empty date range of configuration'
188 if not (off_days[0].year <= day.year < off_days[-1].year):
189 return 2, '- Day is not within year range of configuration'
190 else:
191 if cmd.startswith('explain'): 191 ↛ 194line 191 didn't jump to line 194, because the condition on line 191 was never false
192 print(f'- Day ({day}) is within date range of configuration')
194 if day not in off_days:
195 if cmd.startswith('explain'):
196 print(f'- Day ({day}) is not a holiday')
197 else:
198 return 1, '- Day is a holiday.'
200 work_day = no_weekend(weekday(day))
201 if work_day:
202 if cmd.startswith('explain'):
203 print(f'- Day ({day}) is not a weekend')
204 else:
205 return 1, '- Day is weekend.'
207 return 0, ''
210@no_type_check
211def apply(
212 off_days: list[dti.date], working_hours: WorkingHoursType, cmd: str, day: str, strict: bool
213) -> Tuple[int, str]:
214 """Apply the effective rules to the current date and time."""
215 working_hours = working_hours if working_hours != (None, None) else DEFAULT_WORK_HOURS_CLOSED_INTERVAL
216 code, message = workday(off_days, cmd, date=str(day), strict=strict)
217 if code:
218 return code, message
219 hour = the_hour()
220 if working_hours[0] <= hour <= working_hours[1]: 220 ↛ 224line 220 didn't jump to line 224, because the condition on line 220 was never false
221 if cmd.startswith('explain'):
222 print(f'- At this hour ({hour}) is work time')
223 else:
224 return 1, f'- No worktime at hour({hour}).'
226 return 0, ''
229@no_type_check
230def load(cfg: CfgType) -> Tuple[int, str, list[dti.date], WorkingHoursType]:
231 """Load the configuration and return error, message and holidays as well as working hours list.
233 The holidays as well as non-default working hours will be ordered.
234 """
235 if not cfg:
236 return 0, 'empty configuration, using default', [], DEFAULT_WORK_HOURS_MARKER
238 try:
239 model = api.Arbejdstimer(**cfg)
240 except ValidationError as err:
241 return 2, str(err), [], (None, None)
243 holidays_date_list = []
244 if model.holidays:
245 holidays = model.dict()['holidays']
246 for nth, holiday in enumerate(holidays, start=1):
247 dates = holiday['at']
248 if len(dates) == 1:
249 holidays_date_list.append(dates[0])
250 elif len(dates) == 2:
251 data = sorted(dates)
252 start, end = [data[n] for n in (0, 1)]
253 current = start
254 holidays_date_list.append(current)
255 while current < end:
256 current += dti.timedelta(days=1)
257 holidays_date_list.append(current)
258 else:
259 for a_date in dates:
260 holidays_date_list.append(a_date)
262 working_hours = DEFAULT_WORK_HOURS_MARKER
263 if model.working_hours:
264 working_hours = tuple(sorted(model.working_hours.dict().get('__root__', [None, None])))
265 return 0, '', sorted(holidays_date_list), working_hours
268@no_type_check
269def workdays_from_config(cfg: CfgType, day=None) -> list[dti.date]:
270 """Ja, ja, ja."""
271 error, _, holidays, _ = load(cfg)
272 return [] if error else workdays(holidays, days=days_of_year(day))
275def verify_request(argv: Union[CmdType, None]) -> Tuple[int, str, CmdType]:
276 """Fail with grace."""
277 err = ('', '', '', False)
278 if not argv or len(argv) != 4:
279 return 2, 'received wrong number of arguments', err
281 command, date, config, strict = argv
283 if command not in ('explain', 'explain_verbatim', 'now'):
284 return 2, 'received unknown command', err
286 if not config:
287 return 2, 'configuration missing', err
289 config_path = pathlib.Path(str(config))
290 if not config_path.is_file():
291 return 1, f'config ({config_path}) is no file', err
292 if not ''.join(config_path.suffixes).lower().endswith('.json'):
293 return 1, 'config has not .json extension', err
295 return 0, '', argv
298def main(argv: Union[CmdType, None] = None) -> int:
299 """Drive the lookup."""
300 error, message, strings = verify_request(argv)
301 if error:
302 print(message, file=sys.stderr)
303 return error
305 command, date, config, strict = strings
307 configuration = load_config(config)
308 error, message, holidays, working_hours = load(configuration)
309 if error:
310 if command.startswith('explain'):
311 print('Configuration file failed to parse (INVALID)')
312 print(message, file=sys.stderr)
313 return error
315 if command.startswith('explain'):
316 print(f'read valid configuration from ({config})')
317 if command == 'explain_verbatim':
318 lines = json.dumps(configuration, indent=2).split('\n')
319 line_count = len(lines)
320 counter_width = len(str(line_count))
321 print(f'configuration has {line_count} line{"" if line_count == 1 else "s"} of (indented) JSON content:')
322 for line, content in enumerate(lines, start=1):
323 print(f' {line:>{counter_width + 1}} | {content}')
324 if command == 'explain_verbatim':
325 if strict:
326 print('detected strict mode (queries outside of year frame from config will fail)')
327 else:
328 print('detected non-strict mode (no constraints on year frame from config)')
330 if command == 'explain':
331 print(f'consider {len(holidays)} holidays:')
332 elif command == 'explain_verbatim':
333 print('effective configuration:')
334 if holidays:
335 print(f'- given {len(holidays)} holidays within [{holidays[0]}, {holidays[-1]}]:')
336 for holiday in holidays:
337 print(f' + {holiday}')
338 else:
339 print(f'- no holidays defined in ({config}):')
340 print('- working hours:')
341 if working_hours != DEFAULT_WORK_HOURS_MARKER:
342 print(f' + [{working_hours[0]}, {working_hours[1]}] (from configuration)')
343 else:
344 effective_range = DEFAULT_WORK_HOURS_CLOSED_INTERVAL
345 print(f' + [{effective_range[0]}, {effective_range[1]}] (application default)')
346 print('evaluation:')
348 if strict:
349 print('detected strict mode (queries outside of year frame from config will fail)')
351 error, message = apply(holidays, working_hours, command, date, strict)
352 if error:
353 if command.startswith('explain'):
354 print(message, file=sys.stdout)
355 return error
357 return 0