Coverage for versioalueet/api.py: 98.92%

193 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-08 06:11:21 +00:00

1"""Provide the class VersionRanges for operations on version ranges 

2 

3Use case example: 

4 

5>>> version_ranges = VersionRanges('vers:pypi/|1.2.3|||||') 

6>>> assert version_ranges.normalize() == 'vers:pypi/1.2.3' 

7""" 

8 

9import argparse 

10from typing import Union 

11from urllib.parse import unquote 

12 

13import versioalueet.env as env 

14from versioalueet import log 

15 

16ASTERISK = '*' 

17COLON = ':' 

18PIPE = '|' 

19SLASH = '/' 

20PERCENT = '%' 

21 

22NOT = '!' 

23EQ = '=' 

24NE = NOT + EQ 

25GT = '>' 

26GE = GT + EQ 

27LT = '<' 

28LE = LT + EQ 

29 

30VCPairsType = list[tuple[str, str]] 

31ModelType = dict[str, Union[str, list[str], VCPairsType]] 

32 

33 

34def fail(message: str, model: Union[ModelType, None] = None, debug: bool = False) -> bool: 

35 """DRY. 

36 

37 Usage examples: 

38 

39 >>> model = {'received': 'no:thing'} 

40 >>> fail('some problem', model=model) 

41 True 

42 

43 >>> model = {'received': 'no:thing'} 

44 >>> fail('some problem', model=model, debug=True) 

45 True 

46 """ 

47 if debug and model: 

48 log.debug('Model = %s' % (model,)) 

49 log.error(message) 

50 return True 

51 

52 

53def _parse_uri_scheme(version_range: str, model: ModelType) -> tuple[bool, str]: 

54 """The URI-scheme must be vers. 

55 

56 Usage examples: 

57 

58 >>> _parse_uri_scheme('VERS:WRONG/YES', model={'received': 'VERS:WRONG/YES', 'uri-scheme': 'vers'}) 

59 (True, '') 

60 """ 

61 if not version_range.startswith(f'vers{COLON}'): 

62 model['error'] = 'version range must start with the URI scheme vers' 

63 return fail(message=model['error'], model=model), '' # type: ignore 

64 

65 model['uri-scheme'] = 'vers' 

66 return False, version_range[5:] 

67 

68 

69def _parse_version_scheme(scheme_and_vcs: str, model: ModelType) -> tuple[bool, str]: 

70 """The <versioning-scheme> must be lowercase and a slash must separate from version constraints. 

71 

72 Usage examples: 

73 

74 >>> model = {'received': 'vers:pypi/*', 'uri-scheme': 'vers', 'versioning-scheme': 'pypi'} 

75 >>> _parse_version_scheme('pypi/*', model=model) 

76 (False, '*') 

77 

78 >>> model = {'received': 'vers:pypi/', 'uri-scheme': 'vers', 'versioning-scheme': 'pypi'} 

79 >>> _parse_version_scheme('pypi/', model=model) 

80 (True, '') 

81 """ 

82 if SLASH not in scheme_and_vcs: 

83 model['error'] = 'version range must provide <versioning-scheme> followed by a slash (/)' 

84 return fail(message=model['error'], model=model), '' # type: ignore 

85 

86 versioning_scheme, vc_string = scheme_and_vcs.split(SLASH, 1) 

87 model['versioning-scheme'] = versioning_scheme 

88 

89 if not versioning_scheme: 

90 model['error'] = 'version system must be non empty' 

91 return fail(message=model['error'], model=model), '' # type: ignore 

92 

93 if not versioning_scheme.lower() == versioning_scheme: 

94 model['error'] = 'version system must be lower case' 

95 return fail(message=model['error'], model=model), '' # type: ignore 

96 

97 if not vc_string: 

98 model['error'] = 'version constraints must be non empty' 

99 return fail(message=model['error'], model=model), '' # type: ignore 

100 

101 return False, vc_string 

102 

103 

104def _split_version_constraints(vc_string: str, model: ModelType) -> tuple[bool, list[str]]: 

105 """Split real version constraints. 

106 

107 Usage examples: 

108 

109 >>> _split_version_constraints('13', {}) 

110 (False, ['13']) 

111 

112 >>> _split_version_constraints('|||||42||', {}) 

113 (False, ['42']) 

114 

115 >>> _split_version_constraints('|1|2|3|=4||>=6', {}) 

116 (False, ['1', '2', '3', '=4', '>=6']) 

117 """ 

118 if PIPE in vc_string: 

119 vc_unframed = vc_string.strip(PIPE) 

120 if vc_unframed.startswith(ASTERISK): 

121 version_constraints = [ASTERISK] 

122 if vc_unframed != ASTERISK: 

123 model['error'] = 'if present, asterisk (%s) must be the only version constraint' % (ASTERISK,) 

124 return fail(message=model['error'], model=model), [] # type: ignore 

125 

126 version_constraints = [vc for vc in vc_string.split(PIPE) if vc] 

127 return False, version_constraints 

128 

129 

130def _parse_version_constraint_pairs(version_constraints: list[str], model: ModelType) -> tuple[bool, VCPairsType]: 

131 """Parse the constraints into pairs of versions and comparators. 

132 

133 Implementer notes: 

134 

135 - the version constraints items contain no spaces and no pipes 

136 

137 Usage examples: 

138 

139 >>> _parse_version_constraint_pairs(['1', '3', '=4', '2', '>=6'], model={}) 

140 (False, [('1', '='), ('2', '='), ('3', '='), ('4', '='), ('6', '>=')]) 

141 

142 >>> model = {} 

143 >>> _parse_version_constraint_pairs(['1', '', '2'], model=model) 

144 (True, []) 

145 >>> assert 'empty' in model.get('error', '') 

146 """ 

147 vc_pairs: VCPairsType = [] 

148 for cv in version_constraints: 

149 comparator, version = '', '' 

150 if cv.startswith(GE): 

151 comparator, version = GE, cv[2:] 

152 elif cv.startswith(LE): 

153 comparator, version = LE, cv[2:] 

154 elif cv.startswith(NE): 

155 comparator, version = NE, cv[2:] 

156 elif cv.startswith(LT): 

157 comparator, version = LT, cv[1:] 

158 elif cv.startswith(GT): 

159 comparator, version = GT, cv[1:] 

160 elif cv.startswith(EQ): 

161 comparator, version = EQ, cv[1:] 

162 else: 

163 comparator, version = EQ, cv 

164 

165 if not version: 

166 model['error'] = 'empty version detected' 

167 return fail(message=model['error'], model=model), [] # type: ignore 

168 

169 if PERCENT in version: 

170 version = unquote(version) 

171 

172 vc_pairs.append((version, comparator)) 

173 

174 vc_pairs.sort() 

175 model['version-constraint-pairs'] = vc_pairs 

176 

177 versions = [version for version, _ in vc_pairs] 

178 

179 if sorted(list(set(versions))) != versions: 

180 model['error'] = 'versions must be unique across all version constraints' 

181 return fail(message=model['error'], model=model), [] # type: ignore 

182 

183 return False, vc_pairs 

184 

185 

186def _squeeze_ranges(vc_pairs_to_squeeze: VCPairsType) -> VCPairsType: 

187 """Squeeze any redundant version constraint pair occurrences. 

188 

189 Examples: 

190 

191 >>> to_squeeze = [('v0', GT), ('v1', GE), ('v2', EQ), ('v3', LT), ('v4', EQ), ('v5', LT), ('v6', GT)] 

192 >>> collected = _squeeze_ranges(to_squeeze) 

193 >>> collected 

194 [('v0', '>'), ('v5', '<'), ('v6', '>')] 

195 """ 

196 collector: VCPairsType = [] 

197 ignore_slot = -1 

198 prev_cmp = 'irrelevant' 

199 vc_pairs_to_squeeze.append(vc_pairs_to_squeeze[-1]) 

200 for in_slot, (curr_ver, curr_cmp) in enumerate(vc_pairs_to_squeeze[:-1]): 

201 next_slot = in_slot + 1 

202 if ignore_slot == in_slot: 

203 continue 

204 

205 next_ver, next_cmp = vc_pairs_to_squeeze[next_slot] 

206 if curr_cmp in (GE, GT) and next_cmp in (EQ, GE, GT): 

207 sel_ver, sel_cmp, ignore_slot = curr_ver, curr_cmp, next_slot 

208 elif curr_cmp in (LT, LE, EQ) and next_cmp in (LE, LT): 

209 sel_ver, sel_cmp, ignore_slot = next_ver, next_cmp, in_slot 

210 else: 

211 sel_ver, sel_cmp = curr_ver, curr_cmp 

212 

213 if not in_slot: 

214 collector.append((sel_ver, sel_cmp)) 

215 prev_cmp = sel_cmp 

216 continue 

217 

218 if prev_cmp in (GE, GT) and sel_cmp in (EQ, GE, GT): 

219 continue 

220 

221 if prev_cmp in (LT, LE, EQ) and sel_cmp in (LE, LT) and collector: 

222 collector.pop() 

223 

224 collector.append((sel_ver, sel_cmp)) 

225 prev_cmp = sel_cmp 

226 

227 return collector 

228 

229 

230def _optimize_version_constraints(vc_pairs: VCPairsType, model: ModelType) -> VCPairsType: 

231 """Validate and optimize the version constraints parsed from string. 

232 

233 Examples: 

234 

235 >>> received = 'vers:golang/>v0|>=v1|v2|<v3|v4|<v5|>=v6' 

236 >>> split_up = [('v0', GT), ('v1', GE), ('v2', EQ), ('v3', LT), ('v4', EQ), ('v5', LT), ('v6', GT)] 

237 >>> pairs = _optimize_version_constraints(vc_pairs=split_up, model={}) 

238 >>> pairs 

239 [('v0', '>'), ('v5', '<'), ('v6', '>')] 

240 """ 

241 vc_unequal_pairs: VCPairsType = [(v, c) for v, c in vc_pairs if c == NE] 

242 vc_other_pairs: VCPairsType = [(v, c) for v, c in vc_pairs if c != NE] 

243 model['vc-unequal-pairs'] = vc_unequal_pairs 

244 model['vc-other-pairs'] = vc_other_pairs 

245 model['version-constraint-pairs'] = vc_pairs 

246 

247 if len(vc_pairs) == 1: 

248 return vc_pairs 

249 

250 if len(vc_other_pairs) < 2: 

251 return vc_pairs 

252 

253 vc_pairs = list(set(_squeeze_ranges(vc_other_pairs))) 

254 vc_pairs.extend(vc_unequal_pairs) 

255 vc_pairs.sort() 

256 model['version-constraint-pairs'] = vc_pairs 

257 

258 return vc_pairs 

259 

260 

261class VersionRanges: 

262 """Provide operations on version ranges. 

263 

264 Usage examples: 

265 

266 >>> lc_url_encoded = '1%3e2%3c3%3d4%215%2a6%7c7' 

267 >>> uc_url_encoded = '1%3E2%3C3%3D4%215%2A6%7C7' 

268 >>> version_decoded = '1>2<3=4!5*6|7' 

269 >>> triplicated = '|'.join((lc_url_encoded, uc_url_encoded, version_decoded)) 

270 >>> version_ranges = VersionRanges(f'vers:pypi/{triplicated}') 

271 >>> assert 'unique' in version_ranges.model.get('error', '') 

272 

273 """ 

274 

275 def __init__(self, version_range: str) -> None: 

276 """Later alligator. 

277 

278 Usage examples: 

279 

280 >>> hidden_emopty_version = 'vers:pypi/|1.2.3|>||||' 

281 >>> version_ranges = VersionRanges(hidden_emopty_version) 

282 >>> assert 'empty version detected' in version_ranges.model.get('error', '') 

283 """ 

284 self.failed, self.model = self.parse(''.join(version_range.split())) 

285 

286 def normalize(self, version_range: Union[str, None] = None) -> str: 

287 """Normalize version range. 

288 

289 Usage examples: 

290 

291 >>> a_version = 'vers:pypi/|1.2.3||||' 

292 >>> version_ranges = VersionRanges(a_version) 

293 >>> assert not version_ranges.model.get('error', '') 

294 >>> version_ranges.normalize() 

295 'vers:pypi/1.2.3' 

296 

297 >>> hidden_emopty_version = 'vers:pypi/|1.2.3|>||||' 

298 >>> vr = VersionRanges('vers:abc/42') 

299 >>> vr.normalize(hidden_emopty_version) 

300 'ERROR:<empty version detected>' 

301 """ 

302 if version_range is not None: 

303 self.failed, self.model = self.parse(''.join(version_range.split())) 

304 if error := self.model.get('error', ''): 

305 return 'ERROR:<' + error + '>' # type: ignore 

306 return self.version_range # type:ignore 

307 

308 def __eq__(self, other: object) -> bool: 

309 """We define equality per the version ranges.""" 

310 if not isinstance(other, VersionRanges): 

311 return NotImplemented 

312 return self.version_range == other.version_range 

313 

314 def __hash__(self) -> int: 

315 """We define our identity per the version range.""" 

316 return hash(self.version_range) 

317 

318 def __repr__(self) -> str: 

319 """The version ranges string wrapped in constructor. 

320 

321 Usage examples: 

322 

323 >>> maybe_43 = 'vers:pypi/<44|>42' 

324 >>> version_ranges = VersionRanges(maybe_43) 

325 >>> assert "VersionRanges('vers:pypi/>42|<44')" == repr(version_ranges) 

326 """ 

327 return f"{self.__class__.__name__}('{self.version_range}')" 

328 

329 def __str__(self) -> str: 

330 """The version ranges string is what we are. 

331 

332 Usage examples: 

333 

334 >>> maybe_43 = 'vers:pypi/<44|>42' 

335 >>> version_ranges = VersionRanges(maybe_43) 

336 >>> assert 'vers:pypi/>42|<44' == str(version_ranges) 

337 """ 

338 return self.version_range # type: ignore 

339 

340 def parse(self, version_range: str) -> tuple[bool, ModelType]: 

341 """Poor person parser for bootstrap.""" 

342 model: ModelType = { 

343 'received': version_range, 

344 } 

345 

346 failed, scheme_and_vcs = _parse_uri_scheme(version_range, model) 

347 if failed: 

348 return failed, model 

349 

350 failed, vc_string = _parse_version_scheme(scheme_and_vcs, model) 

351 if failed: 

352 return failed, model 

353 

354 failed, version_constraints = _split_version_constraints(vc_string, model) 

355 if failed: 

356 return failed, model 

357 

358 failed, vc_pairs = _parse_version_constraint_pairs(version_constraints, model) 

359 if failed: 

360 return failed, model 

361 

362 vc_pairs = _optimize_version_constraints(vc_pairs, model) 

363 

364 model['version-constraints'] = [f'{c}{v}' for v, c in vc_pairs] 

365 vcs_compressed = PIPE.join(f'{c}{v}' if c != EQ else v for v, c in vc_pairs) 

366 model['version-constraints-string-compressed'] = vcs_compressed 

367 model['version-range'] = 'vers' + COLON + model['versioning-scheme'] + SLASH + vcs_compressed # type: ignore 

368 

369 self.versioning_scheme = model['versioning-scheme'] 

370 self.version_constraints = model['version-constraints'] 

371 self.version_range = model['version-range'] 

372 

373 return failed, model 

374 

375 

376def main(options: argparse.Namespace) -> int: 

377 if options.debug: 

378 for line in env.report(options, format='text').split('\n'): # type: ignore 

379 log.debug(line) 

380 if options.versions: 

381 log.warning('version inclusion assessment requested, but not implemented yet') 

382 log.warning("details: requested versions were ('%s')" % ("', '".join(options.versions),)) 

383 non_space_versions = [v.strip() for v in options.versions] 

384 non_empty_versions = [v for v in non_space_versions if v] 

385 if non_space_versions != non_empty_versions: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 log.error('received empty or space only version identifiers for inclusion test') 

387 return 2 

388 version_ranges = VersionRanges(options.version_ranges) 

389 if not version_ranges.failed: 

390 if options.debug: 

391 log.debug('model: [') 

392 for k, v in version_ranges.model.items(): 

393 if isinstance(v, str): 

394 log.debug("- %s: '%s'" % (k, str(v))) 

395 else: 

396 log.debug('- %s: %s' % (k, str(v))) 

397 log.debug(']') 

398 print(version_ranges) 

399 return 0 

400 

401 return 1