CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-movingpandas

Python library for trajectory and movement data analysis built on pandas and GeoPandas.

Pending
Overview
Eval results
Files

core-data-structures.mddocs/

Core Data Structures

The fundamental classes for representing and managing trajectory data in MovingPandas. These classes provide rich functionality for spatio-temporal analysis, data enhancement, visualization, and conversion operations.

Capabilities

Trajectory Class

The core class for representing individual movement trajectories built on top of GeoPandas GeoDataFrame.

class Trajectory:
    def __init__(self, df, traj_id, traj_id_col=None, obj_id=None, t=None, x=None, y=None, crs="epsg:4326", parent=None):
        """
        Initialize a Trajectory object.
        
        Parameters:
        - df: GeoDataFrame with trajectory data
        - traj_id: Trajectory identifier
        - traj_id_col: Column name for trajectory ID
        - obj_id: Object identifier
        - t: Time column name (default: auto-detected)
        - x: X coordinate column name (default: auto-detected)
        - y: Y coordinate column name (default: auto-detected)
        - crs: Coordinate reference system (default: "epsg:4326")
        - parent: Parent trajectory for segments
        """

Basic Properties and Validation

def size(self):
    """Get number of trajectory points."""

def copy(self):
    """Create a deep copy of the trajectory."""

def is_valid(self):
    """Check if trajectory has valid structure."""

def get_crs(self):
    """Get coordinate reference system."""

def to_crs(self, crs):
    """Transform to different coordinate reference system."""

def is_latlon(self):
    """Check if using latitude/longitude coordinates."""

Temporal Access Methods

def get_start_time(self):
    """Get trajectory start timestamp."""

def get_end_time(self):
    """Get trajectory end timestamp."""

def get_duration(self):
    """Get trajectory duration as timedelta."""

def get_sampling_interval(self):
    """Get average time interval between points."""

Spatial Access Methods

def get_start_location(self):
    """Get starting point geometry."""

def get_end_location(self):
    """Get ending point geometry."""

def get_bbox(self):
    """Get bounding box of trajectory."""

def get_direction(self):
    """Get overall direction of movement."""

def get_length(self, units=UNITS()):
    """
    Get trajectory length.
    
    Parameters:
    - units: UNITS object specifying distance units
    
    Returns:
    Trajectory length as float
    """

Data Retrieval Methods

def get_row_at(self, t, method="nearest"):
    """
    Get trajectory row at specific time.
    
    Parameters:
    - t: Timestamp
    - method: "nearest" or "interpolated"
    
    Returns:
    DataFrame row
    """

def get_position_at(self, t, method="interpolated"):
    """
    Get position at specific time.
    
    Parameters:
    - t: Timestamp
    - method: "interpolated" or "nearest"
    
    Returns:
    Point geometry
    """

def interpolate_position_at(self, t):
    """Interpolate position at specific time."""

Geometric Operations

def to_linestring(self):
    """Convert trajectory to LineString geometry."""

def to_linestringm_wkt(self):
    """Convert to LineStringM WKT format."""

def intersects(self, polygon):
    """Check if trajectory intersects polygon."""

def distance(self, other, units=UNITS()):
    """
    Calculate distance to another trajectory.
    
    Parameters:
    - other: Another Trajectory object
    - units: UNITS object for distance units
    
    Returns:
    Distance as float
    """

def hausdorff_distance(self, other, units=UNITS()):
    """Calculate Hausdorff distance to another trajectory."""

Conversion Methods

def to_point_gdf(self, return_orig_tz=False):
    """Convert to point GeoDataFrame."""

def to_line_gdf(self, columns=None):
    """Convert to line GeoDataFrame."""

def to_traj_gdf(self, wkt=False, agg=False):
    """Convert to trajectory GeoDataFrame."""

def to_mf_json(self, datetime_to_str=True, temporal_columns=None):
    """Convert to Moving Features JSON format."""

Segmentation Methods

def get_segment_between(self, t1, t2):
    """
    Extract trajectory segment between two times.
    
    Parameters:
    - t1: Start time
    - t2: End time
    
    Returns:
    New Trajectory object
    """

def get_linestring_between(self, t1, t2, method="interpolated"):
    """Get LineString between two times."""

Spatial Operations

def clip(self, polygon, point_based=False):
    """Clip trajectory to polygon boundary."""

def intersection(self, feature, point_based=False):
    """Find intersection with geographic feature."""

def get_mcp(self):
    """Get minimum convex polygon (MCP) of trajectory."""

Data Enhancement Methods

def add_speed(self, overwrite=False, name="speed", units=UNITS()):
    """
    Add speed column to trajectory.
    
    Parameters:
    - overwrite: Whether to overwrite existing column
    - name: Column name for speed values
    - units: UNITS object for speed units
    
    Returns:
    Modified Trajectory object
    """

def add_direction(self, overwrite=False, name="direction"):
    """Add direction/heading column."""

def add_distance(self, overwrite=False, name="distance", units=None):
    """Add distance column (distance between consecutive points)."""

def add_acceleration(self, overwrite=False, name="acceleration", units=UNITS()):
    """Add acceleration column."""

def add_timedelta(self, overwrite=False, name="timedelta"):
    """Add time delta column."""

def add_angular_difference(self, overwrite=False, name="angular_difference"):
    """Add angular difference column."""

def add_traj_id(self, overwrite=False):
    """Add trajectory ID column."""

Column Access Methods

def get_min(self, column):
    """Get minimum value in column."""

def get_max(self, column):
    """Get maximum value in column."""

def get_column_names(self):
    """Get list of column names."""

def get_traj_id_col(self):
    """Get trajectory ID column name."""

def get_speed_col(self):
    """Get speed column name."""

def get_distance_col(self):
    """Get distance column name."""

def get_direction_col(self):
    """Get direction column name."""

def get_angular_difference_col(self):
    """Get angular difference column name."""

def get_timedelta_col(self):
    """Get timedelta column name."""

def get_geom_col(self):
    """Get geometry column name."""

Visualization Methods

def plot(self, *args, **kwargs):
    """Create matplotlib plot of trajectory."""

def explore(self, *args, **kwargs):
    """Create interactive Folium map."""

def hvplot(self, *args, **kwargs):
    """Create hvplot visualization."""

def hvplot_pts(self, *args, **kwargs):
    """Create hvplot point visualization."""

Utility Methods

def drop(self, **kwargs):
    """Drop columns or rows from trajectory."""

def apply_offset_seconds(self, column, offset):
    """Apply time offset in seconds to column."""

def apply_offset_minutes(self, column, offset):
    """Apply time offset in minutes to column."""

def is_long_enough(self, min_length, units=UNITS()):
    """Check if trajectory meets minimum length requirement."""

TrajectoryCollection Class

Container class for managing and analyzing multiple trajectories simultaneously.

class TrajectoryCollection:
    def __init__(self, data, traj_id_col=None, obj_id_col=None, t=None, x=None, y=None, crs="epsg:4326", min_length=0, min_duration=None):
        """
        Initialize a TrajectoryCollection.
        
        Parameters:
        - data: GeoDataFrame with multiple trajectory data
        - traj_id_col: Column name for trajectory IDs
        - obj_id_col: Column name for object IDs
        - t: Time column name
        - x: X coordinate column name
        - y: Y coordinate column name
        - crs: Coordinate reference system
        - min_length: Minimum trajectory length filter
        - min_duration: Minimum trajectory duration filter
        """

Collection Management

def copy(self):
    """Create deep copy of collection."""

def drop(self, **kwargs):
    """Drop trajectories from collection."""

def __len__(self):
    """Get number of trajectories in collection."""

def __iter__(self):
    """Iterate over trajectories in collection."""

Data Access

def get_trajectory(self, traj_id):
    """
    Get specific trajectory by ID.
    
    Parameters:
    - traj_id: Trajectory identifier
    
    Returns:
    Trajectory object
    """

def get_trajectories(self, obj_id):
    """Get all trajectories for specific object ID."""

def get_column_names(self):
    """Get list of column names."""

def get_traj_id_col(self):
    """Get trajectory ID column name."""

def get_geom_col(self):
    """Get geometry column name."""

def get_speed_col(self):
    """Get speed column name."""

def get_direction_col(self):
    """Get direction column name."""

Conversion Methods

def to_point_gdf(self):
    """Convert to point GeoDataFrame."""

def to_line_gdf(self, columns=None):
    """Convert to line GeoDataFrame."""

def to_traj_gdf(self, wkt=False, agg=False):
    """Convert to trajectory GeoDataFrame."""

def to_mf_json(self, datetime_to_str=True, temporal_columns=None):
    """Convert to Moving Features JSON format."""

Temporal Operations

def get_locations_at(self, t, with_direction=False):
    """Get locations of all trajectories at specific time."""

def get_start_locations(self, with_direction=False):
    """Get start locations of all trajectories."""

def get_end_locations(self, with_direction=False):
    """Get end locations of all trajectories."""

def get_segments_between(self, t1, t2):
    """Get trajectory segments between two times."""

Spatial Operations

def get_intersecting(self, polygon):
    """Get trajectories that intersect with polygon."""

def intersection(self, feature, point_based=False):
    """Find intersection with geographic feature."""

def clip(self, polygon, point_based=False):
    """Clip all trajectories to polygon boundary."""

Filtering

def filter(self, property_name, property_values):
    """
    Filter trajectories by property values.
    
    Parameters:
    - property_name: Column name to filter on
    - property_values: Values to keep
    
    Returns:
    Filtered TrajectoryCollection
    """

Data Enhancement Methods

def add_speed(self, overwrite=False, name="speed", units=UNITS(), n_processes=1):
    """Add speed column to all trajectories."""

def add_direction(self, overwrite=False, name="direction", n_processes=1):
    """Add direction column to all trajectories."""

def add_angular_difference(self, overwrite=False, name="angular_difference", n_processes=1):
    """Add angular difference column to all trajectories."""

def add_acceleration(self, overwrite=False, name="acceleration", units=UNITS(), n_processes=1):
    """Add acceleration column to all trajectories."""

def add_distance(self, overwrite=False, name="distance", units=None, n_processes=1):
    """Add distance column to all trajectories."""

def add_timedelta(self, overwrite=False, name="timedelta", n_processes=1):
    """Add timedelta column to all trajectories."""

def add_traj_id(self, overwrite=False):
    """Add trajectory ID column to all trajectories."""

Statistics

def get_min(self, column):
    """Get minimum value across all trajectories."""

def get_max(self, column):
    """Get maximum value across all trajectories."""

def get_crs(self):
    """Get coordinate reference system."""

def is_latlon(self):
    """Check if using latitude/longitude coordinates."""

Visualization Methods

def plot(self, *args, **kwargs):
    """Create matplotlib plot of all trajectories."""

def explore(self, *args, **kwargs):
    """Create interactive Folium map."""

def hvplot(self, *args, **kwargs):
    """Create hvplot visualization."""

def hvplot_pts(self, *args, **kwargs):
    """Create hvplot point visualization."""

Usage Examples

Basic Trajectory Creation and Analysis

import movingpandas as mpd
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from datetime import datetime, timedelta

# Create sample data
data = []
base_time = datetime(2023, 1, 1, 12, 0, 0)
for i in range(10):
    data.append({
        'geometry': Point(i * 0.1, i * 0.1),
        't': base_time + timedelta(minutes=i * 5),
        'obj_id': 'A'
    })

gdf = gpd.GeoDataFrame(data, crs='EPSG:4326')

# Create trajectory
traj = mpd.Trajectory(gdf, traj_id='traj_1')

# Analyze trajectory
print(f"Duration: {traj.get_duration()}")
print(f"Length: {traj.get_length()}")

# Add computed columns
traj = traj.add_speed()
traj = traj.add_direction()

TrajectoryCollection Operations

# Create collection from multiple trajectories
collection = mpd.TrajectoryCollection(gdf, traj_id_col='obj_id')

# Filter by spatial bounds
from shapely.geometry import Polygon
bbox = Polygon([(0, 0), (0.5, 0), (0.5, 0.5), (0, 0.5)])
intersecting = collection.get_intersecting(bbox)

# Get locations at specific time
locations = collection.get_locations_at(base_time + timedelta(minutes=20))

Install with Tessl CLI

npx tessl i tessl/pypi-movingpandas

docs

aggregation-analysis.md

core-data-structures.md

index.md

io-utilities.md

trajectory-processing.md

tile.json