docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Image and video frames carry visual data through the pipeline.
{ .api }
from pipecat.frames.frames import ImageRawFrame
class ImageRawFrame:
"""Raw image container.
Carries image data with size and format information.
Attributes:
image (bytes): Raw image data
size (Tuple[int, int]): Image dimensions (width, height)
format (str): Image format (e.g., "RGB", "RGBA", "JPEG", "PNG")
id (int): Frame identifier
name (str): Frame name
pts (Optional[int]): Presentation timestamp
metadata (Dict[str, Any]): Frame metadata
"""
def __init__(
self,
image: bytes,
size: Tuple[int, int],
format: str
):
"""Initialize image frame.
Args:
image: Raw image bytes
size: Image dimensions (width, height)
format: Image format string
"""
pass{ .api }
from pipecat.frames.frames import OutputImageRawFrame
class OutputImageRawFrame(DataFrame, ImageRawFrame):
"""Image output to transport.
Image data intended for output through the transport.
Example:
frame = OutputImageRawFrame(
image=image_bytes,
size=(1280, 720),
format="RGB"
)
"""
pass{ .api }
from pipecat.frames.frames import InputImageRawFrame
class InputImageRawFrame(SystemFrame, ImageRawFrame):
"""Image input from transport.
Image received from the transport. SystemFrame for
immediate processing.
Example:
# Received from transport
frame = InputImageRawFrame(
image=camera_frame,
size=(640, 480),
format="RGB"
)
"""
pass{ .api }
from pipecat.frames.frames import UserImageRawFrame, UserImageRequestFrame
class UserImageRawFrame(InputImageRawFrame):
"""Raw image input frame associated with a specific user.
An image associated to a user, potentially in response to an image request.
Used for vision model processing and multimodal conversations.
Attributes:
image (bytes): Raw image data (from ImageRawFrame)
size (Tuple[int, int]): Image dimensions (from ImageRawFrame)
format (Optional[str]): Image format (from ImageRawFrame)
user_id (str): Identifier of the user who provided this image
text (Optional[str]): An optional text associated to this image
append_to_context (Optional[bool]): Whether the image should be appended to the LLM context
request (Optional[UserImageRequestFrame]): The original image request frame if this is a response
Example:
# User image for vision processing
frame = UserImageRawFrame(
image=user_photo,
size=(1024, 768),
format="RGB",
user_id="user123"
)
# User image with descriptive text
frame = UserImageRawFrame(
image=screenshot,
size=(1920, 1080),
format="RGB",
user_id="user456",
text="What's in this image?",
append_to_context=True
)
# Response to an image request
frame = UserImageRawFrame(
image=captured_image,
size=(640, 480),
format="RGB",
user_id="user789",
request=original_request_frame
)
"""
user_id: str = ""
text: Optional[str] = None
append_to_context: Optional[bool] = None
request: Optional[UserImageRequestFrame] = None{ .api }
from pipecat.frames.frames import AssistantImageRawFrame
class AssistantImageRawFrame(OutputImageRawFrame):
"""Assistant-generated image frame.
Frame containing an image generated by the assistant. Contains both the raw frame
for display (superclass functionality) as well as the original image data, which
can be used directly in LLM contexts.
Attributes:
image (bytes): Raw image bytes (from ImageRawFrame)
size (Tuple[int, int]): Image dimensions (from ImageRawFrame)
format (Optional[str]): Image format (from ImageRawFrame)
original_data (Optional[bytes]): The original image data, which can be used directly
in an LLM context message without further encoding
original_mime_type (Optional[str]): The MIME type of the original image data
Example:
# Generated image with original data for LLM context
frame = AssistantImageRawFrame(
image=display_image_bytes,
size=(1024, 1024),
format="RGB",
original_data=original_jpeg_bytes,
original_mime_type="image/jpeg"
)
# Simple generated image
frame = AssistantImageRawFrame(
image=generated_image,
size=(512, 512),
format="PNG"
)
"""
original_data: Optional[bytes] = None
original_mime_type: Optional[str] = None{ .api }
from pipecat.frames.frames import URLImageRawFrame
class URLImageRawFrame(OutputImageRawFrame):
"""Image with associated URL.
An output image with an associated URL. These images are usually generated
by third-party services that provide a URL to download the image.
Since this extends OutputImageRawFrame -> ImageRawFrame, it inherits the
required image data fields and adds an optional URL field.
Attributes:
image (bytes): Raw image data (required, from ImageRawFrame)
size (Tuple[int, int]): Image dimensions (required, from ImageRawFrame)
format (Optional[str]): Image format (from ImageRawFrame)
url (Optional[str]): URL where the image can be downloaded from
Example:
# Create with image data and URL
frame = URLImageRawFrame(
image=image_bytes,
size=(512, 512),
format="RGB",
url="https://example.com/image.png"
)
Note: Despite having a URL, the image bytes are still required as this
frame is used for output and the transport needs the actual image data.{ .api }
from pipecat.frames.frames import SpriteFrame, OutputImageRawFrame
from typing import List
class SpriteFrame(DataFrame):
"""Animated sprite frame containing multiple images.
An animated sprite that will be shown by the transport if the transport's camera
is enabled. Will play at the framerate specified in the transport's
`camera_out_framerate` constructor parameter.
Attributes:
images (List[OutputImageRawFrame]): List of image frames that make up the sprite animation
Example:
# Create animated sprite from multiple frames
frames = [
OutputImageRawFrame(
image=frame1_bytes,
size=(512, 512),
format="RGB"
),
OutputImageRawFrame(
image=frame2_bytes,
size=(512, 512),
format="RGB"
),
OutputImageRawFrame(
image=frame3_bytes,
size=(512, 512),
format="RGB"
)
]
sprite = SpriteFrame(images=frames)
await task.queue_frame(sprite)
# Animated avatar with transport
from pipecat.transports.daily import DailyTransport
transport = DailyTransport(
room_url="...",
camera_out_enabled=True,
camera_out_framerate=30 # Play sprite at 30 fps
)
"""
images: List[OutputImageRawFrame]{ .api }
from pipecat.services.image_service import ImageGenService
from pipecat.frames.frames import AssistantImageRawFrame
class ImageGenerator(ImageGenService):
"""Generate images from text prompts."""
async def run_image_gen(self, prompt: str) -> AssistantImageRawFrame:
"""Generate image from prompt.
Args:
prompt: Text prompt for image generation
Returns:
Generated image frame
"""
# Generate image
image_bytes, size, format = await self._generate(prompt)
return AssistantImageRawFrame(
image=image_bytes,
size=size,
format=format
){ .api }
from pipecat.services.vision_service import VisionService
from pipecat.frames.frames import UserImageRawFrame, VisionTextFrame
class VisionProcessor(VisionService):
"""Process images with vision model."""
async def run_vision(self, frame: UserImageRawFrame) -> VisionTextFrame:
"""Analyze image with vision model.
Args:
frame: User image to analyze
Returns:
Text description of image
"""
# Analyze image
description = await self._analyze_image(frame.image)
return VisionTextFrame(text=description){ .api }
from PIL import Image
import io
class ImageConverter(FrameProcessor):
"""Convert image formats."""
async def process_frame(self, frame, direction):
if isinstance(frame, ImageRawFrame):
# Convert to PIL Image
img = Image.frombytes(
frame.format,
frame.size,
frame.image
)
# Convert format
if frame.format != "RGB":
img = img.convert("RGB")
# Convert back to bytes
buffer = io.BytesIO()
img.save(buffer, format="JPEG")
new_image = buffer.getvalue()
# Create new frame
frame = ImageRawFrame(
image=new_image,
size=img.size,
format="JPEG"
)
await self.push_frame(frame, direction){ .api }
from PIL import Image
class ImageResizer(FrameProcessor):
"""Resize images to target dimensions."""
def __init__(self, target_size: Tuple[int, int] = (640, 480)):
super().__init__()
self._target_size = target_size
async def process_frame(self, frame, direction):
if isinstance(frame, ImageRawFrame):
# Resize image
img = Image.frombytes(frame.format, frame.size, frame.image)
img = img.resize(self._target_size, Image.Resampling.LANCZOS)
# Convert back
buffer = io.BytesIO()
img.save(buffer, format="PNG")
frame = ImageRawFrame(
image=buffer.getvalue(),
size=self._target_size,
format="PNG"
)
await self.push_frame(frame, direction){ .api }
import cv2
import numpy as np
class VideoFrameExtractor(FrameProcessor):
"""Extract frames from video at intervals."""
def __init__(self, frame_interval_ms: int = 1000):
super().__init__()
self._interval_ms = frame_interval_ms
self._last_frame_time = 0
async def process_frame(self, frame, direction):
if isinstance(frame, InputImageRawFrame):
current_time = frame.pts or 0
# Extract frame at intervals
if current_time - self._last_frame_time >= self._interval_ms * 1_000_000:
# Process this frame
await self._process_video_frame(frame)
self._last_frame_time = current_time
await self.push_frame(frame, direction){ .api }
from pipecat.processors.aggregators.vision import VisionImageFrameAggregator
class VisionImageFrameAggregator(FrameProcessor):
"""Aggregate vision image frames.
Collects image frames and associated text for vision
model processing.
Args:
max_images (int): Maximum images to aggregate
"""
def __init__(self, max_images: int = 10):
"""Initialize aggregator.
Args:
max_images: Maximum number of images to buffer
"""
pass
async def process_frame(self, frame, direction):
"""Aggregate image frames.
Args:
frame: Frame to process
direction: Frame direction
"""
pass{ .api }
class SpriteAnimator(FrameProcessor):
"""Animate sprite frames."""
async def process_frame(self, frame, direction):
if isinstance(frame, SpriteFrame):
# Play animation by outputting individual frames
for image in frame.images:
image_frame = OutputImageRawFrame(
image=image,
size=frame.size,
format=frame.format
)
await self.push_frame(image_frame, direction)
# Wait for frame duration
await asyncio.sleep(frame.frame_duration_ms / 1000.0)
else:
await self.push_frame(frame, direction){ .api }
from pipecat.frames.frames import UserImageRequestFrame
class UserImageRequestFrame(SystemFrame):
"""User image request.
Signal that user is requesting to send an image
(e.g., camera capture request).
Example:
# User wants to share camera
frame = UserImageRequestFrame()
"""
pass{ .api }
class EfficientImageProcessor(FrameProcessor):
"""Process images efficiently."""
def __init__(self, max_image_size: int = 1920 * 1080):
super().__init__()
self._max_size = max_image_size
async def process_frame(self, frame, direction):
if isinstance(frame, ImageRawFrame):
# Check image size
width, height = frame.size
if width * height > self._max_size:
# Resize large images
scale = (self._max_size / (width * height)) ** 0.5
new_size = (int(width * scale), int(height * scale))
frame = await self._resize_image(frame, new_size)
await self.push_frame(frame, direction){ .api }
class ImageFormatValidator(FrameProcessor):
"""Validate and normalize image formats."""
SUPPORTED_FORMATS = ["RGB", "RGBA", "JPEG", "PNG"]
async def process_frame(self, frame, direction):
if isinstance(frame, ImageRawFrame):
if frame.format not in self.SUPPORTED_FORMATS:
# Convert to supported format
frame = await self._convert_format(frame, "RGB")
await self.push_frame(frame, direction)