Python library for trajectory and movement data analysis built on pandas and GeoPandas.
—
Advanced algorithms for trajectory generalization, splitting, cleaning, and smoothing. These specialized classes provide methods for preparing and analyzing movement data through various processing techniques.
Classes for reducing trajectory complexity while preserving important characteristics.
class TrajectoryGeneralizer:
def __init__(self, traj):
"""
Base class for trajectory generalization algorithms.
Parameters:
- traj: Trajectory object to generalize
"""
def generalize(self, tolerance):
"""
Generalize trajectory using algorithm-specific method.
Parameters:
- tolerance: Algorithm-specific tolerance parameter
Returns:
Generalized Trajectory object
"""class DouglasPeuckerGeneralizer(TrajectoryGeneralizer):
def __init__(self, traj):
"""Classic Douglas-Peucker algorithm using shapely."""
def generalize(self, tolerance=1.0):
"""
Generalize using Douglas-Peucker algorithm.
Parameters:
- tolerance: Distance threshold in coordinate units
Returns:
Generalized Trajectory
"""class MaxDistanceGeneralizer(TrajectoryGeneralizer):
def __init__(self, traj):
"""Douglas-Peucker-like algorithm checking distance threshold."""
def generalize(self, tolerance=1.0):
"""
Generalize by maximum distance threshold.
Parameters:
- tolerance: Maximum distance threshold
"""
class MinDistanceGeneralizer(TrajectoryGeneralizer):
def __init__(self, traj):
"""Ensures consecutive locations are at least a certain distance apart."""
def generalize(self, tolerance=1.0):
"""
Generalize by minimum distance threshold.
Parameters:
- tolerance: Minimum distance between consecutive points
"""class MinTimeDeltaGeneralizer(TrajectoryGeneralizer):
def __init__(self, traj):
"""Ensures consecutive rows are at least a certain timedelta apart."""
def generalize(self, tolerance):
"""
Generalize by minimum time difference.
Parameters:
- tolerance: timedelta object specifying minimum time difference
"""
class TopDownTimeRatioGeneralizer(TrajectoryGeneralizer):
def __init__(self, traj):
"""Spatiotemporal generalization based on Meratnia & de By (2004)."""
def generalize(self, tolerance=1.0):
"""
Generalize using top-down time ratio algorithm.
Parameters:
- tolerance: Distance threshold for spatiotemporal analysis
"""Classes for dividing trajectories into segments based on various criteria.
class TrajectorySplitter:
def __init__(self, traj):
"""
Base class for trajectory splitting algorithms.
Parameters:
- traj: Trajectory object to split
"""
def split(self, n_processes=1, **kwargs):
"""
Split trajectory using algorithm-specific method.
Parameters:
- n_processes: Number of processes for parallel computation
- **kwargs: Algorithm-specific parameters
Returns:
TrajectoryCollection with split trajectories
"""class TemporalSplitter(TrajectorySplitter):
def __init__(self, traj):
"""Split trajectories using regular time intervals."""
def split(self, mode="day", min_length=0):
"""
Split by temporal intervals.
Parameters:
- mode: Time interval ("hour", "day", "month", "year")
- min_length: Minimum length for resulting segments
Returns:
TrajectoryCollection with temporally split trajectories
"""class ObservationGapSplitter(TrajectorySplitter):
def __init__(self, traj):
"""Split whenever there's a gap in observations."""
def split(self, gap, min_length=0):
"""
Split on observation gaps.
Parameters:
- gap: timedelta object defining gap threshold
- min_length: Minimum length for resulting segments
Returns:
TrajectoryCollection with gap-split trajectories
"""class SpeedSplitter(TrajectorySplitter):
def __init__(self, traj):
"""Split based on speed thresholds and duration."""
def split(self, speed, duration, min_length=0, max_speed=float('inf')):
"""
Split based on speed criteria.
Parameters:
- speed: Speed threshold
- duration: Duration threshold
- min_length: Minimum length for resulting segments
- max_speed: Maximum speed threshold
Returns:
TrajectoryCollection with speed-based splits
"""class StopSplitter(TrajectorySplitter):
def __init__(self, traj):
"""Split at detected stops."""
def split(self, max_diameter, min_duration, min_length=0):
"""
Split at stop locations.
Parameters:
- max_diameter: Maximum diameter for stop detection
- min_duration: Minimum duration to qualify as stop
- min_length: Minimum length for resulting segments
Returns:
TrajectoryCollection with stop-based splits
"""class AngleChangeSplitter(TrajectorySplitter):
def __init__(self, traj):
"""Split on heading angle changes."""
def split(self, min_angle=45, min_speed=0, min_length=0):
"""
Split on angle changes.
Parameters:
- min_angle: Minimum angle change in degrees
- min_speed: Minimum speed threshold
- min_length: Minimum length for resulting segments
Returns:
TrajectoryCollection with angle-based splits
"""class ValueChangeSplitter(TrajectorySplitter):
def __init__(self, traj):
"""Split on column value changes."""
def split(self, col_name, min_length=0):
"""
Split when column values change.
Parameters:
- col_name: Column name to monitor for changes
- min_length: Minimum length for resulting segments
Returns:
TrajectoryCollection with value-change splits
"""Classes for detecting and removing outliers from trajectory data.
class TrajectoryCleaner:
def __init__(self, traj):
"""
Base class for trajectory cleaning algorithms.
Parameters:
- traj: Trajectory object to clean
"""
def clean(self, **kwargs):
"""
Clean trajectory using algorithm-specific method.
Returns:
Cleaned Trajectory object
"""class IqrCleaner(TrajectoryCleaner):
def __init__(self, traj):
"""Interquartile range (IQR) based outlier cleaner."""
def clean(self, columns):
"""
Clean using IQR method.
Parameters:
- columns: Dict mapping column names to alpha values for IQR calculation
Returns:
Cleaned Trajectory object
"""class OutlierCleaner(TrajectoryCleaner):
def __init__(self, traj):
"""Speed-based outlier cleaner."""
def clean(self, v_max=None, units=None, alpha=3):
"""
Clean using speed-based outlier detection.
Parameters:
- v_max: Maximum speed threshold
- units: UNITS object for speed units
- alpha: Standard deviation multiplier for outlier detection
Returns:
Cleaned Trajectory object
"""Specialized class for detecting stationary periods in trajectories.
class TrajectoryStopDetector:
def __init__(self, traj, n_processes=1):
"""
Detects stops in trajectories based on area size and duration.
Parameters:
- traj: Trajectory object to analyze
- n_processes: Number of processes for parallel computation
"""
def get_stop_time_ranges(self, max_diameter, min_duration):
"""
Get time ranges of detected stops.
Parameters:
- max_diameter: Maximum diameter for stop area
- min_duration: Minimum duration to qualify as stop
Returns:
List of (start_time, end_time) tuples
"""
def get_stop_segments(self, max_diameter, min_duration):
"""
Get trajectory segments representing stops.
Parameters:
- max_diameter: Maximum diameter for stop area
- min_duration: Minimum duration to qualify as stop
Returns:
TrajectoryCollection with stop segments
"""
def get_stop_points(self, max_diameter, min_duration):
"""
Get representative points for detected stops.
Parameters:
- max_diameter: Maximum diameter for stop area
- min_duration: Minimum duration to qualify as stop
Returns:
GeoDataFrame with stop points
"""Classes for smoothing noisy trajectory data using filtering techniques.
class TrajectorySmoother:
def __init__(self, traj):
"""
Base class for trajectory smoothing algorithms.
Parameters:
- traj: Trajectory object to smooth
"""
def smooth(self, **kwargs):
"""
Smooth trajectory using algorithm-specific method.
Returns:
Smoothed Trajectory object
"""class KalmanSmootherCV(TrajectorySmoother):
def __init__(self, traj):
"""
Kalman Filter with Constant Velocity model.
Note: Requires Stone Soup dependency (install with pip install stonesoup)
"""
def smooth(self, process_noise_std=0.5, measurement_noise_std=1):
"""
Smooth using Kalman filter.
Parameters:
- process_noise_std: Process noise standard deviation
- measurement_noise_std: Measurement noise standard deviation
Returns:
Smoothed Trajectory object
"""import movingpandas as mpd
# Create trajectory (assume 'traj' exists)
# traj = mpd.Trajectory(...)
# Douglas-Peucker generalization
generalizer = mpd.DouglasPeuckerGeneralizer(traj)
simplified_traj = generalizer.generalize(tolerance=10.0) # 10 meter tolerance
# Minimum distance generalization
min_dist_gen = mpd.MinDistanceGeneralizer(traj)
filtered_traj = min_dist_gen.generalize(tolerance=5.0) # 5 meter minimum distance# Split by day
temporal_splitter = mpd.TemporalSplitter(traj)
daily_segments = temporal_splitter.split(mode="day", min_length=100)
# Split on stops
stop_splitter = mpd.StopSplitter(traj)
segments = stop_splitter.split(
max_diameter=50, # 50 meter diameter
min_duration=pd.Timedelta("5 minutes"),
min_length=10
)
# Split on speed changes
speed_splitter = mpd.SpeedSplitter(traj)
speed_segments = speed_splitter.split(
speed=2.0, # 2 m/s threshold
duration=pd.Timedelta("30 seconds")
)# Remove speed-based outliers
cleaner = mpd.OutlierCleaner(traj)
clean_traj = cleaner.clean(v_max=50, alpha=3) # Max 50 m/s, 3 std devs
# IQR-based cleaning
iqr_cleaner = mpd.IqrCleaner(traj)
clean_traj = iqr_cleaner.clean(columns={'speed': 1.5}) # 1.5 * IQR for speed# Detect stops
stop_detector = mpd.TrajectoryStopDetector(traj)
# Get stop time ranges
stop_times = stop_detector.get_stop_time_ranges(
max_diameter=100, # 100 meter diameter
min_duration=pd.Timedelta("10 minutes")
)
# Get stop segments as trajectories
stop_segments = stop_detector.get_stop_segments(
max_diameter=100,
min_duration=pd.Timedelta("10 minutes")
)
# Get stop points
stop_points = stop_detector.get_stop_points(
max_diameter=100,
min_duration=pd.Timedelta("10 minutes")
)# Kalman filter smoothing (requires Stone Soup)
try:
smoother = mpd.KalmanSmootherCV(traj)
smooth_traj = smoother.smooth(
process_noise_std=0.5,
measurement_noise_std=1.0
)
except ImportError:
print("Stone Soup package required for Kalman smoothing")Install with Tessl CLI
npx tessl i tessl/pypi-movingpandas