Coverage for limitys/align.py: 0.00%
301 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:06:37 +00:00
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-04 20:06:37 +00:00
1from typing import no_type_check
3__all__ = ['Alignment', 'Hirschberg', 'Needleman', 'SegmentAlignment']
6@no_type_check
7class Alignment(object):
8 SCORE_UNIFORM = 1
9 SCORE_PROPORTION = 2
11 @no_type_check
12 def __init__(self):
13 self.seq_a = None
14 self.seq_b = None
15 self.len_a = None
16 self.len_b = None
17 self.score_null = 5
18 self.score_sub = -100
19 self.score_del = -3
20 self.score_ins = -3
21 self.separator = '|'
22 self.mode = Alignment.SCORE_UNIFORM
24 @no_type_check
25 def set_score(self, score_null=None, score_sub=None, score_del=None, score_ins=None):
26 if score_null is not None:
27 self.score_null = score_null
28 if score_sub is not None:
29 self.score_sub = score_sub
30 if score_del is not None:
31 self.score_del = score_del
32 if score_ins is not None:
33 self.score_ins = score_ins
35 @no_type_check
36 def match(self, a, b):
37 if a == b and self.mode == Alignment.SCORE_UNIFORM:
38 return self.score_null
39 elif self.mode == Alignment.SCORE_UNIFORM:
40 return self.score_sub
41 elif a == b:
42 return self.score_null * len(a)
43 else:
44 return self.score_sub * len(a)
46 @no_type_check
47 def delete(self, a):
48 """Deleted elements are on seqa."""
49 if self.mode == Alignment.SCORE_UNIFORM:
50 return self.score_del
51 return self.score_del * len(a)
53 @no_type_check
54 def insert(self, a):
55 """Inserted elements are on seqb."""
56 if self.mode == Alignment.SCORE_UNIFORM:
57 return self.score_ins
58 return self.score_ins * len(a)
60 @no_type_check
61 def score(self, aligned_seq_a, aligned_seq_b):
62 score = 0
63 for a, b in zip(aligned_seq_a, aligned_seq_b):
64 if a == b:
65 score += self.score_null
66 else:
67 if a == self.separator:
68 score += self.score_ins
69 elif b == self.separator:
70 score += self.score_del
71 else:
72 score += self.score_sub
73 return score
75 @no_type_check
76 def map_alignment(self, aligned_seq_a, aligned_seq_b):
77 map_b2a = []
78 idx = 0
79 for x, y in zip(aligned_seq_a, aligned_seq_b):
80 if x == y:
81 map_b2a.append(idx)
82 idx += 1
83 elif x == self.separator:
84 map_b2a.append(idx)
85 elif y == self.separator:
86 idx += 1
87 continue
88 return map_b2a
91@no_type_check
92class Needleman(Alignment):
93 @no_type_check
94 def __init__(self, *args):
95 super(Needleman, self).__init__()
96 self.semi_global = False
97 self.matrix = None
99 @no_type_check
100 def init_matrix(self):
101 rows = self.len_a + 1
102 cols = self.len_b + 1
103 self.matrix = [[0] * cols for _ in range(rows)]
105 @no_type_check
106 def compute_matrix(self):
107 seq_a = self.seq_a
108 seq_b = self.seq_b
109 len_a = self.len_a
110 len_b = self.len_b
112 if not self.semi_global:
113 for i in range(1, len_a + 1):
114 self.matrix[i][0] = self.delete(seq_a[i - 1]) + self.matrix[i - 1][0]
115 for i in range(1, len_b + 1):
116 self.matrix[0][i] = self.insert(seq_b[i - 1]) + self.matrix[0][i - 1]
118 for i in range(1, len_a + 1):
119 for j in range(1, len_b + 1):
120 """
121 Note that rows = len_a+1, cols = len_b+1
122 """
124 score_sub = self.matrix[i - 1][j - 1] + self.match(seq_a[i - 1], seq_b[j - 1])
125 score_del = self.matrix[i - 1][j] + self.delete(seq_a[i - 1])
126 score_ins = self.matrix[i][j - 1] + self.insert(seq_b[j - 1])
127 self.matrix[i][j] = max(score_sub, score_del, score_ins)
129 @no_type_check
130 def backtrack(self):
131 aligned_seq_a, aligned_seq_b = [], []
132 seq_a, seq_b = self.seq_a, self.seq_b
134 if self.semi_global:
135 last_col_max, _ = max(enumerate([row[-1] for row in self.matrix]), key=lambda a: a[1])
136 last_row_max, _ = max(enumerate([col for col in self.matrix[-1]]), key=lambda a: a[1])
138 if self.len_a < self.len_b:
139 i, j = self.len_a, last_row_max
140 aligned_seq_a = [self.separator] * (self.len_b - last_row_max)
141 aligned_seq_b = seq_b[last_row_max:]
142 else:
143 i, j = last_col_max, self.len_b
144 aligned_seq_a = seq_a[last_col_max:]
145 aligned_seq_b = [self.separator] * (self.len_a - last_col_max)
146 else:
147 i, j = self.len_a, self.len_b
149 mat = self.matrix
151 while i > 0 or j > 0:
152 if self.semi_global and (i == 0 or j == 0):
153 if i == 0 and j > 0:
154 aligned_seq_a = [self.separator] * j + aligned_seq_a
155 aligned_seq_b = seq_b[:j] + aligned_seq_b
156 elif i > 0 and j == 0:
157 aligned_seq_a = seq_a[:i] + aligned_seq_a
158 aligned_seq_b = [self.separator] * i + aligned_seq_b
159 break
161 if j > 0 and mat[i][j] == mat[i][j - 1] + self.insert(seq_b[j - 1]):
162 aligned_seq_a.insert(0, self.separator * len(seq_b[j - 1]))
163 aligned_seq_b.insert(0, seq_b[j - 1])
164 j -= 1
166 elif i > 0 and mat[i][j] == mat[i - 1][j] + self.delete(seq_a[i - 1]):
167 aligned_seq_a.insert(0, seq_a[i - 1])
168 aligned_seq_b.insert(0, self.separator * len(seq_a[i - 1]))
169 i -= 1
171 elif i > 0 and j > 0 and mat[i][j] == mat[i - 1][j - 1] + self.match(seq_a[i - 1], seq_b[j - 1]):
172 aligned_seq_a.insert(0, seq_a[i - 1])
173 aligned_seq_b.insert(0, seq_b[j - 1])
174 i -= 1
175 j -= 1
177 else:
178 print(seq_a)
179 print(seq_b)
180 print(aligned_seq_a)
181 print(aligned_seq_b)
182 raise Exception('backtrack error', i, j, seq_a[i - 2 : i + 1], seq_b[j - 2 : j + 1])
184 return aligned_seq_a, aligned_seq_b
186 @no_type_check
187 def align(self, seq_a, seq_b, semi_global=True, mode=None):
188 self.seq_a = seq_a
189 self.seq_b = seq_b
190 self.len_a = len(self.seq_a)
191 self.len_b = len(self.seq_b)
193 self.semi_global = semi_global
195 if mode is not None:
196 self.mode = mode
197 self.init_matrix()
198 self.compute_matrix()
199 return self.backtrack()
202@no_type_check
203class Hirschberg(Alignment):
204 @no_type_check
205 def __init__(self):
206 super(Hirschberg, self).__init__()
207 self.needleman = Needleman()
209 @no_type_check
210 def last_row(self, seqa, seqb):
211 lena = len(seqa)
212 lenb = len(seqb)
213 pre_row = [0] * (lenb + 1)
214 cur_row = [0] * (lenb + 1)
216 for j in range(1, lenb + 1):
217 pre_row[j] = pre_row[j - 1] + self.insert(seqb[j - 1])
219 for i in range(1, lena + 1):
220 cur_row[0] = self.delete(seqa[i - 1]) + pre_row[0]
221 for j in range(1, lenb + 1):
222 score_sub = pre_row[j - 1] + self.match(seqa[i - 1], seqb[j - 1])
223 score_del = pre_row[j] + self.delete(seqa[i - 1])
224 score_ins = cur_row[j - 1] + self.insert(seqb[j - 1])
225 cur_row[j] = max(score_sub, score_del, score_ins)
227 pre_row = cur_row
228 cur_row = [0] * (lenb + 1)
230 return pre_row
232 @no_type_check
233 def align_rec(self, seq_a, seq_b):
234 aligned_a, aligned_b = [], []
235 len_a, len_b = len(seq_a), len(seq_b)
237 if len_a == 0:
238 for i in range(len_b):
239 aligned_a.append(self.separator * len(seq_b[i]))
240 aligned_b.append(seq_b[i])
241 elif len_b == 0:
242 for i in range(len_a):
243 aligned_a.append(seq_a[i])
244 aligned_b.append(self.separator * len(seq_a[i]))
246 elif len(seq_a) == 1:
247 aligned_a, aligned_b = self.needleman.align(seq_a, seq_b)
249 else:
250 mid_a = int(len_a / 2)
252 rowleft = self.last_row(seq_a[:mid_a], seq_b)
253 rowright = self.last_row(seq_a[mid_a:][::-1], seq_b[::-1])
255 rowright.reverse()
257 row = [left + right for left, right in zip(rowleft, rowright)]
258 maxidx, _ = max(enumerate(row), key=lambda a: a[1])
260 mid_b = maxidx
262 aligned_a_left, aligned_b_left = self.align_rec(seq_a[:mid_a], seq_b[:mid_b])
263 aligned_a_right, aligned_b_right = self.align_rec(seq_a[mid_a:], seq_b[mid_b:])
264 aligned_a = aligned_a_left + aligned_a_right
265 aligned_b = aligned_b_left + aligned_b_right
267 return aligned_a, aligned_b
269 @no_type_check
270 def align(self, seq_a, seq_b, mode=None):
271 self.seq_a = seq_a
272 self.seq_b = seq_b
273 self.len_a = len(self.seq_a)
274 self.len_b = len(self.seq_b)
275 if mode is not None:
276 self.mode = mode
277 return self.align_rec(self.seq_a, self.seq_b)
280@no_type_check
281class SegmentAlignment(Alignment):
282 step = 50
284 @no_type_check
285 def __init__(self):
286 super(SegmentAlignment, self).__init__()
288 @no_type_check
289 @classmethod
290 def skip_same(cls, seq_a, seq_b, curr_a, curr_b, aligned_seq_a, aligned_seq_b):
291 while True:
292 if seq_a[curr_a] == seq_b[curr_b]:
293 aligned_seq_a.append(seq_a[curr_a])
294 aligned_seq_b.append(seq_b[curr_b])
295 curr_a += 1
296 curr_b += 1
297 else:
298 break
300 if curr_a >= len(seq_a) or curr_b >= len(seq_b):
301 break
303 return curr_a, curr_b
305 @no_type_check
306 @classmethod
307 def align(cls, seq_left, seq_right, segment_half=False, base_alignment='Needleman', semi_global=True):
308 if len(seq_left) < len(seq_right):
309 seq_a, seq_b = seq_left, seq_right
310 else:
311 seq_b, seq_a = seq_left, seq_right
313 len_a = len(seq_a)
314 len_b = len(seq_b)
316 diff = cls.step
318 curr_a = 0
319 curr_b = 0
320 is_needleman = False
322 if base_alignment == 'Hirschberg':
323 aligner = Hirschberg()
324 elif base_alignment == 'Needleman':
325 aligner = Needleman()
326 is_needleman = True
327 else:
328 aligner = None
330 align = aligner.align
332 aligned_a = []
333 aligned_b = []
335 insert_length = 0
336 while curr_a < len_a and curr_b < len_b:
337 if not (is_needleman and semi_global):
338 curr_a, curr_b = cls.skip_same(seq_a, seq_b, curr_a, curr_b, aligned_a, aligned_b)
340 sub_seq_a = seq_a[curr_a : curr_a + cls.step]
341 sub_seq_b = seq_b[curr_b : curr_b + cls.step + diff]
343 if is_needleman:
344 aligned_sub_a, aligned_sub_b = align(sub_seq_a, sub_seq_b, semi_global=semi_global)
345 else:
346 aligned_sub_a, aligned_sub_b = align(sub_seq_a, sub_seq_b)
348 if segment_half:
349 half_aligned_sub_a = []
350 char_len = 0
352 for char in aligned_sub_a:
353 half_aligned_sub_a.append(char)
354 if char != '|':
355 char_len += 1
356 if char_len >= cls.step / 2:
357 break
359 aligned_sub_a = half_aligned_sub_a
360 aligned_sub_b = aligned_sub_b[: len(aligned_sub_a)]
362 incre_a = int(cls.step / 2)
363 incre_b = sum([1 for char in aligned_sub_b if char != '|'])
364 aligned_a += aligned_sub_a
365 aligned_b += aligned_sub_b
366 else:
367 insert_length = 0
368 for char in aligned_sub_a[::-1]:
369 if char == '|':
370 insert_length += 1
371 else:
372 break
374 incre_a = len(sub_seq_a)
375 incre_b = len(sub_seq_b) - insert_length
376 aligned_a += aligned_sub_a[: len(aligned_sub_a) - insert_length]
377 aligned_b += aligned_sub_b[: len(aligned_sub_b) - insert_length]
379 curr_a += incre_a
380 curr_b += incre_b
382 diff = cls.step
384 if curr_b < len_b:
385 aligned_a += ['|'] * (len_b - curr_b)
386 aligned_b += seq_b[curr_b:]
387 else:
388 aligned_a += seq_a[curr_a:]
389 aligned_b += ['|'] * (len_a - curr_a)
390 if len(seq_left) < len(seq_right):
391 return aligned_a, aligned_b
392 else:
393 return aligned_b, aligned_a