CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyrealsense2

Python bindings for Intel RealSense SDK 2.0 providing access to depth and color cameras for computer vision applications.

Pending
Overview
Eval results
Files

recording.mddocs/

Recording & Playback

Session recording and playback functionality for development, testing, and data analysis. Supports compression, provides full control over playback timing, and enables offline processing of RealSense data.

Capabilities

Recording

Real-time session recording to .bag files with configurable compression.

class recorder(device):
    def __init__(filename, device, enable_compression=False):
        """
        Create recording device wrapper.
        
        Args:
            filename (str): Output .bag file path
            device (device): Device to record from
            enable_compression (bool): Enable data compression
        """
    
    def pause():
        """Pause recording (frames still captured but not saved)."""
    
    def resume():
        """Resume recording after pause."""

Playback

Playback of recorded sessions with timing control and status monitoring.

class playback(device):
    def pause():
        """Pause playback."""
    
    def resume():
        """Resume playback after pause."""
    
    def file_name() -> str:
        """
        Get the source file name.
        
        Returns:
            str: Path to .bag file being played
        """
    
    def get_position() -> int:
        """
        Get current playback position.
        
        Returns:
            int: Position in nanoseconds from start
        """
    
    def get_duration() -> int:
        """
        Get total recording duration.
        
        Returns:
            int: Duration in nanoseconds
        """
    
    def seek(time_ns):
        """
        Seek to specific time position.
        
        Args:
            time_ns (int): Target position in nanoseconds
        """
    
    def is_real_time() -> bool:
        """
        Check if playback is in real-time mode.
        
        Returns:
            bool: True if playing at original speed
        """
    
    def set_real_time(real_time):
        """
        Enable/disable real-time playback.
        
        Args:
            real_time (bool): True for original speed, False for as-fast-as-possible
        """
    
    def set_status_changed_callback(callback):
        """
        Set callback for playback status changes.
        
        Args:
            callback: Function called when playback status changes
        """
    
    def current_status() -> playback_status:
        """
        Get current playback status.
        
        Returns:
            playback_status: Current status (playing, paused, stopped)
        """

Playback Status

Enumeration of playback states.

rs.playback_status.unknown      # Status unknown
rs.playback_status.playing      # Currently playing
rs.playback_status.paused       # Playback paused
rs.playback_status.stopped      # Playback stopped/ended

Usage Examples

Basic Recording

import pyrealsense2 as rs
import time

# Configure streams for recording
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

# Enable recording to file
config.enable_record_to_file("recording.bag")

# Start pipeline with recording
pipeline = rs.pipeline()
profile = pipeline.start(config)

print("Recording started...")
print("Press Ctrl+C to stop recording")

try:
    frame_count = 0
    start_time = time.time()
    
    while True:
        frames = pipeline.wait_for_frames()
        frame_count += 1
        
        # Print progress every second
        if frame_count % 30 == 0:
            elapsed = time.time() - start_time
            print(f"Recorded {frame_count} framesets ({elapsed:.1f}s)")
            
except KeyboardInterrupt:
    print("Recording stopped by user")
    
finally:
    pipeline.stop()
    elapsed = time.time() - start_time
    print(f"Recording completed: {frame_count} framesets in {elapsed:.1f}s")
    print("Saved to recording.bag")

Advanced Recording with Device Wrapper

import pyrealsense2 as rs
import time

# Get physical device
ctx = rs.context()
devices = ctx.query_devices()

if len(devices) == 0:
    print("No RealSense devices found")
    exit()

physical_device = devices[0]
print(f"Recording from: {physical_device.get_info(rs.camera_info.name)}")

# Create recorder wrapper with compression
recorder_device = rs.recorder("compressed_recording.bag", physical_device, True)

# Configure streams on recorder device
config = rs.config()
config.enable_device_from_file("compressed_recording.bag")  # This tells config to use the recorder
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.infrared, 1, 640, 480, rs.format.y8, 30)

# Alternative: directly use the recorder device
pipeline = rs.pipeline()

# Configure without config.enable_record_to_file since we're using recorder device directly
config_direct = rs.config()
config_direct.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config_direct.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

# We need to set the device directly for the pipeline to use our recorder
pipeline.start(config_direct)

print("Recording with compression enabled...")

try:
    for i in range(300):  # Record for 10 seconds at 30fps
        frames = pipeline.wait_for_frames()
        
        if i % 60 == 0:  # Print every 2 seconds
            print(f"Recording frame {i}/300")
            
        # Demonstrate pause/resume functionality
        if i == 150:  # Pause halfway through
            recorder_device.pause()
            print("Recording paused")
            time.sleep(1)
            recorder_device.resume()
            print("Recording resumed")
            
except Exception as e:
    print(f"Recording error: {e}")
    
finally:
    pipeline.stop()
    print("Recording completed with compression")

Basic Playback

import pyrealsense2 as rs

# Configure playback from file
config = rs.config()
config.enable_device_from_file("recording.bag")

# Optionally configure repeat playback
config = rs.config()
config.enable_device_from_file("recording.bag", repeat_playback=True)

pipeline = rs.pipeline()
profile = pipeline.start(config)

# Get playback device for control
device = profile.get_device()
playback_device = device.as_playback()

print(f"Playing: {playback_device.file_name()}")
print(f"Duration: {playback_device.get_duration() / 1e9:.1f} seconds")

frame_count = 0

try:
    while True:
        frames = pipeline.wait_for_frames()
        
        # Get timing information
        timestamp = frames.get_timestamp()
        position = playback_device.get_position()
        
        frame_count += 1
        
        if frame_count % 30 == 0:
            print(f"Frame {frame_count}: timestamp={timestamp/1000:.3f}s, "
                  f"position={position/1e9:.3f}s")
            
        # Process frames
        depth_frame = frames.get_depth_frame()
        color_frame = frames.get_color_frame()
        
        if depth_frame and color_frame:
            # Example processing
            center_distance = depth_frame.get_distance(320, 240)
            if center_distance > 0:
                print(f"  Center distance: {center_distance:.3f}m")
                
except rs.error as e:
    print(f"Playback finished or error: {e}")
    
finally:
    pipeline.stop()
    print(f"Playback completed: {frame_count} frames processed")

Playback Control and Seeking

import pyrealsense2 as rs
import time

# Configure playback
config = rs.config()
config.enable_device_from_file("recording.bag", repeat_playback=False)

pipeline = rs.pipeline()
profile = pipeline.start(config)

device = profile.get_device()
playback = device.as_playback()

print(f"File: {playback.file_name()}")
duration_ns = playback.get_duration()
duration_s = duration_ns / 1e9
print(f"Duration: {duration_s:.1f} seconds")

# Set playback status callback
def status_callback(status):
    print(f"Playback status changed to: {status}")

playback.set_status_changed_callback(status_callback)

try:
    # Play at normal speed for a few seconds
    print("Playing at normal speed...")
    playback.set_real_time(True)
    
    for i in range(90):  # 3 seconds at 30fps
        frames = pipeline.wait_for_frames()
        position = playback.get_position()
        print(f"Position: {position/1e9:.1f}s", end='\r')
        
    # Pause playback
    print("\nPausing playback...")
    playback.pause()
    time.sleep(2)
    
    # Resume at fast speed
    print("Resuming at maximum speed...")
    playback.resume()
    playback.set_real_time(False)  # As fast as possible
    
    # Play for another few seconds
    for i in range(60):
        frames = pipeline.wait_for_frames()
        position = playback.get_position()
        print(f"Fast position: {position/1e9:.1f}s", end='\r')
        
    # Seek to middle of recording
    middle_time = duration_ns // 2
    print(f"\nSeeking to middle ({middle_time/1e9:.1f}s)...")
    playback.seek(middle_time)
    
    # Play from middle
    playback.set_real_time(True)
    for i in range(60):
        frames = pipeline.wait_for_frames()
        position = playback.get_position()
        print(f"Middle position: {position/1e9:.1f}s", end='\r')
        
    # Seek to specific timestamps
    timestamps = [duration_ns * 0.1, duration_ns * 0.5, duration_ns * 0.9]
    
    for timestamp in timestamps:
        print(f"\nSeeking to {timestamp/1e9:.1f}s...")
        playback.seek(timestamp)
        
        # Get a few frames from this position
        for j in range(10):
            frames = pipeline.wait_for_frames()
            frame_timestamp = frames.get_timestamp()
            position = playback.get_position()
            print(f"  Frame timestamp: {frame_timestamp/1000:.3f}s, "
                  f"position: {position/1e9:.3f}s")
            
except rs.error as e:
    print(f"\nPlayback error: {e}")
    
finally:
    pipeline.stop()
    print("Playback control demo completed")

Multi-File Playback Analysis

import pyrealsense2 as rs
import os
import glob

class PlaybackAnalyzer:
    def __init__(self):
        self.frame_counts = {}
        self.durations = {}
        self.stream_info = {}
    
    def analyze_file(self, bag_file):
        """Analyze a single bag file."""
        print(f"Analyzing {bag_file}...")
        
        try:
            config = rs.config()
            config.enable_device_from_file(bag_file, repeat_playback=False)
            
            pipeline = rs.pipeline()
            profile = pipeline.start(config)
            
            device = profile.get_device()
            playback = device.as_playback()
            
            # Get file info
            duration = playback.get_duration()
            self.durations[bag_file] = duration / 1e9
            
            # Get stream information
            streams = profile.get_streams()
            stream_info = []
            for stream_profile in streams:
                if stream_profile.is_video_stream_profile():
                    vsp = stream_profile.as_video_stream_profile()
                    info = f"{vsp.stream_type().name} {vsp.width()}x{vsp.height()} {vsp.format().name}@{vsp.fps()}fps"
                else:
                    info = f"{stream_profile.stream_type().name} {stream_profile.format().name}@{stream_profile.fps()}fps"
                stream_info.append(info)
            
            self.stream_info[bag_file] = stream_info
            
            # Count frames by processing entire file
            playback.set_real_time(False)  # Fast playback
            frame_count = 0
            
            while True:
                try:
                    frames = pipeline.wait_for_frames()
                    frame_count += 1
                    
                    if frame_count % 100 == 0:
                        position = playback.get_position()
                        progress = (position / duration) * 100
                        print(f"  Progress: {progress:.1f}%", end='\r')
                        
                except rs.error:
                    break  # End of file
            
            self.frame_counts[bag_file] = frame_count
            pipeline.stop()
            
            print(f"  Completed: {frame_count} frames, {duration/1e9:.1f}s")
            
        except Exception as e:
            print(f"  Error analyzing {bag_file}: {e}")
    
    def analyze_directory(self, directory):
        """Analyze all .bag files in a directory."""
        bag_files = glob.glob(os.path.join(directory, "*.bag"))
        
        if not bag_files:
            print(f"No .bag files found in {directory}")
            return
        
        print(f"Found {len(bag_files)} .bag files")
        
        for bag_file in bag_files:
            self.analyze_file(bag_file)
            print()  # Blank line between files
    
    def print_summary(self):
        """Print analysis summary."""
        if not self.frame_counts:
            print("No files analyzed")
            return
        
        print("Analysis Summary:")
        print("=" * 80)
        
        total_frames = 0
        total_duration = 0
        
        for bag_file in self.frame_counts.keys():
            filename = os.path.basename(bag_file)
            frame_count = self.frame_counts[bag_file]
            duration = self.durations[bag_file]
            avg_fps = frame_count / duration if duration > 0 else 0
            
            print(f"\nFile: {filename}")
            print(f"  Duration: {duration:.1f}s")
            print(f"  Frames: {frame_count}")
            print(f"  Average FPS: {avg_fps:.1f}")
            print(f"  Streams:")
            for stream in self.stream_info[bag_file]:
                print(f"    - {stream}")
            
            total_frames += frame_count
            total_duration += duration
        
        print(f"\nTotals:")
        print(f"  Files: {len(self.frame_counts)}")
        print(f"  Total duration: {total_duration:.1f}s ({total_duration/60:.1f} minutes)")
        print(f"  Total frames: {total_frames}")
        print(f"  Overall average FPS: {total_frames/total_duration:.1f}")

# Usage example
analyzer = PlaybackAnalyzer()

# Analyze single file
if os.path.exists("recording.bag"):
    analyzer.analyze_file("recording.bag")

# Analyze directory of recordings
# analyzer.analyze_directory("./recordings")

analyzer.print_summary()

Recording with Pipeline Control

import pyrealsense2 as rs
import time
import signal
import sys

class PipelineRecorder:
    def __init__(self, output_file):
        self.output_file = output_file
        self.pipeline = None
        self.is_recording = False
        self.frame_count = 0
        self.start_time = None
        
    def configure_streams(self):
        """Configure streams for recording."""
        config = rs.config()
        
        # Enable multiple streams
        config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
        config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
        config.enable_stream(rs.stream.infrared, 1, 640, 480, rs.format.y8, 30)
        config.enable_stream(rs.stream.infrared, 2, 640, 480, rs.format.y8, 30)
        
        # Enable recording
        config.enable_record_to_file(self.output_file)
        
        return config
    
    def start_recording(self):
        """Start the recording pipeline."""
        if self.is_recording:
            print("Already recording")
            return
        
        try:
            config = self.configure_streams()
            self.pipeline = rs.pipeline()
            profile = self.pipeline.start(config)
            
            self.is_recording = True
            self.frame_count = 0
            self.start_time = time.time()
            
            # Get device info
            device = profile.get_device()
            print(f"Started recording from: {device.get_info(rs.camera_info.name)}")
            print(f"Output file: {self.output_file}")
            
            # Print stream configuration
            streams = profile.get_streams()
            print("Recording streams:")
            for stream_profile in streams:
                if stream_profile.is_video_stream_profile():
                    vsp = stream_profile.as_video_stream_profile()
                    print(f"  {vsp.stream_type().name}: {vsp.width()}x{vsp.height()} "
                          f"{vsp.format().name} @ {vsp.fps()}fps")
                else:
                    print(f"  {stream_profile.stream_type().name}: "
                          f"{stream_profile.format().name} @ {stream_profile.fps()}fps")
                    
        except Exception as e:
            print(f"Failed to start recording: {e}")
            self.is_recording = False
    
    def record_frames(self, max_frames=None, max_duration=None):
        """Record frames with optional limits."""
        if not self.is_recording:
            print("Not recording")
            return
        
        print("Recording frames... Press Ctrl+C to stop")
        
        try:
            while self.is_recording:
                frames = self.pipeline.wait_for_frames()
                self.frame_count += 1
                
                # Print progress
                if self.frame_count % 30 == 0:
                    elapsed = time.time() - self.start_time
                    fps = self.frame_count / elapsed
                    print(f"Recorded {self.frame_count} framesets "
                          f"({elapsed:.1f}s, {fps:.1f} fps)")
                
                # Check limits
                if max_frames and self.frame_count >= max_frames:
                    print(f"Reached frame limit: {max_frames}")
                    break
                
                if max_duration and (time.time() - self.start_time) >= max_duration:
                    print(f"Reached time limit: {max_duration}s")
                    break
                    
        except KeyboardInterrupt:
            print("\nRecording stopped by user")
        except Exception as e:
            print(f"Recording error: {e}")
    
    def stop_recording(self):
        """Stop recording and cleanup."""
        if not self.is_recording:
            return
        
        self.is_recording = False
        
        if self.pipeline:
            self.pipeline.stop()
            self.pipeline = None
        
        elapsed = time.time() - self.start_time if self.start_time else 0
        avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
        
        print(f"Recording completed:")
        print(f"  Frames: {self.frame_count}")
        print(f"  Duration: {elapsed:.1f}s")
        print(f"  Average FPS: {avg_fps:.1f}")
        print(f"  File: {self.output_file}")

# Signal handler for clean shutdown
def signal_handler(sig, frame):
    print("\nShutdown signal received")
    if recorder:
        recorder.stop_recording()
    sys.exit(0)

# Setup recording
recorder = PipelineRecorder("controlled_recording.bag")
signal.signal(signal.SIGINT, signal_handler)

# Start recording
recorder.start_recording()

# Record for specific duration or frame count
recorder.record_frames(max_duration=30)  # Record for 30 seconds

# Stop recording
recorder.stop_recording()

Synchronized Recording and Playback

import pyrealsense2 as rs
import time
import threading

class SynchronizedProcessor:
    def __init__(self):
        self.frame_queue = rs.frame_queue(capacity=100)
        self.processed_count = 0
        self.is_processing = False
        
    def start_processing(self):
        """Start processing thread."""
        self.is_processing = True
        self.processing_thread = threading.Thread(target=self._processing_loop)
        self.processing_thread.start()
        
    def stop_processing(self):
        """Stop processing thread."""
        self.is_processing = False
        if hasattr(self, 'processing_thread'):
            self.processing_thread.join()
            
    def _processing_loop(self):
        """Background processing loop."""
        while self.is_processing:
            try:
                frames = self.frame_queue.wait_for_frame(timeout_ms=100)
                self._process_frameset(frames.as_frameset())
                self.processed_count += 1
                
            except rs.error:
                continue  # Timeout, continue loop
                
    def _process_frameset(self, frameset):
        """Process a single frameset."""
        # Example processing: just extract frame info
        depth = frameset.get_depth_frame()
        color = frameset.get_color_frame()
        
        if depth and color:
            timestamp = frameset.get_timestamp()
            center_distance = depth.get_distance(320, 240)
            
            if self.processed_count % 30 == 0:
                print(f"Processed frame at {timestamp/1000:.3f}s, "
                      f"center distance: {center_distance:.3f}m")

def record_with_processing(duration_seconds=10):
    """Record while simultaneously processing frames."""
    print(f"Recording with processing for {duration_seconds} seconds...")
    
    # Setup processor
    processor = SynchronizedProcessor()
    processor.start_processing()
    
    # Setup recording
    config = rs.config()
    config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
    config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
    config.enable_record_to_file("sync_recording.bag")
    
    pipeline = rs.pipeline()
    profile = pipeline.start(config)
    
    try:
        start_time = time.time()
        frame_count = 0
        
        while time.time() - start_time < duration_seconds:
            frames = pipeline.wait_for_frames()
            
            # Send to recorder (automatic via config)
            # Send to processor
            processor.frame_queue.enqueue(frames)
            
            frame_count += 1
            
        elapsed = time.time() - start_time
        print(f"Recording completed: {frame_count} frames in {elapsed:.1f}s")
        
    finally:
        pipeline.stop()
        processor.stop_processing()
        
        print(f"Processed {processor.processed_count} framesets during recording")

def playback_with_processing(bag_file):
    """Playback with synchronized processing."""
    print(f"Playing back {bag_file} with processing...")
    
    # Setup processor
    processor = SynchronizedProcessor()
    processor.start_processing()
    
    # Setup playback
    config = rs.config()
    config.enable_device_from_file(bag_file, repeat_playback=False)
    
    pipeline = rs.pipeline()
    profile = pipeline.start(config)
    
    # Get playback device
    device = profile.get_device()
    playback = device.as_playback()
    playback.set_real_time(True)  # Real-time playback
    
    try:
        frame_count = 0
        
        while True:
            frames = pipeline.wait_for_frames()
            
            # Send to processor
            processor.frame_queue.enqueue(frames)
            
            frame_count += 1
            
            # Print progress
            if frame_count % 60 == 0:
                position = playback.get_position()
                duration = playback.get_duration()
                progress = (position / duration) * 100
                print(f"Playback progress: {progress:.1f}%")
                
    except rs.error:
        print("Playback completed")
        
    finally:
        pipeline.stop()
        processor.stop_processing()
        
        print(f"Played back {frame_count} frames")
        print(f"Processed {processor.processed_count} framesets during playback")

# Example usage
if __name__ == "__main__":
    # Record with processing
    record_with_processing(duration_seconds=5)
    
    # Wait a moment
    time.sleep(1)
    
    # Playback with processing
    playback_with_processing("sync_recording.bag")

File Format and Compression

.bag File Format

  • ROS bag-compatible format
  • Contains timestamped frame data
  • Includes device metadata and stream configurations
  • Supports multiple streams synchronized by timestamp

Compression Options

  • Lossless compression available for reduced file size
  • Trade-off between file size and processing overhead
  • Useful for long-duration recordings or storage constraints

File Size Considerations

  • Uncompressed: ~150MB per minute for depth+color at 640x480@30fps
  • Compressed: ~50-80MB per minute depending on scene content
  • Add ~25MB per minute for each additional infrared stream

Use Cases

Development and Testing

  • Record test scenarios for reproducible debugging
  • Validate algorithm performance across different datasets
  • Compare processing results between different approaches

Data Collection

  • Gather training data for machine learning models
  • Document camera performance in various conditions
  • Create benchmark datasets for evaluation

Offline Processing

  • Process recorded data without live camera
  • Apply different filter settings to same source data
  • Generate processed results for analysis and comparison

Install with Tessl CLI

npx tessl i tessl/pypi-pyrealsense2

docs

advanced-mode.md

context-device.md

frames.md

index.md

logging.md

options.md

pipeline.md

pointclouds.md

processing.md

recording.md

sensors.md

tile.json