CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-movingpandas

Python library for trajectory and movement data analysis built on pandas and GeoPandas.

Pending
Overview
Eval results
Files

trajectory-processing.mddocs/

Trajectory Processing

Advanced algorithms for trajectory generalization, splitting, cleaning, and smoothing. These specialized classes provide methods for preparing and analyzing movement data through various processing techniques.

Capabilities

Trajectory Generalization

Classes for reducing trajectory complexity while preserving important characteristics.

TrajectoryGeneralizer Base Class

class TrajectoryGeneralizer:
    def __init__(self, traj):
        """
        Base class for trajectory generalization algorithms.
        
        Parameters:
        - traj: Trajectory object to generalize
        """
    
    def generalize(self, tolerance):
        """
        Generalize trajectory using algorithm-specific method.
        
        Parameters:
        - tolerance: Algorithm-specific tolerance parameter
        
        Returns:
        Generalized Trajectory object
        """

Douglas-Peucker Generalization

class DouglasPeuckerGeneralizer(TrajectoryGeneralizer):
    def __init__(self, traj):
        """Classic Douglas-Peucker algorithm using shapely."""
    
    def generalize(self, tolerance=1.0):
        """
        Generalize using Douglas-Peucker algorithm.
        
        Parameters:
        - tolerance: Distance threshold in coordinate units
        
        Returns:
        Generalized Trajectory
        """

Distance-Based Generalization

class MaxDistanceGeneralizer(TrajectoryGeneralizer):
    def __init__(self, traj):
        """Douglas-Peucker-like algorithm checking distance threshold."""
    
    def generalize(self, tolerance=1.0):
        """
        Generalize by maximum distance threshold.
        
        Parameters:
        - tolerance: Maximum distance threshold
        """

class MinDistanceGeneralizer(TrajectoryGeneralizer):
    def __init__(self, traj):
        """Ensures consecutive locations are at least a certain distance apart."""
    
    def generalize(self, tolerance=1.0):
        """
        Generalize by minimum distance threshold.
        
        Parameters:
        - tolerance: Minimum distance between consecutive points
        """

Time-Based Generalization

class MinTimeDeltaGeneralizer(TrajectoryGeneralizer):
    def __init__(self, traj):
        """Ensures consecutive rows are at least a certain timedelta apart."""
    
    def generalize(self, tolerance):
        """
        Generalize by minimum time difference.
        
        Parameters:
        - tolerance: timedelta object specifying minimum time difference
        """

class TopDownTimeRatioGeneralizer(TrajectoryGeneralizer):
    def __init__(self, traj):
        """Spatiotemporal generalization based on Meratnia & de By (2004)."""
    
    def generalize(self, tolerance=1.0):
        """
        Generalize using top-down time ratio algorithm.
        
        Parameters:
        - tolerance: Distance threshold for spatiotemporal analysis
        """

Trajectory Splitting

Classes for dividing trajectories into segments based on various criteria.

TrajectorySplitter Base Class

class TrajectorySplitter:
    def __init__(self, traj):
        """
        Base class for trajectory splitting algorithms.
        
        Parameters:
        - traj: Trajectory object to split
        """
    
    def split(self, n_processes=1, **kwargs):
        """
        Split trajectory using algorithm-specific method.
        
        Parameters:
        - n_processes: Number of processes for parallel computation
        - **kwargs: Algorithm-specific parameters
        
        Returns:
        TrajectoryCollection with split trajectories
        """

Temporal Splitting

class TemporalSplitter(TrajectorySplitter):
    def __init__(self, traj):
        """Split trajectories using regular time intervals."""
    
    def split(self, mode="day", min_length=0):
        """
        Split by temporal intervals.
        
        Parameters:
        - mode: Time interval ("hour", "day", "month", "year")
        - min_length: Minimum length for resulting segments
        
        Returns:
        TrajectoryCollection with temporally split trajectories
        """

Gap-Based Splitting

class ObservationGapSplitter(TrajectorySplitter):
    def __init__(self, traj):
        """Split whenever there's a gap in observations."""
    
    def split(self, gap, min_length=0):
        """
        Split on observation gaps.
        
        Parameters:
        - gap: timedelta object defining gap threshold
        - min_length: Minimum length for resulting segments
        
        Returns:
        TrajectoryCollection with gap-split trajectories
        """

Speed-Based Splitting

class SpeedSplitter(TrajectorySplitter):
    def __init__(self, traj):
        """Split based on speed thresholds and duration."""
    
    def split(self, speed, duration, min_length=0, max_speed=float('inf')):
        """
        Split based on speed criteria.
        
        Parameters:
        - speed: Speed threshold
        - duration: Duration threshold
        - min_length: Minimum length for resulting segments
        - max_speed: Maximum speed threshold
        
        Returns:
        TrajectoryCollection with speed-based splits
        """

Stop-Based Splitting

class StopSplitter(TrajectorySplitter):
    def __init__(self, traj):
        """Split at detected stops."""
    
    def split(self, max_diameter, min_duration, min_length=0):
        """
        Split at stop locations.
        
        Parameters:
        - max_diameter: Maximum diameter for stop detection
        - min_duration: Minimum duration to qualify as stop
        - min_length: Minimum length for resulting segments
        
        Returns:
        TrajectoryCollection with stop-based splits
        """

Directional Splitting

class AngleChangeSplitter(TrajectorySplitter):
    def __init__(self, traj):
        """Split on heading angle changes."""
    
    def split(self, min_angle=45, min_speed=0, min_length=0):
        """
        Split on angle changes.
        
        Parameters:
        - min_angle: Minimum angle change in degrees
        - min_speed: Minimum speed threshold
        - min_length: Minimum length for resulting segments
        
        Returns:
        TrajectoryCollection with angle-based splits
        """

Value-Based Splitting

class ValueChangeSplitter(TrajectorySplitter):
    def __init__(self, traj):
        """Split on column value changes."""
    
    def split(self, col_name, min_length=0):
        """
        Split when column values change.
        
        Parameters:
        - col_name: Column name to monitor for changes
        - min_length: Minimum length for resulting segments
        
        Returns:
        TrajectoryCollection with value-change splits
        """

Trajectory Cleaning

Classes for detecting and removing outliers from trajectory data.

TrajectoryCleaner Base Class

class TrajectoryCleaner:
    def __init__(self, traj):
        """
        Base class for trajectory cleaning algorithms.
        
        Parameters:
        - traj: Trajectory object to clean
        """
    
    def clean(self, **kwargs):
        """
        Clean trajectory using algorithm-specific method.
        
        Returns:
        Cleaned Trajectory object
        """

Statistical Outlier Removal

class IqrCleaner(TrajectoryCleaner):
    def __init__(self, traj):
        """Interquartile range (IQR) based outlier cleaner."""
    
    def clean(self, columns):
        """
        Clean using IQR method.
        
        Parameters:
        - columns: Dict mapping column names to alpha values for IQR calculation
        
        Returns:
        Cleaned Trajectory object
        """

Speed-Based Outlier Removal

class OutlierCleaner(TrajectoryCleaner):
    def __init__(self, traj):
        """Speed-based outlier cleaner."""
    
    def clean(self, v_max=None, units=None, alpha=3):
        """
        Clean using speed-based outlier detection.
        
        Parameters:
        - v_max: Maximum speed threshold
        - units: UNITS object for speed units
        - alpha: Standard deviation multiplier for outlier detection
        
        Returns:
        Cleaned Trajectory object
        """

Stop Detection

Specialized class for detecting stationary periods in trajectories.

class TrajectoryStopDetector:
    def __init__(self, traj, n_processes=1):
        """
        Detects stops in trajectories based on area size and duration.
        
        Parameters:
        - traj: Trajectory object to analyze
        - n_processes: Number of processes for parallel computation
        """
    
    def get_stop_time_ranges(self, max_diameter, min_duration):
        """
        Get time ranges of detected stops.
        
        Parameters:
        - max_diameter: Maximum diameter for stop area
        - min_duration: Minimum duration to qualify as stop
        
        Returns:
        List of (start_time, end_time) tuples
        """
    
    def get_stop_segments(self, max_diameter, min_duration):
        """
        Get trajectory segments representing stops.
        
        Parameters:
        - max_diameter: Maximum diameter for stop area
        - min_duration: Minimum duration to qualify as stop
        
        Returns:
        TrajectoryCollection with stop segments
        """
    
    def get_stop_points(self, max_diameter, min_duration):
        """
        Get representative points for detected stops.
        
        Parameters:
        - max_diameter: Maximum diameter for stop area
        - min_duration: Minimum duration to qualify as stop
        
        Returns:
        GeoDataFrame with stop points
        """

Trajectory Smoothing

Classes for smoothing noisy trajectory data using filtering techniques.

TrajectorySmoother Base Class

class TrajectorySmoother:
    def __init__(self, traj):
        """
        Base class for trajectory smoothing algorithms.
        
        Parameters:
        - traj: Trajectory object to smooth
        """
    
    def smooth(self, **kwargs):
        """
        Smooth trajectory using algorithm-specific method.
        
        Returns:
        Smoothed Trajectory object
        """

Kalman Filter Smoothing

class KalmanSmootherCV(TrajectorySmoother):
    def __init__(self, traj):
        """
        Kalman Filter with Constant Velocity model.
        
        Note: Requires Stone Soup dependency (install with pip install stonesoup)
        """
    
    def smooth(self, process_noise_std=0.5, measurement_noise_std=1):
        """
        Smooth using Kalman filter.
        
        Parameters:
        - process_noise_std: Process noise standard deviation
        - measurement_noise_std: Measurement noise standard deviation
        
        Returns:
        Smoothed Trajectory object
        """

Usage Examples

Trajectory Generalization

import movingpandas as mpd

# Create trajectory (assume 'traj' exists)
# traj = mpd.Trajectory(...)

# Douglas-Peucker generalization
generalizer = mpd.DouglasPeuckerGeneralizer(traj)
simplified_traj = generalizer.generalize(tolerance=10.0)  # 10 meter tolerance

# Minimum distance generalization
min_dist_gen = mpd.MinDistanceGeneralizer(traj)
filtered_traj = min_dist_gen.generalize(tolerance=5.0)  # 5 meter minimum distance

Trajectory Splitting

# Split by day
temporal_splitter = mpd.TemporalSplitter(traj)
daily_segments = temporal_splitter.split(mode="day", min_length=100)

# Split on stops
stop_splitter = mpd.StopSplitter(traj)
segments = stop_splitter.split(
    max_diameter=50,  # 50 meter diameter
    min_duration=pd.Timedelta("5 minutes"),
    min_length=10
)

# Split on speed changes
speed_splitter = mpd.SpeedSplitter(traj)
speed_segments = speed_splitter.split(
    speed=2.0,  # 2 m/s threshold
    duration=pd.Timedelta("30 seconds")
)

Trajectory Cleaning

# Remove speed-based outliers
cleaner = mpd.OutlierCleaner(traj)
clean_traj = cleaner.clean(v_max=50, alpha=3)  # Max 50 m/s, 3 std devs

# IQR-based cleaning
iqr_cleaner = mpd.IqrCleaner(traj)
clean_traj = iqr_cleaner.clean(columns={'speed': 1.5})  # 1.5 * IQR for speed

Stop Detection

# Detect stops
stop_detector = mpd.TrajectoryStopDetector(traj)

# Get stop time ranges
stop_times = stop_detector.get_stop_time_ranges(
    max_diameter=100,  # 100 meter diameter
    min_duration=pd.Timedelta("10 minutes")
)

# Get stop segments as trajectories
stop_segments = stop_detector.get_stop_segments(
    max_diameter=100,
    min_duration=pd.Timedelta("10 minutes")
)

# Get stop points
stop_points = stop_detector.get_stop_points(
    max_diameter=100,
    min_duration=pd.Timedelta("10 minutes")
)

Trajectory Smoothing

# Kalman filter smoothing (requires Stone Soup)
try:
    smoother = mpd.KalmanSmootherCV(traj)
    smooth_traj = smoother.smooth(
        process_noise_std=0.5,
        measurement_noise_std=1.0
    )
except ImportError:
    print("Stone Soup package required for Kalman smoothing")

Install with Tessl CLI

npx tessl i tessl/pypi-movingpandas

docs

aggregation-analysis.md

core-data-structures.md

index.md

io-utilities.md

trajectory-processing.md

tile.json