or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

vision-image-services.mddocs/services/

Vision and Image Generation Services

Pipecat provides vision services for analyzing images and image generation services for creating images from text prompts. These services enable multimodal conversational experiences.

Vision Services

VisionService (Base Class)

{ .api }
from pipecat.services.vision_service import VisionService

class VisionService(AIService):
    """Base class for vision services.

    Provides common functionality for vision services that process images and
    generate textual responses. Handles image frame processing and integrates
    with the AI service infrastructure for metrics and lifecycle management.

    Key Features:
        - Image analysis and description
        - Visual question answering
        - Object detection and recognition
        - Automatic metrics tracking
        - Frame processing integration

    Frames Consumed:
        - UserImageRawFrame: Images with optional text prompts

    Frames Produced:
        - VisionFullResponseStartFrame: Marks vision response start
        - VisionTextFrame: Vision analysis text
        - VisionFullResponseEndFrame: Marks vision response end

    Example:
        class CustomVisionService(VisionService):
            async def run_vision(self, frame: UserImageRawFrame):
                # Analyze image
                description = await analyze_image(frame.image)
                yield VisionFullResponseStartFrame()
                yield VisionTextFrame(text=description)
                yield VisionFullResponseEndFrame()
    """

    def __init__(self, **kwargs):
        """Initialize the vision service.

        Args:
            **kwargs: Additional arguments passed to the parent AIService
        """
        pass

    @abstractmethod
    async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
        """Process the given vision image and generate results.

        This method must be implemented by subclasses to provide actual computer
        vision functionality such as image description, object detection, or
        visual question answering.

        Args:
            frame: The image frame to process

        Yields:
            Frame: Frames containing the vision analysis results, typically TextFrame
                objects with descriptions or answers
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process frames, handling vision image frames for analysis.

        Automatically processes UserImageRawFrame objects by calling run_vision
        and handles metrics tracking. Other frames are passed through unchanged.

        Args:
            frame: The frame to process
            direction: The direction of frame processing
        """
        pass

MoondreamService

{ .api }
from pipecat.services.moondream import MoondreamService

class MoondreamService(VisionService):
    """Moondream vision-language model service.

    Provides image analysis and description generation using the Moondream
    vision-language model. Supports various hardware acceleration options
    including CUDA, MPS, and Intel XPU.

    Features:
        - Local model execution (no API calls)
        - Hardware acceleration support
        - Visual question answering
        - Image description generation
        - Efficient model loading

    Hardware Support:
        - CUDA (NVIDIA GPUs)
        - MPS (Apple Silicon)
        - Intel XPU
        - CPU fallback

    Example:
        from pipecat.services.moondream import MoondreamService

        vision = MoondreamService(
            model="vikhyatk/moondream2",
            revision="2025-01-09",
            use_cpu=False  # Use hardware acceleration if available
        )

        # Create image frame
        from pipecat.frames.frames import UserImageRawFrame

        image_frame = UserImageRawFrame(
            image=image_bytes,
            size=(width, height),
            format="RGB",
            text="What's in this image?"  # Optional question
        )

        # Process through pipeline
        pipeline = Pipeline([
            vision,  # Analyzes image
            tts,
            transport.output()
        ])
    """

    def __init__(
        self,
        *,
        model: str = "vikhyatk/moondream2",
        revision: str = "2025-01-09",
        use_cpu: bool = False,
        **kwargs
    ):
        """Initialize the Moondream service.

        Args:
            model: Hugging Face model identifier for the Moondream model
            revision: Specific model revision to use
            use_cpu: Whether to force CPU usage instead of hardware acceleration
            **kwargs: Additional arguments passed to the parent VisionService
        """
        pass

    async def run_vision(self, frame: UserImageRawFrame) -> AsyncGenerator[Frame, None]:
        """Analyze an image and generate a description.

        Args:
            frame: The image frame to process

        Yields:
            VisionFullResponseStartFrame: Marks response start
            VisionTextFrame: Description or answer text
            VisionFullResponseEndFrame: Marks response end
        """
        pass

Image Generation Services

ImageGenService (Base Class)

{ .api }
from pipecat.services.image_service import ImageGenService

class ImageGenService(AIService):
    """Base class for image generation services.

    Processes TextFrames by using their content as prompts for image generation.
    Subclasses must implement the run_image_gen method to provide actual image
    generation functionality using their specific AI service.

    Key Features:
        - Text-to-image generation
        - Prompt processing
        - Image frame creation
        - Automatic metrics tracking

    Frames Consumed:
        - TextFrame: Text prompts for image generation

    Frames Produced:
        - ImageRawFrame: Generated images
        - URLImageRawFrame: Images with URLs
        - ErrorFrame: Generation errors

    Example:
        class CustomImageGenService(ImageGenService):
            async def run_image_gen(self, prompt: str):
                # Generate image from prompt
                image = await generate_image(prompt)
                yield ImageRawFrame(
                    image=image_bytes,
                    size=(width, height),
                    format="PNG"
                )
    """

    def __init__(self, **kwargs):
        """Initialize the image generation service.

        Args:
            **kwargs: Additional arguments passed to the parent AIService
        """
        pass

    @abstractmethod
    async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
        """Generate an image from a text prompt.

        This method must be implemented by subclasses to provide actual image
        generation functionality using their specific AI service.

        Args:
            prompt: The text prompt to generate an image from

        Yields:
            Frame: Frames containing the generated image (typically ImageRawFrame
                or URLImageRawFrame)
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process frames for image generation.

        TextFrames are used as prompts for image generation, while other frames
        are passed through unchanged.

        Args:
            frame: The frame to process
            direction: The direction of frame processing
        """
        pass

OpenAIImageGenService

{ .api }
from pipecat.services.openai import OpenAIImageGenService

class OpenAIImageGenService(ImageGenService):
    """OpenAI DALL-E image generation service.

    Provides image generation capabilities using OpenAI's DALL-E models.
    Supports various image sizes and can generate images from text prompts
    with configurable quality and style parameters.

    Supported Models:
        - dall-e-3: Latest DALL-E model with improved quality
        - dall-e-2: Previous generation model

    Supported Sizes (DALL-E 3):
        - 1024x1024: Square format
        - 1792x1024: Landscape format
        - 1024x1792: Portrait format

    Example:
        from pipecat.services.openai import OpenAIImageGenService
        import aiohttp

        async with aiohttp.ClientSession() as session:
            image_gen = OpenAIImageGenService(
                api_key="sk-...",
                aiohttp_session=session,
                image_size="1024x1024",
                model="dall-e-3"
            )

            # Generate image from text
            pipeline = Pipeline([
                llm,  # Generates image description
                image_gen,  # Creates image
                transport.output()
            ])
    """

    def __init__(
        self,
        *,
        api_key: str,
        base_url: Optional[str] = None,
        aiohttp_session: aiohttp.ClientSession,
        image_size: Literal["256x256", "512x512", "1024x1024", "1792x1024", "1024x1792"],
        model: str = "dall-e-3",
    ):
        """Initialize the OpenAI image generation service.

        Args:
            api_key: OpenAI API key for authentication
            base_url: Custom base URL for OpenAI API. If None, uses default
            aiohttp_session: HTTP session for downloading generated images
            image_size: Target size for generated images
            model: DALL-E model to use for generation. Defaults to "dall-e-3"
        """
        pass

    async def run_image_gen(self, prompt: str) -> AsyncGenerator[Frame, None]:
        """Generate an image from a text prompt using OpenAI's DALL-E.

        Args:
            prompt: Text description of the image to generate

        Yields:
            URLImageRawFrame: Frame containing the generated image data and URL
        """
        pass

AzureImageGenService

{ .api }
from pipecat.services.azure import AzureImageGenService

class AzureImageGenService(ImageGenService):
    """Azure OpenAI DALL-E image generation service.

    Provides image generation using Azure's OpenAI DALL-E deployment.

    Example:
        import aiohttp

        async with aiohttp.ClientSession() as session:
            image_gen = AzureImageGenService(
                api_key="...",
                endpoint="https://your-resource.openai.azure.com",
                aiohttp_session=session,
                image_size="1024x1024",
                model="dall-e-3"
            )
    """

    def __init__(
        self,
        *,
        api_key: str,
        endpoint: str,
        aiohttp_session: aiohttp.ClientSession,
        image_size: str,
        model: str = "dall-e-3",
        **kwargs
    ):
        """Initialize the Azure image generation service.

        Args:
            api_key: Azure OpenAI API key
            endpoint: Azure OpenAI endpoint URL
            aiohttp_session: HTTP session for downloading images
            image_size: Target image size
            model: DALL-E model name
            **kwargs: Additional arguments
        """
        pass

GoogleImageGenService

{ .api }
from pipecat.services.google import GoogleImageGenService

class GoogleImageGenService(ImageGenService):
    """Google Imagen image generation service.

    Provides image generation using Google's Imagen model.

    Example:
        import aiohttp

        async with aiohttp.ClientSession() as session:
            image_gen = GoogleImageGenService(
                credentials="path/to/credentials.json",
                aiohttp_session=session,
                image_size="1024x1024"
            )
    """

    def __init__(
        self,
        *,
        credentials: Union[str, Dict],
        aiohttp_session: aiohttp.ClientSession,
        image_size: str,
        **kwargs
    ):
        """Initialize the Google image generation service.

        Args:
            credentials: Path to credentials JSON or credentials dict
            aiohttp_session: HTTP session for downloading images
            image_size: Target image size
            **kwargs: Additional arguments
        """
        pass

FalImageGenService

{ .api }
from pipecat.services.fal import FalImageGenService

class FalImageGenService(ImageGenService):
    """Fal AI image generation service.

    Provides image generation using Fal AI's models including Flux and SDXL.

    Supported Models:
        - fal-ai/flux/dev: Flux development model
        - fal-ai/flux-pro: Flux professional model
        - fal-ai/stable-diffusion-xl: SDXL model

    Example:
        import aiohttp

        async with aiohttp.ClientSession() as session:
            image_gen = FalImageGenService(
                api_key="...",
                aiohttp_session=session,
                image_size="landscape_4_3",
                model="fal-ai/flux/dev"
            )
    """

    def __init__(
        self,
        *,
        api_key: str,
        aiohttp_session: aiohttp.ClientSession,
        image_size: str,
        model: str = "fal-ai/flux/dev",
        **kwargs
    ):
        """Initialize the Fal image generation service.

        Args:
            api_key: Fal AI API key
            aiohttp_session: HTTP session for downloading images
            image_size: Target image size (e.g., "landscape_4_3", "square")
            model: Fal AI model identifier
            **kwargs: Additional arguments
        """
        pass

Learn more: Fal AI Documentation

TogetherImageGenService

{ .api }
from pipecat.services.together import TogetherImageGenService

class TogetherImageGenService(ImageGenService):
    """Together AI image generation service.

    Fast image generation using Together AI's infrastructure with
    support for Stable Diffusion and other open-source models.

    Supported Models:
        - stabilityai/stable-diffusion-xl-base-1.0
        - stabilityai/stable-diffusion-2-1
        - prompthero/openjourney
        - And many more open-source models

    Example:
        import aiohttp

        async with aiohttp.ClientSession() as session:
            image_gen = TogetherImageGenService(
                api_key="your-api-key",
                aiohttp_session=session,
                model="stabilityai/stable-diffusion-xl-base-1.0",
                image_size="1024x1024"
            )

            # Generate image from text
            async for frame in image_gen.run_image_gen("A beautiful sunset"):
                if isinstance(frame, URLImageRawFrame):
                    print(f"Generated: {frame.url}")
    """

    def __init__(
        self,
        *,
        api_key: str,
        aiohttp_session: aiohttp.ClientSession,
        model: str,
        image_size: str = "1024x1024",
        **kwargs
    ):
        """Initialize the Together AI image generation service.

        Args:
            api_key: Together AI API key
            aiohttp_session: HTTP session for downloading images
            model: Model identifier
            image_size: Target image size
            **kwargs: Additional arguments
        """
        pass

Learn more: Together AI Documentation

MCP Service

MCPService

{ .api }
from pipecat.services.mcp_service import MCPService

class MCPService:
    """Model Context Protocol (MCP) service integration.

    Provides integration with MCP servers for extended context and capabilities.
    MCP allows AI models to access external tools, resources, and context through
    a standardized protocol.

    Features:
        - Tool invocation through MCP
        - Resource access
        - Context management
        - Multi-server support

    Example:
        from pipecat.services.mcp_service import MCPService

        mcp = MCPService(
            server_url="http://localhost:8080",
            api_key="..."
        )

        # Use with LLM for extended capabilities
        llm.register_mcp_service(mcp)
    """

    def __init__(
        self,
        server_url: str,
        api_key: Optional[str] = None,
        **kwargs
    ):
        """Initialize the MCP service.

        Args:
            server_url: URL of the MCP server
            api_key: Optional API key for authentication
            **kwargs: Additional configuration
        """
        pass

Avatar and Video Services

TavusService

{ .api }
from pipecat.services.tavus import TavusService

class TavusService(AIService):
    """Tavus AI avatar service.

    Generate AI-powered video avatars for conversational applications.
    Supports real-time avatar generation and video synthesis.

    Args:
        api_key: Tavus API key
        persona_id: Tavus persona/avatar identifier
        params: Avatar configuration parameters

    Example:
        tavus = TavusService(
            api_key="your-api-key",
            persona_id="persona-123",
            params={
                "voice_id": "voice-456",
                "video_quality": "high"
            }
        )
    """

    def __init__(
        self,
        api_key: str,
        persona_id: str,
        params: Optional[Dict] = None,
        **kwargs
    ):
        pass

HeyGenService

{ .api }
from pipecat.services.heygen import HeyGenService

class HeyGenService(AIService):
    """HeyGen AI avatar service.

    Create realistic AI avatars with synchronized speech and gestures
    for video content and real-time interactions.

    Args:
        api_key: HeyGen API key
        avatar_id: HeyGen avatar identifier
        voice_id: Voice identifier for the avatar
        params: Avatar configuration parameters

    Example:
        heygen = HeyGenService(
            api_key="your-api-key",
            avatar_id="avatar-123",
            voice_id="voice-456",
            params={
                "quality": "high",
                "background": "office"
            }
        )
    """

    def __init__(
        self,
        api_key: str,
        avatar_id: str,
        voice_id: Optional[str] = None,
        params: Optional[Dict] = None,
        **kwargs
    ):
        pass

SimliService

{ .api }
from pipecat.services.simli import SimliService

class SimliService(AIService):
    """Simli AI avatar service.

    Real-time AI avatar generation with lip-sync and facial animations
    for interactive conversational experiences.

    Args:
        api_key: Simli API key
        face_id: Face/avatar identifier
        params: Avatar configuration parameters

    Example:
        simli = SimliService(
            api_key="your-api-key",
            face_id="face-123",
            params={
                "quality": "hd",
                "fps": 30
            }
        )
    """

    def __init__(
        self,
        api_key: str,
        face_id: str,
        params: Optional[Dict] = None,
        **kwargs
    ):
        pass

Additional Services

AsyncAIService

{ .api }
from pipecat.services.asyncai import AsyncAIService

class AsyncAIService(AIService):
    """AsyncAI service integration.

    Asynchronous AI service for various AI processing tasks.

    Args:
        api_key: AsyncAI API key
        params: Service configuration parameters

    Example:
        async_ai = AsyncAIService(
            api_key="your-api-key",
            params={"model": "default"}
        )
    """

    def __init__(
        self,
        api_key: str,
        params: Optional[Dict] = None,
        **kwargs
    ):
        pass

HathoraService

{ .api }
from pipecat.services.hathora import HathoraService

class HathoraService(AIService):
    """Hathora service integration.

    Game backend and multiplayer infrastructure service for real-time
    applications and AI-powered gaming experiences.

    Args:
        api_key: Hathora API key
        app_id: Hathora application identifier
        params: Service configuration parameters

    Example:
        hathora = HathoraService(
            api_key="your-api-key",
            app_id="app-123",
            params={"region": "us-west"}
        )
    """

    def __init__(
        self,
        api_key: str,
        app_id: str,
        params: Optional[Dict] = None,
        **kwargs
    ):
        pass

Mem0Service

{ .api }
from pipecat.services.mem0 import Mem0Service

class Mem0Service(AIService):
    """Mem0 memory service integration.

    Persistent memory and context management for AI applications.
    Provides long-term memory storage and retrieval for conversational AI.

    Args:
        api_key: Mem0 API key
        user_id: User identifier for memory isolation
        params: Memory configuration parameters

    Example:
        mem0 = Mem0Service(
            api_key="your-api-key",
            user_id="user-123",
            params={
                "memory_type": "conversational",
                "max_memories": 1000
            }
        )
    """

    def __init__(
        self,
        api_key: str,
        user_id: Optional[str] = None,
        params: Optional[Dict] = None,
        **kwargs
    ):
        pass

Usage Patterns

Vision with User Images

{ .api }
from pipecat.services.moondream import MoondreamService
from pipecat.frames.frames import UserImageRawFrame
from PIL import Image
import io

# Initialize vision service
vision = MoondreamService()

# Process user-uploaded image
async def process_user_image(image_bytes: bytes, question: str):
    # Create image frame
    image = Image.open(io.BytesIO(image_bytes))
    frame = UserImageRawFrame(
        image=image.tobytes(),
        size=image.size,
        format=image.format,
        text=question  # "What's in this image?"
    )

    # Process through pipeline
    async for result_frame in vision.run_vision(frame):
        if isinstance(result_frame, VisionTextFrame):
            print(f"Vision: {result_frame.text}")

# Use in pipeline
pipeline = Pipeline([
    transport.input(),  # Receives images
    vision,  # Analyzes images
    tts,     # Speaks descriptions
    transport.output()
])

Image Generation from LLM

{ .api }
from pipecat.services.openai import OpenAILLMService, OpenAIImageGenService
import aiohttp

# Initialize services
llm = OpenAILLMService(api_key="...")

async with aiohttp.ClientSession() as session:
    image_gen = OpenAIImageGenService(
        api_key="...",
        aiohttp_session=session,
        image_size="1024x1024",
        model="dall-e-3"
    )

    # LLM generates image descriptions, image_gen creates images
    pipeline = Pipeline([
        transport.input(),
        stt,
        context_aggregator.user(),
        llm,  # Responds with image description
        image_gen,  # Generates image from description
        transport.output()
    ])

# Example conversation:
# User: "Create an image of a sunset over mountains"
# LLM: "A beautiful sunset over snow-capped mountains"
# ImageGen: Creates and sends image

Multi-Modal Conversation

{ .api }
from pipecat.services.moondream import MoondreamService
from pipecat.services.openai import OpenAILLMService

vision = MoondreamService()
llm = OpenAILLMService(api_key="...")

# Create branches for different input types
pipeline = Pipeline([
    transport.input(),
    ParallelPipeline(
        # Text branch
        [stt, context_aggregator.user(), llm],
        # Image branch
        [vision]
    ),
    tts,
    transport.output()
])

# Handles both text questions and image analysis

Image Generation with Retry

{ .api }
import aiohttp
from pipecat.services.openai import OpenAIImageGenService

async with aiohttp.ClientSession() as session:
    image_gen = OpenAIImageGenService(
        api_key="...",
        aiohttp_session=session,
        image_size="1024x1024"
    )

    async def generate_with_retry(prompt: str, max_retries: int = 3):
        for attempt in range(max_retries):
            try:
                async for frame in image_gen.run_image_gen(prompt):
                    if isinstance(frame, URLImageRawFrame):
                        return frame
                    elif isinstance(frame, ErrorFrame):
                        logger.error(f"Generation error: {frame.error}")
                        if attempt < max_retries - 1:
                            await asyncio.sleep(2 ** attempt)  # Exponential backoff
            except Exception as e:
                logger.error(f"Attempt {attempt + 1} failed: {e}")
                if attempt == max_retries - 1:
                    raise

Vision Question Answering

{ .api }
from pipecat.services.moondream import MoondreamService
from pipecat.frames.frames import UserImageRawFrame

vision = MoondreamService()

# Ask specific questions about images
questions = [
    "How many people are in this image?",
    "What colors are dominant?",
    "What is the mood of this scene?"
]

for question in questions:
    frame = UserImageRawFrame(
        image=image_bytes,
        size=image_size,
        format="RGB",
        text=question
    )

    async for result in vision.run_vision(frame):
        if isinstance(result, VisionTextFrame):
            print(f"Q: {question}")
            print(f"A: {result.text}\n")

Image Generation Size Selection

{ .api }
import aiohttp
from pipecat.services.openai import OpenAIImageGenService

async with aiohttp.ClientSession() as session:
    # Square images
    square_gen = OpenAIImageGenService(
        api_key="...",
        aiohttp_session=session,
        image_size="1024x1024",
        model="dall-e-3"
    )

    # Landscape images
    landscape_gen = OpenAIImageGenService(
        api_key="...",
        aiohttp_session=session,
        image_size="1792x1024",
        model="dall-e-3"
    )

    # Portrait images
    portrait_gen = OpenAIImageGenService(
        api_key="...",
        aiohttp_session=session,
        image_size="1024x1792",
        model="dall-e-3"
    )

    # Select based on context
    def get_image_gen(orientation: str):
        if orientation == "landscape":
            return landscape_gen
        elif orientation == "portrait":
            return portrait_gen
        else:
            return square_gen

Best Practices

Optimize Vision Model Loading

{ .api }
# Good: Load model once and reuse
vision = MoondreamService(use_cpu=False)  # Use GPU if available

# Bad: Creating new service for each request
# Don't do this:
async def analyze_image(image):
    vision = MoondreamService()  # Reloads model every time
    ...

Handle Image Formats Properly

{ .api }
from PIL import Image
import io

# Good: Convert to correct format
def prepare_image(image_data: bytes) -> UserImageRawFrame:
    image = Image.open(io.BytesIO(image_data))
    if image.mode != "RGB":
        image = image.convert("RGB")

    return UserImageRawFrame(
        image=image.tobytes(),
        size=image.size,
        format=image.mode
    )

# Bad: Assuming format
# Don't send RGBA or other formats without conversion

Manage Image Generation Costs

{ .api }
# Good: Use appropriate image sizes
image_gen = OpenAIImageGenService(
    api_key="...",
    aiohttp_session=session,
    image_size="1024x1024",  # Cheaper than 1792x1024
    model="dall-e-3"
)

# Good: Cache generated images
generated_images = {}

async def get_or_generate_image(prompt: str):
    if prompt in generated_images:
        return generated_images[prompt]

    async for frame in image_gen.run_image_gen(prompt):
        if isinstance(frame, URLImageRawFrame):
            generated_images[prompt] = frame
            return frame

Handle Vision Errors Gracefully

{ .api }
# Good: Handle vision errors
async def safe_vision_analysis(image_frame: UserImageRawFrame):
    try:
        async for frame in vision.run_vision(image_frame):
            if isinstance(frame, ErrorFrame):
                logger.error(f"Vision error: {frame.error}")
                return "Unable to analyze image"
            elif isinstance(frame, VisionTextFrame):
                return frame.text
    except Exception as e:
        logger.error(f"Vision exception: {e}")
        return "Image analysis failed"

# Bad: Ignoring errors
# Don't let vision errors crash the pipeline

Use Vision with Context

{ .api }
# Good: Provide context in questions
frame = UserImageRawFrame(
    image=image_bytes,
    size=image_size,
    format="RGB",
    text="Looking at this product image, what are its key features?"
)

# Better than generic:
# text="Describe this image"  # Too vague

# Good: Chain vision with LLM
async def vision_with_followup(image_frame):
    # Get vision description
    description = await get_vision_description(image_frame)

    # Use in LLM context
    context.add_message({
        "role": "system",
        "content": f"Image description: {description}"
    })

    # LLM can now reason about the image

Monitor Image Generation

{ .api }
from pipecat.frames.frames import ErrorFrame, URLImageRawFrame

# Good: Track generation metrics
generation_stats = {
    "total": 0,
    "successful": 0,
    "failed": 0
}

async def monitored_generation(prompt: str):
    generation_stats["total"] += 1

    async for frame in image_gen.run_image_gen(prompt):
        if isinstance(frame, URLImageRawFrame):
            generation_stats["successful"] += 1
            logger.info(f"Generated: {frame.url}")
            yield frame
        elif isinstance(frame, ErrorFrame):
            generation_stats["failed"] += 1
            logger.error(f"Failed: {frame.error}")
            yield frame

# Log stats periodically
logger.info(f"Generation stats: {generation_stats}")