Distance measures for time series with Dynamic Time Warping as the primary focus
—
DTW algorithms optimized for multi-dimensional time series where each time point contains multiple features. Uses Euclidean distance for point-wise comparisons and supports the same constraint and optimization options as 1D DTW, enabling analysis of complex temporal data like sensor readings, motion capture, or multi-variate signals.
Compute DTW distance between multi-dimensional time series using Euclidean distance between corresponding feature vectors at each time point.
def distance(s1, s2, window=None, max_dist=None, max_step=None,
max_length_diff=None, penalty=None, psi=None, use_c=False):
"""
DTW distance for N-dimensional sequences using Euclidean distance.
Each time point in the sequences is treated as a feature vector, and
the local distance between time points is computed as Euclidean distance
between the corresponding feature vectors.
Parameters:
- s1, s2: array-like, N-dimensional sequences of shape (length, features)
- window: int, warping window constraint
- max_dist: float, early stopping threshold
- max_step: float, maximum step size
- max_length_diff: int, maximum length difference
- penalty: float, penalty for compression/expansion
- psi: int, psi relaxation parameter
- use_c: bool, use C implementation if available
Returns:
float: DTW distance between multi-dimensional sequences
"""Compute warping paths for multi-dimensional sequences, providing the same path analysis capabilities as 1D DTW but for complex feature spaces.
def warping_paths(s1, s2, window=None, max_dist=None, max_step=None,
max_length_diff=None, penalty=None, psi=None):
"""
Warping paths for N-dimensional sequences.
Computes the full accumulated cost matrix for multi-dimensional DTW,
where local distances are Euclidean distances between feature vectors.
Parameters:
- s1, s2: array-like, N-dimensional sequences of shape (length, features)
- window: int, warping window constraint
- max_dist: float, early stopping threshold
- max_step: float, maximum step size
- max_length_diff: int, maximum length difference
- penalty: float, penalty for compression/expansion
- psi: int, psi relaxation parameter
Returns:
tuple: (distance, paths_matrix)
- distance: float, optimal DTW distance
- paths_matrix: 2D array, accumulated cost matrix
"""Efficient computation of distance matrices for collections of multi-dimensional time series with parallel processing support.
def distance_matrix(s, max_dist=None, max_length_diff=None, window=None,
max_step=None, penalty=None, psi=None, block=None,
parallel=False, use_c=False, show_progress=False):
"""
Distance matrix for N-dimensional sequences.
Computes pairwise DTW distances between all multi-dimensional sequences
in a collection, using Euclidean distance for local comparisons.
Parameters:
- s: list/array, collection of N-dimensional sequences
- max_dist: float, early stopping threshold
- max_length_diff: int, maximum length difference
- window: int, warping window constraint
- max_step: float, maximum step size
- penalty: float, penalty for compression/expansion
- psi: int, psi relaxation parameter
- block: tuple, memory blocking configuration
- parallel: bool, enable parallel computation
- use_c: bool, use C implementation
- show_progress: bool, display progress bar
Returns:
array: distance matrix of shape (n, n) where n is number of sequences
"""from dtaidistance import dtw_ndim
import numpy as np
# Create 3D time series (e.g., accelerometer data: x, y, z)
np.random.seed(42)
# Sequence 1: 50 time points with 3 features each
t = np.linspace(0, 4*np.pi, 50)
s1 = np.column_stack([
np.sin(t) + 0.1*np.random.randn(50), # X component
np.cos(t) + 0.1*np.random.randn(50), # Y component
np.sin(2*t) + 0.1*np.random.randn(50) # Z component
])
# Sequence 2: 45 time points with same 3 features (different timing)
t2 = np.linspace(0, 4*np.pi, 45)
s2 = np.column_stack([
np.sin(t2 * 1.1) + 0.1*np.random.randn(45),
np.cos(t2 * 1.1) + 0.1*np.random.randn(45),
np.sin(2*t2 * 1.1) + 0.1*np.random.randn(45)
])
print(f"Sequence 1 shape: {s1.shape}")
print(f"Sequence 2 shape: {s2.shape}")
# Compute multi-dimensional DTW distance
distance = dtw_ndim.distance(s1, s2)
print(f"Multi-dimensional DTW distance: {distance:.3f}")
# Compare with 1D DTW on individual components
from dtaidistance import dtw
distances_1d = []
for i in range(3):
dist_1d = dtw.distance(s1[:, i], s2[:, i])
distances_1d.append(dist_1d)
print(f"1D DTW distance for component {i}: {dist_1d:.3f}")
print(f"Sum of 1D distances: {sum(distances_1d):.3f}")
print(f"Multi-dimensional distance: {distance:.3f}")from dtaidistance import dtw_ndim
import numpy as np
import matplotlib.pyplot as plt
def create_motion_sequence(motion_type, length=100, noise_level=0.05):
"""Create synthetic motion capture data."""
t = np.linspace(0, 2*np.pi, length)
if motion_type == 'walking':
# Simulate walking motion (periodic)
x = 0.5 * np.sin(4*t) + noise_level * np.random.randn(length)
y = 0.3 * np.sin(8*t) + noise_level * np.random.randn(length)
z = 0.8 + 0.2 * np.cos(4*t) + noise_level * np.random.randn(length)
elif motion_type == 'running':
# Simulate running motion (faster, more variation)
x = 0.8 * np.sin(6*t) + noise_level * np.random.randn(length)
y = 0.5 * np.sin(12*t) + noise_level * np.random.randn(length)
z = 1.0 + 0.4 * np.cos(6*t) + noise_level * np.random.randn(length)
elif motion_type == 'jumping':
# Simulate jumping motion (sporadic vertical movement)
x = 0.1 * np.sin(2*t) + noise_level * np.random.randn(length)
y = 0.1 * np.cos(2*t) + noise_level * np.random.randn(length)
z = 1.0 + np.maximum(0, 0.8 * np.sin(3*t)) + noise_level * np.random.randn(length)
return np.column_stack([x, y, z])
# Generate motion sequences
np.random.seed(42)
walking1 = create_motion_sequence('walking', 80)
walking2 = create_motion_sequence('walking', 75)
running1 = create_motion_sequence('running', 60)
jumping1 = create_motion_sequence('jumping', 70)
motions = [walking1, walking2, running1, jumping1]
motion_labels = ['Walking 1', 'Walking 2', 'Running', 'Jumping']
# Compute distance matrix for motion comparison
distances = dtw_ndim.distance_matrix(motions, parallel=True)
print("Motion similarity matrix:")
print(" ", " ".join(f"{label:>8}" for label in motion_labels))
for i, label in enumerate(motion_labels):
row_str = f"{label:>8}: "
for j in range(len(motion_labels)):
row_str += f"{distances[i, j]:8.2f} "
print(row_str)
# Visualize the motion sequences
fig, axes = plt.subplots(2, 2, figsize=(12, 10), subplot_kw={'projection': '3d'})
axes = axes.flatten()
for i, (motion, label) in enumerate(zip(motions, motion_labels)):
ax = axes[i]
ax.plot(motion[:, 0], motion[:, 1], motion[:, 2], linewidth=2)
ax.set_title(f'{label} (3D Motion)')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.tight_layout()
plt.show()from dtaidistance import dtw_ndim
import numpy as np
import matplotlib.pyplot as plt
# Simulate multi-sensor time series data
def generate_sensor_data(pattern_type, length=120, n_sensors=5):
"""Generate synthetic multi-sensor data."""
t = np.linspace(0, 10, length)
sensors = []
for sensor_id in range(n_sensors):
if pattern_type == 'normal':
# Normal operation pattern
signal = np.sin(0.5*t + sensor_id*0.2) + 0.1*np.random.randn(length)
elif pattern_type == 'anomaly':
# Anomalous pattern with spikes
signal = np.sin(0.5*t + sensor_id*0.2) + 0.1*np.random.randn(length)
# Add anomalous spikes
spike_indices = np.random.choice(length, size=5, replace=False)
signal[spike_indices] += 2.0 * np.random.randn(5)
elif pattern_type == 'drift':
# Pattern with sensor drift
drift = 0.02 * sensor_id * t
signal = np.sin(0.5*t + sensor_id*0.2) + drift + 0.1*np.random.randn(length)
sensors.append(signal)
return np.array(sensors).T # Shape: (time_points, sensors)
# Generate different sensor patterns
np.random.seed(42)
normal1 = generate_sensor_data('normal', 100, 4)
normal2 = generate_sensor_data('normal', 95, 4)
anomaly1 = generate_sensor_data('anomaly', 100, 4)
drift1 = generate_sensor_data('drift', 105, 4)
sensor_data = [normal1, normal2, anomaly1, drift1]
data_labels = ['Normal 1', 'Normal 2', 'Anomaly', 'Drift']
# Analyze sensor data similarities
print("Sensor data analysis:")
for i, data in enumerate(sensor_data):
print(f"{data_labels[i]}: shape {data.shape}")
# Compute DTW distances with constraints suitable for sensor data
distances = dtw_ndim.distance_matrix(
sensor_data,
window=10, # Reasonable temporal constraint
max_dist=100.0, # Early stopping for very different patterns
parallel=True
)
print("\\nSensor data similarity matrix:")
print(" ", " ".join(f"{label:>8}" for label in data_labels))
for i, label in enumerate(data_labels):
row_str = f"{label:>8}: "
for j in range(len(data_labels)):
row_str += f"{distances[i, j]:8.2f} "
print(row_str)
# Visualize sensor readings
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.flatten()
for i, (data, label) in enumerate(zip(sensor_data, data_labels)):
ax = axes[i]
for sensor_idx in range(data.shape[1]):
ax.plot(data[:, sensor_idx], label=f'Sensor {sensor_idx+1}', linewidth=1.5)
ax.set_title(f'{label} - Multi-Sensor Data')
ax.set_xlabel('Time')
ax.set_ylabel('Sensor Value')
ax.legend()
ax.grid(True)
plt.tight_layout()
plt.show()from dtaidistance import dtw_ndim, dtw
import numpy as np
import matplotlib.pyplot as plt
def analyze_dimensionality_effects():
"""Analyze how dimensionality affects DTW distance calculation."""
np.random.seed(42)
base_length = 50
# Create base 1D signal
t = np.linspace(0, 4*np.pi, base_length)
base_signal = np.sin(t)
# Create variations with different numbers of dimensions
dimensions = [1, 2, 3, 5, 10, 20]
sequences_per_dim = []
for n_dim in dimensions:
# Create two similar sequences with n_dim features
seq1_features = []
seq2_features = []
for dim_idx in range(n_dim):
# Each dimension is the base signal with some variation
feature1 = base_signal + 0.1 * np.random.randn(base_length)
feature2 = base_signal + 0.15 * np.random.randn(base_length)
seq1_features.append(feature1)
seq2_features.append(feature2)
seq1 = np.column_stack(seq1_features) if n_dim > 1 else np.array(seq1_features[0])
seq2 = np.column_stack(seq2_features) if n_dim > 1 else np.array(seq2_features[0])
sequences_per_dim.append((seq1, seq2))
# Compute DTW distances for different dimensionalities
distances = []
for i, (seq1, seq2) in enumerate(sequences_per_dim):
if dimensions[i] == 1:
# Use 1D DTW
dist = dtw.distance(seq1, seq2)
else:
# Use N-dimensional DTW
dist = dtw_ndim.distance(seq1, seq2)
distances.append(dist)
print(f"Dimensionality {dimensions[i]:2d}: DTW distance = {dist:.3f}")
# Plot dimensionality vs distance
plt.figure(figsize=(10, 6))
plt.plot(dimensions, distances, 'bo-', linewidth=2, markersize=8)
plt.xlabel('Number of Dimensions')
plt.ylabel('DTW Distance')
plt.title('DTW Distance vs Dimensionality')
plt.grid(True)
plt.show()
analyze_dimensionality_effects()from dtaidistance import dtw_ndim, clustering
import numpy as np
import matplotlib.pyplot as plt
# Generate multi-dimensional time series clusters
np.random.seed(42)
def create_multidim_cluster(cluster_type, n_sequences=5, length=60, n_features=3):
"""Create a cluster of similar multi-dimensional sequences."""
sequences = []
for seq_idx in range(n_sequences):
t = np.linspace(0, 4*np.pi, length)
features = []
for feature_idx in range(n_features):
if cluster_type == 'sine':
# Sine-based cluster
base_freq = 1.0 + 0.1 * seq_idx
signal = np.sin(base_freq * t + feature_idx * 0.5) + 0.1 * np.random.randn(length)
elif cluster_type == 'cosine':
# Cosine-based cluster
base_freq = 1.2 + 0.1 * seq_idx
signal = np.cos(base_freq * t + feature_idx * 0.3) + 0.1 * np.random.randn(length)
elif cluster_type == 'linear':
# Linear trend cluster
slope = 0.5 + 0.1 * seq_idx + 0.05 * feature_idx
signal = slope * t + 0.2 * np.random.randn(length)
features.append(signal)
sequence = np.column_stack(features)
sequences.append(sequence)
return sequences
# Create three clusters of multi-dimensional sequences
cluster1 = create_multidim_cluster('sine', n_sequences=4, n_features=3)
cluster2 = create_multidim_cluster('cosine', n_sequences=4, n_features=3)
cluster3 = create_multidim_cluster('linear', n_sequences=3, n_features=3)
all_sequences = cluster1 + cluster2 + cluster3
true_labels = [0]*4 + [1]*4 + [2]*3
print(f"Created {len(all_sequences)} multi-dimensional sequences")
print(f"Sequence shapes: {[seq.shape for seq in all_sequences[:3]]}...")
# Perform clustering using multi-dimensional DTW
clusterer = clustering.Hierarchical(
dists_fun=dtw_ndim.distance_matrix,
dists_options={'window': 10, 'parallel': True},
show_progress=True
)
cluster_result = clusterer.fit(all_sequences)
print(f"Clustering completed with {len(cluster_result)} nodes")
# Visualize some of the multi-dimensional sequences
fig, axes = plt.subplots(3, 3, figsize=(15, 12))
for cluster_idx in range(3):
start_idx = sum([4, 4, 3][:cluster_idx])
end_idx = start_idx + [4, 4, 3][cluster_idx]
for seq_idx in range(3): # Show first 3 sequences from each cluster
if start_idx + seq_idx < end_idx:
ax = axes[cluster_idx, seq_idx]
sequence = all_sequences[start_idx + seq_idx]
for feature_idx in range(sequence.shape[1]):
ax.plot(sequence[:, feature_idx],
label=f'Feature {feature_idx+1}', linewidth=1.5)
ax.set_title(f'Cluster {cluster_idx+1}, Sequence {seq_idx+1}')
ax.legend()
ax.grid(True)
plt.tight_layout()
plt.show()
print("Multi-dimensional clustering analysis completed")Multi-dimensional DTW requires more memory due to:
The N-dimensional DTW module extends all the powerful capabilities of standard DTW to complex multi-feature temporal data, enabling sophisticated analysis of sensor arrays, motion capture, financial indicators, and other multi-variate time series.
Install with Tessl CLI
npx tessl i tessl/pypi-dtaidistance