Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.
—
Real-time frame processing with built-in filters for noise reduction, alignment, colorization, and format conversion. Supports custom processing blocks and filter chaining for advanced computer vision pipelines.
Base classes for frame processing and filtering.
class processing_block:
def __init__(processing_function):
"""
Create custom processing block.
Args:
processing_function: Function that processes frames
"""
def start(callback_function):
"""
Start processing with callback.
Args:
callback_function: Function called with processed frames
"""
def invoke(frame):
"""
Process single frame synchronously.
Args:
frame (frame): Input frame to process
"""
def supports(camera_info) -> bool:
"""
Check if processing block supports camera info.
Args:
camera_info (camera_info): Info field to check
Returns:
bool: True if supported
"""
def get_info(camera_info) -> str:
"""
Get processing block information.
Args:
camera_info (camera_info): Info field to retrieve
Returns:
str: Information value
"""
class filter_interface:
def process(frame) -> frame:
"""
Process frame and return result.
Args:
frame (frame): Input frame
Returns:
frame: Processed frame
"""
class filter(processing_block, filter_interface):
def __init__(filter_function, queue_size=1):
"""
Create filter with custom function.
Args:
filter_function: Function that processes frames
queue_size (int): Internal queue size
"""
# Type checking methods for specific filters
def is_colorizer() -> bool:
"""Check if filter is a colorizer."""
def is_decimation_filter() -> bool:
"""Check if filter is a decimation filter."""
def is_temporal_filter() -> bool:
"""Check if filter is a temporal filter."""
def is_spatial_filter() -> bool:
"""Check if filter is a spatial filter."""
def is_hole_filling_filter() -> bool:
"""Check if filter is a hole filling filter."""
def is_disparity_transform() -> bool:
"""Check if filter is a disparity transform."""
def is_threshold_filter() -> bool:
"""Check if filter is a threshold filter."""
def is_align() -> bool:
"""Check if filter is an align filter."""
def is_pointcloud() -> bool:
"""Check if filter is a pointcloud filter."""
# Type casting methods
def as_colorizer() -> colorizer:
"""Cast to colorizer filter."""
def as_decimation_filter() -> decimation_filter:
"""Cast to decimation filter."""
def as_temporal_filter() -> temporal_filter:
"""Cast to temporal filter."""
def as_spatial_filter() -> spatial_filter:
"""Cast to spatial filter."""
def as_hole_filling_filter() -> hole_filling_filter:
"""Cast to hole filling filter."""
def as_disparity_transform() -> disparity_transform:
"""Cast to disparity transform."""
def as_threshold_filter() -> threshold_filter:
"""Cast to threshold filter."""
def as_align() -> align:
"""Cast to align filter."""
def as_pointcloud() -> pointcloud:
"""Cast to pointcloud filter."""
class frame_source:
def allocate_video_frame(profile, original, new_bpp=0, new_width=0,
new_height=0, new_stride=0, frame_type=rs.frame_type.video_frame) -> video_frame:
"""
Allocate new video frame.
Args:
profile (stream_profile): Stream profile for new frame
original (frame): Original frame for reference
new_bpp (int): New bits per pixel (0 for same as original)
new_width (int): New width (0 for same as original)
new_height (int): New height (0 for same as original)
new_stride (int): New stride (0 for calculated)
frame_type (frame_type): Type of frame to allocate
Returns:
video_frame: Allocated frame
"""
def allocate_motion_frame(profile, original, frame_type=rs.frame_type.motion_frame) -> motion_frame:
"""
Allocate new motion frame.
Args:
profile (stream_profile): Stream profile for new frame
original (frame): Original frame for reference
frame_type (frame_type): Type of frame to allocate
Returns:
motion_frame: Allocated frame
"""
def allocate_points(profile, original) -> points:
"""
Allocate new point cloud frame.
Args:
profile (stream_profile): Stream profile
original (frame): Original frame for reference
Returns:
points: Allocated point cloud
"""
def allocate_composite_frame(frames) -> frameset:
"""
Allocate composite frame from multiple frames.
Args:
frames (list[frame]): Frames to combine
Returns:
frameset: Composite frame
"""
def frame_ready(result_frame):
"""
Signal that frame is ready for output.
Args:
result_frame (frame): Processed frame to output
"""Frame synchronization and buffering for multi-stream processing.
class frame_queue:
def __init__(capacity=1, keep_frames=False):
"""
Create frame queue for buffering.
Args:
capacity (int): Maximum number of frames to buffer
keep_frames (bool): Whether to keep frame references
"""
def enqueue(frame):
"""
Add frame to queue.
Args:
frame (frame): Frame to add
"""
def wait_for_frame(timeout_ms=5000) -> frame:
"""
Wait for next frame from queue.
Args:
timeout_ms (int): Maximum wait time
Returns:
frame: Next available frame
Raises:
rs.error: If timeout expires
"""
def poll_for_frame() -> frame:
"""
Get next frame without blocking.
Returns:
frame: Next frame or None if queue empty
"""
def try_wait_for_frame(timeout_ms=5000) -> tuple[bool, frame]:
"""
Try to get next frame with timeout.
Args:
timeout_ms (int): Maximum wait time
Returns:
tuple: (success, frame)
"""
def capacity() -> int:
"""
Get queue capacity.
Returns:
int: Maximum queue size
"""
def size() -> int:
"""
Get current queue size.
Returns:
int: Number of frames in queue
"""
def keep_frames() -> bool:
"""
Check if queue keeps frame references.
Returns:
bool: True if keeping references
"""
class syncer:
def __init__(queue_size=1):
"""
Create frame synchronizer.
Args:
queue_size (int): Internal queue size
"""
def wait_for_frames(timeout_ms=5000) -> frameset:
"""
Wait for synchronized frameset.
Args:
timeout_ms (int): Maximum wait time
Returns:
frameset: Synchronized frames
"""
def wait_for_frame(timeout_ms=5000) -> frameset:
"""
Alias for wait_for_frames.
Args:
timeout_ms (int): Maximum wait time
Returns:
frameset: Synchronized frames
"""
def poll_for_frames() -> frameset:
"""
Get synchronized frames without blocking.
Returns:
frameset: Available synchronized frames or None
"""
def poll_for_frame() -> frameset:
"""
Alias for poll_for_frames.
Returns:
frameset: Available synchronized frames or None
"""
def try_wait_for_frames(timeout_ms=5000) -> tuple[bool, frameset]:
"""
Try to get synchronized frames with timeout.
Args:
timeout_ms (int): Maximum wait time
Returns:
tuple: (success, frameset)
"""
def try_wait_for_frame(timeout_ms=5000) -> tuple[bool, frameset]:
"""
Alias for try_wait_for_frames.
Args:
timeout_ms (int): Maximum wait time
Returns:
tuple: (success, frameset)
"""class pointcloud(filter):
def __init__(stream=rs.stream.any, index=0):
"""
Create point cloud generator.
Args:
stream (stream): Stream to use for texture mapping
index (int): Stream index
"""
def calculate(depth_frame) -> points:
"""
Generate point cloud from depth frame.
Args:
depth_frame (depth_frame): Input depth data
Returns:
points: Generated point cloud
"""
def map_to(texture_frame):
"""
Set texture source for point cloud.
Args:
texture_frame (video_frame): Frame to use for texture coordinates
"""class align(filter):
def __init__(align_to_stream):
"""
Create frame alignment filter.
Args:
align_to_stream (stream): Target stream to align to
"""
def process(frameset) -> frameset:
"""
Align frames to target stream.
Args:
frameset (frameset): Input frames to align
Returns:
frameset: Aligned frames
"""class colorizer(filter):
def __init__(color_scheme=0):
"""
Create depth colorizer.
Args:
color_scheme (int): Color scheme (0-8)
0: Jet (blue-red)
1: Classic (grayscale)
2: WhiteToBlack
3: BlackToWhite
4: Bio (green-based)
5: Cold (blue-based)
6: Warm (red-based)
7: Quantized
8: Pattern
"""
def colorize(depth_frame) -> video_frame:
"""
Colorize depth frame.
Args:
depth_frame (depth_frame): Input depth data
Returns:
video_frame: Colorized depth image
"""class decimation_filter(filter):
def __init__(magnitude=2.0):
"""
Create decimation filter to reduce resolution.
Args:
magnitude (float): Decimation factor (1.0-8.0)
"""
class temporal_filter(filter):
def __init__(smooth_alpha=0.4, smooth_delta=20.0, persistence_control=3):
"""
Create temporal noise reduction filter.
Args:
smooth_alpha (float): Alpha factor for smoothing (0.0-1.0)
smooth_delta (float): Delta threshold for edge-preserving
persistence_control (int): Persistence control (0-8)
"""
class spatial_filter(filter):
def __init__(smooth_alpha=0.5, smooth_delta=20.0, magnitude=2.0, hole_fill=0.0):
"""
Create spatial noise reduction filter.
Args:
smooth_alpha (float): Alpha factor for smoothing (0.0-1.0)
smooth_delta (float): Delta threshold for edge-preserving
magnitude (float): Effect magnitude (1.0-5.0)
hole_fill (float): Hole filling factor (0.0-5.0)
"""
class hole_filling_filter(filter):
def __init__(mode=0):
"""
Create hole filling filter.
Args:
mode (int): Filling mode
0: fill_from_left
1: farest_from_around
2: nearest_from_around
"""class disparity_transform(filter):
def __init__(transform_to_disparity=True):
"""
Create depth/disparity transform filter.
Args:
transform_to_disparity (bool): True for depth->disparity, False for disparity->depth
"""
class threshold_filter(filter):
def __init__(min_dist=0.15, max_dist=4.0):
"""
Create depth range threshold filter.
Args:
min_dist (float): Minimum distance in meters
max_dist (float): Maximum distance in meters
"""
class units_transform(filter):
def __init__():
"""Create depth units transformation filter."""
class yuy_decoder(filter):
def __init__():
"""Create YUY format decoder filter."""
class rotation_filter(filter):
def __init__(streams=[]):
"""
Create frame rotation filter.
Args:
streams (list): Streams to apply rotation to
"""
class hdr_merge(filter):
def __init__():
"""Create HDR frame merging filter."""
class sequence_id_filter(filter):
def __init__(sequence_id=0.0):
"""
Create sequence ID filter.
Args:
sequence_id (float): Sequence identifier
"""import pyrealsense2 as rs
import numpy as np
# Create pipeline
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline.start(config)
# Create filter chain
decimation = rs.decimation_filter(2.0)
spatial = rs.spatial_filter()
temporal = rs.temporal_filter()
hole_filling = rs.hole_filling_filter()
colorizer = rs.colorizer()
try:
for i in range(100):
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
if not depth_frame:
continue
# Apply filter chain
filtered = decimation.process(depth_frame)
filtered = spatial.process(filtered)
filtered = temporal.process(filtered)
filtered = hole_filling.process(filtered)
# Convert to depth frame for distance queries
depth_filtered = filtered.as_depth_frame()
# Get center distance
width = depth_filtered.get_width()
height = depth_filtered.get_height()
center_dist = depth_filtered.get_distance(width // 2, height // 2)
# Colorize for visualization
colorized = colorizer.process(filtered)
color_frame = colorized.as_video_frame()
print(f"Frame {i}: center distance = {center_dist:.3f}m, "
f"filtered size = {width}x{height}")
finally:
pipeline.stop()import pyrealsense2 as rs
import numpy as np
# Configure streams
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipeline.start(config)
# Create point cloud generator
pc = rs.pointcloud()
points = rs.points()
try:
# Wait for frames with both color and depth
for i in range(10):
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
color_frame = frames.get_color_frame()
if not depth_frame or not color_frame:
continue
# Map color texture to point cloud
pc.map_to(color_frame)
# Generate point cloud
points = pc.calculate(depth_frame)
print(f"Frame {i}: Generated {points.size()} points")
# Get vertices as numpy array
vertices = np.asanyarray(points.get_vertices())
print(f" Vertex data shape: {vertices.shape}")
# Get texture coordinates
tex_coords = np.asanyarray(points.get_texture_coordinates())
print(f" Texture coordinate shape: {tex_coords.shape}")
# Export first point cloud to PLY file
if i == 0:
points.export_to_ply("pointcloud.ply", color_frame)
print(" Exported to pointcloud.ply")
finally:
pipeline.stop()import pyrealsense2 as rs
import numpy as np
# Configure streams
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
profile = pipeline.start(config)
# Create alignment filter to align depth to color
align_to_color = rs.align(rs.stream.color)
try:
for i in range(10):
frames = pipeline.wait_for_frames()
# Align frames
aligned_frames = align_to_color.process(frames)
# Get aligned frames
aligned_depth = aligned_frames.get_depth_frame()
color_frame = aligned_frames.get_color_frame()
if not aligned_depth or not color_frame:
continue
# Convert to numpy arrays
depth_image = np.asanyarray(aligned_depth.get_data())
color_image = np.asanyarray(color_frame.get_data())
print(f"Frame {i}:")
print(f" Aligned depth: {depth_image.shape}")
print(f" Color: {color_image.shape}")
# Now depth and color are pixel-aligned
# Can directly correlate depth and color at same coordinates
center_x, center_y = 320, 240
depth_at_center = aligned_depth.get_distance(center_x, center_y)
color_at_center = color_image[center_y, center_x]
print(f" Center pixel: depth={depth_at_center:.3f}m, "
f"color={color_at_center}")
finally:
pipeline.stop()import pyrealsense2 as rs
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline.start(config)
# Create and configure filters
decimation = rs.decimation_filter()
decimation.set_option(rs.option.filter_magnitude, 3.0)
temporal = rs.temporal_filter()
temporal.set_option(rs.option.filter_smooth_alpha, 0.3)
temporal.set_option(rs.option.filter_smooth_delta, 30.0)
temporal.set_option(rs.option.holes_fill, 2)
spatial = rs.spatial_filter()
spatial.set_option(rs.option.filter_smooth_alpha, 0.6)
spatial.set_option(rs.option.filter_smooth_delta, 25.0)
spatial.set_option(rs.option.filter_magnitude, 3.0)
spatial.set_option(rs.option.holes_fill, 2.0)
hole_filling = rs.hole_filling_filter()
hole_filling.set_option(rs.option.holes_fill, 2) # nearest_from_around
threshold = rs.threshold_filter()
threshold.set_option(rs.option.min_distance, 0.2)
threshold.set_option(rs.option.max_distance, 3.0)
colorizer = rs.colorizer()
colorizer.set_option(rs.option.color_scheme, 2) # WhiteToBlack
print("Filter configurations:")
print(f" Decimation magnitude: {decimation.get_option(rs.option.filter_magnitude)}")
print(f" Temporal alpha: {temporal.get_option(rs.option.filter_smooth_alpha)}")
print(f" Spatial magnitude: {spatial.get_option(rs.option.filter_magnitude)}")
print(f" Threshold range: {threshold.get_option(rs.option.min_distance):.1f} - "
f"{threshold.get_option(rs.option.max_distance):.1f}m")
try:
for i in range(10):
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
if not depth_frame:
continue
# Apply comprehensive filter chain
filtered = decimation.process(depth_frame)
filtered = threshold.process(filtered)
filtered = spatial.process(filtered)
filtered = temporal.process(filtered)
filtered = hole_filling.process(filtered)
# Colorize result
colorized = colorizer.process(filtered)
# Get frame info
depth_filtered = filtered.as_depth_frame()
color_frame = colorized.as_video_frame()
print(f"Frame {i}: {depth_filtered.get_width()}x{depth_filtered.get_height()} -> "
f"{color_frame.get_width()}x{color_frame.get_height()}")
finally:
pipeline.stop()import pyrealsense2 as rs
import numpy as np
def custom_processing_function(frame):
"""Custom processing function that inverts depth values."""
if frame.is_depth_frame():
depth_frame = frame.as_depth_frame()
# Get frame data as numpy array
depth_data = np.asanyarray(depth_frame.get_data())
# Invert depth values (example processing)
max_depth = depth_data.max()
if max_depth > 0:
inverted_data = max_depth - depth_data
# Create new frame with processed data
# (This is simplified - real implementation would need proper frame allocation)
print(f"Processed depth frame: inverted {depth_data.shape} depth data")
return frame
# Create custom processing block
custom_processor = rs.processing_block(custom_processing_function)
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline.start(config)
# Use custom processor with callback
def processed_frame_callback(processed_frame):
if processed_frame.is_depth_frame():
depth = processed_frame.as_depth_frame()
print(f"Received processed frame #{depth.get_frame_number()}")
custom_processor.start(processed_frame_callback)
try:
for i in range(10):
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
if depth_frame:
# Send frame to custom processor
custom_processor.invoke(depth_frame)
finally:
pipeline.stop()import pyrealsense2 as rs
import threading
import time
# Create frame queue for buffering
frame_queue = rs.frame_queue(capacity=10, keep_frames=True)
def processing_thread():
"""Background thread for frame processing."""
processed_count = 0
while True:
try:
# Wait for frame with timeout
frame = frame_queue.wait_for_frame(timeout_ms=1000)
if frame.is_depth_frame():
depth_frame = frame.as_depth_frame()
# Simulate processing time
time.sleep(0.01)
processed_count += 1
print(f"Processed frame #{depth_frame.get_frame_number()} "
f"(total: {processed_count})")
except rs.error:
# Timeout or other error
break
# Start processing thread
processor = threading.Thread(target=processing_thread)
processor.daemon = True
processor.start()
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline.start(config)
try:
# Stream frames into queue
for i in range(100):
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
if depth_frame:
frame_queue.enqueue(depth_frame)
# Print queue status
if i % 10 == 0:
print(f"Queue size: {frame_queue.size()}/{frame_queue.capacity()}")
time.sleep(0.03) # 30fps
finally:
pipeline.stop()
processor.join(timeout=1.0)# Recommended filter order for general use
filters = [
rs.decimation_filter(2.0), # Reduce resolution for performance
rs.threshold_filter(0.15, 4.0), # Remove very close/far points
rs.spatial_filter(), # Spatial noise reduction
rs.temporal_filter(), # Temporal noise reduction
rs.hole_filling_filter() # Fill holes in depth data
]
# Apply filters in sequence
def apply_filters(frame, filters):
for filter in filters:
frame = filter.process(frame)
return frame# Optimized for speed
filters = [
rs.decimation_filter(4.0), # Aggressive decimation
rs.threshold_filter(0.2, 2.0), # Narrow range
rs.hole_filling_filter(0) # Simple hole filling
]# Optimized for quality
filters = [
rs.decimation_filter(1.0), # No decimation
rs.spatial_filter(0.8, 15.0, 5.0, 5.0), # Strong spatial filtering
rs.temporal_filter(0.2, 50.0, 5), # Strong temporal filtering
rs.hole_filling_filter(2) # Best hole filling
]Install with Tessl CLI
npx tessl i tessl/pypi-pyrealsense2