Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.
—
Session recording and playback functionality for development, testing, and data analysis. Supports compression, provides full control over playback timing, and enables offline processing of RealSense data.
Real-time session recording to .bag files with configurable compression.
class recorder(device):
def __init__(filename, device, enable_compression=False):
"""
Create recording device wrapper.
Args:
filename (str): Output .bag file path
device (device): Device to record from
enable_compression (bool): Enable data compression
"""
def pause():
"""Pause recording (frames still captured but not saved)."""
def resume():
"""Resume recording after pause."""Playback of recorded sessions with timing control and status monitoring.
class playback(device):
def pause():
"""Pause playback."""
def resume():
"""Resume playback after pause."""
def file_name() -> str:
"""
Get the source file name.
Returns:
str: Path to .bag file being played
"""
def get_position() -> int:
"""
Get current playback position.
Returns:
int: Position in nanoseconds from start
"""
def get_duration() -> int:
"""
Get total recording duration.
Returns:
int: Duration in nanoseconds
"""
def seek(time_ns):
"""
Seek to specific time position.
Args:
time_ns (int): Target position in nanoseconds
"""
def is_real_time() -> bool:
"""
Check if playback is in real-time mode.
Returns:
bool: True if playing at original speed
"""
def set_real_time(real_time):
"""
Enable/disable real-time playback.
Args:
real_time (bool): True for original speed, False for as-fast-as-possible
"""
def set_status_changed_callback(callback):
"""
Set callback for playback status changes.
Args:
callback: Function called when playback status changes
"""
def current_status() -> playback_status:
"""
Get current playback status.
Returns:
playback_status: Current status (playing, paused, stopped)
"""Enumeration of playback states.
rs.playback_status.unknown # Status unknown
rs.playback_status.playing # Currently playing
rs.playback_status.paused # Playback paused
rs.playback_status.stopped # Playback stopped/endedimport pyrealsense2 as rs
import time
# Configure streams for recording
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
# Enable recording to file
config.enable_record_to_file("recording.bag")
# Start pipeline with recording
pipeline = rs.pipeline()
profile = pipeline.start(config)
print("Recording started...")
print("Press Ctrl+C to stop recording")
try:
frame_count = 0
start_time = time.time()
while True:
frames = pipeline.wait_for_frames()
frame_count += 1
# Print progress every second
if frame_count % 30 == 0:
elapsed = time.time() - start_time
print(f"Recorded {frame_count} framesets ({elapsed:.1f}s)")
except KeyboardInterrupt:
print("Recording stopped by user")
finally:
pipeline.stop()
elapsed = time.time() - start_time
print(f"Recording completed: {frame_count} framesets in {elapsed:.1f}s")
print("Saved to recording.bag")import pyrealsense2 as rs
import time
# Get physical device
ctx = rs.context()
devices = ctx.query_devices()
if len(devices) == 0:
print("No RealSense devices found")
exit()
physical_device = devices[0]
print(f"Recording from: {physical_device.get_info(rs.camera_info.name)}")
# Create recorder wrapper with compression
recorder_device = rs.recorder("compressed_recording.bag", physical_device, True)
# Configure streams on recorder device
config = rs.config()
config.enable_device_from_file("compressed_recording.bag") # This tells config to use the recorder
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.infrared, 1, 640, 480, rs.format.y8, 30)
# Alternative: directly use the recorder device
pipeline = rs.pipeline()
# Configure without config.enable_record_to_file since we're using recorder device directly
config_direct = rs.config()
config_direct.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config_direct.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
# We need to set the device directly for the pipeline to use our recorder
pipeline.start(config_direct)
print("Recording with compression enabled...")
try:
for i in range(300): # Record for 10 seconds at 30fps
frames = pipeline.wait_for_frames()
if i % 60 == 0: # Print every 2 seconds
print(f"Recording frame {i}/300")
# Demonstrate pause/resume functionality
if i == 150: # Pause halfway through
recorder_device.pause()
print("Recording paused")
time.sleep(1)
recorder_device.resume()
print("Recording resumed")
except Exception as e:
print(f"Recording error: {e}")
finally:
pipeline.stop()
print("Recording completed with compression")import pyrealsense2 as rs
# Configure playback from file
config = rs.config()
config.enable_device_from_file("recording.bag")
# Optionally configure repeat playback
config = rs.config()
config.enable_device_from_file("recording.bag", repeat_playback=True)
pipeline = rs.pipeline()
profile = pipeline.start(config)
# Get playback device for control
device = profile.get_device()
playback_device = device.as_playback()
print(f"Playing: {playback_device.file_name()}")
print(f"Duration: {playback_device.get_duration() / 1e9:.1f} seconds")
frame_count = 0
try:
while True:
frames = pipeline.wait_for_frames()
# Get timing information
timestamp = frames.get_timestamp()
position = playback_device.get_position()
frame_count += 1
if frame_count % 30 == 0:
print(f"Frame {frame_count}: timestamp={timestamp/1000:.3f}s, "
f"position={position/1e9:.3f}s")
# Process frames
depth_frame = frames.get_depth_frame()
color_frame = frames.get_color_frame()
if depth_frame and color_frame:
# Example processing
center_distance = depth_frame.get_distance(320, 240)
if center_distance > 0:
print(f" Center distance: {center_distance:.3f}m")
except rs.error as e:
print(f"Playback finished or error: {e}")
finally:
pipeline.stop()
print(f"Playback completed: {frame_count} frames processed")import pyrealsense2 as rs
import time
# Configure playback
config = rs.config()
config.enable_device_from_file("recording.bag", repeat_playback=False)
pipeline = rs.pipeline()
profile = pipeline.start(config)
device = profile.get_device()
playback = device.as_playback()
print(f"File: {playback.file_name()}")
duration_ns = playback.get_duration()
duration_s = duration_ns / 1e9
print(f"Duration: {duration_s:.1f} seconds")
# Set playback status callback
def status_callback(status):
print(f"Playback status changed to: {status}")
playback.set_status_changed_callback(status_callback)
try:
# Play at normal speed for a few seconds
print("Playing at normal speed...")
playback.set_real_time(True)
for i in range(90): # 3 seconds at 30fps
frames = pipeline.wait_for_frames()
position = playback.get_position()
print(f"Position: {position/1e9:.1f}s", end='\r')
# Pause playback
print("\nPausing playback...")
playback.pause()
time.sleep(2)
# Resume at fast speed
print("Resuming at maximum speed...")
playback.resume()
playback.set_real_time(False) # As fast as possible
# Play for another few seconds
for i in range(60):
frames = pipeline.wait_for_frames()
position = playback.get_position()
print(f"Fast position: {position/1e9:.1f}s", end='\r')
# Seek to middle of recording
middle_time = duration_ns // 2
print(f"\nSeeking to middle ({middle_time/1e9:.1f}s)...")
playback.seek(middle_time)
# Play from middle
playback.set_real_time(True)
for i in range(60):
frames = pipeline.wait_for_frames()
position = playback.get_position()
print(f"Middle position: {position/1e9:.1f}s", end='\r')
# Seek to specific timestamps
timestamps = [duration_ns * 0.1, duration_ns * 0.5, duration_ns * 0.9]
for timestamp in timestamps:
print(f"\nSeeking to {timestamp/1e9:.1f}s...")
playback.seek(timestamp)
# Get a few frames from this position
for j in range(10):
frames = pipeline.wait_for_frames()
frame_timestamp = frames.get_timestamp()
position = playback.get_position()
print(f" Frame timestamp: {frame_timestamp/1000:.3f}s, "
f"position: {position/1e9:.3f}s")
except rs.error as e:
print(f"\nPlayback error: {e}")
finally:
pipeline.stop()
print("Playback control demo completed")import pyrealsense2 as rs
import os
import glob
class PlaybackAnalyzer:
def __init__(self):
self.frame_counts = {}
self.durations = {}
self.stream_info = {}
def analyze_file(self, bag_file):
"""Analyze a single bag file."""
print(f"Analyzing {bag_file}...")
try:
config = rs.config()
config.enable_device_from_file(bag_file, repeat_playback=False)
pipeline = rs.pipeline()
profile = pipeline.start(config)
device = profile.get_device()
playback = device.as_playback()
# Get file info
duration = playback.get_duration()
self.durations[bag_file] = duration / 1e9
# Get stream information
streams = profile.get_streams()
stream_info = []
for stream_profile in streams:
if stream_profile.is_video_stream_profile():
vsp = stream_profile.as_video_stream_profile()
info = f"{vsp.stream_type().name} {vsp.width()}x{vsp.height()} {vsp.format().name}@{vsp.fps()}fps"
else:
info = f"{stream_profile.stream_type().name} {stream_profile.format().name}@{stream_profile.fps()}fps"
stream_info.append(info)
self.stream_info[bag_file] = stream_info
# Count frames by processing entire file
playback.set_real_time(False) # Fast playback
frame_count = 0
while True:
try:
frames = pipeline.wait_for_frames()
frame_count += 1
if frame_count % 100 == 0:
position = playback.get_position()
progress = (position / duration) * 100
print(f" Progress: {progress:.1f}%", end='\r')
except rs.error:
break # End of file
self.frame_counts[bag_file] = frame_count
pipeline.stop()
print(f" Completed: {frame_count} frames, {duration/1e9:.1f}s")
except Exception as e:
print(f" Error analyzing {bag_file}: {e}")
def analyze_directory(self, directory):
"""Analyze all .bag files in a directory."""
bag_files = glob.glob(os.path.join(directory, "*.bag"))
if not bag_files:
print(f"No .bag files found in {directory}")
return
print(f"Found {len(bag_files)} .bag files")
for bag_file in bag_files:
self.analyze_file(bag_file)
print() # Blank line between files
def print_summary(self):
"""Print analysis summary."""
if not self.frame_counts:
print("No files analyzed")
return
print("Analysis Summary:")
print("=" * 80)
total_frames = 0
total_duration = 0
for bag_file in self.frame_counts.keys():
filename = os.path.basename(bag_file)
frame_count = self.frame_counts[bag_file]
duration = self.durations[bag_file]
avg_fps = frame_count / duration if duration > 0 else 0
print(f"\nFile: {filename}")
print(f" Duration: {duration:.1f}s")
print(f" Frames: {frame_count}")
print(f" Average FPS: {avg_fps:.1f}")
print(f" Streams:")
for stream in self.stream_info[bag_file]:
print(f" - {stream}")
total_frames += frame_count
total_duration += duration
print(f"\nTotals:")
print(f" Files: {len(self.frame_counts)}")
print(f" Total duration: {total_duration:.1f}s ({total_duration/60:.1f} minutes)")
print(f" Total frames: {total_frames}")
print(f" Overall average FPS: {total_frames/total_duration:.1f}")
# Usage example
analyzer = PlaybackAnalyzer()
# Analyze single file
if os.path.exists("recording.bag"):
analyzer.analyze_file("recording.bag")
# Analyze directory of recordings
# analyzer.analyze_directory("./recordings")
analyzer.print_summary()import pyrealsense2 as rs
import time
import signal
import sys
class PipelineRecorder:
def __init__(self, output_file):
self.output_file = output_file
self.pipeline = None
self.is_recording = False
self.frame_count = 0
self.start_time = None
def configure_streams(self):
"""Configure streams for recording."""
config = rs.config()
# Enable multiple streams
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.infrared, 1, 640, 480, rs.format.y8, 30)
config.enable_stream(rs.stream.infrared, 2, 640, 480, rs.format.y8, 30)
# Enable recording
config.enable_record_to_file(self.output_file)
return config
def start_recording(self):
"""Start the recording pipeline."""
if self.is_recording:
print("Already recording")
return
try:
config = self.configure_streams()
self.pipeline = rs.pipeline()
profile = self.pipeline.start(config)
self.is_recording = True
self.frame_count = 0
self.start_time = time.time()
# Get device info
device = profile.get_device()
print(f"Started recording from: {device.get_info(rs.camera_info.name)}")
print(f"Output file: {self.output_file}")
# Print stream configuration
streams = profile.get_streams()
print("Recording streams:")
for stream_profile in streams:
if stream_profile.is_video_stream_profile():
vsp = stream_profile.as_video_stream_profile()
print(f" {vsp.stream_type().name}: {vsp.width()}x{vsp.height()} "
f"{vsp.format().name} @ {vsp.fps()}fps")
else:
print(f" {stream_profile.stream_type().name}: "
f"{stream_profile.format().name} @ {stream_profile.fps()}fps")
except Exception as e:
print(f"Failed to start recording: {e}")
self.is_recording = False
def record_frames(self, max_frames=None, max_duration=None):
"""Record frames with optional limits."""
if not self.is_recording:
print("Not recording")
return
print("Recording frames... Press Ctrl+C to stop")
try:
while self.is_recording:
frames = self.pipeline.wait_for_frames()
self.frame_count += 1
# Print progress
if self.frame_count % 30 == 0:
elapsed = time.time() - self.start_time
fps = self.frame_count / elapsed
print(f"Recorded {self.frame_count} framesets "
f"({elapsed:.1f}s, {fps:.1f} fps)")
# Check limits
if max_frames and self.frame_count >= max_frames:
print(f"Reached frame limit: {max_frames}")
break
if max_duration and (time.time() - self.start_time) >= max_duration:
print(f"Reached time limit: {max_duration}s")
break
except KeyboardInterrupt:
print("\nRecording stopped by user")
except Exception as e:
print(f"Recording error: {e}")
def stop_recording(self):
"""Stop recording and cleanup."""
if not self.is_recording:
return
self.is_recording = False
if self.pipeline:
self.pipeline.stop()
self.pipeline = None
elapsed = time.time() - self.start_time if self.start_time else 0
avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
print(f"Recording completed:")
print(f" Frames: {self.frame_count}")
print(f" Duration: {elapsed:.1f}s")
print(f" Average FPS: {avg_fps:.1f}")
print(f" File: {self.output_file}")
# Signal handler for clean shutdown
def signal_handler(sig, frame):
print("\nShutdown signal received")
if recorder:
recorder.stop_recording()
sys.exit(0)
# Setup recording
recorder = PipelineRecorder("controlled_recording.bag")
signal.signal(signal.SIGINT, signal_handler)
# Start recording
recorder.start_recording()
# Record for specific duration or frame count
recorder.record_frames(max_duration=30) # Record for 30 seconds
# Stop recording
recorder.stop_recording()import pyrealsense2 as rs
import time
import threading
class SynchronizedProcessor:
def __init__(self):
self.frame_queue = rs.frame_queue(capacity=100)
self.processed_count = 0
self.is_processing = False
def start_processing(self):
"""Start processing thread."""
self.is_processing = True
self.processing_thread = threading.Thread(target=self._processing_loop)
self.processing_thread.start()
def stop_processing(self):
"""Stop processing thread."""
self.is_processing = False
if hasattr(self, 'processing_thread'):
self.processing_thread.join()
def _processing_loop(self):
"""Background processing loop."""
while self.is_processing:
try:
frames = self.frame_queue.wait_for_frame(timeout_ms=100)
self._process_frameset(frames.as_frameset())
self.processed_count += 1
except rs.error:
continue # Timeout, continue loop
def _process_frameset(self, frameset):
"""Process a single frameset."""
# Example processing: just extract frame info
depth = frameset.get_depth_frame()
color = frameset.get_color_frame()
if depth and color:
timestamp = frameset.get_timestamp()
center_distance = depth.get_distance(320, 240)
if self.processed_count % 30 == 0:
print(f"Processed frame at {timestamp/1000:.3f}s, "
f"center distance: {center_distance:.3f}m")
def record_with_processing(duration_seconds=10):
"""Record while simultaneously processing frames."""
print(f"Recording with processing for {duration_seconds} seconds...")
# Setup processor
processor = SynchronizedProcessor()
processor.start_processing()
# Setup recording
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_record_to_file("sync_recording.bag")
pipeline = rs.pipeline()
profile = pipeline.start(config)
try:
start_time = time.time()
frame_count = 0
while time.time() - start_time < duration_seconds:
frames = pipeline.wait_for_frames()
# Send to recorder (automatic via config)
# Send to processor
processor.frame_queue.enqueue(frames)
frame_count += 1
elapsed = time.time() - start_time
print(f"Recording completed: {frame_count} frames in {elapsed:.1f}s")
finally:
pipeline.stop()
processor.stop_processing()
print(f"Processed {processor.processed_count} framesets during recording")
def playback_with_processing(bag_file):
"""Playback with synchronized processing."""
print(f"Playing back {bag_file} with processing...")
# Setup processor
processor = SynchronizedProcessor()
processor.start_processing()
# Setup playback
config = rs.config()
config.enable_device_from_file(bag_file, repeat_playback=False)
pipeline = rs.pipeline()
profile = pipeline.start(config)
# Get playback device
device = profile.get_device()
playback = device.as_playback()
playback.set_real_time(True) # Real-time playback
try:
frame_count = 0
while True:
frames = pipeline.wait_for_frames()
# Send to processor
processor.frame_queue.enqueue(frames)
frame_count += 1
# Print progress
if frame_count % 60 == 0:
position = playback.get_position()
duration = playback.get_duration()
progress = (position / duration) * 100
print(f"Playback progress: {progress:.1f}%")
except rs.error:
print("Playback completed")
finally:
pipeline.stop()
processor.stop_processing()
print(f"Played back {frame_count} frames")
print(f"Processed {processor.processed_count} framesets during playback")
# Example usage
if __name__ == "__main__":
# Record with processing
record_with_processing(duration_seconds=5)
# Wait a moment
time.sleep(1)
# Playback with processing
playback_with_processing("sync_recording.bag")Install with Tessl CLI
npx tessl i tessl/pypi-pyrealsense2