CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyrealsense2

Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.

Pending
Overview
Eval results
Files

pipeline.mddocs/

Pipeline API

High-level streaming interface that simplifies RealSense camera usage by providing automatic device selection, stream configuration, and synchronized frame delivery. The Pipeline API is the recommended entry point for most applications.

Capabilities

Pipeline

Main streaming interface that manages device lifecycle and provides coherent framesets.

class pipeline:
    def __init__(context=None):
        """
        Create a pipeline for streaming.
        
        Args:
            context (context, optional): Custom context for device management
        """
    
    def start() -> pipeline_profile:
        """
        Start streaming with default configuration.
        
        Returns:
            pipeline_profile: Active pipeline configuration
        """
    
    def start(config) -> pipeline_profile:
        """
        Start streaming with specified configuration.
        
        Args:
            config (config): Stream configuration
            
        Returns:
            pipeline_profile: Active pipeline configuration
        """
    
    def start(callback_function) -> pipeline_profile:
        """
        Start streaming with frame callback.
        
        Args:
            callback_function: Function called for each frameset
            
        Returns:
            pipeline_profile: Active pipeline configuration
        """
    
    def start(config, callback_function) -> pipeline_profile:
        """
        Start streaming with configuration and callback.
        
        Args:
            config (config): Stream configuration
            callback_function: Function called for each frameset
            
        Returns:
            pipeline_profile: Active pipeline configuration
        """
    
    def start(frame_queue) -> pipeline_profile:
        """
        Start streaming into a frame queue.
        
        Args:
            frame_queue (frame_queue): Queue for frame buffering
            
        Returns:
            pipeline_profile: Active pipeline configuration
        """
    
    def start(config, frame_queue) -> pipeline_profile:
        """
        Start streaming with configuration into frame queue.
        
        Args:
            config (config): Stream configuration
            frame_queue (frame_queue): Queue for frame buffering
            
        Returns:
            pipeline_profile: Active pipeline configuration
        """
    
    def stop():
        """Stop streaming and release resources."""
    
    def wait_for_frames(timeout_ms=5000) -> frameset:
        """
        Wait for the next set of coherent frames.
        
        Args:
            timeout_ms (int): Maximum wait time in milliseconds
            
        Returns:
            frameset: Synchronized frames from all active streams
            
        Raises:
            rs.error: If timeout expires or device error occurs
        """
    
    def poll_for_frames() -> frameset:
        """
        Try to get the next frameset without blocking.
        
        Returns:
            frameset: Available frames or None if no frames ready
        """
    
    def try_wait_for_frames(timeout_ms=5000) -> tuple[bool, frameset]:
        """
        Try to wait for frames with timeout.
        
        Args:
            timeout_ms (int): Maximum wait time in milliseconds
            
        Returns:
            tuple: (success, frameset) - success indicates if frames were obtained
        """
    
    def get_active_profile() -> pipeline_profile:
        """
        Get the currently active pipeline configuration.
        
        Returns:
            pipeline_profile: Current configuration
        """
    
    def set_device(device):
        """
        Set a specific device for the pipeline to use.
        
        Args:
            device (device): Device to use for streaming
        """

Configuration

Stream and device configuration for pipeline.

class config:
    def enable_stream(stream_type):
        """
        Enable stream with default parameters.
        
        Args:
            stream_type (stream): Stream type to enable
        """
    
    def enable_stream(stream_type, stream_index):
        """
        Enable specific stream instance.
        
        Args:
            stream_type (stream): Stream type
            stream_index (int): Stream index for multi-instance streams
        """
    
    def enable_stream(stream_type, format, framerate):
        """
        Enable stream with format and framerate.
        
        Args:
            stream_type (stream): Stream type
            format (format): Data format
            framerate (int): Frames per second
        """
    
    def enable_stream(stream_type, width, height, format, framerate):
        """
        Enable video stream with full parameters.
        
        Args:
            stream_type (stream): Stream type
            width (int): Frame width in pixels
            height (int): Frame height in pixels
            format (format): Data format
            framerate (int): Frames per second
        """
    
    def enable_stream(stream_type, stream_index, width, height, format, framerate):
        """
        Enable specific video stream instance with full parameters.
        
        Args:
            stream_type (stream): Stream type
            stream_index (int): Stream index for multi-instance streams
            width (int): Frame width in pixels
            height (int): Frame height in pixels
            format (format): Data format
            framerate (int): Frames per second
        """
    
    def enable_all_streams():
        """Enable all available streams with default parameters."""
    
    def disable_stream(stream_type, index=-1):
        """
        Disable specific stream.
        
        Args:
            stream_type (stream): Stream type to disable
            index (int): Stream index (-1 for all instances)
        """
    
    def disable_all_streams():
        """Disable all previously enabled streams."""
    
    def enable_device(serial_number):
        """
        Specify device to use by serial number.
        
        Args:
            serial_number (str): Device serial number
        """
    
    def enable_device_from_file(filename, repeat_playback=True):
        """
        Enable playback from recorded file.
        
        Args:
            filename (str): Path to recorded .bag file
            repeat_playback (bool): Loop playback when file ends
        """
    
    def enable_record_to_file(filename):
        """
        Enable recording to file.
        
        Args:
            filename (str): Output .bag file path
        """
    
    def resolve(pipeline) -> pipeline_profile:
        """
        Resolve configuration against pipeline.
        
        Args:
            pipeline (pipeline): Pipeline to resolve against
            
        Returns:
            pipeline_profile: Resolved configuration
            
        Raises:
            rs.error: If configuration cannot be resolved
        """
    
    def can_resolve(pipeline) -> bool:
        """
        Check if configuration can be resolved.
        
        Args:
            pipeline (pipeline): Pipeline to check against
            
        Returns:
            bool: True if configuration is valid
        """

Pipeline Profile

Active pipeline configuration and device information.

class pipeline_profile:
    def get_streams() -> list[stream_profile]:
        """
        Get all active stream profiles.
        
        Returns:
            list[stream_profile]: Active stream configurations
        """
    
    def get_stream(stream_type, stream_index=-1) -> stream_profile:
        """
        Get specific stream profile.
        
        Args:
            stream_type (stream): Stream type
            stream_index (int): Stream index (-1 for any)
            
        Returns:
            stream_profile: Stream configuration
            
        Raises:
            rs.error: If stream not found
        """
    
    def get_device() -> device:
        """
        Get the device used by this pipeline.
        
        Returns:
            device: Associated device
        """

Usage Examples

Basic Depth Streaming

import pyrealsense2 as rs
import numpy as np

# Create pipeline and configure depth stream
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

# Start streaming
profile = pipeline.start(config)

try:
    for _ in range(100):
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        
        if not depth_frame:
            continue
            
        # Convert to numpy array
        depth_data = np.asanyarray(depth_frame.get_data())
        
        # Process depth data
        print(f"Frame {depth_frame.get_frame_number()}: "
              f"Mean depth = {np.mean(depth_data)} units")
              
finally:
    pipeline.stop()

Multi-Stream Configuration

import pyrealsense2 as rs

pipeline = rs.pipeline()
config = rs.config()

# Enable multiple streams
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.infrared, 1, 640, 480, rs.format.y8, 30)
config.enable_stream(rs.stream.infrared, 2, 640, 480, rs.format.y8, 30)

# Start with configuration
profile = pipeline.start(config)

# Get device info
device = profile.get_device()
print(f"Using device: {device.get_info(rs.camera_info.name)}")

try:
    while True:
        frames = pipeline.wait_for_frames()
        
        depth = frames.get_depth_frame()
        color = frames.get_color_frame()
        ir1 = frames.get_infrared_frame(1)
        ir2 = frames.get_infrared_frame(2)
        
        if depth and color and ir1 and ir2:
            print(f"Got complete frameset at {frames.get_timestamp()}")
            
finally:
    pipeline.stop()

Asynchronous Processing with Callback

import pyrealsense2 as rs
import threading

frame_count = 0
frame_lock = threading.Lock()

def frame_callback(frames):
    global frame_count
    with frame_lock:
        frame_count += 1
        if frame_count % 30 == 0:  # Print every second at 30fps
            depth = frames.get_depth_frame()
            if depth:
                print(f"Processed {frame_count} framesets")

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

# Start with callback
pipeline.start(config, frame_callback)

try:
    # Let it run for 10 seconds
    time.sleep(10)
finally:
    pipeline.stop()

Recording Session

import pyrealsense2 as rs

pipeline = rs.pipeline()
config = rs.config()

# Configure streams and recording
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_record_to_file("session.bag")

# Start recording
pipeline.start(config)

try:
    for i in range(300):  # Record 10 seconds at 30fps
        frames = pipeline.wait_for_frames()
        print(f"Recording frame {i}")
finally:
    pipeline.stop()
    print("Recording saved to session.bag")

Playback from File

import pyrealsense2 as rs

# Configure playback
config = rs.config()
config.enable_device_from_file("session.bag", repeat_playback=False)

pipeline = rs.pipeline()
profile = pipeline.start(config)

# Get playback device
device = profile.get_device()
playback = device.as_playback()

print(f"Playing back {playback.file_name()}")
print(f"Duration: {playback.get_duration() / 1e9:.1f} seconds")

try:
    while True:
        frames = pipeline.wait_for_frames()
        depth = frames.get_depth_frame()
        
        if depth:
            timestamp = frames.get_timestamp()
            print(f"Playback timestamp: {timestamp / 1000:.3f}s")
            
except rs.error:
    print("Playback finished")
finally:
    pipeline.stop()

Common Patterns

Error Handling

import pyrealsense2 as rs

pipeline = rs.pipeline()
config = rs.config()

try:
    # Attempt to configure streams
    config.enable_stream(rs.stream.depth, 1920, 1080, rs.format.z16, 60)  # May not be supported
    
    # Check if configuration is valid
    if config.can_resolve(pipeline):
        profile = pipeline.start(config)
    else:
        print("Configuration not supported, using defaults")
        profile = pipeline.start()
        
except rs.error as e:
    print(f"RealSense error: {e}")
    # Fallback to safe configuration
    config.disable_all_streams()
    config.enable_stream(rs.stream.depth)  # Use defaults
    profile = pipeline.start(config)

Device Selection

import pyrealsense2 as rs

# Discover available devices
ctx = rs.context()
devices = ctx.query_devices()

if len(devices) == 0:
    print("No RealSense devices found")
    exit()

# Use specific device
device = devices[0]
serial = device.get_info(rs.camera_info.serial_number)

config = rs.config()
config.enable_device(serial)
config.enable_stream(rs.stream.depth)

pipeline = rs.pipeline()
pipeline.start(config)

Install with Tessl CLI

npx tessl i tessl/pypi-pyrealsense2

docs

advanced-mode.md

context-device.md

frames.md

index.md

logging.md

options.md

pipeline.md

pointclouds.md

processing.md

recording.md

sensors.md

tile.json