Coverage for kiertotie/update.py: 0.00%

144 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 18:44:05 +00:00

1"""Prepare entry and gone transactions from comparing local hierarchy with proxy data.""" 

2 

3import datetime as dti 

4import pathlib 

5import random 

6from typing import Union 

7 

8from kiertotie import ( 

9 BASE_URL, 

10 DASH, 

11 EASING, 

12 ENCODING, 

13 ESP, 

14 HTTP_404_BYTES_TOKEN, 

15 HTTP_404_BYTES_TOKEN_LENGTH, 

16 HTTP_404_FILE, 

17 HTTP_404_SIZE_BYTES, 

18 NL, 

19 RATE, 

20 SP, 

21 TS_FORMAT, 

22 URL_ENC_SP, 

23 EntryType, 

24 load, 

25 log, 

26) 

27 

28DEFAULT_SCRIPT = 'update.sh' 

29 

30 

31def shell(path: Union[str, pathlib.Path], commands: list[str]) -> None: 

32 """Dump the commands into a shell script at path.""" 

33 with open(path, 'wt', encoding=ENCODING) as handle: 

34 handle.write(NL.join(commands)) 

35 

36 

37def assess_files( 

38 upstreams: list[EntryType], 

39 anchor: pathlib.Path, 

40 root_folder: pathlib.Path, 

41 commands: list[str], 

42 verbose: bool = False, 

43) -> list[EntryType]: 

44 """DRY.""" 

45 updates = [] 

46 for n, entry in enumerate(upstreams, start=1): 

47 entry_path = entry['path'] 

48 if entry_path == '.': 

49 continue 

50 path = pathlib.Path(str(entry_path)) 

51 

52 if not (anchor / root_folder / path.parent).is_dir(): 

53 if verbose: 

54 commands.append(f'# - New file {root_folder}/{path} in new folder') 

55 updates.append(entry) 

56 continue 

57 

58 if not (anchor / root_folder / path).is_file(): 

59 if verbose: 

60 commands.append(f'# - New file {root_folder}/{path} in existing folder') 

61 updates.append(entry) 

62 continue 

63 

64 stat_found = (anchor / root_folder / path).stat() 

65 size_bytes_found = stat_found.st_size 

66 log.debug(f'local path ({root_folder / path}) pointing to {size_bytes_found} bytes is interesting ...') 

67 if verbose: 

68 commands.append( 

69 f'# ... Local path {root_folder / path} pointing to {size_bytes_found} bytes is interesting ...' 

70 ) 

71 sampled_bytes = b'' 

72 removed_http_404 = False 

73 if size_bytes_found == HTTP_404_SIZE_BYTES: 

74 with open(anchor / root_folder / path, 'rb') as raw_reader: 

75 sampled_bytes = raw_reader.read(HTTP_404_BYTES_TOKEN_LENGTH) 

76 commands.append( 

77 f'# ... ... Read initial {HTTP_404_BYTES_TOKEN_LENGTH} bytes' 

78 f' from {root_folder / path} being {sampled_bytes}' # type: ignore 

79 ) 

80 if sampled_bytes and sampled_bytes == HTTP_404_BYTES_TOKEN: 

81 text_content = '' 

82 with open(anchor / root_folder / path, 'rt', encoding=ENCODING) as text_reader: 

83 text_content = text_reader.read() 

84 if text_content == HTTP_404_FILE: 

85 log.warning( 

86 f'detected HTTP/404 response file ({root_folder / path})' 

87 ' in local hierarchy and added removal command' 

88 ) 

89 commands.append(f'echo Removing HTTP/404 response file {root_folder / path} from local hierarchy:') 

90 commands.append(f'rm -f {anchor / root_folder / path}') 

91 size_bytes_found = 0 

92 removed_http_404 = True 

93 

94 size_bytes_upstream = entry['size'] 

95 if size_bytes_found == size_bytes_upstream: 

96 if path.name not in ('timestamp.tx', 'timestamp.txt', 'md5sums.txt'): 

97 if verbose: 

98 commands.append(f'# - Skipping same size file {root_folder}/{path} in existing folder') 

99 continue 

100 commands.append(f'# - Overwriting same size file {root_folder}/{path} in existing folder') 

101 elif removed_http_404: 

102 commands.append( 

103 f'# - Will replace HTTP/404 response file {root_folder}/{path} with {size_bytes_upstream} bytes' 

104 f' from upstream in existing folder' 

105 ) 

106 else: 

107 if verbose: 

108 commands.append( 

109 f'# - Different size file {root_folder}/{path} with {size_bytes_found}' 

110 f' instead {size_bytes_upstream} bytes upstream in existing folder' 

111 ) 

112 updates.append(entry) 

113 

114 return updates 

115 

116 

117def process( 

118 proxy_data_path: Union[str, pathlib.Path], 

119 anchor_path: Union[str, pathlib.Path, None] = None, 

120 script_path: Union[str, pathlib.Path, None] = None, 

121 verbose: bool = False, 

122) -> int: 

123 """Generate folder tree below current working directory according to proxy data.""" 

124 anchor = pathlib.Path.cwd() if anchor_path is None else pathlib.Path(anchor_path) 

125 log.debug(f'assuming anchor as ({anchor}) in process update') 

126 

127 store_path = pathlib.Path(proxy_data_path) 

128 log.debug(f'loading proxy data from ({store_path}) in process update') 

129 repo = load(store_path) 

130 

131 root_folder_str = store_path.name.split(DASH)[1] 

132 if root_folder_str == 'development': 

133 root_folder_str += '_releases' 

134 root_folder = pathlib.Path(root_folder_str) 

135 log.debug(f'assuming root folder as ({root_folder}) below anchor ({anchor}) in process update') 

136 

137 script_path = pathlib.Path(DEFAULT_SCRIPT) if script_path is None else pathlib.Path(script_path) 

138 log.debug(f'creating shell script at ({script_path})') 

139 

140 actions = ['#! /usr/bin/env bash'] 

141 actions.append(f'# Derived root folder to be ({root_folder}) below anchor ({anchor})') 

142 actions.append(f'echo "Initializing the tree below root folder with random waits between 1 and {EASING} secs"') 

143 

144 present = set() 

145 actions.append(f'# Inventarizing storage folders below {root_folder}') 

146 shared_root_str = f'{root_folder}/' 

147 for path in root_folder.rglob('*'): 

148 if not path.is_dir(): 

149 continue 

150 path_str = str(path)[len(shared_root_str) :] 

151 if path_str: 

152 present.add(path_str) 

153 actions.append(f'# * found {len(present)} storage folders below {root_folder}') 

154 

155 possibly_gone = set(f for f in present) 

156 upstream_folders = [ 

157 f for f in reversed(sorted(f['path'] for f in repo['tree']['folders'])) if f != '.' # type: ignore 

158 ] 

159 upstream_folder_born = { 

160 f['path']: dti.datetime.strptime(f['timestamp'], TS_FORMAT).replace(tzinfo=dti.timezone.utc) # type: ignore 

161 for f in repo['tree']['folders'] # type: ignore 

162 } 

163 folder_count = repo['count_folders'] 

164 maybe_enter = set(upstream_folders) 

165 actions.append('# Subtracting folders present upstream from gone and update enter section') 

166 for local_name in upstream_folders: 

167 possibly_gone.discard(local_name) # type: ignore 

168 if local_name in present: 

169 maybe_enter.discard(local_name) 

170 gone_count = len(possibly_gone) 

171 actions.append(f"# * found {gone_count} gone storage folder{'' if gone_count == 1 else 's'} below {anchor}:") 

172 if verbose: 

173 for f in sorted(possibly_gone): 

174 actions.append(f'# - {root_folder}/{f}') 

175 enter_count = len(maybe_enter) 

176 actions.append(f"# * found {enter_count} enter folder{'' if gone_count == 1 else 's'} below {anchor}:") 

177 if verbose: 

178 for f in sorted(maybe_enter): # type: ignore 

179 actions.append(f'# + {root_folder}/{f} from {upstream_folder_born[f]}') 

180 

181 candidate_count = repo['count_files'] 

182 actions.append( 

183 f'# Detected {candidate_count} candidate entrie{"" if candidate_count == 1 else "s"}' 

184 f' from upstream across {folder_count} folders below {anchor / root_folder}' 

185 ) 

186 

187 updates = assess_files( 

188 repo['tree']['files'], # type: ignore 

189 anchor=anchor, 

190 root_folder=root_folder, 

191 commands=actions, 

192 verbose=verbose, 

193 ) 

194 

195 transfers = len(updates) 

196 size_files_bytes = sum(entry['size'] for entry in updates) # type: ignore 

197 bytes_cum = 0 

198 for n, entry in enumerate(updates, start=1): 

199 entry_path = entry['path'] 

200 if entry_path == '.': 

201 continue 

202 path = pathlib.Path(str(entry_path)) 

203 path_str = str(path) 

204 size_bytes_upstream = int(entry['size']) 

205 secs_est = int(size_bytes_upstream / RATE) 

206 secs_est_disp = 'less than a second' if secs_est < 1 else f'approx. {secs_est} seconds' 

207 bytes_cum += size_bytes_upstream 

208 nap = random.randint(1, EASING) # nosec B311 

209 if 'timestamp' in path_str or 'md5sum' in path_str: 

210 actions.append(f'echo not sleeping before transferring file {n} of {transfers}') 

211 else: 

212 actions.append(f'echo sleeping for {nap} secs before transferring file {n} of {transfers}') 

213 actions.append(f'sleep {nap}') 

214 actions.append(f'mkdir -p "{anchor}/{root_folder}/{path.parent}" || exit 1') 

215 actions.append(f'cd "{anchor}/{root_folder}/{path.parent}" || exit 1') 

216 actions.append('pwd') 

217 actions.append( 

218 f'echo started the transfer {n} of {transfers} requesting {size_bytes_upstream} bytes' 

219 f' assuming {secs_est_disp} at "$(date +"%Y-%m-%d %H:%M:%S +00:00")"' 

220 ) 

221 if SP not in path_str: 

222 actions.append(f"echo curl -kORLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path}'") 

223 actions.append(f"curl -kORLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path}'") 

224 else: 

225 path_url_enc = path_str.replace(SP, URL_ENC_SP) 

226 path_local = f'{str(path.name).replace(SP, ESP)}' 

227 actions.append( 

228 f"echo curl -kRLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path_url_enc}' -o '{path_local}'" 

229 ) 

230 actions.append(f"curl -kRLs --limit-rate 2000k '{BASE_URL}{root_folder}/{path_url_enc}' -o '{path_local}'") 

231 actions.append( 

232 f'echo transfer is complete {n} of {transfers} for cum. {bytes_cum} of' 

233 f' tot. {size_files_bytes} bytes at "$(date +"%Y-%m-%d %H:%M:%S +00:00")"' 

234 ) 

235 

236 actions.append('echo OK') 

237 actions.append('') # Final newline at end of fetch script 

238 

239 shell(script_path, actions) 

240 log.debug(f'created shell script with {len(actions) - 1} lines at ({script_path}) from process update') 

241 

242 return 0