Python library for trajectory and movement data analysis built on pandas and GeoPandas.
—
The fundamental classes for representing and managing trajectory data in MovingPandas. These classes provide rich functionality for spatio-temporal analysis, data enhancement, visualization, and conversion operations.
The core class for representing individual movement trajectories built on top of GeoPandas GeoDataFrame.
class Trajectory:
def __init__(self, df, traj_id, traj_id_col=None, obj_id=None, t=None, x=None, y=None, crs="epsg:4326", parent=None):
"""
Initialize a Trajectory object.
Parameters:
- df: GeoDataFrame with trajectory data
- traj_id: Trajectory identifier
- traj_id_col: Column name for trajectory ID
- obj_id: Object identifier
- t: Time column name (default: auto-detected)
- x: X coordinate column name (default: auto-detected)
- y: Y coordinate column name (default: auto-detected)
- crs: Coordinate reference system (default: "epsg:4326")
- parent: Parent trajectory for segments
"""def size(self):
"""Get number of trajectory points."""
def copy(self):
"""Create a deep copy of the trajectory."""
def is_valid(self):
"""Check if trajectory has valid structure."""
def get_crs(self):
"""Get coordinate reference system."""
def to_crs(self, crs):
"""Transform to different coordinate reference system."""
def is_latlon(self):
"""Check if using latitude/longitude coordinates."""def get_start_time(self):
"""Get trajectory start timestamp."""
def get_end_time(self):
"""Get trajectory end timestamp."""
def get_duration(self):
"""Get trajectory duration as timedelta."""
def get_sampling_interval(self):
"""Get average time interval between points."""def get_start_location(self):
"""Get starting point geometry."""
def get_end_location(self):
"""Get ending point geometry."""
def get_bbox(self):
"""Get bounding box of trajectory."""
def get_direction(self):
"""Get overall direction of movement."""
def get_length(self, units=UNITS()):
"""
Get trajectory length.
Parameters:
- units: UNITS object specifying distance units
Returns:
Trajectory length as float
"""def get_row_at(self, t, method="nearest"):
"""
Get trajectory row at specific time.
Parameters:
- t: Timestamp
- method: "nearest" or "interpolated"
Returns:
DataFrame row
"""
def get_position_at(self, t, method="interpolated"):
"""
Get position at specific time.
Parameters:
- t: Timestamp
- method: "interpolated" or "nearest"
Returns:
Point geometry
"""
def interpolate_position_at(self, t):
"""Interpolate position at specific time."""def to_linestring(self):
"""Convert trajectory to LineString geometry."""
def to_linestringm_wkt(self):
"""Convert to LineStringM WKT format."""
def intersects(self, polygon):
"""Check if trajectory intersects polygon."""
def distance(self, other, units=UNITS()):
"""
Calculate distance to another trajectory.
Parameters:
- other: Another Trajectory object
- units: UNITS object for distance units
Returns:
Distance as float
"""
def hausdorff_distance(self, other, units=UNITS()):
"""Calculate Hausdorff distance to another trajectory."""def to_point_gdf(self, return_orig_tz=False):
"""Convert to point GeoDataFrame."""
def to_line_gdf(self, columns=None):
"""Convert to line GeoDataFrame."""
def to_traj_gdf(self, wkt=False, agg=False):
"""Convert to trajectory GeoDataFrame."""
def to_mf_json(self, datetime_to_str=True, temporal_columns=None):
"""Convert to Moving Features JSON format."""def get_segment_between(self, t1, t2):
"""
Extract trajectory segment between two times.
Parameters:
- t1: Start time
- t2: End time
Returns:
New Trajectory object
"""
def get_linestring_between(self, t1, t2, method="interpolated"):
"""Get LineString between two times."""def clip(self, polygon, point_based=False):
"""Clip trajectory to polygon boundary."""
def intersection(self, feature, point_based=False):
"""Find intersection with geographic feature."""
def get_mcp(self):
"""Get minimum convex polygon (MCP) of trajectory."""def add_speed(self, overwrite=False, name="speed", units=UNITS()):
"""
Add speed column to trajectory.
Parameters:
- overwrite: Whether to overwrite existing column
- name: Column name for speed values
- units: UNITS object for speed units
Returns:
Modified Trajectory object
"""
def add_direction(self, overwrite=False, name="direction"):
"""Add direction/heading column."""
def add_distance(self, overwrite=False, name="distance", units=None):
"""Add distance column (distance between consecutive points)."""
def add_acceleration(self, overwrite=False, name="acceleration", units=UNITS()):
"""Add acceleration column."""
def add_timedelta(self, overwrite=False, name="timedelta"):
"""Add time delta column."""
def add_angular_difference(self, overwrite=False, name="angular_difference"):
"""Add angular difference column."""
def add_traj_id(self, overwrite=False):
"""Add trajectory ID column."""def get_min(self, column):
"""Get minimum value in column."""
def get_max(self, column):
"""Get maximum value in column."""
def get_column_names(self):
"""Get list of column names."""
def get_traj_id_col(self):
"""Get trajectory ID column name."""
def get_speed_col(self):
"""Get speed column name."""
def get_distance_col(self):
"""Get distance column name."""
def get_direction_col(self):
"""Get direction column name."""
def get_angular_difference_col(self):
"""Get angular difference column name."""
def get_timedelta_col(self):
"""Get timedelta column name."""
def get_geom_col(self):
"""Get geometry column name."""def plot(self, *args, **kwargs):
"""Create matplotlib plot of trajectory."""
def explore(self, *args, **kwargs):
"""Create interactive Folium map."""
def hvplot(self, *args, **kwargs):
"""Create hvplot visualization."""
def hvplot_pts(self, *args, **kwargs):
"""Create hvplot point visualization."""def drop(self, **kwargs):
"""Drop columns or rows from trajectory."""
def apply_offset_seconds(self, column, offset):
"""Apply time offset in seconds to column."""
def apply_offset_minutes(self, column, offset):
"""Apply time offset in minutes to column."""
def is_long_enough(self, min_length, units=UNITS()):
"""Check if trajectory meets minimum length requirement."""Container class for managing and analyzing multiple trajectories simultaneously.
class TrajectoryCollection:
def __init__(self, data, traj_id_col=None, obj_id_col=None, t=None, x=None, y=None, crs="epsg:4326", min_length=0, min_duration=None):
"""
Initialize a TrajectoryCollection.
Parameters:
- data: GeoDataFrame with multiple trajectory data
- traj_id_col: Column name for trajectory IDs
- obj_id_col: Column name for object IDs
- t: Time column name
- x: X coordinate column name
- y: Y coordinate column name
- crs: Coordinate reference system
- min_length: Minimum trajectory length filter
- min_duration: Minimum trajectory duration filter
"""def copy(self):
"""Create deep copy of collection."""
def drop(self, **kwargs):
"""Drop trajectories from collection."""
def __len__(self):
"""Get number of trajectories in collection."""
def __iter__(self):
"""Iterate over trajectories in collection."""def get_trajectory(self, traj_id):
"""
Get specific trajectory by ID.
Parameters:
- traj_id: Trajectory identifier
Returns:
Trajectory object
"""
def get_trajectories(self, obj_id):
"""Get all trajectories for specific object ID."""
def get_column_names(self):
"""Get list of column names."""
def get_traj_id_col(self):
"""Get trajectory ID column name."""
def get_geom_col(self):
"""Get geometry column name."""
def get_speed_col(self):
"""Get speed column name."""
def get_direction_col(self):
"""Get direction column name."""def to_point_gdf(self):
"""Convert to point GeoDataFrame."""
def to_line_gdf(self, columns=None):
"""Convert to line GeoDataFrame."""
def to_traj_gdf(self, wkt=False, agg=False):
"""Convert to trajectory GeoDataFrame."""
def to_mf_json(self, datetime_to_str=True, temporal_columns=None):
"""Convert to Moving Features JSON format."""def get_locations_at(self, t, with_direction=False):
"""Get locations of all trajectories at specific time."""
def get_start_locations(self, with_direction=False):
"""Get start locations of all trajectories."""
def get_end_locations(self, with_direction=False):
"""Get end locations of all trajectories."""
def get_segments_between(self, t1, t2):
"""Get trajectory segments between two times."""def get_intersecting(self, polygon):
"""Get trajectories that intersect with polygon."""
def intersection(self, feature, point_based=False):
"""Find intersection with geographic feature."""
def clip(self, polygon, point_based=False):
"""Clip all trajectories to polygon boundary."""def filter(self, property_name, property_values):
"""
Filter trajectories by property values.
Parameters:
- property_name: Column name to filter on
- property_values: Values to keep
Returns:
Filtered TrajectoryCollection
"""def add_speed(self, overwrite=False, name="speed", units=UNITS(), n_processes=1):
"""Add speed column to all trajectories."""
def add_direction(self, overwrite=False, name="direction", n_processes=1):
"""Add direction column to all trajectories."""
def add_angular_difference(self, overwrite=False, name="angular_difference", n_processes=1):
"""Add angular difference column to all trajectories."""
def add_acceleration(self, overwrite=False, name="acceleration", units=UNITS(), n_processes=1):
"""Add acceleration column to all trajectories."""
def add_distance(self, overwrite=False, name="distance", units=None, n_processes=1):
"""Add distance column to all trajectories."""
def add_timedelta(self, overwrite=False, name="timedelta", n_processes=1):
"""Add timedelta column to all trajectories."""
def add_traj_id(self, overwrite=False):
"""Add trajectory ID column to all trajectories."""def get_min(self, column):
"""Get minimum value across all trajectories."""
def get_max(self, column):
"""Get maximum value across all trajectories."""
def get_crs(self):
"""Get coordinate reference system."""
def is_latlon(self):
"""Check if using latitude/longitude coordinates."""def plot(self, *args, **kwargs):
"""Create matplotlib plot of all trajectories."""
def explore(self, *args, **kwargs):
"""Create interactive Folium map."""
def hvplot(self, *args, **kwargs):
"""Create hvplot visualization."""
def hvplot_pts(self, *args, **kwargs):
"""Create hvplot point visualization."""import movingpandas as mpd
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from datetime import datetime, timedelta
# Create sample data
data = []
base_time = datetime(2023, 1, 1, 12, 0, 0)
for i in range(10):
data.append({
'geometry': Point(i * 0.1, i * 0.1),
't': base_time + timedelta(minutes=i * 5),
'obj_id': 'A'
})
gdf = gpd.GeoDataFrame(data, crs='EPSG:4326')
# Create trajectory
traj = mpd.Trajectory(gdf, traj_id='traj_1')
# Analyze trajectory
print(f"Duration: {traj.get_duration()}")
print(f"Length: {traj.get_length()}")
# Add computed columns
traj = traj.add_speed()
traj = traj.add_direction()# Create collection from multiple trajectories
collection = mpd.TrajectoryCollection(gdf, traj_id_col='obj_id')
# Filter by spatial bounds
from shapely.geometry import Polygon
bbox = Polygon([(0, 0), (0.5, 0), (0.5, 0.5), (0, 0.5)])
intersecting = collection.get_intersecting(bbox)
# Get locations at specific time
locations = collection.get_locations_at(base_time + timedelta(minutes=20))Install with Tessl CLI
npx tessl i tessl/pypi-movingpandas