CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dtaidistance

Distance measures for time series with Dynamic Time Warping as the primary focus

Pending
Overview
Eval results
Files

alignment.mddocs/

Sequence Alignment

Global sequence alignment algorithms like Needleman-Wunsch for optimal alignment of time series with gap penalties. This module provides alternative approaches to DTW for sequence comparison and alignment tasks, particularly useful when explicit gap handling and global alignment constraints are needed.

Capabilities

Global Sequence Alignment

Compute optimal global alignment between two sequences using the Needleman-Wunsch algorithm, which handles gaps explicitly and ensures end-to-end alignment.

def needleman_wunsch(s1, s2, window=None, max_dist=None, max_step=None,
                     max_length_diff=None, psi=None):
    """
    Global sequence alignment using Needleman-Wunsch algorithm.
    
    Computes optimal global alignment between two sequences with explicit
    gap handling, ensuring alignment from beginning to end of both sequences.
    Unlike DTW, which allows flexible warping, this enforces global alignment
    with gap penalties.
    
    Parameters:
    - s1, s2: array-like, input sequences to align
    - window: int, alignment window constraint (similar to DTW Sakoe-Chiba band)
    - max_dist: float, early stopping threshold
    - max_step: float, maximum step size constraint
    - max_length_diff: int, maximum allowed length difference
    - psi: int, psi relaxation parameter for boundary conditions
    
    Returns:
    tuple: (alignment_score, alignment_matrix)
        - alignment_score: float, optimal alignment score
        - alignment_matrix: 2D array, alignment scoring matrix
    """

Optimal Alignment Extraction

Extract the optimal alignment path and create aligned sequences with explicit gap symbols from alignment matrices.

def best_alignment(paths, s1=None, s2=None, gap="-", order=None):
    """
    Compute optimal alignment from alignment paths matrix.
    
    Traces back through the alignment matrix to extract the optimal
    alignment path and creates aligned sequences with gap symbols.
    
    Parameters:
    - paths: 2D array, alignment paths matrix from needleman_wunsch()
    - s1, s2: array-like, original sequences (optional, for creating aligned output)
    - gap: str/value, symbol to use for gaps in aligned sequences
    - order: str, ordering preference for path selection ('first', 'last', None)
    
    Returns:
    tuple: (path, aligned_s1, aligned_s2)
        - path: list, sequence of alignment operations
        - aligned_s1: list, first sequence with gaps inserted
        - aligned_s2: list, second sequence with gaps inserted
    """

Usage Examples

Basic Global Alignment

from dtaidistance.alignment import needleman_wunsch, best_alignment
import numpy as np

# Create two sequences with different lengths and patterns
s1 = [1, 2, 3, 4, 5, 4, 3, 2, 1]
s2 = [2, 3, 4, 5, 4, 3, 2]

print(f"Sequence 1 length: {len(s1)}")
print(f"Sequence 2 length: {len(s2)}")
print(f"Sequence 1: {s1}")
print(f"Sequence 2: {s2}")

# Perform global alignment
alignment_score, alignment_matrix = needleman_wunsch(s1, s2)

print(f"\\nAlignment score: {alignment_score:.3f}")
print(f"Alignment matrix shape: {alignment_matrix.shape}")

# Extract optimal alignment
path, aligned_s1, aligned_s2 = best_alignment(alignment_matrix, s1, s2, gap="-")

print(f"\\nOptimal alignment path length: {len(path)}")
print(f"Aligned sequence 1: {aligned_s1}")
print(f"Aligned sequence 2: {aligned_s2}")

# Analyze alignment
print("\\nAlignment analysis:")
gaps_s1 = sum(1 for x in aligned_s1 if x == "-")
gaps_s2 = sum(1 for x in aligned_s2 if x == "-")
matches = sum(1 for x, y in zip(aligned_s1, aligned_s2) if x == y and x != "-")

print(f"Gaps in sequence 1: {gaps_s1}")
print(f"Gaps in sequence 2: {gaps_s2}")
print(f"Exact matches: {matches}")
print(f"Alignment length: {len(aligned_s1)}")

Alignment vs DTW Comparison

from dtaidistance.alignment import needleman_wunsch, best_alignment
from dtaidistance import dtw
import numpy as np
import matplotlib.pyplot as plt

# Create test sequences with clear alignment challenges
np.random.seed(42)

# Sequence 1: Base pattern with inserted elements
base_pattern = [1, 2, 3, 4, 3, 2, 1]
s1 = base_pattern.copy()
s1.insert(3, 3.5)  # Insert element
s1.insert(6, 2.5)  # Insert another element

# Sequence 2: Base pattern with deleted elements  
s2 = base_pattern.copy()
s2.pop(2)  # Remove element
s2.pop(4)  # Remove another element

print("Original sequences:")
print(f"Sequence 1 (with insertions): {s1}")
print(f"Sequence 2 (with deletions): {s2}")

# Global alignment
alignment_score, alignment_matrix = needleman_wunsch(s1, s2)
path, aligned_s1, aligned_s2 = best_alignment(alignment_matrix, s1, s2, gap="-")

# DTW alignment  
dtw_distance, dtw_paths = dtw.warping_paths(s1, s2)
dtw_path = dtw.best_path(dtw_paths)

print(f"\\nAlignment Results:")
print(f"Global alignment score: {alignment_score:.3f}")
print(f"DTW distance: {dtw_distance:.3f}")

print(f"\\nGlobal alignment result:")
print(f"Aligned S1: {aligned_s1}")
print(f"Aligned S2: {aligned_s2}")

# Visualize both approaches
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(14, 10))

# Original sequences
ax1.plot(s1, 'bo-', label='Sequence 1', markersize=8, linewidth=2)
ax1.plot(s2, 'ro-', label='Sequence 2', markersize=8, linewidth=2)
ax1.set_title('Original Sequences')
ax1.legend()
ax1.grid(True)

# Global alignment matrix
im2 = ax2.imshow(alignment_matrix, cmap='viridis', origin='lower')
ax2.set_title('Global Alignment Matrix')
ax2.set_xlabel('Sequence 2 Index')
ax2.set_ylabel('Sequence 1 Index')
plt.colorbar(im2, ax=ax2)

# DTW warping paths
im3 = ax3.imshow(dtw_paths, cmap='viridis', origin='lower')
ax3.set_title('DTW Warping Paths Matrix')
ax3.set_xlabel('Sequence 2 Index')
ax3.set_ylabel('Sequence 1 Index')
plt.colorbar(im3, ax=ax3)

if dtw_path:
    path_i, path_j = zip(*dtw_path)
    ax3.plot(path_j, path_i, 'white', linewidth=2)

# Alignment comparison
alignment_positions = []
for i, (a1, a2) in enumerate(zip(aligned_s1, aligned_s2)):
    if a1 != "-" and a2 != "-":
        alignment_positions.append(i)

ax4.plot(range(len(aligned_s1)), [x if x != "-" else None for x in aligned_s1], 
         'bo-', label='Aligned S1', markersize=6)
ax4.plot(range(len(aligned_s2)), [x if x != "-" else None for x in aligned_s2], 
         'ro-', label='Aligned S2', markersize=6)

# Mark gaps
for i, (a1, a2) in enumerate(zip(aligned_s1, aligned_s2)):
    if a1 == "-":
        ax4.axvline(x=i, color='blue', alpha=0.3, linestyle='--')
    if a2 == "-":
        ax4.axvline(x=i, color='red', alpha=0.3, linestyle='--')

ax4.set_title('Global Alignment Result (dashed lines = gaps)')
ax4.legend()
ax4.grid(True)

plt.tight_layout()
plt.show()

Time Series Alignment with Constraints

from dtaidistance.alignment import needleman_wunsch, best_alignment
import numpy as np
import matplotlib.pyplot as plt

# Generate time series with temporal shifts and noise
np.random.seed(42)

def create_shifted_signal(base_signal, shift=0, noise_level=0.1):
    """Create a shifted and noisy version of a base signal."""
    shifted = np.roll(base_signal, shift)
    noisy = shifted + noise_level * np.random.randn(len(shifted))
    return noisy

# Base signal: combination of sine wave and trend
t = np.linspace(0, 4*np.pi, 50)
base_signal = np.sin(t) + 0.1 * t

# Create variations
original_signal = base_signal + 0.05 * np.random.randn(len(base_signal))
shifted_signal = create_shifted_signal(base_signal, shift=5, noise_level=0.1)
compressed_signal = base_signal[::2] + 0.05 * np.random.randn(len(base_signal)//2)

signals = [original_signal, shifted_signal, compressed_signal]
signal_names = ['Original', 'Shifted', 'Compressed']

print("Signal lengths:")
for name, signal in zip(signal_names, signals):
    print(f"{name}: {len(signal)} points")

# Perform pairwise alignments with different constraints
constraint_sets = [
    {'window': None},           # No constraint
    {'window': 10},             # Moderate constraint
    {'max_length_diff': 20}     # Length difference constraint
]

constraint_names = ['Unconstrained', 'Window=10', 'MaxLenDiff=20']

# Compare alignments
fig, axes = plt.subplots(len(constraint_sets), 2, figsize=(14, 12))

for constraint_idx, (constraints, constraint_name) in enumerate(zip(constraint_sets, constraint_names)):
    # Align original vs shifted signal
    score, matrix = needleman_wunsch(original_signal, shifted_signal, **constraints)
    path, aligned_orig, aligned_shift = best_alignment(matrix, original_signal, shifted_signal, gap=np.nan)
    
    # Plot alignment matrix
    ax_matrix = axes[constraint_idx, 0]
    im = ax_matrix.imshow(matrix, cmap='viridis', origin='lower')
    ax_matrix.set_title(f'{constraint_name} - Alignment Matrix\\nScore: {score:.2f}')
    ax_matrix.set_xlabel('Shifted Signal Index')
    ax_matrix.set_ylabel('Original Signal Index')
    
    # Plot aligned sequences
    ax_seqs = axes[constraint_idx, 1]
    
    # Create x-axis for aligned sequences
    x_aligned = range(len(aligned_orig))
    
    # Plot with gap handling
    orig_values = [val if not np.isnan(val) else None for val in aligned_orig]
    shift_values = [val if not np.isnan(val) else None for val in aligned_shift]
    
    ax_seqs.plot(x_aligned, orig_values, 'b-o', label='Original (aligned)', 
                markersize=3, linewidth=1.5, alpha=0.8)
    ax_seqs.plot(x_aligned, shift_values, 'r-s', label='Shifted (aligned)', 
                markersize=3, linewidth=1.5, alpha=0.8)
    
    # Mark gaps
    for i, (orig_val, shift_val) in enumerate(zip(aligned_orig, aligned_shift)):
        if np.isnan(orig_val):
            ax_seqs.axvline(x=i, color='blue', alpha=0.2, linestyle='--')
        if np.isnan(shift_val):
            ax_seqs.axvline(x=i, color='red', alpha=0.2, linestyle='--')
    
    ax_seqs.set_title(f'{constraint_name} - Aligned Sequences')
    ax_seqs.legend()
    ax_seqs.grid(True)

plt.tight_layout()
plt.show()

Multiple Sequence Alignment Analysis

from dtaidistance.alignment import needleman_wunsch, best_alignment
from dtaidistance import dtw
import numpy as np
import matplotlib.pyplot as plt

# Create a family of related sequences
np.random.seed(42)

def create_sequence_variant(base_seq, variant_type, intensity=1.0):
    """Create different variants of a base sequence."""
    seq = base_seq.copy()
    
    if variant_type == 'stretched':
        # Stretch sequence by interpolation
        x_old = np.arange(len(seq))
        x_new = np.linspace(0, len(seq)-1, int(len(seq) * (1 + 0.3 * intensity)))
        seq_stretched = np.interp(x_new, x_old, seq)
        return seq_stretched
    
    elif variant_type == 'compressed':
        # Compress sequence
        step = int(1 + intensity)
        return seq[::step]
    
    elif variant_type == 'noisy':
        # Add noise
        noise = intensity * 0.2 * np.random.randn(len(seq))
        return seq + noise
    
    elif variant_type == 'shifted':
        # Shift values
        return seq + intensity * 0.5
        
    return seq

# Base sequence
t = np.linspace(0, 2*np.pi, 30)
base_sequence = np.sin(t) + 0.5 * np.cos(2*t)

# Create variants
variants = {
    'Original': base_sequence,
    'Stretched': create_sequence_variant(base_sequence, 'stretched', 1.0),
    'Compressed': create_sequence_variant(base_sequence, 'compressed', 1),
    'Noisy': create_sequence_variant(base_sequence, 'noisy', 1.0),
    'Shifted': create_sequence_variant(base_sequence, 'shifted', 1.0)
}

variant_names = list(variants.keys())
variant_sequences = list(variants.values())

print("Sequence variants:")
for name, seq in variants.items():
    print(f"{name}: length {len(seq)}")

# Compute pairwise alignment scores and DTW distances
n_variants = len(variants)
alignment_scores = np.zeros((n_variants, n_variants))
dtw_distances = np.zeros((n_variants, n_variants))

for i in range(n_variants):
    for j in range(n_variants):
        if i != j:
            # Global alignment
            score, _ = needleman_wunsch(variant_sequences[i], variant_sequences[j])
            alignment_scores[i, j] = score
            
            # DTW distance
            dtw_dist = dtw.distance(variant_sequences[i], variant_sequences[j])
            dtw_distances[i, j] = dtw_dist

print("\\nAlignment vs DTW Comparison:")
print("Alignment scores (lower = more similar):")
print("      ", "  ".join(f"{name[:6]:>8}" for name in variant_names))
for i, name in enumerate(variant_names):
    row_str = f"{name[:8]:>8}: "
    for j in range(n_variants):
        if i == j:
            row_str += "    0.00  "
        else:
            row_str += f"{alignment_scores[i, j]:8.2f}  "
    print(row_str)

print("\\nDTW distances:")
print("      ", "  ".join(f"{name[:6]:>8}" for name in variant_names))
for i, name in enumerate(variant_names):
    row_str = f"{name[:8]:>8}: "
    for j in range(n_variants):
        if i == j:
            row_str += "    0.00  "
        else:
            row_str += f"{dtw_distances[i, j]:8.2f}  "
    print(row_str)

# Visualize sequences and similarity matrices
fig = plt.figure(figsize=(16, 12))

# Plot all variant sequences
ax1 = plt.subplot(2, 3, (1, 3))
colors = ['blue', 'red', 'green', 'orange', 'purple']
for i, (name, seq) in enumerate(variants.items()):
    ax1.plot(seq, color=colors[i], label=name, linewidth=2, alpha=0.8)

ax1.set_title('Sequence Variants')
ax1.legend()
ax1.grid(True)
ax1.set_xlabel('Time Point')
ax1.set_ylabel('Value')

# Plot alignment scores matrix
ax2 = plt.subplot(2, 3, 4)
im2 = ax2.imshow(alignment_scores, cmap='viridis')
ax2.set_title('Alignment Scores')
ax2.set_xticks(range(n_variants))
ax2.set_yticks(range(n_variants))
ax2.set_xticklabels([name[:6] for name in variant_names], rotation=45)
ax2.set_yticklabels([name[:6] for name in variant_names])
plt.colorbar(im2, ax=ax2)

# Plot DTW distances matrix
ax3 = plt.subplot(2, 3, 5)
im3 = ax3.imshow(dtw_distances, cmap='hot')
ax3.set_title('DTW Distances')
ax3.set_xticks(range(n_variants))
ax3.set_yticks(range(n_variants))
ax3.set_xticklabels([name[:6] for name in variant_names], rotation=45)
ax3.set_yticklabels([name[:6] for name in variant_names])
plt.colorbar(im3, ax=ax3)

# Detailed alignment example
ax4 = plt.subplot(2, 3, 6)
# Align original vs stretched for detailed view
score, matrix = needleman_wunsch(variants['Original'], variants['Stretched'])
path, aligned_orig, aligned_stretch = best_alignment(matrix, variants['Original'], variants['Stretched'], gap=np.nan)

x_aligned = range(len(aligned_orig))
orig_vals = [val if not np.isnan(val) else None for val in aligned_orig]
stretch_vals = [val if not np.isnan(val) else None for val in aligned_stretch]

ax4.plot(x_aligned, orig_vals, 'b-o', label='Original', markersize=4, linewidth=1.5)
ax4.plot(x_aligned, stretch_vals, 'r-s', label='Stretched', markersize=4, linewidth=1.5)
ax4.set_title(f'Detailed Alignment: Original vs Stretched\\nScore: {score:.2f}')
ax4.legend()
ax4.grid(True)

plt.tight_layout()
plt.show()

Alignment Quality Assessment

from dtaidistance.alignment import needleman_wunsch, best_alignment
import numpy as np

def assess_alignment_quality(s1, s2, aligned_s1, aligned_s2, gap_symbol=None):
    """Assess the quality of a sequence alignment."""
    if gap_symbol is None:
        gap_symbol = "-"
    
    # Basic statistics
    total_positions = len(aligned_s1)
    
    # Count matches, mismatches, and gaps
    matches = 0
    mismatches = 0  
    gaps_s1 = 0
    gaps_s2 = 0
    
    for a1, a2 in zip(aligned_s1, aligned_s2):
        if a1 == gap_symbol:
            gaps_s1 += 1
        elif a2 == gap_symbol:
            gaps_s2 += 1
        elif isinstance(a1, (int, float)) and isinstance(a2, (int, float)):
            if abs(a1 - a2) < 1e-10:  # Exact match
                matches += 1
            else:
                mismatches += 1
    
    # Calculate percentages
    match_percentage = (matches / total_positions) * 100
    gap_percentage = ((gaps_s1 + gaps_s2) / total_positions) * 100
    
    # Calculate alignment efficiency
    original_length = len(s1) + len(s2)
    alignment_length = total_positions
    efficiency = (original_length / alignment_length) * 100
    
    return {
        'total_positions': total_positions,
        'matches': matches,
        'mismatches': mismatches, 
        'gaps_s1': gaps_s1,
        'gaps_s2': gaps_s2,
        'match_percentage': match_percentage,
        'gap_percentage': gap_percentage,
        'efficiency': efficiency
    }

# Test alignment quality on different sequence pairs
test_cases = [
    {
        'name': 'Identical sequences',
        's1': [1, 2, 3, 4, 5],
        's2': [1, 2, 3, 4, 5]
    },
    {
        'name': 'One insertion',
        's1': [1, 2, 3, 4, 5],
        's2': [1, 2, 2.5, 3, 4, 5]
    },
    {
        'name': 'One deletion',
        's1': [1, 2, 3, 4, 5],
        's2': [1, 2, 4, 5]
    },
    {
        'name': 'Multiple gaps',
        's1': [1, 2, 3, 4, 5, 6, 7, 8],
        's2': [1, 3, 5, 7]
    },
    {
        'name': 'Different lengths, similar pattern',
        's1': [1, 2, 3, 4, 3, 2, 1],
        's2': [1, 2, 4, 2, 1]
    }
]

print("Alignment Quality Assessment:")
print("=" * 60)

for test_case in test_cases:
    s1, s2 = test_case['s1'], test_case['s2']
    name = test_case['name']
    
    # Perform alignment
    score, matrix = needleman_wunsch(s1, s2)
    path, aligned_s1, aligned_s2 = best_alignment(matrix, s1, s2, gap="-")
    
    # Assess quality
    quality = assess_alignment_quality(s1, s2, aligned_s1, aligned_s2, gap_symbol="-")
    
    print(f"\\nTest Case: {name}")
    print(f"Original lengths: S1={len(s1)}, S2={len(s2)}")
    print(f"Alignment score: {score:.3f}")
    print(f"Alignment length: {quality['total_positions']}")
    print(f"Matches: {quality['matches']} ({quality['match_percentage']:.1f}%)")
    print(f"Gaps: S1={quality['gaps_s1']}, S2={quality['gaps_s2']} (total {quality['gap_percentage']:.1f}%)")
    print(f"Efficiency: {quality['efficiency']:.1f}%")
    print(f"Aligned S1: {aligned_s1}")
    print(f"Aligned S2: {aligned_s2}")

The sequence alignment module provides complementary capabilities to DTW by enforcing global alignment constraints and explicit gap handling, making it suitable for applications where end-to-end alignment and gap identification are important requirements.

Install with Tessl CLI

npx tessl i tessl/pypi-dtaidistance

docs

alignment.md

clustering.md

core-dtw.md

distance-matrices.md

index.md

ndim-dtw.md

visualization.md

warping-paths.md

weighted-dtw.md

tile.json