Coverage for vastaanottaa/cli.py: 100.00%

93 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-05 19:45:13 +00:00

1"""Command line interface for vastaanottaa.""" 

2 

3# import argparse 

4import base64 

5import datetime as dti 

6import os 

7import pathlib 

8import sys 

9import time 

10from typing import no_type_check 

11 

12from cryptography.fernet import Fernet 

13from cryptography.hazmat.primitives import hashes 

14from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC 

15from vastaanottaa import APP_ALIAS, APP_ENV, VERSION 

16 

17SEND = 'send' 

18RECV = 'recv' 

19ACTIONS = (SEND, RECV) 

20ENCODING = 'utf-8' 

21HASH_ALG = hashes.BLAKE2b 

22RES_KEY_LEN = 32 

23SALT_LEN = 16 

24SALT = os.urandom(SALT_LEN) 

25N_ITER = 480000 

26 

27ARMOR_MSG_BEGIN = f'---- BEGIN {APP_ENV} MESSAGE ----' 

28ARMOR_MSG_END = f'---- END {APP_ENV} MESSAGE ----' 

29 

30ARMOR_MSG_BEGIN_BYTES = ARMOR_MSG_BEGIN.encode(ENCODING) 

31ARMOR_MSG_END_BYTES = ARMOR_MSG_BEGIN.encode(ENCODING) 

32 

33ARMOR_PADDED_LENGTH = len(ARMOR_MSG_BEGIN_BYTES) + 1 + len(ARMOR_MSG_END_BYTES) + 1 

34 

35ARMOR_SALT_BEGIN = f'---- BEGIN {APP_ENV} SALT ----' 

36ARMOR_SALT_END = f'---- END {APP_ENV} SALT ----' 

37 

38USAGE_INFO = f'usage: {APP_ALIAS} send|recv given text-or-file salt-for-recv' 

39 

40 

41@no_type_check 

42def app(argv=None) -> int: 

43 """Do the thing.""" 

44 argv = sys.argv[1:] if argv is None else argv 

45 

46 if len(argv) == 1 and argv[0] in ('-V', '--version', 'version'): 

47 print(f'{APP_ALIAS} v{VERSION}', file=sys.stderr) 

48 return 0 

49 

50 if not argv or len(argv) == 1 and argv[0] in ('-h', '--help', 'help'): 

51 print(USAGE_INFO, file=sys.stderr) 

52 return 0 

53 

54 if len(argv) not in (3, 4): 

55 print('ERROR: unexpected count of parameters', file=sys.stderr) 

56 print(USAGE_INFO, file=sys.stderr) 

57 return 2 

58 action = argv[0].lower().strip() 

59 if action not in ACTIONS: 

60 print('ERROR: unexpected goal/action - use either send or recv', file=sys.stderr) 

61 print(USAGE_INFO, file=sys.stderr) 

62 return 2 

63 given = argv[1].encode(ENCODING) 

64 src = argv[2] 

65 

66 salt = SALT 

67 if action == SEND and len(argv) == 4: 

68 print('WARNING: salt provided as parameter for send', file=sys.stderr) 

69 salt = base64.decodebytes(argv[3].encode(ENCODING)) 

70 if action == RECV: 

71 if len(argv) != 4: 

72 print('ERROR: salt missing for recv', file=sys.stderr) 

73 print(USAGE_INFO, file=sys.stderr) 

74 return 2 

75 salt = base64.decodebytes(argv[3].encode(ENCODING)) 

76 

77 src_path = pathlib.Path(src) 

78 if src_path.is_file(): 

79 print('INFO: Reading content from file', file=sys.stderr) 

80 src_data = src_path.open('rb').read() 

81 else: 

82 print('INFO: Loading content from parameter', file=sys.stderr) 

83 src_data = src.encode(ENCODING) 

84 

85 if not src_data: # type: ignore 

86 print('ERROR: afraid of the void - content for message has zero bytes', file=sys.stderr) 

87 print(USAGE_INFO, file=sys.stderr) 

88 return 1 

89 

90 kdf = PBKDF2HMAC( 

91 algorithm=HASH_ALG(64), 

92 length=RES_KEY_LEN, 

93 salt=salt, 

94 iterations=N_ITER, 

95 ) 

96 key = base64.urlsafe_b64encode(kdf.derive(given)) 

97 f = Fernet(key) 

98 ts = int(time.time()) 

99 ts_dt = dti.datetime.fromtimestamp(ts, dti.timezone.utc) 

100 

101 if action == SEND: 

102 token = f.encrypt_at_time(src_data, ts) # type: ignore 

103 control_ts = f.extract_timestamp(token) 

104 control_ts_dt = dti.datetime.fromtimestamp(control_ts, dti.timezone.utc) 

105 print(f'DEBUG: timestamp embedded: {control_ts} ({ts_dt})', file=sys.stderr) 

106 print(f'DEBUG: timestamp extracted: {ts} ({control_ts_dt})', file=sys.stderr) 

107 trp = base64.encodebytes(token) 

108 print(ARMOR_MSG_BEGIN) 

109 print(trp.decode(ENCODING), end='') 

110 print(ARMOR_MSG_END) 

111 print(file=sys.stderr) 

112 print(ARMOR_SALT_BEGIN, file=sys.stderr) 

113 print(base64.encodebytes(salt), file=sys.stderr) 

114 print(ARMOR_SALT_END, file=sys.stderr) 

115 return 0 

116 

117 if src_data.startswith(ARMOR_MSG_BEGIN_BYTES) and len(src_data) > ARMOR_PADDED_LENGTH: # type: ignore 

118 src_data = src_data[len(ARMOR_MSG_BEGIN_BYTES) + 1 : -len(ARMOR_MSG_END_BYTES) + 1] 

119 

120 rcv = base64.decodebytes(src_data) 

121 print(f.decrypt(rcv).decode(ENCODING)) 

122 

123 return 0