R-Tree spatial index for Python GIS
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced spatial indexing capabilities that extend beyond basic CRUD operations. These features provide high-performance bulk operations, custom storage implementations, temporal indexing, and set operations between indexes.
High-performance bulk queries using NumPy arrays for processing multiple spatial queries efficiently.
def intersection_v(self, mins, maxs):
"""
Bulk intersection query for multiple bounding boxes.
Parameters:
- mins (array-like): Minimum coordinates for each query box
- maxs (array-like): Maximum coordinates for each query box
Returns:
tuple: (ids, counts) where:
- ids: Flattened array of all intersecting item IDs
- counts: Array of intersection counts for each query box
"""
def nearest_v(self, mins, maxs, *, num_results=1, max_dists=None, strict=False, return_max_dists=False):
"""
Bulk k-nearest neighbor query for multiple bounding boxes.
Parameters:
- mins (array-like): Minimum coordinates for each query box
- maxs (array-like): Maximum coordinates for each query box
- num_results (int): Number of nearest neighbors to find per query
- max_dists (array-like, optional): Maximum search distances
- strict (bool): Strict distance checking
- return_max_dists (bool): Return maximum distances found
Returns:
tuple: (ids, counts) or (ids, counts, distances) if return_max_dists=True
"""Usage example:
import numpy as np
from rtree import index
# Create index with sample data
idx = index.Index()
for i in range(100):
x, y = i % 10, i // 10
idx.insert(i, (x, y, x+1, y+1))
# Bulk intersection queries
query_mins = np.array([[0, 0], [2, 2], [5, 5]])
query_maxs = np.array([[1, 1], [3, 3], [6, 6]])
ids, counts = idx.intersection_v(query_mins, query_maxs)
print(f"Found {len(ids)} total intersections")
print(f"Per-query counts: {counts}")
# Bulk nearest neighbor queries
nearest_ids, nearest_counts = idx.nearest_v(query_mins, query_maxs, num_results=3)
print(f"Found {len(nearest_ids)} nearest neighbors")Implement custom storage backends for specialized use cases like ZODB integration or custom persistence layers.
class ICustomStorage:
"""
Interface for custom storage implementations.
Must implement all abstract methods for data persistence.
"""
def create(self, name): ...
def destroy(self, name): ...
def open(self, name): ...
def close(self): ...
def flush(self): ...
def read(self, page, length): ...
def write(self, page, data): ...
def delete(self, page): ...
class CustomStorageBase(ICustomStorage):
"""
Base class for custom storage with default implementations.
Provides basic structure that can be extended for specific storage needs.
"""
class CustomStorage(ICustomStorage):
"""
Template custom storage implementation.
Example implementation showing the storage interface pattern.
"""Usage example:
from rtree import index
class MemoryStorage(index.CustomStorageBase):
def __init__(self):
self._data = {}
self._next_page = 0
def create(self, name):
self._data = {}
return 0
def write(self, page, data):
self._data[page] = data
return len(data)
def read(self, page, length):
return self._data.get(page, b'')
# Use custom storage
storage = MemoryStorage()
idx = index.Index(storage=storage)Support for Time-Parameterized R-Trees (TPR-Tree) that handle moving objects with temporal queries.
class Property:
tpr_horizon: float
"""
Time horizon for TPR-Tree temporal queries.
Sets the time window for trajectory prediction.
"""TPR-Tree indexes use specialized coordinate formats that include velocity information:
from rtree import index
# Configure for TPR-Tree
p = index.Property()
p.type = index.RT_TPRTree
p.tpr_horizon = 100.0
tpr_idx = index.Index(properties=p)
# Insert moving object: ((position_bounds), (velocity_bounds), time)
tpr_idx.insert(0, ((0, 0, 1, 1), (0.1, 0.1, 0.1, 0.1), 0.0))Perform set operations between spatial indexes to combine or compare their contents.
def __and__(self, other: Index) -> Index:
"""
Intersection of two indexes - items present in both indexes.
Parameters:
- other (Index): Another spatial index
Returns:
Index: New index containing items in both indexes
"""
def __or__(self, other: Index) -> Index:
"""
Union of two indexes - items present in either index.
Parameters:
- other (Index): Another spatial index
Returns:
Index: New index containing items from both indexes
"""Usage example:
from rtree import index
# Create two indexes
idx1 = index.Index()
idx1.insert(0, (0, 0, 1, 1))
idx1.insert(1, (1, 1, 2, 2))
idx2 = index.Index()
idx2.insert(1, (1, 1, 2, 2)) # Overlapping item
idx2.insert(2, (2, 2, 3, 3))
# Set operations
intersection_idx = idx1 & idx2 # Items in both
union_idx = idx1 | idx2 # Items in either
print(len(intersection_idx)) # 1
print(len(union_idx)) # 3Configure specialized index behaviors and performance characteristics.
class Property:
near_minimum_overlap_factor: int
"""
Factor for near minimum overlap splitting in R*-Tree.
"""
split_distribution_factor: float
"""
Distribution factor for node splitting algorithms.
"""
reinsert_factor: float
"""
Reinsertion factor for R*-Tree forced reinsertion.
"""
tight_mbr: bool
"""
Use tight minimum bounding rectangles.
"""
buffering_capacity: int
"""
Buffer capacity for batched operations.
"""
point_pool_capacity: int
"""
Capacity of the point object pool.
"""
region_pool_capacity: int
"""
Capacity of the region object pool.
"""Load large datasets efficiently using stream-based bulk loading.
def __init__(self, stream, **kwargs):
"""
Create index from a stream of data for efficient bulk loading.
Parameters:
- stream (iterable): Stream of (id, coordinates, object) tuples
Stream format:
- Regular R-Tree: (id, coordinates, object)
- TPR-Tree: (id, ((pos_coords), (vel_coords), time), object)
"""Usage example:
from rtree import index
def data_stream():
"""Generator that yields spatial data."""
for i in range(10000):
x, y = i % 100, i // 100
yield (i, (x, y, x+1, y+1), f"Object {i}")
# Bulk load from stream - more efficient than individual inserts
idx = index.Index(data_stream())
print(f"Bulk loaded {len(idx)} items")Control object serialization for stored data and index persistence.
def dumps(self, obj: object) -> bytes:
"""
Serialize an object for storage in the index.
Parameters:
- obj (object): Object to serialize
Returns:
bytes: Serialized object data
Note:
Default implementation uses pickle. Override for custom serialization.
"""
def loads(self, string: bytes) -> object:
"""
Deserialize an object from index storage.
Parameters:
- string (bytes): Serialized object data
Returns:
object: Deserialized object
Note:
Default implementation uses pickle. Override for custom deserialization.
"""
def __getstate__(self) -> dict[str, Any]:
"""
Get state for pickle serialization of the index itself.
Returns:
dict: Index state for serialization
"""
def __setstate__(self, state: dict[str, Any]) -> None:
"""
Restore index state from pickle deserialization.
Parameters:
- state (dict): Index state from serialization
"""Usage example with custom serialization:
import json
from rtree import index
class JSONIndex(index.Index):
def dumps(self, obj):
return json.dumps(obj).encode('utf-8')
def loads(self, data):
return json.loads(data.decode('utf-8'))
# Use custom serialization
json_idx = JSONIndex()
json_idx.insert(0, (0, 0, 1, 1), obj={"name": "Building", "floors": 5})
# Objects are serialized as JSON instead of pickle
for item in json_idx.intersection((0, 0, 1, 1), objects=True):
print(item.object) # {"name": "Building", "floors": 5}Access detailed information about index leaf nodes for advanced analysis.
def leaves(self):
"""
Get information about leaf nodes in the index.
Returns:
Generator yielding leaf node information including:
- Node IDs
- Child item IDs
- Node bounding boxes
"""Usage example:
from rtree import index
idx = index.Index()
for i in range(10):
idx.insert(i, (i, i, i+1, i+1))
# Examine leaf structure
leaf_info = list(idx.leaves())
print(f"Index has {len(leaf_info)} leaf nodes")Control query result pagination and limits.
@property
def result_limit(self) -> int:
"""
Get/set the maximum number of results returned by queries.
Returns:
int: Current result limit
"""
@property
def result_offset(self) -> int:
"""
Get/set the offset for query results (pagination).
Returns:
int: Current result offset
"""Usage example:
from rtree import index
idx = index.Index()
for i in range(100):
idx.insert(i, (0, 0, 10, 10)) # All overlap
# Paginate results
idx.result_limit = 10
idx.result_offset = 20
hits = list(idx.intersection((0, 0, 10, 10)))
print(f"Got {len(hits)} results (page 3)") # Results 20-29Alternative object-oriented interface that automatically manages object-coordinate associations.
class RtreeContainer:
"""
Object-oriented spatial container that automatically manages coordinate relationships.
Unlike the Index class which requires separate ID and coordinate management,
RtreeContainer directly stores objects and automatically handles coordinate associations.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Create a new spatial container.
Parameters:
Same as Index constructor (properties, storage, etc.)
"""
def insert(self, obj: object, coordinates: Any) -> None:
"""
Insert an object with its spatial coordinates.
Parameters:
- obj (object): Object to store
- coordinates (sequence): Spatial coordinates
"""
def delete(self, obj: object, coordinates: Any) -> None:
"""
Delete an object from the container.
Parameters:
- obj (object): Object to remove
- coordinates (sequence): Object's coordinates
"""
def intersection(self, coordinates: Any, bbox: bool = False) -> Iterator[object | Item]:
"""
Find objects that intersect with coordinates.
Parameters:
- coordinates (sequence): Query bounds
- bbox (bool): Return Item objects with bounding boxes
Returns:
Iterator of objects or Item objects
"""
def nearest(self, coordinates: Any, num_results: int = 1, bbox: bool = True) -> Iterator[object | Item]:
"""
Find nearest objects to coordinates.
Parameters:
- coordinates (sequence): Query point
- num_results (int): Number of results
- bbox (bool): Return Item objects with bounding boxes
Returns:
Iterator of objects or Item objects
"""
def __contains__(self, obj: object) -> bool:
"""Check if object is in container."""
def __len__(self) -> int:
"""Get number of objects in container."""
def __iter__(self) -> Iterator[object]:
"""Iterate over all objects in container."""Usage example:
from rtree import index
# Create object-oriented container
container = index.RtreeContainer()
# Insert objects directly
buildings = [
{"name": "City Hall", "type": "government"},
{"name": "Library", "type": "public"},
{"name": "Mall", "type": "commercial"}
]
container.insert(buildings[0], (100, 100, 150, 150))
container.insert(buildings[1], (200, 200, 250, 250))
container.insert(buildings[2], (120, 120, 180, 180))
# Query for objects
for building in container.intersection((110, 110, 160, 160)):
print(f"Found: {building['name']}")
# Check membership
print(buildings[0] in container) # True
# Iterate over all objects
for building in container:
print(building['name'])Install with Tessl CLI
npx tessl i tessl/pypi-rtree