Coverage for kiertotie/update.py: 0.00%
144 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 18:44:05 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 18:44:05 +00:00
1"""Prepare entry and gone transactions from comparing local hierarchy with proxy data."""
3import datetime as dti
4import pathlib
5import random
6from typing import Union
8from kiertotie import (
9 BASE_URL,
10 DASH,
11 EASING,
12 ENCODING,
13 ESP,
14 HTTP_404_BYTES_TOKEN,
15 HTTP_404_BYTES_TOKEN_LENGTH,
16 HTTP_404_FILE,
17 HTTP_404_SIZE_BYTES,
18 NL,
19 RATE,
20 SP,
21 TS_FORMAT,
22 URL_ENC_SP,
23 EntryType,
24 load,
25 log,
26)
28DEFAULT_SCRIPT = 'update.sh'
31def shell(path: Union[str, pathlib.Path], commands: list[str]) -> None:
32 """Dump the commands into a shell script at path."""
33 with open(path, 'wt', encoding=ENCODING) as handle:
34 handle.write(NL.join(commands))
37def assess_files(
38 upstreams: list[EntryType],
39 anchor: pathlib.Path,
40 root_folder: pathlib.Path,
41 commands: list[str],
42 verbose: bool = False,
43) -> list[EntryType]:
44 """DRY."""
45 updates = []
46 for n, entry in enumerate(upstreams, start=1):
47 entry_path = entry['path']
48 if entry_path == '.':
49 continue
50 path = pathlib.Path(str(entry_path))
52 if not (anchor / root_folder / path.parent).is_dir():
53 if verbose:
54 commands.append(f'# - New file {root_folder}/{path} in new folder')
55 updates.append(entry)
56 continue
58 if not (anchor / root_folder / path).is_file():
59 if verbose:
60 commands.append(f'# - New file {root_folder}/{path} in existing folder')
61 updates.append(entry)
62 continue
64 stat_found = (anchor / root_folder / path).stat()
65 size_bytes_found = stat_found.st_size
66 log.debug(f'local path ({root_folder / path}) pointing to {size_bytes_found} bytes is interesting ...')
67 if verbose:
68 commands.append(
69 f'# ... Local path {root_folder / path} pointing to {size_bytes_found} bytes is interesting ...'
70 )
71 sampled_bytes = b''
72 removed_http_404 = False
73 if size_bytes_found == HTTP_404_SIZE_BYTES:
74 with open(anchor / root_folder / path, 'rb') as raw_reader:
75 sampled_bytes = raw_reader.read(HTTP_404_BYTES_TOKEN_LENGTH)
76 commands.append(
77 f'# ... ... Read initial {HTTP_404_BYTES_TOKEN_LENGTH} bytes'
78 f' from {root_folder / path} being {sampled_bytes}' # type: ignore
79 )
80 if sampled_bytes and sampled_bytes == HTTP_404_BYTES_TOKEN:
81 text_content = ''
82 with open(anchor / root_folder / path, 'rt', encoding=ENCODING) as text_reader:
83 text_content = text_reader.read()
84 if text_content == HTTP_404_FILE:
85 log.warning(
86 f'detected HTTP/404 response file ({root_folder / path})'
87 ' in local hierarchy and added removal command'
88 )
89 commands.append(f'echo Removing HTTP/404 response file {root_folder / path} from local hierarchy:')
90 commands.append(f'rm -f {anchor / root_folder / path}')
91 size_bytes_found = 0
92 removed_http_404 = True
94 size_bytes_upstream = entry['size']
95 if size_bytes_found == size_bytes_upstream:
96 if path.name not in ('timestamp.tx', 'timestamp.txt', 'md5sums.txt'):
97 if verbose:
98 commands.append(f'# - Skipping same size file {root_folder}/{path} in existing folder')
99 continue
100 commands.append(f'# - Overwriting same size file {root_folder}/{path} in existing folder')
101 elif removed_http_404:
102 commands.append(
103 f'# - Will replace HTTP/404 response file {root_folder}/{path} with {size_bytes_upstream} bytes'
104 f' from upstream in existing folder'
105 )
106 else:
107 if verbose:
108 commands.append(
109 f'# - Different size file {root_folder}/{path} with {size_bytes_found}'
110 f' instead {size_bytes_upstream} bytes upstream in existing folder'
111 )
112 updates.append(entry)
114 return updates
117def process(
118 proxy_data_path: Union[str, pathlib.Path],
119 anchor_path: Union[str, pathlib.Path, None] = None,
120 script_path: Union[str, pathlib.Path, None] = None,
121 verbose: bool = False,
122) -> int:
123 """Generate folder tree below current working directory according to proxy data."""
124 anchor = pathlib.Path.cwd() if anchor_path is None else pathlib.Path(anchor_path)
125 log.debug(f'assuming anchor as ({anchor}) in process update')
127 store_path = pathlib.Path(proxy_data_path)
128 log.debug(f'loading proxy data from ({store_path}) in process update')
129 repo = load(store_path)
131 root_folder_str = store_path.name.split(DASH)[1]
132 if root_folder_str == 'development':
133 root_folder_str += '_releases'
134 root_folder = pathlib.Path(root_folder_str)
135 log.debug(f'assuming root folder as ({root_folder}) below anchor ({anchor}) in process update')
137 script_path = pathlib.Path(DEFAULT_SCRIPT) if script_path is None else pathlib.Path(script_path)
138 log.debug(f'creating shell script at ({script_path})')
140 actions = ['#! /usr/bin/env bash']
141 actions.append(f'# Derived root folder to be ({root_folder}) below anchor ({anchor})')
142 actions.append(f'echo "Initializing the tree below root folder with random waits between 1 and {EASING} secs"')
144 present = set()
145 actions.append(f'# Inventarizing storage folders below {root_folder}')
146 shared_root_str = f'{root_folder}/'
147 for path in root_folder.rglob('*'):
148 if not path.is_dir():
149 continue
150 path_str = str(path)[len(shared_root_str) :]
151 if path_str:
152 present.add(path_str)
153 actions.append(f'# * found {len(present)} storage folders below {root_folder}')
155 possibly_gone = set(f for f in present)
156 upstream_folders = [
157 f for f in reversed(sorted(f['path'] for f in repo['tree']['folders'])) if f != '.' # type: ignore
158 ]
159 upstream_folder_born = {
160 f['path']: dti.datetime.strptime(f['timestamp'], TS_FORMAT).replace(tzinfo=dti.timezone.utc) # type: ignore
161 for f in repo['tree']['folders'] # type: ignore
162 }
163 folder_count = repo['count_folders']
164 maybe_enter = set(upstream_folders)
165 actions.append('# Subtracting folders present upstream from gone and update enter section')
166 for local_name in upstream_folders:
167 possibly_gone.discard(local_name) # type: ignore
168 if local_name in present:
169 maybe_enter.discard(local_name)
170 gone_count = len(possibly_gone)
171 actions.append(f"# * found {gone_count} gone storage folder{'' if gone_count == 1 else 's'} below {anchor}:")
172 if verbose:
173 for f in sorted(possibly_gone):
174 actions.append(f'# - {root_folder}/{f}')
175 enter_count = len(maybe_enter)
176 actions.append(f"# * found {enter_count} enter folder{'' if gone_count == 1 else 's'} below {anchor}:")
177 if verbose:
178 for f in sorted(maybe_enter): # type: ignore
179 actions.append(f'# + {root_folder}/{f} from {upstream_folder_born[f]}')
181 candidate_count = repo['count_files']
182 actions.append(
183 f'# Detected {candidate_count} candidate entrie{"" if candidate_count == 1 else "s"}'
184 f' from upstream across {folder_count} folders below {anchor / root_folder}'
185 )
187 updates = assess_files(
188 repo['tree']['files'], # type: ignore
189 anchor=anchor,
190 root_folder=root_folder,
191 commands=actions,
192 verbose=verbose,
193 )
195 transfers = len(updates)
196 size_files_bytes = sum(entry['size'] for entry in updates) # type: ignore
197 bytes_cum = 0
198 for n, entry in enumerate(updates, start=1):
199 entry_path = entry['path']
200 if entry_path == '.':
201 continue
202 path = pathlib.Path(str(entry_path))
203 path_str = str(path)
204 size_bytes_upstream = int(entry['size'])
205 secs_est = int(size_bytes_upstream / RATE)
206 secs_est_disp = 'less than a second' if secs_est < 1 else f'approx. {secs_est} seconds'
207 bytes_cum += size_bytes_upstream
208 nap = random.randint(1, EASING) # nosec B311
209 if 'timestamp' in path_str or 'md5sum' in path_str:
210 actions.append(f'echo not sleeping before transferring file {n} of {transfers}')
211 else:
212 actions.append(f'echo sleeping for {nap} secs before transferring file {n} of {transfers}')
213 actions.append(f'sleep {nap}')
214 actions.append(f'mkdir -p "{anchor}/{root_folder}/{path.parent}" || exit 1')
215 actions.append(f'cd "{anchor}/{root_folder}/{path.parent}" || exit 1')
216 actions.append('pwd')
217 actions.append(
218 f'echo started the transfer {n} of {transfers} requesting {size_bytes_upstream} bytes'
219 f' assuming {secs_est_disp} at "$(date +"%Y-%m-%d %H:%M:%S +00:00")"'
220 )
221 if SP not in path_str:
222 actions.append(f"echo curl -kORLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path}'")
223 actions.append(f"curl -kORLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path}'")
224 else:
225 path_url_enc = path_str.replace(SP, URL_ENC_SP)
226 path_local = f'{str(path.name).replace(SP, ESP)}'
227 actions.append(
228 f"echo curl -kRLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path_url_enc}' -o '{path_local}'"
229 )
230 actions.append(f"curl -kRLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path_url_enc}' -o '{path_local}'")
231 actions.append(
232 f'echo transfer is complete {n} of {transfers} for cum. {bytes_cum} of'
233 f' tot. {size_files_bytes} bytes at "$(date +"%Y-%m-%d %H:%M:%S +00:00")"'
234 )
236 actions.append('echo OK')
237 actions.append('') # Final newline at end of fetch script
239 shell(script_path, actions)
240 log.debug(f'created shell script with {len(actions) - 1} lines at ({script_path}) from process update')
242 return 0