CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-picamera2

The libcamera-based Python interface to Raspberry Pi cameras, based on the original Picamera library

Pending
Overview
Eval results
Files

advanced.mddocs/

Advanced Features

Advanced functionality including mode switching, autofocus control, frame dropping, device-specific AI sensor integration, and platform-specific optimizations. These features enable sophisticated camera applications and integration with specialized hardware.

Capabilities

Mode Switching

Dynamic configuration changes without stopping the camera system.

def switch_mode(
    self,
    camera_config: CameraConfiguration,
    wait: bool = True,
    signal_function: callable = None
):
    """
    Switch camera configuration dynamically.
    
    Parameters:
    - camera_config: CameraConfiguration, new configuration to apply
    - wait: bool, wait for switch completion
    - signal_function: callable, completion callback
    
    Returns:
    Job object if wait=False, None if wait=True
    """

def switch_mode_and_drop_frames(
    self,
    camera_config: CameraConfiguration,
    num_frames: int,
    wait: bool = True,
    signal_function: callable = None
):
    """
    Switch mode and drop initial frames for settling.
    
    Parameters:
    - camera_config: CameraConfiguration, new configuration
    - num_frames: int, number of frames to drop after switch
    - wait: bool, wait for completion
    - signal_function: callable, completion callback
    """

Combined Mode Switch and Capture

Atomic operations that switch mode and immediately capture.

def switch_mode_and_capture_file(
    self,
    camera_config: CameraConfiguration,
    file_output: str,
    name: str = "main",
    format: str = None,
    wait: bool = True,
    signal_function: callable = None,
    exif_data: dict = None
):
    """
    Switch mode and capture file atomically.
    
    Parameters:
    - camera_config: CameraConfiguration, new configuration
    - file_output: str, output file path
    - name: str, stream name to capture
    - format: str, output format
    - wait: bool, wait for completion
    - signal_function: callable, completion callback
    - exif_data: dict, EXIF metadata
    """

def switch_mode_and_capture_array(
    self,
    camera_config: CameraConfiguration,
    name: str = "main",
    wait: bool = True,
    signal_function: callable = None,
    delay: float = None
) -> np.ndarray:
    """
    Switch mode and capture array atomically.
    
    Parameters:
    - camera_config: CameraConfiguration, new configuration
    - name: str, stream name to capture
    - wait: bool, wait for completion
    - signal_function: callable, completion callback
    - delay: float, delay before capture
    
    Returns:
    np.ndarray: Captured image array
    """

def switch_mode_and_capture_request(
    self,
    camera_config: CameraConfiguration,
    wait: bool = True,
    signal_function: callable = None,
    delay: float = None
) -> CompletedRequest:
    """
    Switch mode and capture complete request atomically.
    
    Parameters:
    - camera_config: CameraConfiguration, new configuration
    - wait: bool, wait for completion
    - signal_function: callable, completion callback
    - delay: float, delay before capture
    
    Returns:
    CompletedRequest: Complete request with all streams
    """

Frame Management

Control frame dropping and camera timing.

def drop_frames(
    self,
    num_frames: int,
    wait: bool = True,
    signal_function: callable = None
):
    """
    Drop specified number of frames from stream.
    
    Parameters:
    - num_frames: int, number of frames to drop
    - wait: bool, wait for completion
    - signal_function: callable, completion callback
    
    Useful for allowing camera settings to settle after changes.
    """

Autofocus Control

Advanced autofocus operations and control.

def autofocus_cycle(
    self,
    wait: bool = True,
    signal_function: callable = None
):
    """
    Trigger complete autofocus cycle.
    
    Parameters:
    - wait: bool, wait for focus completion
    - signal_function: callable, completion callback
    
    Performs full autofocus scan and locks focus at optimal position.
    """

Asynchronous Job Management

Control and synchronization of asynchronous operations.

def wait(self, job, timeout: float = None):
    """
    Wait for job completion.
    
    Parameters:
    - job: Job object from async operation
    - timeout: float, timeout in seconds (None = infinite)
    
    Returns:
    Job result
    
    Raises:
    - TimeoutError: If timeout exceeded
    """

def dispatch_functions(
    self,
    functions: list[callable],
    wait: bool = True,
    signal_function: callable = None,
    immediate: bool = False
):
    """
    Dispatch list of functions for execution.
    
    Parameters:
    - functions: list of callable functions
    - wait: bool, wait for all functions to complete
    - signal_function: callable, completion callback
    - immediate: bool, execute immediately vs queued
    """

class Job:
    """Asynchronous job handle."""
    
    def execute(self):
        """Execute the job."""
    
    def signal(self):
        """Signal job completion."""
    
    def wait(self, timeout: float = None):
        """
        Wait for job completion.
        
        Parameters:
        - timeout: float, timeout in seconds
        
        Returns:
        Job result
        """
    
    def result(self):
        """Get job result (non-blocking)."""
    
    @property
    def finished(self) -> bool:
        """Whether job is finished."""

Device-Specific Integration

Integration with specialized camera sensors and AI accelerators.

# IMX500 AI Sensor
class IMX500:
    """IMX500 AI sensor integration."""
    
    def __init__(self, picam2: Picamera2):
        """Initialize IMX500 with camera instance."""
    
    def set_model(self, model_path: str):
        """Load AI model for on-sensor processing."""
    
    def get_network_intrinsics(self) -> NetworkIntrinsics:
        """Get network intrinsics for model."""

class NetworkIntrinsics:
    """Neural network intrinsics for IMX500."""
    
    input_width: int
    input_height: int
    input_channels: int
    output_tensors: list

# Postprocessing functions for common AI models
def postprocess_efficientdet_lite0_detection(
    outputs: list,
    network_intrinsics: NetworkIntrinsics,
    threshold: float = 0.5
) -> list:
    """
    Postprocess EfficientDet-Lite0 detection outputs.
    
    Parameters:
    - outputs: list, raw model outputs
    - network_intrinsics: NetworkIntrinsics, model metadata
    - threshold: float, confidence threshold
    
    Returns:
    list: Detected objects with bounding boxes and scores
    """

def postprocess_yolov5_detection(
    outputs: list,
    network_intrinsics: NetworkIntrinsics,
    threshold: float = 0.5
) -> list:
    """Postprocess YOLOv5 detection outputs."""

def postprocess_yolov8_detection(
    outputs: list,
    network_intrinsics: NetworkIntrinsics,
    threshold: float = 0.5
) -> list:
    """Postprocess YOLOv8 detection outputs."""

# Hailo AI Accelerator
class Hailo:
    """Hailo AI accelerator integration."""
    
    def __init__(self):
        """Initialize Hailo accelerator."""
    
    def load_model(self, model_path: str):
        """Load model onto Hailo accelerator."""
    
    def run_inference(self, input_data: np.ndarray) -> list:
        """Run inference on accelerator."""

Platform-Specific Features

Platform detection and optimization.

class Platform(Enum):
    """Platform types."""
    VC4 = "vc4"      # Raspberry Pi 4 and earlier
    PISP = "pisp"    # Raspberry Pi 5 and newer

def get_platform() -> Platform:
    """
    Detect current platform.
    
    Returns:
    Platform: Current platform type
    """

# Platform-specific tuning and configuration
def load_tuning_file(tuning_file: str, dir: str = None) -> dict:
    """
    Load camera tuning file.
    
    Parameters:
    - tuning_file: str, tuning file name
    - dir: str, directory to search (None = default paths)
    
    Returns:
    dict: Tuning parameters
    """

def find_tuning_algo(tuning: dict, name: str) -> dict:
    """
    Find algorithm parameters in tuning data.
    
    Parameters:
    - tuning: dict, tuning parameters
    - name: str, algorithm name
    
    Returns:
    dict: Algorithm parameters
    """

Memory Management

Advanced memory allocation and management.

class MappedArray:
    """Context manager for memory-mapped array access."""
    
    def __init__(
        self,
        request: CompletedRequest,
        stream: str,
        reshape: bool = True,
        write: bool = False
    ):
        """
        Initialize mapped array.
        
        Parameters:
        - request: CompletedRequest, source request
        - stream: str, stream name
        - reshape: bool, reshape to image dimensions
        - write: bool, allow write access
        """
    
    def __enter__(self) -> 'MappedArray':
        """Enter context and map buffer."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context and unmap buffer."""
    
    @property
    def array(self) -> np.ndarray:
        """Access to mapped numpy array."""

# Allocator system for advanced memory management
class LibcameraAllocator:
    """Default libcamera buffer allocator."""

class DmaAllocator:
    """DMA buffer allocator for zero-copy operations."""

class PersistentAllocator:
    """Persistent buffer allocator for reduced allocation overhead."""

Usage Examples

Dynamic Mode Switching

from picamera2 import Picamera2
import time

picam2 = Picamera2()

# Start with preview configuration
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()

# Preview mode active
time.sleep(2)

# Switch to high-resolution still mode
still_config = picam2.create_still_configuration(
    main={"size": (4056, 3040), "format": "RGB888"}
)

picam2.switch_mode(still_config)
picam2.capture_file("high_res.jpg")

# Switch back to preview
picam2.switch_mode(preview_config)

picam2.close()

Atomic Mode Switch and Capture

from picamera2 import Picamera2

picam2 = Picamera2()
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()

# Create still configuration
still_config = picam2.create_still_configuration(
    main={"size": (2592, 1944), "format": "RGB888"}
)

# Atomic switch and capture - no intermediate state
picam2.switch_mode_and_capture_file(
    still_config, 
    "atomic_capture.jpg"
)

# Camera automatically returns to previous mode
picam2.close()

Frame Dropping for Stability

from picamera2 import Picamera2

picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()

# Change camera settings
picam2.set_controls({
    "ExposureTime": 50000,  # Long exposure
    "AnalogueGain": 4.0
})

# Drop frames to allow settings to settle
picam2.drop_frames(5)

# Now capture with settled settings
picam2.capture_file("settled_image.jpg")

picam2.close()

Autofocus Control

from picamera2 import Picamera2
import time

picam2 = Picamera2()
picam2.configure(picam2.create_still_configuration())
picam2.start()

# Enable autofocus
picam2.set_controls({"AfMode": 1})  # Auto focus mode

# Trigger focus cycle
picam2.autofocus_cycle()

# Capture with optimal focus
picam2.capture_file("focused_image.jpg")

# Manual focus sweep
picam2.set_controls({"AfMode": 0})  # Manual mode
focus_positions = [0.5, 1.0, 2.0, 5.0, 10.0]

for pos in focus_positions:
    picam2.set_controls({"LensPosition": pos})
    picam2.drop_frames(3)  # Allow focus to settle
    picam2.capture_file(f"focus_{pos:.1f}.jpg")

picam2.close()

Asynchronous Operations

from picamera2 import Picamera2
import time

def capture_complete(job):
    print(f"Capture {job.result()} completed")

picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()

# Start multiple async captures
jobs = []
for i in range(5):
    job = picam2.capture_file(
        f"async_{i}.jpg", 
        wait=False,
        signal_function=capture_complete
    )
    jobs.append(job)
    time.sleep(0.2)

# Wait for all to complete
for job in jobs:
    job.wait()

print("All captures completed")
picam2.close()

IMX500 AI Integration

from picamera2 import Picamera2
from picamera2.devices.imx500 import IMX500, postprocess_yolov5_detection
import numpy as np

picam2 = Picamera2()

# Configure for AI processing
config = picam2.create_preview_configuration(
    main={"size": (640, 640), "format": "RGB888"}
)
picam2.configure(config)

# Initialize IMX500
imx500 = IMX500(picam2)
imx500.set_model("yolov5_model.rpk")

picam2.start()

while True:
    # Capture frame
    request = picam2.capture_request()
    
    # Get AI inference results
    outputs = imx500.get_outputs()
    intrinsics = imx500.get_network_intrinsics()
    
    # Postprocess detections
    detections = postprocess_yolov5_detection(
        outputs, intrinsics, threshold=0.5
    )
    
    # Process detections
    for detection in detections:
        bbox = detection['bbox']
        score = detection['score']
        class_id = detection['class_id']
        print(f"Detected class {class_id} at {bbox} with score {score}")
    
    request.release()
    
    if len(detections) > 0:
        break

picam2.close()

Platform-Specific Optimization

from picamera2 import Picamera2
from picamera2.platform import Platform, get_platform

picam2 = Picamera2()

# Configure based on platform capabilities
platform = get_platform()

if platform == Platform.PISP:
    # Raspberry Pi 5 - can use higher resolutions and frame rates
    config = picam2.create_video_configuration(
        main={"size": (1920, 1080), "format": "YUV420"}
    )
    # Use higher buffer count for better performance
    config.buffer_count = 4
else:
    # Raspberry Pi 4 and earlier
    config = picam2.create_video_configuration(
        main={"size": (1280, 720), "format": "YUV420"}
    )
    config.buffer_count = 2

picam2.configure(config)
picam2.start()

print(f"Running on {platform.value} platform")
print(f"Configuration: {config.main.size} @ {config.buffer_count} buffers")

picam2.close()

Memory-Mapped Access

from picamera2 import Picamera2, MappedArray
import numpy as np

picam2 = Picamera2()
config = picam2.create_preview_configuration(
    main={"format": "YUV420", "size": (640, 480)}
)
picam2.configure(config)
picam2.start()

# Capture request
request = picam2.capture_request()

# Zero-copy access to buffer data
with MappedArray(request, "main") as mapped:
    # Direct access to camera buffer
    yuv_data = mapped.array
    
    # Process Y channel (luminance) in-place
    y_channel = yuv_data[:480, :]  # Y plane
    y_channel[y_channel < 50] = 0  # Threshold dark pixels
    
    # Changes are made directly to camera buffer

# Save processed result
request.save("main", "processed.jpg")
request.release()

picam2.close()

Custom Function Dispatch

from picamera2 import Picamera2

def custom_processing():
    print("Custom processing started")
    time.sleep(1)
    print("Custom processing completed")
    return "processing_result"

def another_function():
    print("Another function executed")
    return "another_result"

picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()

# Dispatch multiple functions
functions = [custom_processing, another_function]
picam2.dispatch_functions(functions, wait=True)

print("All functions completed")
picam2.close()

Install with Tessl CLI

npx tessl i tessl/pypi-picamera2

docs

advanced.md

capture.md

configuration.md

controls.md

core-operations.md

index.md

preview.md

recording.md

tile.json