Distance measures for time series with Dynamic Time Warping as the primary focus
—
Efficient computation of distance matrices for multiple time series, supporting parallel processing, memory optimization through blocking, and various output formats for large-scale time series analysis. Distance matrices are essential for clustering, similarity search, and comparative analysis of time series datasets.
Compute pairwise DTW distances between all sequences in a collection, with support for both full matrices and condensed formats.
def distance_matrix(s, max_dist=None, max_length_diff=None, window=None,
max_step=None, penalty=None, psi=None, block=None,
compact=False, parallel=False, use_c=False,
use_nogil=False, show_progress=False):
"""
Compute distance matrix for all sequence pairs in a collection.
Parameters:
- s: list/array/SeriesContainer, collection of time series sequences
- max_dist: float, early stopping threshold for individual distances
- max_length_diff: int, maximum length difference between sequences
- window: int, warping window constraint
- max_step: float, maximum step size constraint
- penalty: float, penalty for compression/expansion
- psi: int, psi relaxation parameter
- block: tuple, memory blocking configuration (start_idx, end_idx)
- compact: bool, return condensed array instead of full matrix
- parallel: bool, enable parallel computation
- use_c: bool, use C implementation
- use_nogil: bool, use GIL-free C implementation for better parallelization
- show_progress: bool, display progress bar
Returns:
array: distance matrix of shape (n, n) or condensed array of length n*(n-1)/2
"""
def distance_matrix_fast(s, max_dist=None, max_length_diff=None, window=None,
max_step=None, penalty=None, psi=None, block=None,
compact=False, parallel=True):
"""
Fast C version of distance matrix computation.
Automatically uses optimized C implementation with parallel processing
enabled by default for better performance on multi-core systems.
Parameters:
Same as distance_matrix() but with parallel=True by default.
Returns:
array: distance matrix or condensed array
"""
def distance_matrix_python(s, block=None, show_progress=False,
max_length_diff=None, dist_opts=None):
"""
Pure Python distance matrix implementation.
Provides a reference implementation that doesn't require C extensions,
useful for debugging or when C extensions are unavailable.
Parameters:
- s: list/array, collection of sequences
- block: tuple, memory blocking configuration
- show_progress: bool, display progress bar
- max_length_diff: int, maximum length difference
- dist_opts: dict, options passed to distance function
Returns:
array: condensed distance array
"""Get pre-configured distance matrix functions with specific settings for repeated use with consistent parameters.
def distance_matrix_func(use_c=False, use_nogil=False, parallel=False,
show_progress=False):
"""
Return a configured distance matrix function.
Creates a partially configured distance matrix function with specified
performance and display options, useful for repeated computations.
Parameters:
- use_c: bool, use C implementation
- use_nogil: bool, use GIL-free implementation
- parallel: bool, enable parallel processing
- show_progress: bool, show progress bar
Returns:
function: configured distance matrix function
"""Convert between different distance matrix representations for compatibility with various analysis tools.
def distances_array_to_matrix(dists, nb_series, block=None):
"""
Convert condensed distance array to full symmetric matrix.
Transforms the condensed array format (n*(n-1)/2 elements) to a full
symmetric matrix format (n x n) with zeros on the diagonal.
Parameters:
- dists: array, condensed distance array
- nb_series: int, number of original sequences
- block: tuple, optional blocking for memory efficiency
Returns:
array: full symmetric distance matrix of shape (nb_series, nb_series)
"""
def distance_array_index(a, b, nb_series):
"""
Get index in condensed distance array for sequence pair (a, b).
Computes the position in the condensed array where the distance
between sequences a and b is stored.
Parameters:
- a, b: int, sequence indices (where a < b)
- nb_series: int, total number of sequences
Returns:
int: index in condensed distance array
"""from dtaidistance import dtw
import numpy as np
# Create a collection of time series
series = [
[1, 2, 3, 2, 1],
[0, 1, 2, 3, 2, 1, 0],
[1, 3, 2, 1],
[2, 1, 0, 1, 2, 3, 2]
]
# Compute full distance matrix
distances = dtw.distance_matrix(series)
print("Distance matrix shape:", distances.shape)
print("Distance matrix:")
print(distances)
# Compute condensed distance array (more memory efficient)
distances_condensed = dtw.distance_matrix(series, compact=True)
print("Condensed array length:", len(distances_condensed))
print("Condensed distances:", distances_condensed)from dtaidistance import dtw
import numpy as np
import time
# Generate larger dataset
np.random.seed(42)
series = [np.cumsum(np.random.randn(100)) for _ in range(50)]
# Compare different implementations
methods = [
("Python", lambda: dtw.distance_matrix_python(series)),
("C sequential", lambda: dtw.distance_matrix(series, use_c=True, parallel=False)),
("C parallel", lambda: dtw.distance_matrix_fast(series)),
("C nogil parallel", lambda: dtw.distance_matrix(series, use_c=True,
use_nogil=True, parallel=True))
]
for name, method in methods:
start = time.time()
result = method()
elapsed = time.time() - start
print(f"{name}: {elapsed:.2f}s, shape: {result.shape}")from dtaidistance import dtw
import numpy as np
# Large dataset that might not fit in memory
large_series = [np.random.randn(200) for _ in range(100)]
# Process in blocks to manage memory usage
block_size = 25
n_series = len(large_series)
# Compute distance matrix in blocks
distances = np.zeros((n_series, n_series))
for i in range(0, n_series, block_size):
for j in range(i, n_series, block_size):
end_i = min(i + block_size, n_series)
end_j = min(j + block_size, n_series)
# Extract block
block_series = large_series[i:end_i]
if i == j:
# Diagonal block
block_distances = dtw.distance_matrix(block_series, compact=False)
distances[i:end_i, i:end_i] = block_distances
else:
# Off-diagonal block - compute cross distances
for x in range(len(block_series)):
for y in range(j, end_j):
if y < len(large_series):
dist = dtw.distance(block_series[x], large_series[y])
distances[i + x, y] = dist
distances[y, i + x] = dist # Symmetric
print(f"Computed {n_series}x{n_series} distance matrix in blocks")from dtaidistance import dtw
# Time series with different characteristics
series = [
[1, 2, 3, 4, 5], # Increasing trend
[5, 4, 3, 2, 1], # Decreasing trend
[1, 3, 2, 4, 1], # Oscillating
[2, 2, 2, 2, 2], # Constant
[1, 2, 3, 4, 5, 6, 7, 8] # Longer increasing
]
# Distance matrix with various constraints
constraints = {
'window': 3, # Sakoe-Chiba band
'max_dist': 10.0, # Early stopping
'max_length_diff': 4, # Length difference limit
'penalty': 0.1 # Warping penalty
}
distances = dtw.distance_matrix(series, **constraints, show_progress=True)
print("Constrained distance matrix:")
print(distances)
# Find most similar pairs
n = len(series)
for i in range(n):
for j in range(i + 1, n):
print(f"Distance between series {i} and {j}: {distances[i, j]:.2f}")from dtaidistance import dtw, clustering
import numpy as np
# Generate sample time series data
np.random.seed(42)
n_series = 20
series_length = 50
# Create three clusters of similar time series
cluster1 = [np.sin(np.linspace(0, 4*np.pi, series_length)) +
0.1*np.random.randn(series_length) for _ in range(7)]
cluster2 = [np.cos(np.linspace(0, 3*np.pi, series_length)) +
0.1*np.random.randn(series_length) for _ in range(6)]
cluster3 = [np.linspace(0, 1, series_length) +
0.1*np.random.randn(series_length) for _ in range(7)]
all_series = cluster1 + cluster2 + cluster3
# Compute distance matrix
distances = dtw.distance_matrix(all_series, use_c=True, parallel=True)
# Perform hierarchical clustering
clusterer = clustering.Hierarchical(
dists_fun=dtw.distance_matrix_fast,
dists_options={},
max_dist=np.inf
)
cluster_tree = clusterer.fit(all_series)
print("Clustering completed")
print(f"Number of clusters found: {len(cluster_tree)}")from dtaidistance import dtw
import numpy as np
series = [[1, 2, 3], [2, 3, 1], [3, 1, 2], [1, 3, 2]]
# Get condensed distance array
condensed = dtw.distance_matrix(series, compact=True)
print("Condensed array:", condensed)
# Convert to full matrix
full_matrix = dtw.distances_array_to_matrix(condensed, len(series))
print("Full matrix:")
print(full_matrix)
# Verify conversion by accessing specific distances
for i in range(len(series)):
for j in range(i + 1, len(series)):
condensed_idx = dtw.distance_array_index(i, j, len(series))
condensed_dist = condensed[condensed_idx]
matrix_dist = full_matrix[i, j]
print(f"Distance ({i},{j}): condensed={condensed_dist:.3f}, "
f"matrix={matrix_dist:.3f}, match={abs(condensed_dist - matrix_dist) < 1e-10}")from dtaidistance import dtw
from dtaidistance.util import SeriesContainer
import numpy as np
# Optimize data format
raw_series = [list(np.random.randn(100)) for _ in range(30)]
# Use SeriesContainer for better performance
series_container = SeriesContainer(raw_series)
# Configure optimized distance matrix function
fast_distance_matrix = dtw.distance_matrix_func(
use_c=True,
use_nogil=True,
parallel=True,
show_progress=True
)
# Compute with optimization
distances = fast_distance_matrix(
series_container,
window=10, # Reasonable window constraint
max_dist=50.0, # Early stopping
compact=True # Memory efficient format
)
print(f"Computed {len(raw_series)}x{len(raw_series)} distances")
print(f"Result shape: {distances.shape if hasattr(distances, 'shape') else len(distances)}")parallel=Truecompact=True for memory efficiencyparallel=True for multi-core systemswindow constraints to reduce computationmax_dist for early stopping in similarity searchInstall with Tessl CLI
npx tessl i tessl/pypi-dtaidistance