CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyrealsense2

Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.

Pending
Overview
Eval results
Files

processing.mddocs/

Processing & Filters

Real-time frame processing with built-in filters for noise reduction, alignment, colorization, and format conversion. Supports custom processing blocks and filter chaining for advanced computer vision pipelines.

Capabilities

Processing Framework

Base classes for frame processing and filtering.

class processing_block:
    def __init__(processing_function):
        """
        Create custom processing block.
        
        Args:
            processing_function: Function that processes frames
        """
    
    def start(callback_function):
        """
        Start processing with callback.
        
        Args:
            callback_function: Function called with processed frames
        """
    
    def invoke(frame):
        """
        Process single frame synchronously.
        
        Args:
            frame (frame): Input frame to process
        """
    
    def supports(camera_info) -> bool:
        """
        Check if processing block supports camera info.
        
        Args:
            camera_info (camera_info): Info field to check
            
        Returns:
            bool: True if supported
        """
    
    def get_info(camera_info) -> str:
        """
        Get processing block information.
        
        Args:
            camera_info (camera_info): Info field to retrieve
            
        Returns:
            str: Information value
        """

class filter_interface:
    def process(frame) -> frame:
        """
        Process frame and return result.
        
        Args:
            frame (frame): Input frame
            
        Returns:
            frame: Processed frame
        """

class filter(processing_block, filter_interface):
    def __init__(filter_function, queue_size=1):
        """
        Create filter with custom function.
        
        Args:
            filter_function: Function that processes frames
            queue_size (int): Internal queue size
        """
    
    # Type checking methods for specific filters
    def is_colorizer() -> bool:
        """Check if filter is a colorizer."""
    
    def is_decimation_filter() -> bool:
        """Check if filter is a decimation filter."""
    
    def is_temporal_filter() -> bool:
        """Check if filter is a temporal filter."""
    
    def is_spatial_filter() -> bool:
        """Check if filter is a spatial filter."""
    
    def is_hole_filling_filter() -> bool:
        """Check if filter is a hole filling filter."""
    
    def is_disparity_transform() -> bool:
        """Check if filter is a disparity transform."""
    
    def is_threshold_filter() -> bool:
        """Check if filter is a threshold filter."""
    
    def is_align() -> bool:
        """Check if filter is an align filter."""
    
    def is_pointcloud() -> bool:
        """Check if filter is a pointcloud filter."""
    
    # Type casting methods
    def as_colorizer() -> colorizer:
        """Cast to colorizer filter."""
    
    def as_decimation_filter() -> decimation_filter:
        """Cast to decimation filter."""
    
    def as_temporal_filter() -> temporal_filter:
        """Cast to temporal filter."""
    
    def as_spatial_filter() -> spatial_filter:
        """Cast to spatial filter."""
    
    def as_hole_filling_filter() -> hole_filling_filter:
        """Cast to hole filling filter."""
    
    def as_disparity_transform() -> disparity_transform:
        """Cast to disparity transform."""
    
    def as_threshold_filter() -> threshold_filter:
        """Cast to threshold filter."""
    
    def as_align() -> align:
        """Cast to align filter."""
    
    def as_pointcloud() -> pointcloud:
        """Cast to pointcloud filter."""

class frame_source:
    def allocate_video_frame(profile, original, new_bpp=0, new_width=0, 
                           new_height=0, new_stride=0, frame_type=rs.frame_type.video_frame) -> video_frame:
        """
        Allocate new video frame.
        
        Args:
            profile (stream_profile): Stream profile for new frame
            original (frame): Original frame for reference
            new_bpp (int): New bits per pixel (0 for same as original)
            new_width (int): New width (0 for same as original)
            new_height (int): New height (0 for same as original)  
            new_stride (int): New stride (0 for calculated)
            frame_type (frame_type): Type of frame to allocate
            
        Returns:
            video_frame: Allocated frame
        """
    
    def allocate_motion_frame(profile, original, frame_type=rs.frame_type.motion_frame) -> motion_frame:
        """
        Allocate new motion frame.
        
        Args:
            profile (stream_profile): Stream profile for new frame
            original (frame): Original frame for reference
            frame_type (frame_type): Type of frame to allocate
            
        Returns:
            motion_frame: Allocated frame
        """
    
    def allocate_points(profile, original) -> points:
        """
        Allocate new point cloud frame.
        
        Args:
            profile (stream_profile): Stream profile
            original (frame): Original frame for reference
            
        Returns:
            points: Allocated point cloud
        """
    
    def allocate_composite_frame(frames) -> frameset:
        """
        Allocate composite frame from multiple frames.
        
        Args:
            frames (list[frame]): Frames to combine
            
        Returns:
            frameset: Composite frame
        """
    
    def frame_ready(result_frame):
        """
        Signal that frame is ready for output.
        
        Args:
            result_frame (frame): Processed frame to output
        """

Synchronization

Frame synchronization and buffering for multi-stream processing.

class frame_queue:
    def __init__(capacity=1, keep_frames=False):
        """
        Create frame queue for buffering.
        
        Args:
            capacity (int): Maximum number of frames to buffer
            keep_frames (bool): Whether to keep frame references
        """
    
    def enqueue(frame):
        """
        Add frame to queue.
        
        Args:
            frame (frame): Frame to add
        """
    
    def wait_for_frame(timeout_ms=5000) -> frame:
        """
        Wait for next frame from queue.
        
        Args:
            timeout_ms (int): Maximum wait time
            
        Returns:
            frame: Next available frame
            
        Raises:
            rs.error: If timeout expires
        """
    
    def poll_for_frame() -> frame:
        """
        Get next frame without blocking.
        
        Returns:
            frame: Next frame or None if queue empty
        """
    
    def try_wait_for_frame(timeout_ms=5000) -> tuple[bool, frame]:
        """
        Try to get next frame with timeout.
        
        Args:
            timeout_ms (int): Maximum wait time
            
        Returns:
            tuple: (success, frame)
        """
    
    def capacity() -> int:
        """
        Get queue capacity.
        
        Returns:
            int: Maximum queue size
        """
    
    def size() -> int:
        """
        Get current queue size.
        
        Returns:
            int: Number of frames in queue
        """
    
    def keep_frames() -> bool:
        """
        Check if queue keeps frame references.
        
        Returns:
            bool: True if keeping references
        """

class syncer:
    def __init__(queue_size=1):
        """
        Create frame synchronizer.
        
        Args:
            queue_size (int): Internal queue size
        """
    
    def wait_for_frames(timeout_ms=5000) -> frameset:
        """
        Wait for synchronized frameset.
        
        Args:
            timeout_ms (int): Maximum wait time
            
        Returns:
            frameset: Synchronized frames
        """
    
    def wait_for_frame(timeout_ms=5000) -> frameset:
        """
        Alias for wait_for_frames.
        
        Args:
            timeout_ms (int): Maximum wait time
            
        Returns:
            frameset: Synchronized frames
        """
    
    def poll_for_frames() -> frameset:
        """
        Get synchronized frames without blocking.
        
        Returns:
            frameset: Available synchronized frames or None
        """
    
    def poll_for_frame() -> frameset:
        """
        Alias for poll_for_frames.
        
        Returns:  
            frameset: Available synchronized frames or None
        """
    
    def try_wait_for_frames(timeout_ms=5000) -> tuple[bool, frameset]:
        """
        Try to get synchronized frames with timeout.
        
        Args:
            timeout_ms (int): Maximum wait time
            
        Returns:
            tuple: (success, frameset)
        """
    
    def try_wait_for_frame(timeout_ms=5000) -> tuple[bool, frameset]:
        """
        Alias for try_wait_for_frames.
        
        Args:
            timeout_ms (int): Maximum wait time
            
        Returns:
            tuple: (success, frameset)
        """

Built-in Filters

Point Cloud Generation

class pointcloud(filter):
    def __init__(stream=rs.stream.any, index=0):
        """
        Create point cloud generator.
        
        Args:
            stream (stream): Stream to use for texture mapping
            index (int): Stream index
        """
    
    def calculate(depth_frame) -> points:
        """
        Generate point cloud from depth frame.
        
        Args:
            depth_frame (depth_frame): Input depth data
            
        Returns:
            points: Generated point cloud
        """
    
    def map_to(texture_frame):
        """
        Set texture source for point cloud.
        
        Args:
            texture_frame (video_frame): Frame to use for texture coordinates
        """

Frame Alignment

class align(filter):
    def __init__(align_to_stream):
        """
        Create frame alignment filter.
        
        Args:
            align_to_stream (stream): Target stream to align to
        """
    
    def process(frameset) -> frameset:
        """
        Align frames to target stream.
        
        Args:
            frameset (frameset): Input frames to align
            
        Returns:
            frameset: Aligned frames
        """

Depth Colorization

class colorizer(filter):
    def __init__(color_scheme=0):
        """
        Create depth colorizer.
        
        Args:
            color_scheme (int): Color scheme (0-8)
                0: Jet (blue-red)
                1: Classic (grayscale)
                2: WhiteToBlack
                3: BlackToWhite  
                4: Bio (green-based)
                5: Cold (blue-based)
                6: Warm (red-based)
                7: Quantized
                8: Pattern
        """
    
    def colorize(depth_frame) -> video_frame:
        """
        Colorize depth frame.
        
        Args:
            depth_frame (depth_frame): Input depth data
            
        Returns:
            video_frame: Colorized depth image
        """

Noise Reduction Filters

class decimation_filter(filter):
    def __init__(magnitude=2.0):
        """
        Create decimation filter to reduce resolution.
        
        Args:
            magnitude (float): Decimation factor (1.0-8.0)
        """

class temporal_filter(filter):
    def __init__(smooth_alpha=0.4, smooth_delta=20.0, persistence_control=3):
        """
        Create temporal noise reduction filter.
        
        Args:
            smooth_alpha (float): Alpha factor for smoothing (0.0-1.0)
            smooth_delta (float): Delta threshold for edge-preserving
            persistence_control (int): Persistence control (0-8)
        """

class spatial_filter(filter):
    def __init__(smooth_alpha=0.5, smooth_delta=20.0, magnitude=2.0, hole_fill=0.0):
        """
        Create spatial noise reduction filter.
        
        Args:
            smooth_alpha (float): Alpha factor for smoothing (0.0-1.0)
            smooth_delta (float): Delta threshold for edge-preserving
            magnitude (float): Effect magnitude (1.0-5.0)
            hole_fill (float): Hole filling factor (0.0-5.0)
        """

class hole_filling_filter(filter):
    def __init__(mode=0):
        """
        Create hole filling filter.
        
        Args:
            mode (int): Filling mode
                0: fill_from_left
                1: farest_from_around
                2: nearest_from_around
        """

Format Conversion Filters

class disparity_transform(filter):
    def __init__(transform_to_disparity=True):
        """
        Create depth/disparity transform filter.
        
        Args:
            transform_to_disparity (bool): True for depth->disparity, False for disparity->depth
        """

class threshold_filter(filter):
    def __init__(min_dist=0.15, max_dist=4.0):
        """
        Create depth range threshold filter.
        
        Args:
            min_dist (float): Minimum distance in meters
            max_dist (float): Maximum distance in meters
        """

class units_transform(filter):
    def __init__():
        """Create depth units transformation filter."""

class yuy_decoder(filter):
    def __init__():
        """Create YUY format decoder filter."""

class rotation_filter(filter):
    def __init__(streams=[]):
        """
        Create frame rotation filter.
        
        Args:
            streams (list): Streams to apply rotation to
        """

class hdr_merge(filter):
    def __init__():
        """Create HDR frame merging filter."""

class sequence_id_filter(filter):
    def __init__(sequence_id=0.0):
        """
        Create sequence ID filter.
        
        Args:
            sequence_id (float): Sequence identifier
        """

Usage Examples

Basic Filter Chain

import pyrealsense2 as rs
import numpy as np

# Create pipeline
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

pipeline.start(config)

# Create filter chain
decimation = rs.decimation_filter(2.0)
spatial = rs.spatial_filter()
temporal = rs.temporal_filter()
hole_filling = rs.hole_filling_filter()
colorizer = rs.colorizer()

try:
    for i in range(100):
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        
        if not depth_frame:
            continue
            
        # Apply filter chain
        filtered = decimation.process(depth_frame)
        filtered = spatial.process(filtered)
        filtered = temporal.process(filtered)
        filtered = hole_filling.process(filtered)
        
        # Convert to depth frame for distance queries
        depth_filtered = filtered.as_depth_frame()
        
        # Get center distance
        width = depth_filtered.get_width()
        height = depth_filtered.get_height()
        center_dist = depth_filtered.get_distance(width // 2, height // 2)
        
        # Colorize for visualization
        colorized = colorizer.process(filtered)
        color_frame = colorized.as_video_frame()
        
        print(f"Frame {i}: center distance = {center_dist:.3f}m, "
              f"filtered size = {width}x{height}")
        
finally:
    pipeline.stop()

Point Cloud Generation

import pyrealsense2 as rs
import numpy as np

# Configure streams
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

pipeline.start(config)

# Create point cloud generator
pc = rs.pointcloud()
points = rs.points()

try:
    # Wait for frames with both color and depth
    for i in range(10):
        frames = pipeline.wait_for_frames()
        
        depth_frame = frames.get_depth_frame()
        color_frame = frames.get_color_frame()
        
        if not depth_frame or not color_frame:
            continue
            
        # Map color texture to point cloud
        pc.map_to(color_frame)
        
        # Generate point cloud
        points = pc.calculate(depth_frame)
        
        print(f"Frame {i}: Generated {points.size()} points")
        
        # Get vertices as numpy array
        vertices = np.asanyarray(points.get_vertices())
        print(f"  Vertex data shape: {vertices.shape}")
        
        # Get texture coordinates
        tex_coords = np.asanyarray(points.get_texture_coordinates())
        print(f"  Texture coordinate shape: {tex_coords.shape}")
        
        # Export first point cloud to PLY file
        if i == 0:
            points.export_to_ply("pointcloud.ply", color_frame)
            print("  Exported to pointcloud.ply")
        
finally:
    pipeline.stop()

Frame Alignment

import pyrealsense2 as rs
import numpy as np

# Configure streams
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

profile = pipeline.start(config)

# Create alignment filter to align depth to color
align_to_color = rs.align(rs.stream.color)

try:
    for i in range(10):
        frames = pipeline.wait_for_frames()
        
        # Align frames
        aligned_frames = align_to_color.process(frames)
        
        # Get aligned frames
        aligned_depth = aligned_frames.get_depth_frame()
        color_frame = aligned_frames.get_color_frame()
        
        if not aligned_depth or not color_frame:
            continue
            
        # Convert to numpy arrays
        depth_image = np.asanyarray(aligned_depth.get_data())
        color_image = np.asanyarray(color_frame.get_data())
        
        print(f"Frame {i}:")
        print(f"  Aligned depth: {depth_image.shape}")
        print(f"  Color: {color_image.shape}")
        
        # Now depth and color are pixel-aligned
        # Can directly correlate depth and color at same coordinates
        center_x, center_y = 320, 240
        depth_at_center = aligned_depth.get_distance(center_x, center_y)
        color_at_center = color_image[center_y, center_x]
        
        print(f"  Center pixel: depth={depth_at_center:.3f}m, "
              f"color={color_at_center}")
        
finally:
    pipeline.stop()

Advanced Filter Configuration

import pyrealsense2 as rs

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

pipeline.start(config)

# Create and configure filters
decimation = rs.decimation_filter()
decimation.set_option(rs.option.filter_magnitude, 3.0)

temporal = rs.temporal_filter()
temporal.set_option(rs.option.filter_smooth_alpha, 0.3)
temporal.set_option(rs.option.filter_smooth_delta, 30.0)
temporal.set_option(rs.option.holes_fill, 2)

spatial = rs.spatial_filter()
spatial.set_option(rs.option.filter_smooth_alpha, 0.6)
spatial.set_option(rs.option.filter_smooth_delta, 25.0)
spatial.set_option(rs.option.filter_magnitude, 3.0)
spatial.set_option(rs.option.holes_fill, 2.0)

hole_filling = rs.hole_filling_filter()
hole_filling.set_option(rs.option.holes_fill, 2)  # nearest_from_around

threshold = rs.threshold_filter()
threshold.set_option(rs.option.min_distance, 0.2)
threshold.set_option(rs.option.max_distance, 3.0)

colorizer = rs.colorizer()
colorizer.set_option(rs.option.color_scheme, 2)  # WhiteToBlack

print("Filter configurations:")
print(f"  Decimation magnitude: {decimation.get_option(rs.option.filter_magnitude)}")
print(f"  Temporal alpha: {temporal.get_option(rs.option.filter_smooth_alpha)}")
print(f"  Spatial magnitude: {spatial.get_option(rs.option.filter_magnitude)}")
print(f"  Threshold range: {threshold.get_option(rs.option.min_distance):.1f} - "
      f"{threshold.get_option(rs.option.max_distance):.1f}m")

try:
    for i in range(10):
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        
        if not depth_frame:
            continue
            
        # Apply comprehensive filter chain
        filtered = decimation.process(depth_frame)
        filtered = threshold.process(filtered)
        filtered = spatial.process(filtered)
        filtered = temporal.process(filtered)
        filtered = hole_filling.process(filtered)
        
        # Colorize result
        colorized = colorizer.process(filtered)
        
        # Get frame info
        depth_filtered = filtered.as_depth_frame()
        color_frame = colorized.as_video_frame()
        
        print(f"Frame {i}: {depth_filtered.get_width()}x{depth_filtered.get_height()} -> "
              f"{color_frame.get_width()}x{color_frame.get_height()}")
        
finally:
    pipeline.stop()

Custom Processing Block

import pyrealsense2 as rs
import numpy as np

def custom_processing_function(frame):
    """Custom processing function that inverts depth values."""
    if frame.is_depth_frame():
        depth_frame = frame.as_depth_frame()
        
        # Get frame data as numpy array
        depth_data = np.asanyarray(depth_frame.get_data())
        
        # Invert depth values (example processing)
        max_depth = depth_data.max()
        if max_depth > 0:
            inverted_data = max_depth - depth_data
            
            # Create new frame with processed data
            # (This is simplified - real implementation would need proper frame allocation)
            print(f"Processed depth frame: inverted {depth_data.shape} depth data")
        
    return frame

# Create custom processing block
custom_processor = rs.processing_block(custom_processing_function)

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

pipeline.start(config)

# Use custom processor with callback
def processed_frame_callback(processed_frame):
    if processed_frame.is_depth_frame():
        depth = processed_frame.as_depth_frame()
        print(f"Received processed frame #{depth.get_frame_number()}")

custom_processor.start(processed_frame_callback)

try:
    for i in range(10):
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        
        if depth_frame:
            # Send frame to custom processor
            custom_processor.invoke(depth_frame)
            
finally:
    pipeline.stop()

Frame Queue Processing

import pyrealsense2 as rs
import threading
import time

# Create frame queue for buffering
frame_queue = rs.frame_queue(capacity=10, keep_frames=True)

def processing_thread():
    """Background thread for frame processing."""
    processed_count = 0
    
    while True:
        try:
            # Wait for frame with timeout
            frame = frame_queue.wait_for_frame(timeout_ms=1000)
            
            if frame.is_depth_frame():
                depth_frame = frame.as_depth_frame()
                
                # Simulate processing time
                time.sleep(0.01)
                
                processed_count += 1
                print(f"Processed frame #{depth_frame.get_frame_number()} "
                      f"(total: {processed_count})")
                
        except rs.error:
            # Timeout or other error
            break

# Start processing thread
processor = threading.Thread(target=processing_thread)
processor.daemon = True
processor.start()

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

pipeline.start(config)

try:
    # Stream frames into queue
    for i in range(100):
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        
        if depth_frame:
            frame_queue.enqueue(depth_frame)
            
            # Print queue status
            if i % 10 == 0:
                print(f"Queue size: {frame_queue.size()}/{frame_queue.capacity()}")
        
        time.sleep(0.03)  # 30fps
        
finally:
    pipeline.stop()
    processor.join(timeout=1.0)

Filter Chain Recommendations

Standard Post-Processing Chain

# Recommended filter order for general use
filters = [
    rs.decimation_filter(2.0),      # Reduce resolution for performance
    rs.threshold_filter(0.15, 4.0), # Remove very close/far points
    rs.spatial_filter(),             # Spatial noise reduction
    rs.temporal_filter(),            # Temporal noise reduction  
    rs.hole_filling_filter()         # Fill holes in depth data
]

# Apply filters in sequence
def apply_filters(frame, filters):
    for filter in filters:
        frame = filter.process(frame)
    return frame

Performance-Oriented Chain

# Optimized for speed
filters = [
    rs.decimation_filter(4.0),       # Aggressive decimation
    rs.threshold_filter(0.2, 2.0),   # Narrow range
    rs.hole_filling_filter(0)        # Simple hole filling
]

Quality-Oriented Chain

# Optimized for quality
filters = [
    rs.decimation_filter(1.0),       # No decimation
    rs.spatial_filter(0.8, 15.0, 5.0, 5.0),  # Strong spatial filtering
    rs.temporal_filter(0.2, 50.0, 5),        # Strong temporal filtering
    rs.hole_filling_filter(2)                # Best hole filling
]

Install with Tessl CLI

npx tessl i tessl/pypi-pyrealsense2

docs

advanced-mode.md

context-device.md

frames.md

index.md

logging.md

options.md

pipeline.md

pointclouds.md

processing.md

recording.md

sensors.md

tile.json