Coverage for limitys/align.py: 0.00%

301 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-04 20:06:37 +00:00

1from typing import no_type_check 

2 

3__all__ = ['Alignment', 'Hirschberg', 'Needleman', 'SegmentAlignment'] 

4 

5 

6@no_type_check 

7class Alignment(object): 

8 SCORE_UNIFORM = 1 

9 SCORE_PROPORTION = 2 

10 

11 @no_type_check 

12 def __init__(self): 

13 self.seq_a = None 

14 self.seq_b = None 

15 self.len_a = None 

16 self.len_b = None 

17 self.score_null = 5 

18 self.score_sub = -100 

19 self.score_del = -3 

20 self.score_ins = -3 

21 self.separator = '|' 

22 self.mode = Alignment.SCORE_UNIFORM 

23 

24 @no_type_check 

25 def set_score(self, score_null=None, score_sub=None, score_del=None, score_ins=None): 

26 if score_null is not None: 

27 self.score_null = score_null 

28 if score_sub is not None: 

29 self.score_sub = score_sub 

30 if score_del is not None: 

31 self.score_del = score_del 

32 if score_ins is not None: 

33 self.score_ins = score_ins 

34 

35 @no_type_check 

36 def match(self, a, b): 

37 if a == b and self.mode == Alignment.SCORE_UNIFORM: 

38 return self.score_null 

39 elif self.mode == Alignment.SCORE_UNIFORM: 

40 return self.score_sub 

41 elif a == b: 

42 return self.score_null * len(a) 

43 else: 

44 return self.score_sub * len(a) 

45 

46 @no_type_check 

47 def delete(self, a): 

48 """Deleted elements are on seqa.""" 

49 if self.mode == Alignment.SCORE_UNIFORM: 

50 return self.score_del 

51 return self.score_del * len(a) 

52 

53 @no_type_check 

54 def insert(self, a): 

55 """Inserted elements are on seqb.""" 

56 if self.mode == Alignment.SCORE_UNIFORM: 

57 return self.score_ins 

58 return self.score_ins * len(a) 

59 

60 @no_type_check 

61 def score(self, aligned_seq_a, aligned_seq_b): 

62 score = 0 

63 for a, b in zip(aligned_seq_a, aligned_seq_b): 

64 if a == b: 

65 score += self.score_null 

66 else: 

67 if a == self.separator: 

68 score += self.score_ins 

69 elif b == self.separator: 

70 score += self.score_del 

71 else: 

72 score += self.score_sub 

73 return score 

74 

75 @no_type_check 

76 def map_alignment(self, aligned_seq_a, aligned_seq_b): 

77 map_b2a = [] 

78 idx = 0 

79 for x, y in zip(aligned_seq_a, aligned_seq_b): 

80 if x == y: 

81 map_b2a.append(idx) 

82 idx += 1 

83 elif x == self.separator: 

84 map_b2a.append(idx) 

85 elif y == self.separator: 

86 idx += 1 

87 continue 

88 return map_b2a 

89 

90 

91@no_type_check 

92class Needleman(Alignment): 

93 @no_type_check 

94 def __init__(self, *args): 

95 super(Needleman, self).__init__() 

96 self.semi_global = False 

97 self.matrix = None 

98 

99 @no_type_check 

100 def init_matrix(self): 

101 rows = self.len_a + 1 

102 cols = self.len_b + 1 

103 self.matrix = [[0] * cols for _ in range(rows)] 

104 

105 @no_type_check 

106 def compute_matrix(self): 

107 seq_a = self.seq_a 

108 seq_b = self.seq_b 

109 len_a = self.len_a 

110 len_b = self.len_b 

111 

112 if not self.semi_global: 

113 for i in range(1, len_a + 1): 

114 self.matrix[i][0] = self.delete(seq_a[i - 1]) + self.matrix[i - 1][0] 

115 for i in range(1, len_b + 1): 

116 self.matrix[0][i] = self.insert(seq_b[i - 1]) + self.matrix[0][i - 1] 

117 

118 for i in range(1, len_a + 1): 

119 for j in range(1, len_b + 1): 

120 """ 

121 Note that rows = len_a+1, cols = len_b+1 

122 """ 

123 

124 score_sub = self.matrix[i - 1][j - 1] + self.match(seq_a[i - 1], seq_b[j - 1]) 

125 score_del = self.matrix[i - 1][j] + self.delete(seq_a[i - 1]) 

126 score_ins = self.matrix[i][j - 1] + self.insert(seq_b[j - 1]) 

127 self.matrix[i][j] = max(score_sub, score_del, score_ins) 

128 

129 @no_type_check 

130 def backtrack(self): 

131 aligned_seq_a, aligned_seq_b = [], [] 

132 seq_a, seq_b = self.seq_a, self.seq_b 

133 

134 if self.semi_global: 

135 last_col_max, _ = max(enumerate([row[-1] for row in self.matrix]), key=lambda a: a[1]) 

136 last_row_max, _ = max(enumerate([col for col in self.matrix[-1]]), key=lambda a: a[1]) 

137 

138 if self.len_a < self.len_b: 

139 i, j = self.len_a, last_row_max 

140 aligned_seq_a = [self.separator] * (self.len_b - last_row_max) 

141 aligned_seq_b = seq_b[last_row_max:] 

142 else: 

143 i, j = last_col_max, self.len_b 

144 aligned_seq_a = seq_a[last_col_max:] 

145 aligned_seq_b = [self.separator] * (self.len_a - last_col_max) 

146 else: 

147 i, j = self.len_a, self.len_b 

148 

149 mat = self.matrix 

150 

151 while i > 0 or j > 0: 

152 if self.semi_global and (i == 0 or j == 0): 

153 if i == 0 and j > 0: 

154 aligned_seq_a = [self.separator] * j + aligned_seq_a 

155 aligned_seq_b = seq_b[:j] + aligned_seq_b 

156 elif i > 0 and j == 0: 

157 aligned_seq_a = seq_a[:i] + aligned_seq_a 

158 aligned_seq_b = [self.separator] * i + aligned_seq_b 

159 break 

160 

161 if j > 0 and mat[i][j] == mat[i][j - 1] + self.insert(seq_b[j - 1]): 

162 aligned_seq_a.insert(0, self.separator * len(seq_b[j - 1])) 

163 aligned_seq_b.insert(0, seq_b[j - 1]) 

164 j -= 1 

165 

166 elif i > 0 and mat[i][j] == mat[i - 1][j] + self.delete(seq_a[i - 1]): 

167 aligned_seq_a.insert(0, seq_a[i - 1]) 

168 aligned_seq_b.insert(0, self.separator * len(seq_a[i - 1])) 

169 i -= 1 

170 

171 elif i > 0 and j > 0 and mat[i][j] == mat[i - 1][j - 1] + self.match(seq_a[i - 1], seq_b[j - 1]): 

172 aligned_seq_a.insert(0, seq_a[i - 1]) 

173 aligned_seq_b.insert(0, seq_b[j - 1]) 

174 i -= 1 

175 j -= 1 

176 

177 else: 

178 print(seq_a) 

179 print(seq_b) 

180 print(aligned_seq_a) 

181 print(aligned_seq_b) 

182 raise Exception('backtrack error', i, j, seq_a[i - 2 : i + 1], seq_b[j - 2 : j + 1]) 

183 

184 return aligned_seq_a, aligned_seq_b 

185 

186 @no_type_check 

187 def align(self, seq_a, seq_b, semi_global=True, mode=None): 

188 self.seq_a = seq_a 

189 self.seq_b = seq_b 

190 self.len_a = len(self.seq_a) 

191 self.len_b = len(self.seq_b) 

192 

193 self.semi_global = semi_global 

194 

195 if mode is not None: 

196 self.mode = mode 

197 self.init_matrix() 

198 self.compute_matrix() 

199 return self.backtrack() 

200 

201 

202@no_type_check 

203class Hirschberg(Alignment): 

204 @no_type_check 

205 def __init__(self): 

206 super(Hirschberg, self).__init__() 

207 self.needleman = Needleman() 

208 

209 @no_type_check 

210 def last_row(self, seqa, seqb): 

211 lena = len(seqa) 

212 lenb = len(seqb) 

213 pre_row = [0] * (lenb + 1) 

214 cur_row = [0] * (lenb + 1) 

215 

216 for j in range(1, lenb + 1): 

217 pre_row[j] = pre_row[j - 1] + self.insert(seqb[j - 1]) 

218 

219 for i in range(1, lena + 1): 

220 cur_row[0] = self.delete(seqa[i - 1]) + pre_row[0] 

221 for j in range(1, lenb + 1): 

222 score_sub = pre_row[j - 1] + self.match(seqa[i - 1], seqb[j - 1]) 

223 score_del = pre_row[j] + self.delete(seqa[i - 1]) 

224 score_ins = cur_row[j - 1] + self.insert(seqb[j - 1]) 

225 cur_row[j] = max(score_sub, score_del, score_ins) 

226 

227 pre_row = cur_row 

228 cur_row = [0] * (lenb + 1) 

229 

230 return pre_row 

231 

232 @no_type_check 

233 def align_rec(self, seq_a, seq_b): 

234 aligned_a, aligned_b = [], [] 

235 len_a, len_b = len(seq_a), len(seq_b) 

236 

237 if len_a == 0: 

238 for i in range(len_b): 

239 aligned_a.append(self.separator * len(seq_b[i])) 

240 aligned_b.append(seq_b[i]) 

241 elif len_b == 0: 

242 for i in range(len_a): 

243 aligned_a.append(seq_a[i]) 

244 aligned_b.append(self.separator * len(seq_a[i])) 

245 

246 elif len(seq_a) == 1: 

247 aligned_a, aligned_b = self.needleman.align(seq_a, seq_b) 

248 

249 else: 

250 mid_a = int(len_a / 2) 

251 

252 rowleft = self.last_row(seq_a[:mid_a], seq_b) 

253 rowright = self.last_row(seq_a[mid_a:][::-1], seq_b[::-1]) 

254 

255 rowright.reverse() 

256 

257 row = [left + right for left, right in zip(rowleft, rowright)] 

258 maxidx, _ = max(enumerate(row), key=lambda a: a[1]) 

259 

260 mid_b = maxidx 

261 

262 aligned_a_left, aligned_b_left = self.align_rec(seq_a[:mid_a], seq_b[:mid_b]) 

263 aligned_a_right, aligned_b_right = self.align_rec(seq_a[mid_a:], seq_b[mid_b:]) 

264 aligned_a = aligned_a_left + aligned_a_right 

265 aligned_b = aligned_b_left + aligned_b_right 

266 

267 return aligned_a, aligned_b 

268 

269 @no_type_check 

270 def align(self, seq_a, seq_b, mode=None): 

271 self.seq_a = seq_a 

272 self.seq_b = seq_b 

273 self.len_a = len(self.seq_a) 

274 self.len_b = len(self.seq_b) 

275 if mode is not None: 

276 self.mode = mode 

277 return self.align_rec(self.seq_a, self.seq_b) 

278 

279 

280@no_type_check 

281class SegmentAlignment(Alignment): 

282 step = 50 

283 

284 @no_type_check 

285 def __init__(self): 

286 super(SegmentAlignment, self).__init__() 

287 

288 @no_type_check 

289 @classmethod 

290 def skip_same(cls, seq_a, seq_b, curr_a, curr_b, aligned_seq_a, aligned_seq_b): 

291 while True: 

292 if seq_a[curr_a] == seq_b[curr_b]: 

293 aligned_seq_a.append(seq_a[curr_a]) 

294 aligned_seq_b.append(seq_b[curr_b]) 

295 curr_a += 1 

296 curr_b += 1 

297 else: 

298 break 

299 

300 if curr_a >= len(seq_a) or curr_b >= len(seq_b): 

301 break 

302 

303 return curr_a, curr_b 

304 

305 @no_type_check 

306 @classmethod 

307 def align(cls, seq_left, seq_right, segment_half=False, base_alignment='Needleman', semi_global=True): 

308 if len(seq_left) < len(seq_right): 

309 seq_a, seq_b = seq_left, seq_right 

310 else: 

311 seq_b, seq_a = seq_left, seq_right 

312 

313 len_a = len(seq_a) 

314 len_b = len(seq_b) 

315 

316 diff = cls.step 

317 

318 curr_a = 0 

319 curr_b = 0 

320 is_needleman = False 

321 

322 if base_alignment == 'Hirschberg': 

323 aligner = Hirschberg() 

324 elif base_alignment == 'Needleman': 

325 aligner = Needleman() 

326 is_needleman = True 

327 else: 

328 aligner = None 

329 

330 align = aligner.align 

331 

332 aligned_a = [] 

333 aligned_b = [] 

334 

335 insert_length = 0 

336 while curr_a < len_a and curr_b < len_b: 

337 if not (is_needleman and semi_global): 

338 curr_a, curr_b = cls.skip_same(seq_a, seq_b, curr_a, curr_b, aligned_a, aligned_b) 

339 

340 sub_seq_a = seq_a[curr_a : curr_a + cls.step] 

341 sub_seq_b = seq_b[curr_b : curr_b + cls.step + diff] 

342 

343 if is_needleman: 

344 aligned_sub_a, aligned_sub_b = align(sub_seq_a, sub_seq_b, semi_global=semi_global) 

345 else: 

346 aligned_sub_a, aligned_sub_b = align(sub_seq_a, sub_seq_b) 

347 

348 if segment_half: 

349 half_aligned_sub_a = [] 

350 char_len = 0 

351 

352 for char in aligned_sub_a: 

353 half_aligned_sub_a.append(char) 

354 if char != '|': 

355 char_len += 1 

356 if char_len >= cls.step / 2: 

357 break 

358 

359 aligned_sub_a = half_aligned_sub_a 

360 aligned_sub_b = aligned_sub_b[: len(aligned_sub_a)] 

361 

362 incre_a = int(cls.step / 2) 

363 incre_b = sum([1 for char in aligned_sub_b if char != '|']) 

364 aligned_a += aligned_sub_a 

365 aligned_b += aligned_sub_b 

366 else: 

367 insert_length = 0 

368 for char in aligned_sub_a[::-1]: 

369 if char == '|': 

370 insert_length += 1 

371 else: 

372 break 

373 

374 incre_a = len(sub_seq_a) 

375 incre_b = len(sub_seq_b) - insert_length 

376 aligned_a += aligned_sub_a[: len(aligned_sub_a) - insert_length] 

377 aligned_b += aligned_sub_b[: len(aligned_sub_b) - insert_length] 

378 

379 curr_a += incre_a 

380 curr_b += incre_b 

381 

382 diff = cls.step 

383 

384 if curr_b < len_b: 

385 aligned_a += ['|'] * (len_b - curr_b) 

386 aligned_b += seq_b[curr_b:] 

387 else: 

388 aligned_a += seq_a[curr_a:] 

389 aligned_b += ['|'] * (len_a - curr_a) 

390 if len(seq_left) < len(seq_right): 

391 return aligned_a, aligned_b 

392 else: 

393 return aligned_b, aligned_a