Coverage for vastaanottaa/cli.py: 100.00%
93 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-05 19:45:13 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-05 19:45:13 +00:00
1"""Command line interface for vastaanottaa."""
3# import argparse
4import base64
5import datetime as dti
6import os
7import pathlib
8import sys
9import time
10from typing import no_type_check
12from cryptography.fernet import Fernet
13from cryptography.hazmat.primitives import hashes
14from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
15from vastaanottaa import APP_ALIAS, APP_ENV, VERSION
17SEND = 'send'
18RECV = 'recv'
19ACTIONS = (SEND, RECV)
20ENCODING = 'utf-8'
21HASH_ALG = hashes.BLAKE2b
22RES_KEY_LEN = 32
23SALT_LEN = 16
24SALT = os.urandom(SALT_LEN)
25N_ITER = 480000
27ARMOR_MSG_BEGIN = f'---- BEGIN {APP_ENV} MESSAGE ----'
28ARMOR_MSG_END = f'---- END {APP_ENV} MESSAGE ----'
30ARMOR_MSG_BEGIN_BYTES = ARMOR_MSG_BEGIN.encode(ENCODING)
31ARMOR_MSG_END_BYTES = ARMOR_MSG_BEGIN.encode(ENCODING)
33ARMOR_PADDED_LENGTH = len(ARMOR_MSG_BEGIN_BYTES) + 1 + len(ARMOR_MSG_END_BYTES) + 1
35ARMOR_SALT_BEGIN = f'---- BEGIN {APP_ENV} SALT ----'
36ARMOR_SALT_END = f'---- END {APP_ENV} SALT ----'
38USAGE_INFO = f'usage: {APP_ALIAS} send|recv given text-or-file salt-for-recv'
41@no_type_check
42def app(argv=None) -> int:
43 """Do the thing."""
44 argv = sys.argv[1:] if argv is None else argv
46 if len(argv) == 1 and argv[0] in ('-V', '--version', 'version'):
47 print(f'{APP_ALIAS} v{VERSION}', file=sys.stderr)
48 return 0
50 if not argv or len(argv) == 1 and argv[0] in ('-h', '--help', 'help'):
51 print(USAGE_INFO, file=sys.stderr)
52 return 0
54 if len(argv) not in (3, 4):
55 print('ERROR: unexpected count of parameters', file=sys.stderr)
56 print(USAGE_INFO, file=sys.stderr)
57 return 2
58 action = argv[0].lower().strip()
59 if action not in ACTIONS:
60 print('ERROR: unexpected goal/action - use either send or recv', file=sys.stderr)
61 print(USAGE_INFO, file=sys.stderr)
62 return 2
63 given = argv[1].encode(ENCODING)
64 src = argv[2]
66 salt = SALT
67 if action == SEND and len(argv) == 4:
68 print('WARNING: salt provided as parameter for send', file=sys.stderr)
69 salt = base64.decodebytes(argv[3].encode(ENCODING))
70 if action == RECV:
71 if len(argv) != 4:
72 print('ERROR: salt missing for recv', file=sys.stderr)
73 print(USAGE_INFO, file=sys.stderr)
74 return 2
75 salt = base64.decodebytes(argv[3].encode(ENCODING))
77 src_path = pathlib.Path(src)
78 if src_path.is_file():
79 print('INFO: Reading content from file', file=sys.stderr)
80 src_data = src_path.open('rb').read()
81 else:
82 print('INFO: Loading content from parameter', file=sys.stderr)
83 src_data = src.encode(ENCODING)
85 if not src_data: # type: ignore
86 print('ERROR: afraid of the void - content for message has zero bytes', file=sys.stderr)
87 print(USAGE_INFO, file=sys.stderr)
88 return 1
90 kdf = PBKDF2HMAC(
91 algorithm=HASH_ALG(64),
92 length=RES_KEY_LEN,
93 salt=salt,
94 iterations=N_ITER,
95 )
96 key = base64.urlsafe_b64encode(kdf.derive(given))
97 f = Fernet(key)
98 ts = int(time.time())
99 ts_dt = dti.datetime.fromtimestamp(ts, dti.timezone.utc)
101 if action == SEND:
102 token = f.encrypt_at_time(src_data, ts) # type: ignore
103 control_ts = f.extract_timestamp(token)
104 control_ts_dt = dti.datetime.fromtimestamp(control_ts, dti.timezone.utc)
105 print(f'DEBUG: timestamp embedded: {control_ts} ({ts_dt})', file=sys.stderr)
106 print(f'DEBUG: timestamp extracted: {ts} ({control_ts_dt})', file=sys.stderr)
107 trp = base64.encodebytes(token)
108 print(ARMOR_MSG_BEGIN)
109 print(trp.decode(ENCODING), end='')
110 print(ARMOR_MSG_END)
111 print(file=sys.stderr)
112 print(ARMOR_SALT_BEGIN, file=sys.stderr)
113 print(base64.encodebytes(salt), file=sys.stderr)
114 print(ARMOR_SALT_END, file=sys.stderr)
115 return 0
117 if src_data.startswith(ARMOR_MSG_BEGIN_BYTES) and len(src_data) > ARMOR_PADDED_LENGTH: # type: ignore
118 src_data = src_data[len(ARMOR_MSG_BEGIN_BYTES) + 1 : -len(ARMOR_MSG_END_BYTES) + 1]
120 rcv = base64.decodebytes(src_data)
121 print(f.decrypt(rcv).decode(ENCODING))
123 return 0