CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyrealsense2

Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.

Pending
Overview
Eval results
Files

context-device.mddocs/

Context & Device Management

Device discovery, enumeration, and management capabilities for fine-grained control over RealSense devices. Provides low-level access to device features, hotplug detection, and multi-device scenarios.

Capabilities

Context

Central hub for device discovery and management.

class context:
    def __init__(json_settings=None):
        """
        Create a context for device management.
        
        Args:
            json_settings (str, optional): JSON configuration string
        """
    
    def query_devices() -> device_list:
        """
        Query all connected RealSense devices.
        
        Returns:
            device_list: Collection of available devices
        """
    
    def query_devices(product_line) -> device_list:
        """
        Query devices from specific product line.
        
        Args:
            product_line (product_line): Device family filter
            
        Returns:
            device_list: Filtered device collection
        """
    
    def query_all_sensors() -> list[sensor]:
        """
        Get all sensors from all devices.
        
        Returns:
            list[sensor]: All available sensors
        """
    
    @property
    def devices(self) -> device_list:
        """All connected devices."""
    
    @property  
    def sensors(self) -> list[sensor]:
        """All sensors from all devices."""
    
    def load_device(filename) -> device:
        """
        Load recorded device from file.
        
        Args:
            filename (str): Path to .bag file
            
        Returns:
            device: Playback device
        """
    
    def unload_device(filename):
        """
        Unload previously loaded device.
        
        Args:
            filename (str): Path to .bag file
        """
    
    def unload_tracking_module():
        """Unload tracking module if loaded."""
    
    def get_sensor_parent(sensor) -> device:
        """
        Get device that owns a sensor.
        
        Args:
            sensor (sensor): Sensor instance
            
        Returns:
            device: Parent device
        """
    
    def set_devices_changed_callback(callback_function):
        """
        Set callback for device connect/disconnect events.
        
        Args:
            callback_function: Called when devices change
        """

Device List

Collection of discovered devices.

class device_list:
    def __len__() -> int:
        """Number of devices in list."""
    
    def __getitem__(index) -> device:
        """Get device by index."""
    
    def __iter__():
        """Iterate over devices."""
    
    def size() -> int: 
        """Number of devices in list."""
    
    def contains(device) -> bool:
        """
        Check if device is in list.
        
        Args:
            device (device): Device to check
            
        Returns:
            bool: True if device is present
        """
    
    def front() -> device:
        """Get first device."""
    
    def back() -> device:
        """Get last device."""

Base Device

Core device interface with common functionality.

class device:
    def query_sensors() -> list[sensor]:
        """
        Get all sensors on this device.
        
        Returns:
            list[sensor]: Device sensors
        """
    
    @property
    def sensors(self) -> list[sensor]:
        """All sensors on this device."""
    
    def first_depth_sensor() -> depth_sensor:
        """
        Get first depth sensor.
        
        Returns:
            depth_sensor: First depth sensor found
            
        Raises:
            rs.error: If no depth sensor available
        """
    
    def first_roi_sensor() -> roi_sensor:
        """
        Get first ROI sensor.
        
        Returns:
            roi_sensor: First ROI sensor found
        """
    
    def first_pose_sensor() -> pose_sensor:
        """
        Get first pose sensor.
        
        Returns:
            pose_sensor: First pose sensor found
        """
    
    def first_color_sensor() -> color_sensor:
        """
        Get first color sensor.
        
        Returns:
            color_sensor: First color sensor found  
        """
    
    def first_motion_sensor() -> motion_sensor:
        """
        Get first motion sensor.
        
        Returns:
            motion_sensor: First motion sensor found
        """
    
    def first_fisheye_sensor() -> fisheye_sensor:
        """
        Get first fisheye sensor.
        
        Returns:
            fisheye_sensor: First fisheye sensor found
        """
    
    def supports(camera_info) -> bool:
        """
        Check if device supports specific information field.
        
        Args:
            camera_info (camera_info): Information field to check
            
        Returns:
            bool: True if information is available
        """
    
    def get_info(camera_info) -> str:
        """
        Get device information string.
        
        Args:
            camera_info (camera_info): Information field to retrieve
            
        Returns:
            str: Information value
            
        Raises:
            rs.error: If information not supported
        """
    
    def hardware_reset():
        """Perform hardware reset of device."""
    
    def is_connected() -> bool:
        """
        Check if device is currently connected.
        
        Returns:
            bool: True if device is connected
        """
    
    def is_metadata_enabled() -> bool:
        """
        Check if metadata is enabled for frames.
        
        Returns:
            bool: True if metadata is enabled
        """
    
    # Type checking methods
    def is_updatable() -> bool:
        """Check if device supports firmware updates."""
    
    def is_update_device() -> bool:
        """Check if device is in update mode."""
    
    def is_auto_calibrated_device() -> bool:
        """Check if device supports auto-calibration."""
    
    def is_playback() -> bool:
        """Check if device is a playback device."""
    
    def is_recorder() -> bool:
        """Check if device is a recorder."""
        
    # Type casting methods  
    def as_updatable() -> updatable:
        """Cast to updatable device."""
    
    def as_update_device() -> update_device:
        """Cast to update device."""
    
    def as_auto_calibrated_device() -> auto_calibrated_device:
        """Cast to auto-calibrated device."""
    
    def as_playback() -> playback:
        """Cast to playback device."""
    
    def as_recorder() -> recorder:
        """Cast to recorder device."""

Specialized Device Types

Updatable Device

class updatable(device):
    def enter_update_state():
        """Enter firmware update mode."""
    
    def create_flash_backup() -> bytes:
        """
        Create backup of device firmware.
        
        Returns:
            bytes: Firmware backup data
        """
    
    def create_flash_backup(progress_callback) -> bytes:
        """
        Create firmware backup with progress callback.
        
        Args:
            progress_callback: Function called with progress updates
            
        Returns:
            bytes: Firmware backup data
        """
    
    def update_unsigned(fw_image, update_mode=rs.update_mode.UPDATE):
        """
        Update firmware with unsigned image.
        
        Args:
            fw_image (bytes): Firmware image data
            update_mode (update_mode): Update mode flags
        """
    
    def update_unsigned(fw_image, progress_callback, update_mode=rs.update_mode.UPDATE):
        """
        Update firmware with progress callback.
        
        Args:
            fw_image (bytes): Firmware image data
            progress_callback: Function called with progress updates
            update_mode (update_mode): Update mode flags
        """
    
    def check_firmware_compatibility(fw_image) -> bool:
        """
        Check if firmware image is compatible.
        
        Args:
            fw_image (bytes): Firmware image to check
            
        Returns:
            bool: True if compatible
        """

Update Device

class update_device(device):
    def update(fw_image):
        """
        Apply firmware update.
        
        Args:
            fw_image (bytes): Firmware image data
        """
    
    def update(fw_image, progress_callback):
        """
        Apply firmware update with progress.
        
        Args:
            fw_image (bytes): Firmware image data  
            progress_callback: Function called with progress updates
        """

Auto-Calibrated Device

class auto_calibrated_device(device):
    def write_calibration():
        """Write calibration data to device."""
    
    def run_on_chip_calibration(json_content, timeout_ms) -> tuple[bytes, tuple[float, float]]:
        """
        Run on-chip calibration.
        
        Args:
            json_content (str): Calibration configuration JSON
            timeout_ms (int): Operation timeout
            
        Returns:
            tuple: (calibration_table, (health, accuracy))
        """
    
    def run_on_chip_calibration(json_content, progress_callback, timeout_ms) -> tuple[bytes, tuple[float, float]]:
        """
        Run on-chip calibration with progress.
        
        Args:
            json_content (str): Calibration configuration JSON
            progress_callback: Function called with progress updates
            timeout_ms (int): Operation timeout
            
        Returns:
            tuple: (calibration_table, (health, accuracy))
        """
    
    def run_tare_calibration(ground_truth_mm, json_content, timeout_ms) -> tuple[bytes, tuple[float, float]]:
        """
        Run tare calibration.
        
        Args:
            ground_truth_mm (float): Ground truth distance in mm
            json_content (str): Calibration configuration JSON
            timeout_ms (int): Operation timeout
            
        Returns:
            tuple: (calibration_table, (health1, health2))
        """
    
    def process_calibration_frame(frame, timeout_ms) -> tuple[bytes, tuple[float, float]]:
        """
        Process single calibration frame.
        
        Args:
            frame (frame): Calibration frame
            timeout_ms (int): Operation timeout
            
        Returns:
            tuple: (calibration_table, (health1, health2))
        """

Device Calibration

Device with calibration trigger capabilities.

class device_calibration(device):
    def trigger_device_calibration(calibration_type) -> float:
        """
        Trigger device calibration process.
        
        Args:
            calibration_type (calibration_type): Type of calibration to perform
            
        Returns:
            float: Calibration result value
        """
    
    def register_calibration_change_callback(callback_function):
        """
        Register callback for calibration changes.
        
        Args:
            callback_function: Function called when calibration changes
        """

Calibration Change Device

Device that can notify about calibration changes.

class calibration_change_device(device):
    def register_calibration_change_callback(callback_function):
        """
        Register callback for calibration state changes.
        
        Args:
            callback_function: Function called when calibration state changes
        """

Debug Protocol

Low-level device communication interface.

class debug_protocol:
    def build_command(opcode, param1=0, param2=0, param3=0, param4=0, data=None) -> bytes:
        """
        Build debug command packet.
        
        Args:
            opcode (int): Operation code
            param1-param4 (int): Command parameters
            data (bytes, optional): Additional data payload
            
        Returns:
            bytes: Command packet
        """
    
    def send_and_receive_raw_data(command) -> tuple[int, bytes]:
        """
        Send raw command and receive response.
        
        Args:
            command (bytes): Command packet
            
        Returns:
            tuple: (status_code, response_data)
        """

Event Information

Device hotplug event data.

class event_information:
    def was_removed(device) -> bool:
        """
        Check if device was removed.
        
        Args:
            device (device): Device to check
            
        Returns:
            bool: True if device was removed
        """
    
    def was_added(device) -> bool:
        """
        Check if device was added.
        
        Args:
            device (device): Device to check
            
        Returns:
            bool: True if device was added
        """
    
    def get_new_devices() -> list[device]:
        """
        Get list of newly added devices.
        
        Returns:
            list[device]: New devices
        """

Usage Examples

Device Discovery

import pyrealsense2 as rs

# Create context and discover devices
ctx = rs.context()
devices = ctx.query_devices()

print(f"Found {len(devices)} RealSense device(s)")

for i, device in enumerate(devices):
    print(f"\nDevice {i}:")
    print(f"  Name: {device.get_info(rs.camera_info.name)}")
    print(f"  Serial: {device.get_info(rs.camera_info.serial_number)}")
    print(f"  Firmware: {device.get_info(rs.camera_info.firmware_version)}")
    print(f"  Product Line: {device.get_info(rs.camera_info.product_line)}")
    
    # List sensors
    sensors = device.query_sensors()
    print(f"  Sensors: {len(sensors)}")
    for j, sensor in enumerate(sensors):
        if sensor.supports(rs.camera_info.name):
            print(f"    {j}: {sensor.get_info(rs.camera_info.name)}")

Device Information

import pyrealsense2 as rs

ctx = rs.context()
devices = ctx.query_devices()

if len(devices) == 0:
    print("No devices found")
    exit()

device = devices[0]

# Check all available information
info_types = [
    rs.camera_info.name,
    rs.camera_info.serial_number,
    rs.camera_info.firmware_version,
    rs.camera_info.physical_port,
    rs.camera_info.debug_op_code,
    rs.camera_info.advanced_mode,
    rs.camera_info.product_id,
    rs.camera_info.camera_locked,
    rs.camera_info.usb_type_descriptor,
    rs.camera_info.product_line
]

print("Device Information:")
for info_type in info_types:
    if device.supports(info_type):
        value = device.get_info(info_type)
        print(f"  {info_type.name}: {value}")

Hotplug Detection

import pyrealsense2 as rs
import time

def device_changed_callback(info):
    devices = info.get_new_devices()
    
    for device in devices:
        if info.was_added(device):
            print(f"Device connected: {device.get_info(rs.camera_info.serial_number)}")
        elif info.was_removed(device):
            print(f"Device disconnected: {device.get_info(rs.camera_info.serial_number)}")

ctx = rs.context()
ctx.set_devices_changed_callback(device_changed_callback)

print("Monitoring device connections. Press Ctrl+C to stop.")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping device monitoring")

Multi-Device Setup

import pyrealsense2 as rs

ctx = rs.context()
devices = ctx.query_devices()

if len(devices) < 2:
    print("Need at least 2 devices for this example")
    exit()

# Configure pipeline for each device
pipelines = []
configs = []

for i, device in enumerate(devices[:2]):  # Use first 2 devices
    serial = device.get_info(rs.camera_info.serial_number)
    
    config = rs.config()
    config.enable_device(serial)
    config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
    
    pipeline = rs.pipeline(ctx)
    
    pipelines.append(pipeline)
    configs.append(config)
    
    print(f"Configured device {i}: {serial}")

# Start all pipelines
for i, (pipeline, config) in enumerate(zip(pipelines, configs)):
    pipeline.start(config)
    print(f"Started pipeline {i}")

try:
    for frame_count in range(100):
        for i, pipeline in enumerate(pipelines):
            frames = pipeline.poll_for_frames()
            if frames:
                depth = frames.get_depth_frame()
                if depth:
                    print(f"Device {i} frame {depth.get_frame_number()}")
                    
finally:
    for pipeline in pipelines:
        pipeline.stop()

Device Capabilities Detection

import pyrealsense2 as rs

ctx = rs.context()
devices = ctx.query_devices()

for device in devices:
    serial = device.get_info(rs.camera_info.serial_number)
    print(f"\nDevice {serial} capabilities:")
    
    # Check device types
    capabilities = []
    if device.is_updatable():
        capabilities.append("Firmware Update")
    if device.is_auto_calibrated_device():
        capabilities.append("Auto-Calibration")
        
    print(f"  Device Types: {', '.join(capabilities) if capabilities else 'Basic'}")
    
    # Check sensor types
    sensor_types = []
    try:
        device.first_depth_sensor()
        sensor_types.append("Depth")
    except:
        pass
        
    try:
        device.first_color_sensor()
        sensor_types.append("Color")
    except:
        pass
        
    try:
        device.first_motion_sensor()
        sensor_types.append("Motion")
    except:
        pass
        
    try:
        device.first_pose_sensor()
        sensor_types.append("Pose")
    except:
        pass
        
    print(f"  Sensor Types: {', '.join(sensor_types)}")
    
    # List stream capabilities
    sensors = device.query_sensors()
    for i, sensor in enumerate(sensors):
        if sensor.supports(rs.camera_info.name):
            sensor_name = sensor.get_info(rs.camera_info.name)
            profiles = sensor.get_stream_profiles()
            print(f"  Sensor {i} ({sensor_name}): {len(profiles)} stream profiles")

Firmware Update Example

import pyrealsense2 as rs

ctx = rs.context()
devices = ctx.query_devices()

# Find updatable device
updatable_device = None
for device in devices:
    if device.is_updatable():
        updatable_device = device.as_updatable()
        break

if not updatable_device:
    print("No updatable devices found")
    exit()

print(f"Found updatable device: {updatable_device.get_info(rs.camera_info.serial_number)}")

# Read firmware file
try:
    with open("firmware.bin", "rb") as f:
        fw_image = f.read()
        
    print("Checking firmware compatibility...")
    if updatable_device.check_firmware_compatibility(fw_image):
        print("Firmware is compatible")
        
        # Create backup first
        print("Creating firmware backup...")
        backup = updatable_device.create_flash_backup()
        
        with open("firmware_backup.bin", "wb") as f:
            f.write(backup)
        print("Backup saved")
        
        # Update firmware (commented out for safety)
        # print("Updating firmware...")
        # updatable_device.update_unsigned(fw_image)
        # print("Firmware update complete")
        
    else:
        print("Firmware is not compatible with this device")
        
except FileNotFoundError:
    print("Firmware file 'firmware.bin' not found")
except Exception as e:
    print(f"Error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-pyrealsense2

docs

advanced-mode.md

context-device.md

frames.md

index.md

logging.md

options.md

pipeline.md

pointclouds.md

processing.md

recording.md

sensors.md

tile.json