docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Pipecat provides vision services for analyzing images and image generation services for creating images from text prompts. These services enable multimodal conversational experiences.
{ .api }
from pipecat.services.vision_service import VisionService
class VisionService(AIService):
"""Base class for vision services.
Provides common functionality for vision services that process images and
generate textual responses. Handles image frame processing and integrates
with the AI service infrastructure for metrics and lifecycle management.
Key Features:
- Image analysis and description
- Visual question answering
- Object detection and recognition
- Automatic metrics tracking
- Frame processing integration
Frames Consumed:
- UserImageRawFrame: Images with optional text prompts
Frames Produced:
- VisionFullResponseStartFrame: Marks vision response start
- VisionTextFrame: Vision analysis text
- VisionFullResponseEndFrame: Marks vision response end
Example:
class CustomVisionService(VisionService):
async def run_vision(self, frame: UserImageRawFrame):
# Analyze image
description = await analyze_image(frame.image)
yield VisionFullResponseStartFrame()
yield VisionTextFrame(text=description)
yield VisionFullResponseEndFrame()
"""
def __init__(self, **kwargs):
"""Initialize the vision service.
Args:
**kwargs: Additional arguments passed to the parent AIService
"""
pass
@abstractmethod
async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
"""Process the given vision image and generate results.
This method must be implemented by subclasses to provide actual computer
vision functionality such as image description, object detection, or
visual question answering.
Args:
frame: The image frame to process
Yields:
Frame: Frames containing the vision analysis results, typically TextFrame
objects with descriptions or answers
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames, handling vision image frames for analysis.
Automatically processes UserImageRawFrame objects by calling run_vision
and handles metrics tracking. Other frames are passed through unchanged.
Args:
frame: The frame to process
direction: The direction of frame processing
"""
pass{ .api }
from pipecat.services.moondream import MoondreamService
class MoondreamService(VisionService):
"""Moondream vision-language model service.
Provides image analysis and description generation using the Moondream
vision-language model. Supports various hardware acceleration options
including CUDA, MPS, and Intel XPU.
Features:
- Local model execution (no API calls)
- Hardware acceleration support
- Visual question answering
- Image description generation
- Efficient model loading
Hardware Support:
- CUDA (NVIDIA GPUs)
- MPS (Apple Silicon)
- Intel XPU
- CPU fallback
Example:
from pipecat.services.moondream import MoondreamService
vision = MoondreamService(
model="vikhyatk/moondream2",
revision="2025-01-09",
use_cpu=False # Use hardware acceleration if available
)
# Create image frame
from pipecat.frames.frames import UserImageRawFrame
image_frame = UserImageRawFrame(
image=image_bytes,
size=(width, height),
format="RGB",
text="What's in this image?" # Optional question
)
# Process through pipeline
pipeline = Pipeline([
vision, # Analyzes image
tts,
transport.output()
])
"""
def __init__(
self,
*,
model: str = "vikhyatk/moondream2",
revision: str = "2025-01-09",
use_cpu: bool = False,
**kwargs
):
"""Initialize the Moondream service.
Args:
model: Hugging Face model identifier for the Moondream model
revision: Specific model revision to use
use_cpu: Whether to force CPU usage instead of hardware acceleration
**kwargs: Additional arguments passed to the parent VisionService
"""
pass
async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
"""Analyze an image and generate a description.
Args:
frame: The image frame to process
Yields:
VisionFullResponseStartFrame: Marks response start
VisionTextFrame: Description or answer text
VisionFullResponseEndFrame: Marks response end
"""
pass{ .api }
from pipecat.services.image_service import ImageGenService
class ImageGenService(AIService):
"""Base class for image generation services.
Processes TextFrames by using their content as prompts for image generation.
Subclasses must implement the run_image_gen method to provide actual image
generation functionality using their specific AI service.
Key Features:
- Text-to-image generation
- Prompt processing
- Image frame creation
- Automatic metrics tracking
Frames Consumed:
- TextFrame: Text prompts for image generation
Frames Produced:
- ImageRawFrame: Generated images
- URLImageRawFrame: Images with URLs
- ErrorFrame: Generation errors
Example:
class CustomImageGenService(ImageGenService):
async def run_image_gen(self, prompt: str):
# Generate image from prompt
image = await generate_image(prompt)
yield ImageRawFrame(
image=image_bytes,
size=(width, height),
format="PNG"
)
"""
def __init__(self, **kwargs):
"""Initialize the image generation service.
Args:
**kwargs: Additional arguments passed to the parent AIService
"""
pass
@abstractmethod
async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
"""Generate an image from a text prompt.
This method must be implemented by subclasses to provide actual image
generation functionality using their specific AI service.
Args:
prompt: The text prompt to generate an image from
Yields:
Frame: Frames containing the generated image (typically ImageRawFrame
or URLImageRawFrame)
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames for image generation.
TextFrames are used as prompts for image generation, while other frames
are passed through unchanged.
Args:
frame: The frame to process
direction: The direction of frame processing
"""
pass{ .api }
from pipecat.services.openai import OpenAIImageGenService
class OpenAIImageGenService(ImageGenService):
"""OpenAI DALL-E image generation service.
Provides image generation capabilities using OpenAI's DALL-E models.
Supports various image sizes and can generate images from text prompts
with configurable quality and style parameters.
Supported Models:
- dall-e-3: Latest DALL-E model with improved quality
- dall-e-2: Previous generation model
Supported Sizes (DALL-E 3):
- 1024x1024: Square format
- 1792x1024: Landscape format
- 1024x1792: Portrait format
Example:
from pipecat.services.openai import OpenAIImageGenService
import aiohttp
async with aiohttp.ClientSession() as session:
image_gen = OpenAIImageGenService(
api_key="sk-...",
aiohttp_session=session,
image_size="1024x1024",
model="dall-e-3"
)
# Generate image from text
pipeline = Pipeline([
llm, # Generates image description
image_gen, # Creates image
transport.output()
])
"""
def __init__(
self,
*,
api_key: str,
base_url: Optional[str] = None,
aiohttp_session: aiohttp.ClientSession,
image_size: Literal["256x256", "512x512", "1024x1024", "1792x1024", "1024x1792"],
model: str = "dall-e-3",
):
"""Initialize the OpenAI image generation service.
Args:
api_key: OpenAI API key for authentication
base_url: Custom base URL for OpenAI API. If None, uses default
aiohttp_session: HTTP session for downloading generated images
image_size: Target size for generated images
model: DALL-E model to use for generation. Defaults to "dall-e-3"
"""
pass
async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
"""Generate an image from a text prompt using OpenAI's DALL-E.
Args:
prompt: Text description of the image to generate
Yields:
URLImageRawFrame: Frame containing the generated image data and URL
"""
pass{ .api }
from pipecat.services.azure import AzureImageGenService
class AzureImageGenService(ImageGenService):
"""Azure OpenAI DALL-E image generation service.
Provides image generation using Azure's OpenAI DALL-E deployment.
Example:
import aiohttp
async with aiohttp.ClientSession() as session:
image_gen = AzureImageGenService(
api_key="...",
endpoint="https://your-resource.openai.azure.com",
aiohttp_session=session,
image_size="1024x1024",
model="dall-e-3"
)
"""
def __init__(
self,
*,
api_key: str,
endpoint: str,
aiohttp_session: aiohttp.ClientSession,
image_size: str,
model: str = "dall-e-3",
**kwargs
):
"""Initialize the Azure image generation service.
Args:
api_key: Azure OpenAI API key
endpoint: Azure OpenAI endpoint URL
aiohttp_session: HTTP session for downloading images
image_size: Target image size
model: DALL-E model name
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.services.google import GoogleImageGenService
class GoogleImageGenService(ImageGenService):
"""Google Imagen image generation service.
Provides image generation using Google's Imagen model.
Example:
import aiohttp
async with aiohttp.ClientSession() as session:
image_gen = GoogleImageGenService(
credentials="path/to/credentials.json",
aiohttp_session=session,
image_size="1024x1024"
)
"""
def __init__(
self,
*,
credentials: Union[str, Dict],
aiohttp_session: aiohttp.ClientSession,
image_size: str,
**kwargs
):
"""Initialize the Google image generation service.
Args:
credentials: Path to credentials JSON or credentials dict
aiohttp_session: HTTP session for downloading images
image_size: Target image size
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.services.fal import FalImageGenService
class FalImageGenService(ImageGenService):
"""Fal AI image generation service.
Provides image generation using Fal AI's models including Flux and SDXL.
Supported Models:
- fal-ai/flux/dev: Flux development model
- fal-ai/flux-pro: Flux professional model
- fal-ai/stable-diffusion-xl: SDXL model
Example:
import aiohttp
async with aiohttp.ClientSession() as session:
image_gen = FalImageGenService(
api_key="...",
aiohttp_session=session,
image_size="landscape_4_3",
model="fal-ai/flux/dev"
)
"""
def __init__(
self,
*,
api_key: str,
aiohttp_session: aiohttp.ClientSession,
image_size: str,
model: str = "fal-ai/flux/dev",
**kwargs
):
"""Initialize the Fal image generation service.
Args:
api_key: Fal AI API key
aiohttp_session: HTTP session for downloading images
image_size: Target image size (e.g., "landscape_4_3", "square")
model: Fal AI model identifier
**kwargs: Additional arguments
"""
passLearn more: Fal AI Documentation
{ .api }
from pipecat.services.together import TogetherImageGenService
class TogetherImageGenService(ImageGenService):
"""Together AI image generation service.
Fast image generation using Together AI's infrastructure with
support for Stable Diffusion and other open-source models.
Supported Models:
- stabilityai/stable-diffusion-xl-base-1.0
- stabilityai/stable-diffusion-2-1
- prompthero/openjourney
- And many more open-source models
Example:
import aiohttp
async with aiohttp.ClientSession() as session:
image_gen = TogetherImageGenService(
api_key="your-api-key",
aiohttp_session=session,
model="stabilityai/stable-diffusion-xl-base-1.0",
image_size="1024x1024"
)
# Generate image from text
async for frame in image_gen.run_image_gen("A beautiful sunset"):
if isinstance(frame, URLImageRawFrame):
print(f"Generated: {frame.url}")
"""
def __init__(
self,
*,
api_key: str,
aiohttp_session: aiohttp.ClientSession,
model: str,
image_size: str = "1024x1024",
**kwargs
):
"""Initialize the Together AI image generation service.
Args:
api_key: Together AI API key
aiohttp_session: HTTP session for downloading images
model: Model identifier
image_size: Target image size
**kwargs: Additional arguments
"""
passLearn more: Together AI Documentation
{ .api }
from pipecat.services.mcp_service import MCPService
class MCPService:
"""Model Context Protocol (MCP) service integration.
Provides integration with MCP servers for extended context and capabilities.
MCP allows AI models to access external tools, resources, and context through
a standardized protocol.
Features:
- Tool invocation through MCP
- Resource access
- Context management
- Multi-server support
Example:
from pipecat.services.mcp_service import MCPService
mcp = MCPService(
server_url="http://localhost:8080",
api_key="..."
)
# Use with LLM for extended capabilities
llm.register_mcp_service(mcp)
"""
def __init__(
self,
server_url: str,
api_key: Optional[str] = None,
**kwargs
):
"""Initialize the MCP service.
Args:
server_url: URL of the MCP server
api_key: Optional API key for authentication
**kwargs: Additional configuration
"""
pass{ .api }
from pipecat.services.tavus import TavusService
class TavusService(AIService):
"""Tavus AI avatar service.
Generate AI-powered video avatars for conversational applications.
Supports real-time avatar generation and video synthesis.
Args:
api_key: Tavus API key
persona_id: Tavus persona/avatar identifier
params: Avatar configuration parameters
Example:
tavus = TavusService(
api_key="your-api-key",
persona_id="persona-123",
params={
"voice_id": "voice-456",
"video_quality": "high"
}
)
"""
def __init__(
self,
api_key: str,
persona_id: str,
params: Optional[Dict] = None,
**kwargs
):
pass{ .api }
from pipecat.services.heygen import HeyGenService
class HeyGenService(AIService):
"""HeyGen AI avatar service.
Create realistic AI avatars with synchronized speech and gestures
for video content and real-time interactions.
Args:
api_key: HeyGen API key
avatar_id: HeyGen avatar identifier
voice_id: Voice identifier for the avatar
params: Avatar configuration parameters
Example:
heygen = HeyGenService(
api_key="your-api-key",
avatar_id="avatar-123",
voice_id="voice-456",
params={
"quality": "high",
"background": "office"
}
)
"""
def __init__(
self,
api_key: str,
avatar_id: str,
voice_id: Optional[str] = None,
params: Optional[Dict] = None,
**kwargs
):
pass{ .api }
from pipecat.services.simli import SimliService
class SimliService(AIService):
"""Simli AI avatar service.
Real-time AI avatar generation with lip-sync and facial animations
for interactive conversational experiences.
Args:
api_key: Simli API key
face_id: Face/avatar identifier
params: Avatar configuration parameters
Example:
simli = SimliService(
api_key="your-api-key",
face_id="face-123",
params={
"quality": "hd",
"fps": 30
}
)
"""
def __init__(
self,
api_key: str,
face_id: str,
params: Optional[Dict] = None,
**kwargs
):
pass{ .api }
from pipecat.services.asyncai import AsyncAIService
class AsyncAIService(AIService):
"""AsyncAI service integration.
Asynchronous AI service for various AI processing tasks.
Args:
api_key: AsyncAI API key
params: Service configuration parameters
Example:
async_ai = AsyncAIService(
api_key="your-api-key",
params={"model": "default"}
)
"""
def __init__(
self,
api_key: str,
params: Optional[Dict] = None,
**kwargs
):
pass{ .api }
from pipecat.services.hathora import HathoraService
class HathoraService(AIService):
"""Hathora service integration.
Game backend and multiplayer infrastructure service for real-time
applications and AI-powered gaming experiences.
Args:
api_key: Hathora API key
app_id: Hathora application identifier
params: Service configuration parameters
Example:
hathora = HathoraService(
api_key="your-api-key",
app_id="app-123",
params={"region": "us-west"}
)
"""
def __init__(
self,
api_key: str,
app_id: str,
params: Optional[Dict] = None,
**kwargs
):
pass{ .api }
from pipecat.services.mem0 import Mem0Service
class Mem0Service(AIService):
"""Mem0 memory service integration.
Persistent memory and context management for AI applications.
Provides long-term memory storage and retrieval for conversational AI.
Args:
api_key: Mem0 API key
user_id: User identifier for memory isolation
params: Memory configuration parameters
Example:
mem0 = Mem0Service(
api_key="your-api-key",
user_id="user-123",
params={
"memory_type": "conversational",
"max_memories": 1000
}
)
"""
def __init__(
self,
api_key: str,
user_id: Optional[str] = None,
params: Optional[Dict] = None,
**kwargs
):
pass{ .api }
from pipecat.services.moondream import MoondreamService
from pipecat.frames.frames import UserImageRawFrame
from PIL import Image
import io
# Initialize vision service
vision = MoondreamService()
# Process user-uploaded image
async def process_user_image(image_bytes: bytes, question: str):
# Create image frame
image = Image.open(io.BytesIO(image_bytes))
frame = UserImageRawFrame(
image=image.tobytes(),
size=image.size,
format=image.format,
text=question # "What's in this image?"
)
# Process through pipeline
async for result_frame in vision.run_vision(frame):
if isinstance(result_frame, VisionTextFrame):
print(f"Vision: {result_frame.text}")
# Use in pipeline
pipeline = Pipeline([
transport.input(), # Receives images
vision, # Analyzes images
tts, # Speaks descriptions
transport.output()
]){ .api }
from pipecat.services.openai import OpenAILLMService, OpenAIImageGenService
import aiohttp
# Initialize services
llm = OpenAILLMService(api_key="...")
async with aiohttp.ClientSession() as session:
image_gen = OpenAIImageGenService(
api_key="...",
aiohttp_session=session,
image_size="1024x1024",
model="dall-e-3"
)
# LLM generates image descriptions, image_gen creates images
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
llm, # Responds with image description
image_gen, # Generates image from description
transport.output()
])
# Example conversation:
# User: "Create an image of a sunset over mountains"
# LLM: "A beautiful sunset over snow-capped mountains"
# ImageGen: Creates and sends image{ .api }
from pipecat.services.moondream import MoondreamService
from pipecat.services.openai import OpenAILLMService
vision = MoondreamService()
llm = OpenAILLMService(api_key="...")
# Create branches for different input types
pipeline = Pipeline([
transport.input(),
ParallelPipeline(
# Text branch
[stt, context_aggregator.user(), llm],
# Image branch
[vision]
),
tts,
transport.output()
])
# Handles both text questions and image analysis{ .api }
import aiohttp
from pipecat.services.openai import OpenAIImageGenService
async with aiohttp.ClientSession() as session:
image_gen = OpenAIImageGenService(
api_key="...",
aiohttp_session=session,
image_size="1024x1024"
)
async def generate_with_retry(prompt: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
async for frame in image_gen.run_image_gen(prompt):
if isinstance(frame, URLImageRawFrame):
return frame
elif isinstance(frame, ErrorFrame):
logger.error(f"Generation error: {frame.error}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise{ .api }
from pipecat.services.moondream import MoondreamService
from pipecat.frames.frames import UserImageRawFrame
vision = MoondreamService()
# Ask specific questions about images
questions = [
"How many people are in this image?",
"What colors are dominant?",
"What is the mood of this scene?"
]
for question in questions:
frame = UserImageRawFrame(
image=image_bytes,
size=image_size,
format="RGB",
text=question
)
async for result in vision.run_vision(frame):
if isinstance(result, VisionTextFrame):
print(f"Q: {question}")
print(f"A: {result.text}\n"){ .api }
import aiohttp
from pipecat.services.openai import OpenAIImageGenService
async with aiohttp.ClientSession() as session:
# Square images
square_gen = OpenAIImageGenService(
api_key="...",
aiohttp_session=session,
image_size="1024x1024",
model="dall-e-3"
)
# Landscape images
landscape_gen = OpenAIImageGenService(
api_key="...",
aiohttp_session=session,
image_size="1792x1024",
model="dall-e-3"
)
# Portrait images
portrait_gen = OpenAIImageGenService(
api_key="...",
aiohttp_session=session,
image_size="1024x1792",
model="dall-e-3"
)
# Select based on context
def get_image_gen(orientation: str):
if orientation == "landscape":
return landscape_gen
elif orientation == "portrait":
return portrait_gen
else:
return square_gen{ .api }
# Good: Load model once and reuse
vision = MoondreamService(use_cpu=False) # Use GPU if available
# Bad: Creating new service for each request
# Don't do this:
async def analyze_image(image):
vision = MoondreamService() # Reloads model every time
...{ .api }
from PIL import Image
import io
# Good: Convert to correct format
def prepare_image(image_data: bytes) -> UserImageRawFrame:
image = Image.open(io.BytesIO(image_data))
if image.mode != "RGB":
image = image.convert("RGB")
return UserImageRawFrame(
image=image.tobytes(),
size=image.size,
format=image.mode
)
# Bad: Assuming format
# Don't send RGBA or other formats without conversion{ .api }
# Good: Use appropriate image sizes
image_gen = OpenAIImageGenService(
api_key="...",
aiohttp_session=session,
image_size="1024x1024", # Cheaper than 1792x1024
model="dall-e-3"
)
# Good: Cache generated images
generated_images = {}
async def get_or_generate_image(prompt: str):
if prompt in generated_images:
return generated_images[prompt]
async for frame in image_gen.run_image_gen(prompt):
if isinstance(frame, URLImageRawFrame):
generated_images[prompt] = frame
return frame{ .api }
# Good: Handle vision errors
async def safe_vision_analysis(image_frame: UserImageRawFrame):
try:
async for frame in vision.run_vision(image_frame):
if isinstance(frame, ErrorFrame):
logger.error(f"Vision error: {frame.error}")
return "Unable to analyze image"
elif isinstance(frame, VisionTextFrame):
return frame.text
except Exception as e:
logger.error(f"Vision exception: {e}")
return "Image analysis failed"
# Bad: Ignoring errors
# Don't let vision errors crash the pipeline{ .api }
# Good: Provide context in questions
frame = UserImageRawFrame(
image=image_bytes,
size=image_size,
format="RGB",
text="Looking at this product image, what are its key features?"
)
# Better than generic:
# text="Describe this image" # Too vague
# Good: Chain vision with LLM
async def vision_with_followup(image_frame):
# Get vision description
description = await get_vision_description(image_frame)
# Use in LLM context
context.add_message({
"role": "system",
"content": f"Image description: {description}"
})
# LLM can now reason about the image{ .api }
from pipecat.frames.frames import ErrorFrame, URLImageRawFrame
# Good: Track generation metrics
generation_stats = {
"total": 0,
"successful": 0,
"failed": 0
}
async def monitored_generation(prompt: str):
generation_stats["total"] += 1
async for frame in image_gen.run_image_gen(prompt):
if isinstance(frame, URLImageRawFrame):
generation_stats["successful"] += 1
logger.info(f"Generated: {frame.url}")
yield frame
elif isinstance(frame, ErrorFrame):
generation_stats["failed"] += 1
logger.error(f"Failed: {frame.error}")
yield frame
# Log stats periodically
logger.info(f"Generation stats: {generation_stats}")