The libcamera-based Python interface to Raspberry Pi cameras, based on the original Picamera library
—
Advanced functionality including mode switching, autofocus control, frame dropping, device-specific AI sensor integration, and platform-specific optimizations. These features enable sophisticated camera applications and integration with specialized hardware.
Dynamic configuration changes without stopping the camera system.
def switch_mode(
self,
camera_config: CameraConfiguration,
wait: bool = True,
signal_function: callable = None
):
"""
Switch camera configuration dynamically.
Parameters:
- camera_config: CameraConfiguration, new configuration to apply
- wait: bool, wait for switch completion
- signal_function: callable, completion callback
Returns:
Job object if wait=False, None if wait=True
"""
def switch_mode_and_drop_frames(
self,
camera_config: CameraConfiguration,
num_frames: int,
wait: bool = True,
signal_function: callable = None
):
"""
Switch mode and drop initial frames for settling.
Parameters:
- camera_config: CameraConfiguration, new configuration
- num_frames: int, number of frames to drop after switch
- wait: bool, wait for completion
- signal_function: callable, completion callback
"""Atomic operations that switch mode and immediately capture.
def switch_mode_and_capture_file(
self,
camera_config: CameraConfiguration,
file_output: str,
name: str = "main",
format: str = None,
wait: bool = True,
signal_function: callable = None,
exif_data: dict = None
):
"""
Switch mode and capture file atomically.
Parameters:
- camera_config: CameraConfiguration, new configuration
- file_output: str, output file path
- name: str, stream name to capture
- format: str, output format
- wait: bool, wait for completion
- signal_function: callable, completion callback
- exif_data: dict, EXIF metadata
"""
def switch_mode_and_capture_array(
self,
camera_config: CameraConfiguration,
name: str = "main",
wait: bool = True,
signal_function: callable = None,
delay: float = None
) -> np.ndarray:
"""
Switch mode and capture array atomically.
Parameters:
- camera_config: CameraConfiguration, new configuration
- name: str, stream name to capture
- wait: bool, wait for completion
- signal_function: callable, completion callback
- delay: float, delay before capture
Returns:
np.ndarray: Captured image array
"""
def switch_mode_and_capture_request(
self,
camera_config: CameraConfiguration,
wait: bool = True,
signal_function: callable = None,
delay: float = None
) -> CompletedRequest:
"""
Switch mode and capture complete request atomically.
Parameters:
- camera_config: CameraConfiguration, new configuration
- wait: bool, wait for completion
- signal_function: callable, completion callback
- delay: float, delay before capture
Returns:
CompletedRequest: Complete request with all streams
"""Control frame dropping and camera timing.
def drop_frames(
self,
num_frames: int,
wait: bool = True,
signal_function: callable = None
):
"""
Drop specified number of frames from stream.
Parameters:
- num_frames: int, number of frames to drop
- wait: bool, wait for completion
- signal_function: callable, completion callback
Useful for allowing camera settings to settle after changes.
"""Advanced autofocus operations and control.
def autofocus_cycle(
self,
wait: bool = True,
signal_function: callable = None
):
"""
Trigger complete autofocus cycle.
Parameters:
- wait: bool, wait for focus completion
- signal_function: callable, completion callback
Performs full autofocus scan and locks focus at optimal position.
"""Control and synchronization of asynchronous operations.
def wait(self, job, timeout: float = None):
"""
Wait for job completion.
Parameters:
- job: Job object from async operation
- timeout: float, timeout in seconds (None = infinite)
Returns:
Job result
Raises:
- TimeoutError: If timeout exceeded
"""
def dispatch_functions(
self,
functions: list[callable],
wait: bool = True,
signal_function: callable = None,
immediate: bool = False
):
"""
Dispatch list of functions for execution.
Parameters:
- functions: list of callable functions
- wait: bool, wait for all functions to complete
- signal_function: callable, completion callback
- immediate: bool, execute immediately vs queued
"""
class Job:
"""Asynchronous job handle."""
def execute(self):
"""Execute the job."""
def signal(self):
"""Signal job completion."""
def wait(self, timeout: float = None):
"""
Wait for job completion.
Parameters:
- timeout: float, timeout in seconds
Returns:
Job result
"""
def result(self):
"""Get job result (non-blocking)."""
@property
def finished(self) -> bool:
"""Whether job is finished."""Integration with specialized camera sensors and AI accelerators.
# IMX500 AI Sensor
class IMX500:
"""IMX500 AI sensor integration."""
def __init__(self, picam2: Picamera2):
"""Initialize IMX500 with camera instance."""
def set_model(self, model_path: str):
"""Load AI model for on-sensor processing."""
def get_network_intrinsics(self) -> NetworkIntrinsics:
"""Get network intrinsics for model."""
class NetworkIntrinsics:
"""Neural network intrinsics for IMX500."""
input_width: int
input_height: int
input_channels: int
output_tensors: list
# Postprocessing functions for common AI models
def postprocess_efficientdet_lite0_detection(
outputs: list,
network_intrinsics: NetworkIntrinsics,
threshold: float = 0.5
) -> list:
"""
Postprocess EfficientDet-Lite0 detection outputs.
Parameters:
- outputs: list, raw model outputs
- network_intrinsics: NetworkIntrinsics, model metadata
- threshold: float, confidence threshold
Returns:
list: Detected objects with bounding boxes and scores
"""
def postprocess_yolov5_detection(
outputs: list,
network_intrinsics: NetworkIntrinsics,
threshold: float = 0.5
) -> list:
"""Postprocess YOLOv5 detection outputs."""
def postprocess_yolov8_detection(
outputs: list,
network_intrinsics: NetworkIntrinsics,
threshold: float = 0.5
) -> list:
"""Postprocess YOLOv8 detection outputs."""
# Hailo AI Accelerator
class Hailo:
"""Hailo AI accelerator integration."""
def __init__(self):
"""Initialize Hailo accelerator."""
def load_model(self, model_path: str):
"""Load model onto Hailo accelerator."""
def run_inference(self, input_data: np.ndarray) -> list:
"""Run inference on accelerator."""Platform detection and optimization.
class Platform(Enum):
"""Platform types."""
VC4 = "vc4" # Raspberry Pi 4 and earlier
PISP = "pisp" # Raspberry Pi 5 and newer
def get_platform() -> Platform:
"""
Detect current platform.
Returns:
Platform: Current platform type
"""
# Platform-specific tuning and configuration
def load_tuning_file(tuning_file: str, dir: str = None) -> dict:
"""
Load camera tuning file.
Parameters:
- tuning_file: str, tuning file name
- dir: str, directory to search (None = default paths)
Returns:
dict: Tuning parameters
"""
def find_tuning_algo(tuning: dict, name: str) -> dict:
"""
Find algorithm parameters in tuning data.
Parameters:
- tuning: dict, tuning parameters
- name: str, algorithm name
Returns:
dict: Algorithm parameters
"""Advanced memory allocation and management.
class MappedArray:
"""Context manager for memory-mapped array access."""
def __init__(
self,
request: CompletedRequest,
stream: str,
reshape: bool = True,
write: bool = False
):
"""
Initialize mapped array.
Parameters:
- request: CompletedRequest, source request
- stream: str, stream name
- reshape: bool, reshape to image dimensions
- write: bool, allow write access
"""
def __enter__(self) -> 'MappedArray':
"""Enter context and map buffer."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context and unmap buffer."""
@property
def array(self) -> np.ndarray:
"""Access to mapped numpy array."""
# Allocator system for advanced memory management
class LibcameraAllocator:
"""Default libcamera buffer allocator."""
class DmaAllocator:
"""DMA buffer allocator for zero-copy operations."""
class PersistentAllocator:
"""Persistent buffer allocator for reduced allocation overhead."""from picamera2 import Picamera2
import time
picam2 = Picamera2()
# Start with preview configuration
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()
# Preview mode active
time.sleep(2)
# Switch to high-resolution still mode
still_config = picam2.create_still_configuration(
main={"size": (4056, 3040), "format": "RGB888"}
)
picam2.switch_mode(still_config)
picam2.capture_file("high_res.jpg")
# Switch back to preview
picam2.switch_mode(preview_config)
picam2.close()from picamera2 import Picamera2
picam2 = Picamera2()
preview_config = picam2.create_preview_configuration()
picam2.configure(preview_config)
picam2.start()
# Create still configuration
still_config = picam2.create_still_configuration(
main={"size": (2592, 1944), "format": "RGB888"}
)
# Atomic switch and capture - no intermediate state
picam2.switch_mode_and_capture_file(
still_config,
"atomic_capture.jpg"
)
# Camera automatically returns to previous mode
picam2.close()from picamera2 import Picamera2
picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()
# Change camera settings
picam2.set_controls({
"ExposureTime": 50000, # Long exposure
"AnalogueGain": 4.0
})
# Drop frames to allow settings to settle
picam2.drop_frames(5)
# Now capture with settled settings
picam2.capture_file("settled_image.jpg")
picam2.close()from picamera2 import Picamera2
import time
picam2 = Picamera2()
picam2.configure(picam2.create_still_configuration())
picam2.start()
# Enable autofocus
picam2.set_controls({"AfMode": 1}) # Auto focus mode
# Trigger focus cycle
picam2.autofocus_cycle()
# Capture with optimal focus
picam2.capture_file("focused_image.jpg")
# Manual focus sweep
picam2.set_controls({"AfMode": 0}) # Manual mode
focus_positions = [0.5, 1.0, 2.0, 5.0, 10.0]
for pos in focus_positions:
picam2.set_controls({"LensPosition": pos})
picam2.drop_frames(3) # Allow focus to settle
picam2.capture_file(f"focus_{pos:.1f}.jpg")
picam2.close()from picamera2 import Picamera2
import time
def capture_complete(job):
print(f"Capture {job.result()} completed")
picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()
# Start multiple async captures
jobs = []
for i in range(5):
job = picam2.capture_file(
f"async_{i}.jpg",
wait=False,
signal_function=capture_complete
)
jobs.append(job)
time.sleep(0.2)
# Wait for all to complete
for job in jobs:
job.wait()
print("All captures completed")
picam2.close()from picamera2 import Picamera2
from picamera2.devices.imx500 import IMX500, postprocess_yolov5_detection
import numpy as np
picam2 = Picamera2()
# Configure for AI processing
config = picam2.create_preview_configuration(
main={"size": (640, 640), "format": "RGB888"}
)
picam2.configure(config)
# Initialize IMX500
imx500 = IMX500(picam2)
imx500.set_model("yolov5_model.rpk")
picam2.start()
while True:
# Capture frame
request = picam2.capture_request()
# Get AI inference results
outputs = imx500.get_outputs()
intrinsics = imx500.get_network_intrinsics()
# Postprocess detections
detections = postprocess_yolov5_detection(
outputs, intrinsics, threshold=0.5
)
# Process detections
for detection in detections:
bbox = detection['bbox']
score = detection['score']
class_id = detection['class_id']
print(f"Detected class {class_id} at {bbox} with score {score}")
request.release()
if len(detections) > 0:
break
picam2.close()from picamera2 import Picamera2
from picamera2.platform import Platform, get_platform
picam2 = Picamera2()
# Configure based on platform capabilities
platform = get_platform()
if platform == Platform.PISP:
# Raspberry Pi 5 - can use higher resolutions and frame rates
config = picam2.create_video_configuration(
main={"size": (1920, 1080), "format": "YUV420"}
)
# Use higher buffer count for better performance
config.buffer_count = 4
else:
# Raspberry Pi 4 and earlier
config = picam2.create_video_configuration(
main={"size": (1280, 720), "format": "YUV420"}
)
config.buffer_count = 2
picam2.configure(config)
picam2.start()
print(f"Running on {platform.value} platform")
print(f"Configuration: {config.main.size} @ {config.buffer_count} buffers")
picam2.close()from picamera2 import Picamera2, MappedArray
import numpy as np
picam2 = Picamera2()
config = picam2.create_preview_configuration(
main={"format": "YUV420", "size": (640, 480)}
)
picam2.configure(config)
picam2.start()
# Capture request
request = picam2.capture_request()
# Zero-copy access to buffer data
with MappedArray(request, "main") as mapped:
# Direct access to camera buffer
yuv_data = mapped.array
# Process Y channel (luminance) in-place
y_channel = yuv_data[:480, :] # Y plane
y_channel[y_channel < 50] = 0 # Threshold dark pixels
# Changes are made directly to camera buffer
# Save processed result
request.save("main", "processed.jpg")
request.release()
picam2.close()from picamera2 import Picamera2
def custom_processing():
print("Custom processing started")
time.sleep(1)
print("Custom processing completed")
return "processing_result"
def another_function():
print("Another function executed")
return "another_result"
picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration())
picam2.start()
# Dispatch multiple functions
functions = [custom_processing, another_function]
picam2.dispatch_functions(functions, wait=True)
print("All functions completed")
picam2.close()Install with Tessl CLI
npx tessl i tessl/pypi-picamera2