CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyrealsense2

Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.

Pending
Overview
Eval results
Files

frames.mddocs/

Frame Handling

Frame data access and manipulation with support for depth, color, infrared, and motion data. Provides seamless integration with NumPy through Python's buffer protocol and comprehensive metadata access.

Capabilities

Base Frame

Core frame interface providing common functionality for all frame types.

class frame:
    def get_timestamp() -> float:
        """
        Get frame timestamp.
        
        Returns:
            float: Timestamp in milliseconds
        """
    
    def get_frame_timestamp_domain() -> timestamp_domain:
        """
        Get timestamp domain (hardware vs system clock).
        
        Returns:
            timestamp_domain: Timestamp source
        """
    
    def get_frame_number() -> int:
        """
        Get frame sequence number.
        
        Returns:
            int: Frame number from start of streaming
        """
    
    def get_data_size() -> int:
        """
        Get frame data size in bytes.
        
        Returns:
            int: Data size in bytes
        """
    
    def get_profile() -> stream_profile:
        """
        Get stream profile that produced this frame.
        
        Returns:
            stream_profile: Associated stream configuration
        """
    
    # Properties (read-only)
    @property
    def timestamp(self) -> float:
        """Frame timestamp in milliseconds."""
    
    @property
    def frame_timestamp_domain(self) -> timestamp_domain:
        """Timestamp domain."""
    
    @property
    def frame_number(self) -> int:
        """Frame sequence number."""
    
    @property
    def profile(self) -> stream_profile:
        """Associated stream profile."""
    
    @property
    def data(self) -> BufData:
        """Frame data with buffer protocol support."""
    
    def get_data() -> BufData:
        """
        Get frame data with Python buffer protocol support.
        
        Returns:
            BufData: Frame data accessible as NumPy array
        """
    
    def get_frame_metadata(metadata_type) -> int:
        """
        Get frame metadata value.
        
        Args:
            metadata_type (frame_metadata_value): Metadata field to retrieve
            
        Returns:
            int: Metadata value
            
        Raises:
            rs.error: If metadata not supported
        """
    
    def supports_frame_metadata(metadata_type) -> bool:
        """
        Check if frame supports specific metadata.
        
        Args:
            metadata_type (frame_metadata_value): Metadata field to check
            
        Returns:
            bool: True if metadata is available
        """
    
    def keep():
        """
        Extend frame lifetime beyond current scope.
        Used when frame needs to persist after callback returns.
        """
    
    def swap(other_frame):
        """
        Swap frame contents with another frame.
        
        Args:
            other_frame (frame): Frame to swap with
        """
    
    # Type checking methods
    def is_video_frame() -> bool:
        """Check if frame is a video frame."""
    
    def is_depth_frame() -> bool:
        """Check if frame is a depth frame."""
    
    def is_disparity_frame() -> bool:
        """Check if frame is a disparity frame."""
    
    def is_motion_frame() -> bool:
        """Check if frame is a motion frame."""
    
    def is_pose_frame() -> bool:
        """Check if frame is a pose frame."""
    
    def is_frameset() -> bool:
        """Check if frame is a frameset (composite frame)."""
    
    def is_points() -> bool:
        """Check if frame is a point cloud."""
    
    # Type casting methods
    def as_video_frame() -> video_frame:
        """Cast to video frame."""
    
    def as_depth_frame() -> depth_frame:
        """Cast to depth frame."""
    
    def as_disparity_frame() -> disparity_frame:
        """Cast to disparity frame."""
    
    def as_motion_frame() -> motion_frame:
        """Cast to motion frame."""
    
    def as_pose_frame() -> pose_frame:
        """Cast to pose frame."""
    
    def as_frameset() -> frameset:
        """Cast to frameset."""
    
    def as_points() -> points:
        """Cast to point cloud."""

Video Frame

Frame containing 2D image data (color, infrared, etc.).

class video_frame(frame):
    def get_width() -> int:
        """
        Get frame width.
        
        Returns:
            int: Width in pixels
        """
    
    def get_height() -> int:
        """
        Get frame height.
        
        Returns:
            int: Height in pixels
        """
    
    def get_stride_in_bytes() -> int:
        """
        Get row stride in bytes.
        
        Returns:
            int: Bytes per row including padding
        """
    
    def get_bits_per_pixel() -> int:
        """
        Get bits per pixel.
        
        Returns:
            int: Bits per pixel
        """
    
    def get_bytes_per_pixel() -> int:
        """
        Get bytes per pixel.
        
        Returns:
            int: Bytes per pixel
        """
    
    def extract_target_dimensions(target_type) -> list[float]:
        """
        Extract target dimensions for specific use case.
        
        Args:
            target_type: Target type for dimension extraction
            
        Returns:
            list[float]: Extracted dimensions
        """
    
    # Properties
    @property
    def width(self) -> int:
        """Frame width in pixels."""
    
    @property
    def height(self) -> int:
        """Frame height in pixels."""
    
    @property
    def stride_in_bytes(self) -> int:
        """Row stride in bytes."""
    
    @property
    def bits_per_pixel(self) -> int:
        """Bits per pixel."""
    
    @property
    def bytes_per_pixel(self) -> int:
        """Bytes per pixel."""

Depth Frame

Video frame containing depth data with distance query capabilities.

class depth_frame(video_frame):
    def get_distance(x, y) -> float:
        """
        Get distance at specific pixel coordinate.
        
        Args:
            x (int): Pixel x-coordinate
            y (int): Pixel y-coordinate
            
        Returns:
            float: Distance in meters (0 if invalid)
        """
    
    def get_units() -> float:
        """
        Get depth units scaling factor.
        
        Returns:
            float: Meters per depth unit
        """

Disparity Frame

Depth frame containing disparity data (inverse depth).

class disparity_frame(depth_frame):
    def get_baseline() -> float:
        """
        Get stereo baseline distance.
        
        Returns:
            float: Baseline in millimeters
        """

Motion Frame

Frame containing IMU sensor data (accelerometer, gyroscope).

class motion_frame(frame):
    def get_motion_data() -> vector:
        """
        Get motion sensor data.
        
        Returns:
            vector: 3D motion data (acceleration or angular velocity)
        """
    
    def get_combined_motion_data() -> vector:
        """
        Get combined motion data from multiple sensors.
        
        Returns:
            vector: Combined motion data
        """
    
    @property
    def motion_data(self) -> vector:
        """Motion sensor data."""

Pose Frame

Frame containing 6DOF pose data (position and orientation).

class pose_frame(frame):
    def get_pose_data() -> pose_data:
        """
        Get pose data.
        
        Returns:
            pose_data: 6DOF pose information
        """
    
    @property
    def pose_data(self) -> pose_data:
        """6DOF pose data."""

Frameset

Collection of synchronized frames from multiple streams.

class frameset(frame):
    def size() -> int:
        """
        Get number of frames in set.
        
        Returns:
            int: Number of frames
        """
    
    def __len__() -> int:
        """Number of frames in set."""
    
    def __getitem__(index) -> frame:
        """Get frame by index."""
    
    def __iter__():
        """Iterate over frames."""
    
    def first(stream_type, format=rs.format.any) -> frame:
        """
        Get first frame of specified type.
        
        Args:
            stream_type (stream): Stream type to find
            format (format): Optional format filter
            
        Returns:
            frame: First matching frame
            
        Raises:
            rs.error: If no matching frame found
        """
    
    def first_or_default(stream_type, format=rs.format.any) -> frame:
        """
        Get first frame of specified type or None.
        
        Args:
            stream_type (stream): Stream type to find
            format (format): Optional format filter
            
        Returns:
            frame: First matching frame or None
        """
    
    def get_depth_frame() -> depth_frame:
        """
        Get depth frame from set.
        
        Returns:
            depth_frame: Depth frame or None if not present
        """
    
    def get_color_frame() -> video_frame:
        """
        Get color frame from set.
        
        Returns:
            video_frame: Color frame or None if not present
        """
    
    def get_infrared_frame(index=0) -> video_frame:
        """
        Get infrared frame from set.
        
        Args:
            index (int): Infrared stream index
            
        Returns:
            video_frame: Infrared frame or None if not present
        """
    
    def get_fisheye_frame(index=0) -> video_frame:
        """
        Get fisheye frame from set.
        
        Args:
            index (int): Fisheye stream index
            
        Returns:
            video_frame: Fisheye frame or None if not present
        """
    
    def get_pose_frame(index=0) -> pose_frame:
        """
        Get pose frame from set.
        
        Args:
            index (int): Pose stream index
            
        Returns:
            pose_frame: Pose frame or None if not present
        """
    
    def foreach(callback_function):
        """
        Apply callback to each frame in set.
        
        Args:
            callback_function: Function called for each frame
        """

Point Cloud Frame

Frame containing 3D point cloud data.

class points(frame):
    def get_vertices(dims=1) -> BufData:
        """
        Get vertex data as buffer.
        
        Args:
            dims (int): Dimensionality (1 for flat array, 2 for structured)
            
        Returns:
            BufData: Vertex coordinates buffer
        """
    
    def get_texture_coordinates(dims=1) -> BufData:
        """
        Get texture coordinate data as buffer.
        
        Args:
            dims (int): Dimensionality
            
        Returns:
            BufData: Texture coordinates buffer
        """
    
    def export_to_ply(filename, texture_frame):
        """
        Export point cloud to PLY file.
        
        Args:
            filename (str): Output file path
            texture_frame (video_frame): Frame to use for texture
        """
    
    def size() -> int:
        """
        Get number of points.
        
        Returns:
            int: Point count
        """

Buffer Data

Python buffer protocol support for zero-copy NumPy integration.

class BufData:
    """
    Buffer protocol implementation for frame data.
    Enables zero-copy access to frame data as NumPy arrays.
    """

Data Types

3D Vector

class vector:
    x: float  # X component
    y: float  # Y component
    z: float  # Z component

3D Vertex

class vertex:
    x: float  # X coordinate
    y: float  # Y coordinate  
    z: float  # Z coordinate

Texture Coordinate

class texture_coordinate:
    u: float  # U texture coordinate
    v: float  # V texture coordinate

Quaternion

class quaternion:
    x: float  # X component
    y: float  # Y component
    z: float  # Z component
    w: float  # W component

Pose Data

class pose_data:
    translation: vector      # Position (x, y, z)
    velocity: vector         # Velocity (x, y, z)  
    acceleration: vector     # Acceleration (x, y, z)
    rotation: quaternion     # Orientation quaternion
    angular_velocity: vector # Angular velocity (x, y, z)
    angular_acceleration: vector # Angular acceleration (x, y, z)
    tracker_confidence: int  # Tracking confidence level
    mapper_confidence: int   # Mapping confidence level

Usage Examples

Basic Frame Processing

import pyrealsense2 as rs
import numpy as np

# Start pipeline
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

pipeline.start(config)

try:
    for i in range(100):
        frames = pipeline.wait_for_frames()
        
        # Get individual frames
        depth_frame = frames.get_depth_frame()
        color_frame = frames.get_color_frame()
        
        if not depth_frame or not color_frame:
            continue
            
        # Convert to NumPy arrays (zero-copy)
        depth_image = np.asanyarray(depth_frame.get_data())
        color_image = np.asanyarray(color_frame.get_data())
        
        # Frame information
        print(f"Frame {depth_frame.get_frame_number()}:")
        print(f"  Depth: {depth_image.shape} {depth_image.dtype}")
        print(f"  Color: {color_image.shape} {color_image.dtype}")
        print(f"  Timestamp: {depth_frame.get_timestamp():.1f}ms")
        
        # Distance at center pixel
        width = depth_frame.get_width()
        height = depth_frame.get_height()
        center_distance = depth_frame.get_distance(width // 2, height // 2)
        print(f"  Center distance: {center_distance:.3f}m")
        
finally:
    pipeline.stop()

Frame Metadata Access

import pyrealsense2 as rs

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

pipeline.start(config)

# Available metadata types
metadata_types = [
    rs.frame_metadata_value.frame_counter,
    rs.frame_metadata_value.frame_timestamp,
    rs.frame_metadata_value.sensor_timestamp,
    rs.frame_metadata_value.actual_exposure,
    rs.frame_metadata_value.gain_level,
    rs.frame_metadata_value.auto_exposure,
    rs.frame_metadata_value.white_balance,
    rs.frame_metadata_value.time_of_arrival
]

try:
    frames = pipeline.wait_for_frames()
    depth_frame = frames.get_depth_frame()
    
    if depth_frame:
        print("Available metadata:")
        for metadata_type in metadata_types:
            if depth_frame.supports_frame_metadata(metadata_type):
                value = depth_frame.get_frame_metadata(metadata_type)
                print(f"  {metadata_type.name}: {value}")
            else:
                print(f"  {metadata_type.name}: Not supported")
                
finally:
    pipeline.stop()

Frame Lifetime Management

import pyrealsense2 as rs
import queue
import threading

# Frame storage for processing in another thread
frame_queue = queue.Queue()

def frame_callback(frameset):
    depth_frame = frameset.get_depth_frame()
    if depth_frame:
        # Extend frame lifetime beyond callback
        depth_frame.keep()
        frame_queue.put(depth_frame)

def processing_thread():
    while True:
        try:
            frame = frame_queue.get(timeout=1.0)
            
            # Process frame (frame will be automatically released 
            # when it goes out of scope)
            width = frame.get_width()
            height = frame.get_height() 
            print(f"Processing frame {frame.get_frame_number()} "
                  f"({width}x{height})")
            
            frame_queue.task_done()
            
        except queue.Empty:
            break

# Start processing thread
processor = threading.Thread(target=processing_thread)
processor.start()

# Start streaming with callback
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

pipeline.start(config, frame_callback)

try:
    import time
    time.sleep(5)  # Stream for 5 seconds
finally:
    pipeline.stop()
    frame_queue.join()  # Wait for processing to complete
    processor.join()

Depth Analysis

import pyrealsense2 as rs
import numpy as np

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

profile = pipeline.start(config)

# Get depth sensor and scale
device = profile.get_device()
depth_sensor = device.first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()

print(f"Depth scale: {depth_scale} meters per unit")

try:
    for i in range(10):
        frames = pipeline.wait_for_frames()
        depth_frame = frames.get_depth_frame()
        
        if not depth_frame:
            continue
            
        # Convert to numpy and meters
        depth_image = np.asanyarray(depth_frame.get_data())
        depth_meters = depth_image * depth_scale
        
        # Filter valid depths (exclude zeros)
        valid_depths = depth_meters[depth_meters > 0]
        
        if len(valid_depths) > 0:
            print(f"Frame {depth_frame.get_frame_number()}:")
            print(f"  Valid pixels: {len(valid_depths)}/{depth_image.size}")
            print(f"  Depth range: {valid_depths.min():.3f} - {valid_depths.max():.3f}m")
            print(f"  Mean depth: {valid_depths.mean():.3f}m")
            
            # Distance histogram
            hist, bins = np.histogram(valid_depths, bins=10, range=(0, 5))
            print(f"  Distance distribution (0-5m):")
            for j in range(len(hist)):
                if hist[j] > 0:
                    print(f"    {bins[j]:.1f}-{bins[j+1]:.1f}m: {hist[j]} pixels")
        
finally:
    pipeline.stop()

Motion Data Processing

import pyrealsense2 as rs
import numpy as np

# Configure motion streams
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.accel, rs.format.motion_xyz32f, 250)
config.enable_stream(rs.stream.gyro, rs.format.motion_xyz32f, 400)

try:
    profile = pipeline.start(config)
    
    accel_data = []
    gyro_data = []
    
    for i in range(1000):  # Collect 1000 samples
        frames = pipeline.wait_for_frames()
        
        # Check for motion frames
        for j in range(frames.size()):
            frame = frames[j]
            
            if frame.is_motion_frame():
                motion_frame = frame.as_motion_frame()
                motion_data = motion_frame.get_motion_data()
                
                # Get stream type
                profile = motion_frame.get_profile()
                stream_type = profile.stream_type()
                
                if stream_type == rs.stream.accel:
                    accel_data.append([motion_data.x, motion_data.y, motion_data.z])
                elif stream_type == rs.stream.gyro:
                    gyro_data.append([motion_data.x, motion_data.y, motion_data.z])
    
    # Analyze motion data
    if accel_data:
        accel_array = np.array(accel_data)
        print(f"Accelerometer data: {len(accel_data)} samples")
        print(f"  Mean: {accel_array.mean(axis=0)}")
        print(f"  Std:  {accel_array.std(axis=0)}")
        
    if gyro_data:
        gyro_array = np.array(gyro_data)
        print(f"Gyroscope data: {len(gyro_data)} samples")
        print(f"  Mean: {gyro_array.mean(axis=0)}")
        print(f"  Std:  {gyro_array.std(axis=0)}")
        
except Exception as e:
    print(f"Motion streaming not supported: {e}")
    
finally:
    pipeline.stop()

Frameset Processing

import pyrealsense2 as rs
import numpy as np

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.infrared, 1, 640, 480, rs.format.y8, 30)
config.enable_stream(rs.stream.infrared, 2, 640, 480, rs.format.y8, 30)

pipeline.start(config)

try:
    for i in range(10):
        frames = pipeline.wait_for_frames()
        
        print(f"Frameset {i}: {frames.size()} frames")
        print(f"  Timestamp: {frames.get_timestamp():.1f}ms")
        
        # Process each frame type  
        depth = frames.get_depth_frame()
        color = frames.get_color_frame()
        ir1 = frames.get_infrared_frame(1)
        ir2 = frames.get_infrared_frame(2)
        
        if depth:
            depth_data = np.asanyarray(depth.get_data())
            print(f"  Depth: {depth_data.shape} (frame #{depth.get_frame_number()})")
            
        if color:
            color_data = np.asanyarray(color.get_data())
            print(f"  Color: {color_data.shape} (frame #{color.get_frame_number()})")
            
        if ir1:
            ir1_data = np.asanyarray(ir1.get_data())
            print(f"  IR1: {ir1_data.shape} (frame #{ir1.get_frame_number()})")
            
        if ir2:
            ir2_data = np.asanyarray(ir2.get_data())
            print(f"  IR2: {ir2_data.shape} (frame #{ir2.get_frame_number()})")
        
        # Alternative: iterate through all frames
        print("  All frames:")
        for j, frame in enumerate(frames):
            profile = frame.get_profile()
            stream_type = profile.stream_type()
            print(f"    {j}: {stream_type.name} frame #{frame.get_frame_number()}")
        
finally:
    pipeline.stop()

Integration Notes

  • NumPy Integration: All frames support Python's buffer protocol for zero-copy numpy array creation
  • Memory Management: Frames are reference-counted; use frame.keep() to extend lifetime in callbacks
  • Thread Safety: Frame callbacks execute on separate threads; synchronization may be required
  • Data Types: Frame data types depend on stream format (uint8, uint16, float32, etc.)
  • Performance: Buffer protocol access is zero-copy and very efficient for image processing

Install with Tessl CLI

npx tessl i tessl/pypi-pyrealsense2

docs

advanced-mode.md

context-device.md

frames.md

index.md

logging.md

options.md

pipeline.md

pointclouds.md

processing.md

recording.md

sensors.md

tile.json