Distance measures for time series with Dynamic Time Warping as the primary focus
—
Hierarchical clustering algorithms specifically designed for time series data. This module provides multiple clustering strategies, tree representations, and visualization capabilities for discovering patterns and groupings in temporal datasets using DTW-based distance measures.
Core hierarchical clustering implementation that builds cluster hierarchies using DTW distances with customizable distance functions and merging strategies.
class Hierarchical:
"""
Hierarchical clustering for time series using DTW distances.
Builds a hierarchy of clusters by iteratively merging the closest pairs
of sequences or clusters based on DTW distance measures.
"""
def __init__(self, dists_fun, dists_options, max_dist=np.inf,
merge_hook=None, order_hook=None, show_progress=True):
"""
Initialize hierarchical clustering.
Parameters:
- dists_fun: function, distance matrix computation function (e.g., dtw.distance_matrix)
- dists_options: dict, options passed to distance function
- max_dist: float, maximum distance threshold for clustering
- merge_hook: function, callback called when clusters merge
- order_hook: function, callback for ordering sequences
- show_progress: bool, display progress during clustering
"""
def fit(self, series):
"""
Perform hierarchical clustering on time series collection.
Parameters:
- series: list/array, collection of time series sequences
Returns:
dict: cluster hierarchy with node information and structure
"""Abstract base class and concrete implementations for representing and manipulating cluster hierarchies.
class BaseTree:
"""
Abstract base class for cluster tree representations.
Provides common interface for different tree implementations
and visualization capabilities.
"""
@property
def maxnode(self):
"""Get maximum node ID in the tree."""
def get_linkage(self, node):
"""
Get linkage information for a specific node.
Parameters:
- node: int, node identifier
Returns:
tuple: linkage information (left, right, distance, count)
"""
def plot(self, filename=None, axes=None, **kwargs):
"""
Plot hierarchy dendrogram and time series.
Parameters:
- filename: str, optional file to save plot
- axes: matplotlib axes, optional axes for plotting
- **kwargs: additional plotting parameters
Returns:
tuple: (figure, axes)
"""
def to_dot(self):
"""
Generate Graphviz DOT representation of the tree.
Returns:
str: DOT format string for visualization with Graphviz
"""
class HierarchicalTree(BaseTree):
"""
Wrapper for hierarchical clustering with tree tracking.
Extends the basic Hierarchical clustering with tree structure
preservation and visualization capabilities.
"""
def __init__(self, model=None, **kwargs):
"""
Initialize hierarchical tree.
Parameters:
- model: Hierarchical, optional pre-configured clustering model
- **kwargs: parameters passed to Hierarchical constructor if model is None
"""
def fit(self, series, *args, **kwargs):
"""
Fit clustering model and build tree structure.
Parameters:
- series: list/array, time series collection
- *args, **kwargs: additional parameters passed to clustering
Returns:
self: fitted tree object
"""
class LinkageTree(BaseTree):
"""
Fast scipy-based hierarchical clustering.
Uses scipy's optimized linkage algorithms for improved performance
on large datasets while maintaining DTW distance compatibility.
"""
def __init__(self, dists_fun, dists_options, method='complete'):
"""
Initialize linkage-based clustering.
Parameters:
- dists_fun: function, distance computation function
- dists_options: dict, distance function options
- method: str, linkage method ('complete', 'single', 'average', 'ward')
"""
def fit(self, series):
"""
Perform clustering using scipy linkage.
Parameters:
- series: list/array, time series collection
Returns:
self: fitted tree object
"""Utility functions for creating custom hooks that modify clustering behavior through weights and ordering constraints.
class Hooks:
"""Utility class for creating clustering hooks."""
@staticmethod
def create_weighthook(weights, series):
"""
Create a weight hook for biasing cluster merging.
Parameters:
- weights: array-like, weights for each series
- series: list/array, time series collection
Returns:
function: weight hook function
"""
@staticmethod
def create_orderhook(weights):
"""
Create an order hook for controlling merge sequence.
Parameters:
- weights: array-like, ordering weights
Returns:
function: order hook function
"""from dtaidistance import dtw, clustering
import numpy as np
# Create sample time series with different patterns
series = [
[1, 2, 3, 2, 1], # Mountain shape
[1, 3, 2, 3, 1], # Double peak
[0, 1, 2, 3, 4], # Increasing
[4, 3, 2, 1, 0], # Decreasing
[2, 2, 2, 2, 2], # Constant
[1, 2, 3, 2, 1, 0] # Mountain with tail
]
# Set up hierarchical clustering
clusterer = clustering.Hierarchical(
dists_fun=dtw.distance_matrix,
dists_options={'window': 3},
show_progress=True
)
# Perform clustering
cluster_tree = clusterer.fit(series)
print("Clustering completed")
print(f"Cluster tree keys: {list(cluster_tree.keys())}")
# Access cluster information
for node_id, node_info in cluster_tree.items():
if 'distance' in node_info:
print(f"Node {node_id}: distance={node_info['distance']:.3f}")from dtaidistance import dtw, clustering
import matplotlib.pyplot as plt
import numpy as np
# Generate synthetic time series clusters
np.random.seed(42)
# Cluster 1: Sine waves
cluster1 = [np.sin(np.linspace(0, 4*np.pi, 50)) + 0.1*np.random.randn(50)
for _ in range(5)]
# Cluster 2: Cosine waves
cluster2 = [np.cos(np.linspace(0, 3*np.pi, 50)) + 0.1*np.random.randn(50)
for _ in range(4)]
# Cluster 3: Linear trends
cluster3 = [np.linspace(0, 2, 50) + 0.1*np.random.randn(50)
for _ in range(3)]
all_series = cluster1 + cluster2 + cluster3
# Build hierarchical tree with visualization
tree = clustering.HierarchicalTree(
dists_fun=dtw.distance_matrix_fast,
dists_options={'window': 5},
show_progress=True
)
tree.fit(all_series)
# Plot dendrogram and time series
fig, axes = tree.plot(filename='cluster_tree.png')
plt.title('Hierarchical Clustering of Time Series')
plt.show()
# Export tree structure to DOT format
dot_representation = tree.to_dot()
print("DOT representation (first 200 chars):")
print(dot_representation[:200] + "...")from dtaidistance import dtw, clustering
import numpy as np
import time
# Large dataset
np.random.seed(42)
n_series = 100
series_length = 200
# Generate diverse time series patterns
series = []
for i in range(n_series):
if i < n_series // 3:
# Sine patterns
s = np.sin(np.linspace(0, 2*np.pi*np.random.uniform(1, 5), series_length))
elif i < 2 * n_series // 3:
# Random walks
s = np.cumsum(np.random.randn(series_length))
else:
# Polynomial trends
x = np.linspace(0, 1, series_length)
s = np.random.randn() * x**2 + np.random.randn() * x + np.random.randn()
series.append(s + 0.1 * np.random.randn(series_length))
# Compare clustering methods
methods = [
("Basic Hierarchical", clustering.Hierarchical),
("Tree with Tracking", clustering.HierarchicalTree),
("Fast Linkage", clustering.LinkageTree)
]
for name, ClusterClass in methods:
start_time = time.time()
if ClusterClass == clustering.LinkageTree:
clusterer = ClusterClass(
dists_fun=dtw.distance_matrix_fast,
dists_options={'window': 10, 'parallel': True},
method='complete'
)
else:
clusterer = ClusterClass(
dists_fun=dtw.distance_matrix_fast,
dists_options={'window': 10, 'parallel': True},
show_progress=False
)
result = clusterer.fit(series)
elapsed = time.time() - start_time
print(f"{name}: {elapsed:.2f}s")from dtaidistance import dtw, clustering
import numpy as np
# Time series with known importance weights
series = [
[1, 2, 3, 2, 1], # Important series
[2, 3, 4, 3, 2], # Important series
[0, 1, 0, 1, 0], # Less important
[3, 1, 4, 1, 5], # Less important
[1, 1, 1, 1, 1] # Least important
]
# Define importance weights (higher = more important for clustering)
importance_weights = [1.0, 1.0, 0.5, 0.5, 0.1]
# Create hooks
weight_hook = clustering.Hooks.create_weighthook(importance_weights, series)
order_hook = clustering.Hooks.create_orderhook(importance_weights)
# Clustering with hooks
clusterer = clustering.Hierarchical(
dists_fun=dtw.distance_matrix,
dists_options={'window': 2},
merge_hook=weight_hook,
order_hook=order_hook,
show_progress=True
)
weighted_clusters = clusterer.fit(series)
print("Weighted clustering completed")
print("Cluster structure influenced by importance weights")from dtaidistance import dtw, clustering
import numpy as np
# Create hierarchical data with multiple cluster levels
np.random.seed(42)
# Level 1: Different base patterns
patterns = [
lambda t: np.sin(2*np.pi*t), # Sine
lambda t: np.cos(2*np.pi*t), # Cosine
lambda t: np.sign(np.sin(4*np.pi*t)), # Square wave
lambda t: 2*t - 1 # Linear
]
series = []
true_labels = []
t = np.linspace(0, 1, 100)
for pattern_idx, pattern_func in enumerate(patterns):
for variant in range(3): # 3 variants per pattern
# Add noise and slight variations
noise_level = 0.1 + 0.05 * variant
s = pattern_func(t) + noise_level * np.random.randn(len(t))
series.append(s)
true_labels.append(pattern_idx)
# Perform clustering
tree = clustering.HierarchicalTree(
dists_fun=dtw.distance_matrix_fast,
dists_options={'window': 5},
max_dist=2.0 # Stop at reasonable distance threshold
)
tree.fit(series)
# Analyze cluster structure at different levels
def analyze_clusters_at_distance(tree, max_distance):
"""Extract clusters formed at given distance threshold."""
clusters = {}
cluster_id = 0
# Implementation would traverse tree to find clusters
# This is a simplified example
print(f"Analyzing clusters at distance threshold: {max_distance}")
return clusters
# Analyze at different distance thresholds
for threshold in [0.5, 1.0, 1.5, 2.0]:
clusters = analyze_clusters_at_distance(tree, threshold)
print(f"Threshold {threshold}: Found clusters")from dtaidistance import dtw, dtw_ndim, clustering
import numpy as np
# Example with multi-dimensional time series
np.random.seed(42)
# Generate 3D time series (e.g., accelerometer data)
n_series = 15
series_length = 80
multidim_series = []
for i in range(n_series):
# Create 3D patterns
t = np.linspace(0, 4*np.pi, series_length)
x = np.sin(t + i*0.5) + 0.1*np.random.randn(series_length)
y = np.cos(t + i*0.3) + 0.1*np.random.randn(series_length)
z = np.sin(2*t + i*0.2) + 0.1*np.random.randn(series_length)
# Stack into multi-dimensional series
series_3d = np.column_stack([x, y, z])
multidim_series.append(series_3d)
# Cluster using N-dimensional DTW
clusterer = clustering.Hierarchical(
dists_fun=dtw_ndim.distance_matrix,
dists_options={'window': 10, 'parallel': True},
show_progress=True
)
ndim_clusters = clusterer.fit(multidim_series)
print("Multi-dimensional clustering completed")
print(f"Number of cluster nodes: {len(ndim_clusters)}")from dtaidistance import dtw, clustering
def cluster_with_threshold(series, threshold=2.0):
"""Cluster series and stop at distance threshold."""
clusterer = clustering.Hierarchical(
dists_fun=dtw.distance_matrix_fast,
dists_options={'window': 5},
max_dist=threshold,
show_progress=True
)
return clusterer.fit(series)
# Apply threshold-based clustering
series = [[1, 2, 1], [2, 3, 2], [10, 11, 10], [11, 12, 11]]
clusters = cluster_with_threshold(series, threshold=5.0)This comprehensive clustering module enables sophisticated analysis of time series collections, from basic hierarchical clustering to advanced multi-dimensional pattern discovery with customizable distance measures and clustering strategies.
Install with Tessl CLI
npx tessl i tessl/pypi-dtaidistance