Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.
—
Frame data access and manipulation with support for depth, color, infrared, and motion data. Provides seamless integration with NumPy through Python's buffer protocol and comprehensive metadata access.
Core frame interface providing common functionality for all frame types.
class frame:
def get_timestamp() -> float:
"""
Get frame timestamp.
Returns:
float: Timestamp in milliseconds
"""
def get_frame_timestamp_domain() -> timestamp_domain:
"""
Get timestamp domain (hardware vs system clock).
Returns:
timestamp_domain: Timestamp source
"""
def get_frame_number() -> int:
"""
Get frame sequence number.
Returns:
int: Frame number from start of streaming
"""
def get_data_size() -> int:
"""
Get frame data size in bytes.
Returns:
int: Data size in bytes
"""
def get_profile() -> stream_profile:
"""
Get stream profile that produced this frame.
Returns:
stream_profile: Associated stream configuration
"""
# Properties (read-only)
@property
def timestamp(self) -> float:
"""Frame timestamp in milliseconds."""
@property
def frame_timestamp_domain(self) -> timestamp_domain:
"""Timestamp domain."""
@property
def frame_number(self) -> int:
"""Frame sequence number."""
@property
def profile(self) -> stream_profile:
"""Associated stream profile."""
@property
def data(self) -> BufData:
"""Frame data with buffer protocol support."""
def get_data() -> BufData:
"""
Get frame data with Python buffer protocol support.
Returns:
BufData: Frame data accessible as NumPy array
"""
def get_frame_metadata(metadata_type) -> int:
"""
Get frame metadata value.
Args:
metadata_type (frame_metadata_value): Metadata field to retrieve
Returns:
int: Metadata value
Raises:
rs.error: If metadata not supported
"""
def supports_frame_metadata(metadata_type) -> bool:
"""
Check if frame supports specific metadata.
Args:
metadata_type (frame_metadata_value): Metadata field to check
Returns:
bool: True if metadata is available
"""
def keep():
"""
Extend frame lifetime beyond current scope.
Used when frame needs to persist after callback returns.
"""
def swap(other_frame):
"""
Swap frame contents with another frame.
Args:
other_frame (frame): Frame to swap with
"""
# Type checking methods
def is_video_frame() -> bool:
"""Check if frame is a video frame."""
def is_depth_frame() -> bool:
"""Check if frame is a depth frame."""
def is_disparity_frame() -> bool:
"""Check if frame is a disparity frame."""
def is_motion_frame() -> bool:
"""Check if frame is a motion frame."""
def is_pose_frame() -> bool:
"""Check if frame is a pose frame."""
def is_frameset() -> bool:
"""Check if frame is a frameset (composite frame)."""
def is_points() -> bool:
"""Check if frame is a point cloud."""
# Type casting methods
def as_video_frame() -> video_frame:
"""Cast to video frame."""
def as_depth_frame() -> depth_frame:
"""Cast to depth frame."""
def as_disparity_frame() -> disparity_frame:
"""Cast to disparity frame."""
def as_motion_frame() -> motion_frame:
"""Cast to motion frame."""
def as_pose_frame() -> pose_frame:
"""Cast to pose frame."""
def as_frameset() -> frameset:
"""Cast to frameset."""
def as_points() -> points:
"""Cast to point cloud."""Frame containing 2D image data (color, infrared, etc.).
class video_frame(frame):
def get_width() -> int:
"""
Get frame width.
Returns:
int: Width in pixels
"""
def get_height() -> int:
"""
Get frame height.
Returns:
int: Height in pixels
"""
def get_stride_in_bytes() -> int:
"""
Get row stride in bytes.
Returns:
int: Bytes per row including padding
"""
def get_bits_per_pixel() -> int:
"""
Get bits per pixel.
Returns:
int: Bits per pixel
"""
def get_bytes_per_pixel() -> int:
"""
Get bytes per pixel.
Returns:
int: Bytes per pixel
"""
def extract_target_dimensions(target_type) -> list[float]:
"""
Extract target dimensions for specific use case.
Args:
target_type: Target type for dimension extraction
Returns:
list[float]: Extracted dimensions
"""
# Properties
@property
def width(self) -> int:
"""Frame width in pixels."""
@property
def height(self) -> int:
"""Frame height in pixels."""
@property
def stride_in_bytes(self) -> int:
"""Row stride in bytes."""
@property
def bits_per_pixel(self) -> int:
"""Bits per pixel."""
@property
def bytes_per_pixel(self) -> int:
"""Bytes per pixel."""Video frame containing depth data with distance query capabilities.
class depth_frame(video_frame):
def get_distance(x, y) -> float:
"""
Get distance at specific pixel coordinate.
Args:
x (int): Pixel x-coordinate
y (int): Pixel y-coordinate
Returns:
float: Distance in meters (0 if invalid)
"""
def get_units() -> float:
"""
Get depth units scaling factor.
Returns:
float: Meters per depth unit
"""Depth frame containing disparity data (inverse depth).
class disparity_frame(depth_frame):
def get_baseline() -> float:
"""
Get stereo baseline distance.
Returns:
float: Baseline in millimeters
"""Frame containing IMU sensor data (accelerometer, gyroscope).
class motion_frame(frame):
def get_motion_data() -> vector:
"""
Get motion sensor data.
Returns:
vector: 3D motion data (acceleration or angular velocity)
"""
def get_combined_motion_data() -> vector:
"""
Get combined motion data from multiple sensors.
Returns:
vector: Combined motion data
"""
@property
def motion_data(self) -> vector:
"""Motion sensor data."""Frame containing 6DOF pose data (position and orientation).
class pose_frame(frame):
def get_pose_data() -> pose_data:
"""
Get pose data.
Returns:
pose_data: 6DOF pose information
"""
@property
def pose_data(self) -> pose_data:
"""6DOF pose data."""Collection of synchronized frames from multiple streams.
class frameset(frame):
def size() -> int:
"""
Get number of frames in set.
Returns:
int: Number of frames
"""
def __len__() -> int:
"""Number of frames in set."""
def __getitem__(index) -> frame:
"""Get frame by index."""
def __iter__():
"""Iterate over frames."""
def first(stream_type, format=rs.format.any) -> frame:
"""
Get first frame of specified type.
Args:
stream_type (stream): Stream type to find
format (format): Optional format filter
Returns:
frame: First matching frame
Raises:
rs.error: If no matching frame found
"""
def first_or_default(stream_type, format=rs.format.any) -> frame:
"""
Get first frame of specified type or None.
Args:
stream_type (stream): Stream type to find
format (format): Optional format filter
Returns:
frame: First matching frame or None
"""
def get_depth_frame() -> depth_frame:
"""
Get depth frame from set.
Returns:
depth_frame: Depth frame or None if not present
"""
def get_color_frame() -> video_frame:
"""
Get color frame from set.
Returns:
video_frame: Color frame or None if not present
"""
def get_infrared_frame(index=0) -> video_frame:
"""
Get infrared frame from set.
Args:
index (int): Infrared stream index
Returns:
video_frame: Infrared frame or None if not present
"""
def get_fisheye_frame(index=0) -> video_frame:
"""
Get fisheye frame from set.
Args:
index (int): Fisheye stream index
Returns:
video_frame: Fisheye frame or None if not present
"""
def get_pose_frame(index=0) -> pose_frame:
"""
Get pose frame from set.
Args:
index (int): Pose stream index
Returns:
pose_frame: Pose frame or None if not present
"""
def foreach(callback_function):
"""
Apply callback to each frame in set.
Args:
callback_function: Function called for each frame
"""Frame containing 3D point cloud data.
class points(frame):
def get_vertices(dims=1) -> BufData:
"""
Get vertex data as buffer.
Args:
dims (int): Dimensionality (1 for flat array, 2 for structured)
Returns:
BufData: Vertex coordinates buffer
"""
def get_texture_coordinates(dims=1) -> BufData:
"""
Get texture coordinate data as buffer.
Args:
dims (int): Dimensionality
Returns:
BufData: Texture coordinates buffer
"""
def export_to_ply(filename, texture_frame):
"""
Export point cloud to PLY file.
Args:
filename (str): Output file path
texture_frame (video_frame): Frame to use for texture
"""
def size() -> int:
"""
Get number of points.
Returns:
int: Point count
"""Python buffer protocol support for zero-copy NumPy integration.
class BufData:
"""
Buffer protocol implementation for frame data.
Enables zero-copy access to frame data as NumPy arrays.
"""class vector:
x: float # X component
y: float # Y component
z: float # Z componentclass vertex:
x: float # X coordinate
y: float # Y coordinate
z: float # Z coordinateclass texture_coordinate:
u: float # U texture coordinate
v: float # V texture coordinateclass quaternion:
x: float # X component
y: float # Y component
z: float # Z component
w: float # W componentclass pose_data:
translation: vector # Position (x, y, z)
velocity: vector # Velocity (x, y, z)
acceleration: vector # Acceleration (x, y, z)
rotation: quaternion # Orientation quaternion
angular_velocity: vector # Angular velocity (x, y, z)
angular_acceleration: vector # Angular acceleration (x, y, z)
tracker_confidence: int # Tracking confidence level
mapper_confidence: int # Mapping confidence levelimport pyrealsense2 as rs
import numpy as np
# Start pipeline
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipeline.start(config)
try:
for i in range(100):
frames = pipeline.wait_for_frames()
# Get individual frames
depth_frame = frames.get_depth_frame()
color_frame = frames.get_color_frame()
if not depth_frame or not color_frame:
continue
# Convert to NumPy arrays (zero-copy)
depth_image = np.asanyarray(depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())
# Frame information
print(f"Frame {depth_frame.get_frame_number()}:")
print(f" Depth: {depth_image.shape} {depth_image.dtype}")
print(f" Color: {color_image.shape} {color_image.dtype}")
print(f" Timestamp: {depth_frame.get_timestamp():.1f}ms")
# Distance at center pixel
width = depth_frame.get_width()
height = depth_frame.get_height()
center_distance = depth_frame.get_distance(width // 2, height // 2)
print(f" Center distance: {center_distance:.3f}m")
finally:
pipeline.stop()import pyrealsense2 as rs
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline.start(config)
# Available metadata types
metadata_types = [
rs.frame_metadata_value.frame_counter,
rs.frame_metadata_value.frame_timestamp,
rs.frame_metadata_value.sensor_timestamp,
rs.frame_metadata_value.actual_exposure,
rs.frame_metadata_value.gain_level,
rs.frame_metadata_value.auto_exposure,
rs.frame_metadata_value.white_balance,
rs.frame_metadata_value.time_of_arrival
]
try:
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
if depth_frame:
print("Available metadata:")
for metadata_type in metadata_types:
if depth_frame.supports_frame_metadata(metadata_type):
value = depth_frame.get_frame_metadata(metadata_type)
print(f" {metadata_type.name}: {value}")
else:
print(f" {metadata_type.name}: Not supported")
finally:
pipeline.stop()import pyrealsense2 as rs
import queue
import threading
# Frame storage for processing in another thread
frame_queue = queue.Queue()
def frame_callback(frameset):
depth_frame = frameset.get_depth_frame()
if depth_frame:
# Extend frame lifetime beyond callback
depth_frame.keep()
frame_queue.put(depth_frame)
def processing_thread():
while True:
try:
frame = frame_queue.get(timeout=1.0)
# Process frame (frame will be automatically released
# when it goes out of scope)
width = frame.get_width()
height = frame.get_height()
print(f"Processing frame {frame.get_frame_number()} "
f"({width}x{height})")
frame_queue.task_done()
except queue.Empty:
break
# Start processing thread
processor = threading.Thread(target=processing_thread)
processor.start()
# Start streaming with callback
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline.start(config, frame_callback)
try:
import time
time.sleep(5) # Stream for 5 seconds
finally:
pipeline.stop()
frame_queue.join() # Wait for processing to complete
processor.join()import pyrealsense2 as rs
import numpy as np
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
profile = pipeline.start(config)
# Get depth sensor and scale
device = profile.get_device()
depth_sensor = device.first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()
print(f"Depth scale: {depth_scale} meters per unit")
try:
for i in range(10):
frames = pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
if not depth_frame:
continue
# Convert to numpy and meters
depth_image = np.asanyarray(depth_frame.get_data())
depth_meters = depth_image * depth_scale
# Filter valid depths (exclude zeros)
valid_depths = depth_meters[depth_meters > 0]
if len(valid_depths) > 0:
print(f"Frame {depth_frame.get_frame_number()}:")
print(f" Valid pixels: {len(valid_depths)}/{depth_image.size}")
print(f" Depth range: {valid_depths.min():.3f} - {valid_depths.max():.3f}m")
print(f" Mean depth: {valid_depths.mean():.3f}m")
# Distance histogram
hist, bins = np.histogram(valid_depths, bins=10, range=(0, 5))
print(f" Distance distribution (0-5m):")
for j in range(len(hist)):
if hist[j] > 0:
print(f" {bins[j]:.1f}-{bins[j+1]:.1f}m: {hist[j]} pixels")
finally:
pipeline.stop()import pyrealsense2 as rs
import numpy as np
# Configure motion streams
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.accel, rs.format.motion_xyz32f, 250)
config.enable_stream(rs.stream.gyro, rs.format.motion_xyz32f, 400)
try:
profile = pipeline.start(config)
accel_data = []
gyro_data = []
for i in range(1000): # Collect 1000 samples
frames = pipeline.wait_for_frames()
# Check for motion frames
for j in range(frames.size()):
frame = frames[j]
if frame.is_motion_frame():
motion_frame = frame.as_motion_frame()
motion_data = motion_frame.get_motion_data()
# Get stream type
profile = motion_frame.get_profile()
stream_type = profile.stream_type()
if stream_type == rs.stream.accel:
accel_data.append([motion_data.x, motion_data.y, motion_data.z])
elif stream_type == rs.stream.gyro:
gyro_data.append([motion_data.x, motion_data.y, motion_data.z])
# Analyze motion data
if accel_data:
accel_array = np.array(accel_data)
print(f"Accelerometer data: {len(accel_data)} samples")
print(f" Mean: {accel_array.mean(axis=0)}")
print(f" Std: {accel_array.std(axis=0)}")
if gyro_data:
gyro_array = np.array(gyro_data)
print(f"Gyroscope data: {len(gyro_data)} samples")
print(f" Mean: {gyro_array.mean(axis=0)}")
print(f" Std: {gyro_array.std(axis=0)}")
except Exception as e:
print(f"Motion streaming not supported: {e}")
finally:
pipeline.stop()import pyrealsense2 as rs
import numpy as np
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.infrared, 1, 640, 480, rs.format.y8, 30)
config.enable_stream(rs.stream.infrared, 2, 640, 480, rs.format.y8, 30)
pipeline.start(config)
try:
for i in range(10):
frames = pipeline.wait_for_frames()
print(f"Frameset {i}: {frames.size()} frames")
print(f" Timestamp: {frames.get_timestamp():.1f}ms")
# Process each frame type
depth = frames.get_depth_frame()
color = frames.get_color_frame()
ir1 = frames.get_infrared_frame(1)
ir2 = frames.get_infrared_frame(2)
if depth:
depth_data = np.asanyarray(depth.get_data())
print(f" Depth: {depth_data.shape} (frame #{depth.get_frame_number()})")
if color:
color_data = np.asanyarray(color.get_data())
print(f" Color: {color_data.shape} (frame #{color.get_frame_number()})")
if ir1:
ir1_data = np.asanyarray(ir1.get_data())
print(f" IR1: {ir1_data.shape} (frame #{ir1.get_frame_number()})")
if ir2:
ir2_data = np.asanyarray(ir2.get_data())
print(f" IR2: {ir2_data.shape} (frame #{ir2.get_frame_number()})")
# Alternative: iterate through all frames
print(" All frames:")
for j, frame in enumerate(frames):
profile = frame.get_profile()
stream_type = profile.stream_type()
print(f" {j}: {stream_type.name} frame #{frame.get_frame_number()}")
finally:
pipeline.stop()frame.keep() to extend lifetime in callbacksInstall with Tessl CLI
npx tessl i tessl/pypi-pyrealsense2