0
# Sequence Alignment
1
2
Global sequence alignment algorithms like Needleman-Wunsch for optimal alignment of time series with gap penalties. This module provides alternative approaches to DTW for sequence comparison and alignment tasks, particularly useful when explicit gap handling and global alignment constraints are needed.
3
4
## Capabilities
5
6
### Global Sequence Alignment
7
8
Compute optimal global alignment between two sequences using the Needleman-Wunsch algorithm, which handles gaps explicitly and ensures end-to-end alignment.
9
10
```python { .api }
11
def needleman_wunsch(s1, s2, window=None, max_dist=None, max_step=None,
12
max_length_diff=None, psi=None):
13
"""
14
Global sequence alignment using Needleman-Wunsch algorithm.
15
16
Computes optimal global alignment between two sequences with explicit
17
gap handling, ensuring alignment from beginning to end of both sequences.
18
Unlike DTW, which allows flexible warping, this enforces global alignment
19
with gap penalties.
20
21
Parameters:
22
- s1, s2: array-like, input sequences to align
23
- window: int, alignment window constraint (similar to DTW Sakoe-Chiba band)
24
- max_dist: float, early stopping threshold
25
- max_step: float, maximum step size constraint
26
- max_length_diff: int, maximum allowed length difference
27
- psi: int, psi relaxation parameter for boundary conditions
28
29
Returns:
30
tuple: (alignment_score, alignment_matrix)
31
- alignment_score: float, optimal alignment score
32
- alignment_matrix: 2D array, alignment scoring matrix
33
"""
34
```
35
36
### Optimal Alignment Extraction
37
38
Extract the optimal alignment path and create aligned sequences with explicit gap symbols from alignment matrices.
39
40
```python { .api }
41
def best_alignment(paths, s1=None, s2=None, gap="-", order=None):
42
"""
43
Compute optimal alignment from alignment paths matrix.
44
45
Traces back through the alignment matrix to extract the optimal
46
alignment path and creates aligned sequences with gap symbols.
47
48
Parameters:
49
- paths: 2D array, alignment paths matrix from needleman_wunsch()
50
- s1, s2: array-like, original sequences (optional, for creating aligned output)
51
- gap: str/value, symbol to use for gaps in aligned sequences
52
- order: str, ordering preference for path selection ('first', 'last', None)
53
54
Returns:
55
tuple: (path, aligned_s1, aligned_s2)
56
- path: list, sequence of alignment operations
57
- aligned_s1: list, first sequence with gaps inserted
58
- aligned_s2: list, second sequence with gaps inserted
59
"""
60
```
61
62
## Usage Examples
63
64
### Basic Global Alignment
65
66
```python
67
from dtaidistance.alignment import needleman_wunsch, best_alignment
68
import numpy as np
69
70
# Create two sequences with different lengths and patterns
71
s1 = [1, 2, 3, 4, 5, 4, 3, 2, 1]
72
s2 = [2, 3, 4, 5, 4, 3, 2]
73
74
print(f"Sequence 1 length: {len(s1)}")
75
print(f"Sequence 2 length: {len(s2)}")
76
print(f"Sequence 1: {s1}")
77
print(f"Sequence 2: {s2}")
78
79
# Perform global alignment
80
alignment_score, alignment_matrix = needleman_wunsch(s1, s2)
81
82
print(f"\\nAlignment score: {alignment_score:.3f}")
83
print(f"Alignment matrix shape: {alignment_matrix.shape}")
84
85
# Extract optimal alignment
86
path, aligned_s1, aligned_s2 = best_alignment(alignment_matrix, s1, s2, gap="-")
87
88
print(f"\\nOptimal alignment path length: {len(path)}")
89
print(f"Aligned sequence 1: {aligned_s1}")
90
print(f"Aligned sequence 2: {aligned_s2}")
91
92
# Analyze alignment
93
print("\\nAlignment analysis:")
94
gaps_s1 = sum(1 for x in aligned_s1 if x == "-")
95
gaps_s2 = sum(1 for x in aligned_s2 if x == "-")
96
matches = sum(1 for x, y in zip(aligned_s1, aligned_s2) if x == y and x != "-")
97
98
print(f"Gaps in sequence 1: {gaps_s1}")
99
print(f"Gaps in sequence 2: {gaps_s2}")
100
print(f"Exact matches: {matches}")
101
print(f"Alignment length: {len(aligned_s1)}")
102
```
103
104
### Alignment vs DTW Comparison
105
106
```python
107
from dtaidistance.alignment import needleman_wunsch, best_alignment
108
from dtaidistance import dtw
109
import numpy as np
110
import matplotlib.pyplot as plt
111
112
# Create test sequences with clear alignment challenges
113
np.random.seed(42)
114
115
# Sequence 1: Base pattern with inserted elements
116
base_pattern = [1, 2, 3, 4, 3, 2, 1]
117
s1 = base_pattern.copy()
118
s1.insert(3, 3.5) # Insert element
119
s1.insert(6, 2.5) # Insert another element
120
121
# Sequence 2: Base pattern with deleted elements
122
s2 = base_pattern.copy()
123
s2.pop(2) # Remove element
124
s2.pop(4) # Remove another element
125
126
print("Original sequences:")
127
print(f"Sequence 1 (with insertions): {s1}")
128
print(f"Sequence 2 (with deletions): {s2}")
129
130
# Global alignment
131
alignment_score, alignment_matrix = needleman_wunsch(s1, s2)
132
path, aligned_s1, aligned_s2 = best_alignment(alignment_matrix, s1, s2, gap="-")
133
134
# DTW alignment
135
dtw_distance, dtw_paths = dtw.warping_paths(s1, s2)
136
dtw_path = dtw.best_path(dtw_paths)
137
138
print(f"\\nAlignment Results:")
139
print(f"Global alignment score: {alignment_score:.3f}")
140
print(f"DTW distance: {dtw_distance:.3f}")
141
142
print(f"\\nGlobal alignment result:")
143
print(f"Aligned S1: {aligned_s1}")
144
print(f"Aligned S2: {aligned_s2}")
145
146
# Visualize both approaches
147
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10))
148
149
# Original sequences
150
ax1.plot(s1, 'bo-', label='Sequence 1', markersize=8, linewidth=2)
151
ax1.plot(s2, 'ro-', label='Sequence 2', markersize=8, linewidth=2)
152
ax1.set_title('Original Sequences')
153
ax1.legend()
154
ax1.grid(True)
155
156
# Global alignment matrix
157
im2 = ax2.imshow(alignment_matrix, cmap='viridis', origin='lower')
158
ax2.set_title('Global Alignment Matrix')
159
ax2.set_xlabel('Sequence 2 Index')
160
ax2.set_ylabel('Sequence 1 Index')
161
plt.colorbar(im2, ax=ax2)
162
163
# DTW warping paths
164
im3 = ax3.imshow(dtw_paths, cmap='viridis', origin='lower')
165
ax3.set_title('DTW Warping Paths Matrix')
166
ax3.set_xlabel('Sequence 2 Index')
167
ax3.set_ylabel('Sequence 1 Index')
168
plt.colorbar(im3, ax=ax3)
169
170
if dtw_path:
171
path_i, path_j = zip(*dtw_path)
172
ax3.plot(path_j, path_i, 'white', linewidth=2)
173
174
# Alignment comparison
175
alignment_positions = []
176
for i, (a1, a2) in enumerate(zip(aligned_s1, aligned_s2)):
177
if a1 != "-" and a2 != "-":
178
alignment_positions.append(i)
179
180
ax4.plot(range(len(aligned_s1)), [x if x != "-" else None for x in aligned_s1],
181
'bo-', label='Aligned S1', markersize=6)
182
ax4.plot(range(len(aligned_s2)), [x if x != "-" else None for x in aligned_s2],
183
'ro-', label='Aligned S2', markersize=6)
184
185
# Mark gaps
186
for i, (a1, a2) in enumerate(zip(aligned_s1, aligned_s2)):
187
if a1 == "-":
188
ax4.axvline(x=i, color='blue', alpha=0.3, linestyle='--')
189
if a2 == "-":
190
ax4.axvline(x=i, color='red', alpha=0.3, linestyle='--')
191
192
ax4.set_title('Global Alignment Result (dashed lines = gaps)')
193
ax4.legend()
194
ax4.grid(True)
195
196
plt.tight_layout()
197
plt.show()
198
```
199
200
### Time Series Alignment with Constraints
201
202
```python
203
from dtaidistance.alignment import needleman_wunsch, best_alignment
204
import numpy as np
205
import matplotlib.pyplot as plt
206
207
# Generate time series with temporal shifts and noise
208
np.random.seed(42)
209
210
def create_shifted_signal(base_signal, shift=0, noise_level=0.1):
211
"""Create a shifted and noisy version of a base signal."""
212
shifted = np.roll(base_signal, shift)
213
noisy = shifted + noise_level * np.random.randn(len(shifted))
214
return noisy
215
216
# Base signal: combination of sine wave and trend
217
t = np.linspace(0, 4*np.pi, 50)
218
base_signal = np.sin(t) + 0.1 * t
219
220
# Create variations
221
original_signal = base_signal + 0.05 * np.random.randn(len(base_signal))
222
shifted_signal = create_shifted_signal(base_signal, shift=5, noise_level=0.1)
223
compressed_signal = base_signal[::2] + 0.05 * np.random.randn(len(base_signal)//2)
224
225
signals = [original_signal, shifted_signal, compressed_signal]
226
signal_names = ['Original', 'Shifted', 'Compressed']
227
228
print("Signal lengths:")
229
for name, signal in zip(signal_names, signals):
230
print(f"{name}: {len(signal)} points")
231
232
# Perform pairwise alignments with different constraints
233
constraint_sets = [
234
{'window': None}, # No constraint
235
{'window': 10}, # Moderate constraint
236
{'max_length_diff': 20} # Length difference constraint
237
]
238
239
constraint_names = ['Unconstrained', 'Window=10', 'MaxLenDiff=20']
240
241
# Compare alignments
242
fig, axes = plt.subplots(len(constraint_sets), 2, figsize=(14, 12))
243
244
for constraint_idx, (constraints, constraint_name) in enumerate(zip(constraint_sets, constraint_names)):
245
# Align original vs shifted signal
246
score, matrix = needleman_wunsch(original_signal, shifted_signal, **constraints)
247
path, aligned_orig, aligned_shift = best_alignment(matrix, original_signal, shifted_signal, gap=np.nan)
248
249
# Plot alignment matrix
250
ax_matrix = axes[constraint_idx, 0]
251
im = ax_matrix.imshow(matrix, cmap='viridis', origin='lower')
252
ax_matrix.set_title(f'{constraint_name} - Alignment Matrix\\nScore: {score:.2f}')
253
ax_matrix.set_xlabel('Shifted Signal Index')
254
ax_matrix.set_ylabel('Original Signal Index')
255
256
# Plot aligned sequences
257
ax_seqs = axes[constraint_idx, 1]
258
259
# Create x-axis for aligned sequences
260
x_aligned = range(len(aligned_orig))
261
262
# Plot with gap handling
263
orig_values = [val if not np.isnan(val) else None for val in aligned_orig]
264
shift_values = [val if not np.isnan(val) else None for val in aligned_shift]
265
266
ax_seqs.plot(x_aligned, orig_values, 'b-o', label='Original (aligned)',
267
markersize=3, linewidth=1.5, alpha=0.8)
268
ax_seqs.plot(x_aligned, shift_values, 'r-s', label='Shifted (aligned)',
269
markersize=3, linewidth=1.5, alpha=0.8)
270
271
# Mark gaps
272
for i, (orig_val, shift_val) in enumerate(zip(aligned_orig, aligned_shift)):
273
if np.isnan(orig_val):
274
ax_seqs.axvline(x=i, color='blue', alpha=0.2, linestyle='--')
275
if np.isnan(shift_val):
276
ax_seqs.axvline(x=i, color='red', alpha=0.2, linestyle='--')
277
278
ax_seqs.set_title(f'{constraint_name} - Aligned Sequences')
279
ax_seqs.legend()
280
ax_seqs.grid(True)
281
282
plt.tight_layout()
283
plt.show()
284
```
285
286
### Multiple Sequence Alignment Analysis
287
288
```python
289
from dtaidistance.alignment import needleman_wunsch, best_alignment
290
from dtaidistance import dtw
291
import numpy as np
292
import matplotlib.pyplot as plt
293
294
# Create a family of related sequences
295
np.random.seed(42)
296
297
def create_sequence_variant(base_seq, variant_type, intensity=1.0):
298
"""Create different variants of a base sequence."""
299
seq = base_seq.copy()
300
301
if variant_type == 'stretched':
302
# Stretch sequence by interpolation
303
x_old = np.arange(len(seq))
304
x_new = np.linspace(0, len(seq)-1, int(len(seq) * (1 + 0.3 * intensity)))
305
seq_stretched = np.interp(x_new, x_old, seq)
306
return seq_stretched
307
308
elif variant_type == 'compressed':
309
# Compress sequence
310
step = int(1 + intensity)
311
return seq[::step]
312
313
elif variant_type == 'noisy':
314
# Add noise
315
noise = intensity * 0.2 * np.random.randn(len(seq))
316
return seq + noise
317
318
elif variant_type == 'shifted':
319
# Shift values
320
return seq + intensity * 0.5
321
322
return seq
323
324
# Base sequence
325
t = np.linspace(0, 2*np.pi, 30)
326
base_sequence = np.sin(t) + 0.5 * np.cos(2*t)
327
328
# Create variants
329
variants = {
330
'Original': base_sequence,
331
'Stretched': create_sequence_variant(base_sequence, 'stretched', 1.0),
332
'Compressed': create_sequence_variant(base_sequence, 'compressed', 1),
333
'Noisy': create_sequence_variant(base_sequence, 'noisy', 1.0),
334
'Shifted': create_sequence_variant(base_sequence, 'shifted', 1.0)
335
}
336
337
variant_names = list(variants.keys())
338
variant_sequences = list(variants.values())
339
340
print("Sequence variants:")
341
for name, seq in variants.items():
342
print(f"{name}: length {len(seq)}")
343
344
# Compute pairwise alignment scores and DTW distances
345
n_variants = len(variants)
346
alignment_scores = np.zeros((n_variants, n_variants))
347
dtw_distances = np.zeros((n_variants, n_variants))
348
349
for i in range(n_variants):
350
for j in range(n_variants):
351
if i != j:
352
# Global alignment
353
score, _ = needleman_wunsch(variant_sequences[i], variant_sequences[j])
354
alignment_scores[i, j] = score
355
356
# DTW distance
357
dtw_dist = dtw.distance(variant_sequences[i], variant_sequences[j])
358
dtw_distances[i, j] = dtw_dist
359
360
print("\\nAlignment vs DTW Comparison:")
361
print("Alignment scores (lower = more similar):")
362
print(" ", " ".join(f"{name[:6]:>8}" for name in variant_names))
363
for i, name in enumerate(variant_names):
364
row_str = f"{name[:8]:>8}: "
365
for j in range(n_variants):
366
if i == j:
367
row_str += " 0.00 "
368
else:
369
row_str += f"{alignment_scores[i, j]:8.2f} "
370
print(row_str)
371
372
print("\\nDTW distances:")
373
print(" ", " ".join(f"{name[:6]:>8}" for name in variant_names))
374
for i, name in enumerate(variant_names):
375
row_str = f"{name[:8]:>8}: "
376
for j in range(n_variants):
377
if i == j:
378
row_str += " 0.00 "
379
else:
380
row_str += f"{dtw_distances[i, j]:8.2f} "
381
print(row_str)
382
383
# Visualize sequences and similarity matrices
384
fig = plt.figure(figsize=(16, 12))
385
386
# Plot all variant sequences
387
ax1 = plt.subplot(2, 3, (1, 3))
388
colors = ['blue', 'red', 'green', 'orange', 'purple']
389
for i, (name, seq) in enumerate(variants.items()):
390
ax1.plot(seq, color=colors[i], label=name, linewidth=2, alpha=0.8)
391
392
ax1.set_title('Sequence Variants')
393
ax1.legend()
394
ax1.grid(True)
395
ax1.set_xlabel('Time Point')
396
ax1.set_ylabel('Value')
397
398
# Plot alignment scores matrix
399
ax2 = plt.subplot(2, 3, 4)
400
im2 = ax2.imshow(alignment_scores, cmap='viridis')
401
ax2.set_title('Alignment Scores')
402
ax2.set_xticks(range(n_variants))
403
ax2.set_yticks(range(n_variants))
404
ax2.set_xticklabels([name[:6] for name in variant_names], rotation=45)
405
ax2.set_yticklabels([name[:6] for name in variant_names])
406
plt.colorbar(im2, ax=ax2)
407
408
# Plot DTW distances matrix
409
ax3 = plt.subplot(2, 3, 5)
410
im3 = ax3.imshow(dtw_distances, cmap='hot')
411
ax3.set_title('DTW Distances')
412
ax3.set_xticks(range(n_variants))
413
ax3.set_yticks(range(n_variants))
414
ax3.set_xticklabels([name[:6] for name in variant_names], rotation=45)
415
ax3.set_yticklabels([name[:6] for name in variant_names])
416
plt.colorbar(im3, ax=ax3)
417
418
# Detailed alignment example
419
ax4 = plt.subplot(2, 3, 6)
420
# Align original vs stretched for detailed view
421
score, matrix = needleman_wunsch(variants['Original'], variants['Stretched'])
422
path, aligned_orig, aligned_stretch = best_alignment(matrix, variants['Original'], variants['Stretched'], gap=np.nan)
423
424
x_aligned = range(len(aligned_orig))
425
orig_vals = [val if not np.isnan(val) else None for val in aligned_orig]
426
stretch_vals = [val if not np.isnan(val) else None for val in aligned_stretch]
427
428
ax4.plot(x_aligned, orig_vals, 'b-o', label='Original', markersize=4, linewidth=1.5)
429
ax4.plot(x_aligned, stretch_vals, 'r-s', label='Stretched', markersize=4, linewidth=1.5)
430
ax4.set_title(f'Detailed Alignment: Original vs Stretched\\nScore: {score:.2f}')
431
ax4.legend()
432
ax4.grid(True)
433
434
plt.tight_layout()
435
plt.show()
436
```
437
438
### Alignment Quality Assessment
439
440
```python
441
from dtaidistance.alignment import needleman_wunsch, best_alignment
442
import numpy as np
443
444
def assess_alignment_quality(s1, s2, aligned_s1, aligned_s2, gap_symbol=None):
445
"""Assess the quality of a sequence alignment."""
446
if gap_symbol is None:
447
gap_symbol = "-"
448
449
# Basic statistics
450
total_positions = len(aligned_s1)
451
452
# Count matches, mismatches, and gaps
453
matches = 0
454
mismatches = 0
455
gaps_s1 = 0
456
gaps_s2 = 0
457
458
for a1, a2 in zip(aligned_s1, aligned_s2):
459
if a1 == gap_symbol:
460
gaps_s1 += 1
461
elif a2 == gap_symbol:
462
gaps_s2 += 1
463
elif isinstance(a1, (int, float)) and isinstance(a2, (int, float)):
464
if abs(a1 - a2) < 1e-10: # Exact match
465
matches += 1
466
else:
467
mismatches += 1
468
469
# Calculate percentages
470
match_percentage = (matches / total_positions) * 100
471
gap_percentage = ((gaps_s1 + gaps_s2) / total_positions) * 100
472
473
# Calculate alignment efficiency
474
original_length = len(s1) + len(s2)
475
alignment_length = total_positions
476
efficiency = (original_length / alignment_length) * 100
477
478
return {
479
'total_positions': total_positions,
480
'matches': matches,
481
'mismatches': mismatches,
482
'gaps_s1': gaps_s1,
483
'gaps_s2': gaps_s2,
484
'match_percentage': match_percentage,
485
'gap_percentage': gap_percentage,
486
'efficiency': efficiency
487
}
488
489
# Test alignment quality on different sequence pairs
490
test_cases = [
491
{
492
'name': 'Identical sequences',
493
's1': [1, 2, 3, 4, 5],
494
's2': [1, 2, 3, 4, 5]
495
},
496
{
497
'name': 'One insertion',
498
's1': [1, 2, 3, 4, 5],
499
's2': [1, 2, 2.5, 3, 4, 5]
500
},
501
{
502
'name': 'One deletion',
503
's1': [1, 2, 3, 4, 5],
504
's2': [1, 2, 4, 5]
505
},
506
{
507
'name': 'Multiple gaps',
508
's1': [1, 2, 3, 4, 5, 6, 7, 8],
509
's2': [1, 3, 5, 7]
510
},
511
{
512
'name': 'Different lengths, similar pattern',
513
's1': [1, 2, 3, 4, 3, 2, 1],
514
's2': [1, 2, 4, 2, 1]
515
}
516
]
517
518
print("Alignment Quality Assessment:")
519
print("=" * 60)
520
521
for test_case in test_cases:
522
s1, s2 = test_case['s1'], test_case['s2']
523
name = test_case['name']
524
525
# Perform alignment
526
score, matrix = needleman_wunsch(s1, s2)
527
path, aligned_s1, aligned_s2 = best_alignment(matrix, s1, s2, gap="-")
528
529
# Assess quality
530
quality = assess_alignment_quality(s1, s2, aligned_s1, aligned_s2, gap_symbol="-")
531
532
print(f"\\nTest Case: {name}")
533
print(f"Original lengths: S1={len(s1)}, S2={len(s2)}")
534
print(f"Alignment score: {score:.3f}")
535
print(f"Alignment length: {quality['total_positions']}")
536
print(f"Matches: {quality['matches']} ({quality['match_percentage']:.1f}%)")
537
print(f"Gaps: S1={quality['gaps_s1']}, S2={quality['gaps_s2']} (total {quality['gap_percentage']:.1f}%)")
538
print(f"Efficiency: {quality['efficiency']:.1f}%")
539
print(f"Aligned S1: {aligned_s1}")
540
print(f"Aligned S2: {aligned_s2}")
541
```
542
543
The sequence alignment module provides complementary capabilities to DTW by enforcing global alignment constraints and explicit gap handling, making it suitable for applications where end-to-end alignment and gap identification are important requirements.