or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

pipeline.mddocs/

Pipeline

Pipelines chain processors together to create complete frame processing workflows. They manage processor linking, frame routing, task execution, and lifecycle management.

Pipeline Types

Pipeline

{ .api }
from pipecat.pipeline.pipeline import Pipeline

class Pipeline(BasePipeline):
    """Sequential processor chain.

    Frames flow through processors in order. Each processor receives frames
    from the previous processor and passes them to the next.

    Flow: Input -> P1 -> P2 -> P3 -> Output

    Args:
        processors (List[FrameProcessor]): Ordered list of processors
        name (Optional[str]): Pipeline name for debugging
        enable_direct_mode (bool): Enable optimized direct frame passing

    Example:
        pipeline = Pipeline([
            transport.input(),
            stt_service,
            user_aggregator,
            llm_service,
            tts_service,
            transport.output()
        ])
    """

    def __init__(
        self,
        processors: List[FrameProcessor],
        name: Optional[str] = None,
        enable_direct_mode: bool = False
    ):
        """Initialize pipeline.

        Args:
            processors: Ordered list of processors
            name: Optional pipeline name
            enable_direct_mode: Use optimized frame passing
        """
        pass

    async def run(self):
        """Run the pipeline until completion.

        Starts all processors and processes frames until EndFrame received.
        """
        pass

    async def queue_frames(self, frames: List[Frame]):
        """Queue frames for processing.

        Args:
            frames: List of frames to queue
        """
        pass

    def get_processors(self) -> List[FrameProcessor]:
        """Get all processors in the pipeline.

        Returns:
            List of processors in order
        """
        pass

    def get_metrics(self) -> Dict[str, Any]:
        """Get pipeline metrics.

        Returns:
            Dictionary of metrics data
        """
        pass

ParallelPipeline

{ .api }
from pipecat.pipeline.parallel_pipeline import ParallelPipeline

class ParallelPipeline(BasePipeline):
    """Parallel processor execution.

    All processors receive the same input frames simultaneously.
    Outputs from all processors are merged.

    Flow: Input -> [P1, P2, P3] -> Merged Output

    Args:
        processors (List[FrameProcessor]): Processors to run in parallel
        name (Optional[str]): Pipeline name

    Example:
        # All processors receive same frames
        parallel = ParallelPipeline([
            tts_service_1,      # Generate TTS with service 1
            tts_service_2,      # Generate TTS with service 2
            metrics_logger      # Log metrics
        ])

    Use Cases:
    - A/B testing multiple services
    - Parallel processing for speed
    - Monitoring/logging alongside main flow
    """

    def __init__(
        self,
        processors: List[FrameProcessor],
        name: Optional[str] = None
    ):
        """Initialize parallel pipeline.

        Args:
            processors: Processors to run in parallel
            name: Optional pipeline name
        """
        pass

SyncParallelPipeline

{ .api }
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline

class SyncParallelPipeline(BasePipeline):
    """Synchronized parallel processor execution.

    All processors run in parallel, but the pipeline waits for ALL
    to complete before continuing. Ensures synchronization.

    Flow: Input -> [P1, P2, P3] -> Wait All -> Output

    Args:
        processors (List[FrameProcessor]): Processors to run in parallel
        name (Optional[str]): Pipeline name

    Example:
        # Wait for all to complete
        sync = SyncParallelPipeline([
            metrics_processor,     # Collect metrics
            logger_processor,      # Log frames
            audit_processor        # Audit trail
        ])

    Use Cases:
    - Side effects that must complete
    - Multiple dependent operations
    - Synchronized checkpoints
    """

    def __init__(
        self,
        processors: List[FrameProcessor],
        name: Optional[str] = None
    ):
        """Initialize synchronized parallel pipeline.

        Args:
            processors: Processors to run in parallel
            name: Optional pipeline name
        """
        pass

Pipeline Components

PipelineSource

{ .api }
from pipecat.pipeline.pipeline import PipelineSource

class PipelineSource(FrameProcessor):
    """Pipeline entry point that forwards frames to an upstream handler.

    This processor acts as the entry point for a pipeline, forwarding
    downstream frames to the next processor and upstream frames to a
    provided upstream handler function.

    Args:
        upstream_push_frame: Coroutine function to handle upstream frames
        **kwargs: Additional arguments passed to FrameProcessor

    Example:
        # Create a source with upstream handler
        async def handle_upstream(frame, direction):
            print(f"Upstream frame: {frame}")

        source = PipelineSource(upstream_push_frame=handle_upstream)

        pipeline = Pipeline([source, processor1, processor2])

        # Feed frames downstream
        await source.queue_frame(TextFrame("Hello"))

    Note: upstream_push_frame is a required parameter.

PipelineSink

{ .api }
from pipecat.pipeline.pipeline import PipelineSink

class PipelineSink(FrameProcessor):
    """Pipeline exit point that forwards frames to a downstream handler.

    This processor acts as the exit point for a pipeline, forwarding
    upstream frames to the previous processor and downstream frames to a
    provided downstream handler function.

    Args:
        downstream_push_frame: Coroutine function to handle downstream frames
        **kwargs: Additional arguments passed to FrameProcessor

    Example:
        # Create a sink with downstream handler
        async def handle_downstream(frame, direction):
            print(f"Downstream frame: {frame}")
            # Process or output the frame

        sink = PipelineSink(downstream_push_frame=handle_downstream)
        pipeline = Pipeline([processor1, processor2, sink])

    Note: downstream_push_frame is a required parameter.

Task Management

PipelineTask

{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams

class PipelineTask(BasePipelineTask):
    """Pipeline execution task.

    Manages pipeline lifecycle, frame queuing, and execution. Provides
    control over running, stopping, and cancelling pipelines.

    Args:
        pipeline: Pipeline to execute
        params: Task configuration parameters
        clock: Optional clock for timing

    Example:
        # Create task
        task = PipelineTask(
            pipeline=pipeline,
            params=PipelineParams(
                allow_interruptions=True,
                enable_metrics=True
            )
        )

        # Run task
        await task.run()
    """

    def __init__(
        self,
        pipeline: Pipeline,
        params: Optional[PipelineParams] = None,
        clock: Optional[BaseClock] = None
    ):
        """Initialize pipeline task.

        Args:
            pipeline: Pipeline to execute
            params: Task parameters
            clock: Optional clock
        """
        pass

    async def run(self):
        """Run the pipeline until completion.

        Processes all queued frames and waits for EndFrame.
        Handles interruptions if enabled.

        Example:
            await task.run()
            # Blocks until pipeline completes
        """
        pass

    async def queue_frame(self, frame: Frame):
        """Queue a single frame for processing.

        Args:
            frame: Frame to queue

        Example:
            await task.queue_frame(TextFrame("Hello"))
        """
        pass

    async def queue_frames(self, frames: List[Frame]):
        """Queue multiple frames for processing.

        Args:
            frames: List of frames to queue

        Example:
            frames = [
                TextFrame("Hello"),
                TextFrame("world"),
                EndFrame()
            ]
            await task.queue_frames(frames)
        """
        pass

    async def cancel(self):
        """Cancel the task.

        Stops all processors and clears queues. Faster than normal stop.

        Example:
            await task.cancel()
        """
        pass

    def has_finished(self) -> bool:
        """Check if task has finished.

        Returns:
            True if task completed

        Example:
            if task.has_finished():
                print("Task complete")
        """
        pass

    @property
    def id(self) -> str:
        """Get task ID.

        Returns:
            Unique task identifier
        """
        pass

PipelineParams

{ .api }
from pipecat.pipeline.task import PipelineParams

class PipelineParams:
    """Pipeline task configuration.

    Controls pipeline behavior including interruptions, metrics,
    and performance settings.

    Attributes:
        allow_interruptions (bool): Enable user interruptions
        enable_metrics (bool): Collect performance metrics
        enable_usage_metrics (bool): Collect usage metrics (tokens, etc.)
        report_only_initial_ttfb (bool): Report only first TTFB

    Example:
        params = PipelineParams(
            allow_interruptions=True,   # Allow user to interrupt bot
            enable_metrics=True,         # Collect TTFB metrics
            enable_usage_metrics=True    # Track token usage
        )

        task = PipelineTask(pipeline=pipeline, params=params)
    """

    def __init__(
        self,
        allow_interruptions: bool = True,
        enable_metrics: bool = False,
        enable_usage_metrics: bool = False,
        report_only_initial_ttfb: bool = False
    ):
        """Initialize pipeline parameters.

        Args:
            allow_interruptions: Enable interruptions
            enable_metrics: Enable metrics collection
            enable_usage_metrics: Enable usage metrics
            report_only_initial_ttfb: Report only first TTFB
        """
        pass

PipelineRunner

{ .api }
from pipecat.pipeline.runner import PipelineRunner

class PipelineRunner(BaseObject):
    """Manages pipeline execution lifecycle.

    High-level interface for running pipelines with transports.
    Handles setup, execution, and cleanup.

    Example:
        runner = PipelineRunner()

        await runner.run(
            pipeline=pipeline,
            transport=transport
        )
    """

    async def run(
        self,
        pipeline: Pipeline,
        transport: Optional[BaseTransport] = None
    ):
        """Run pipeline with optional transport.

        Args:
            pipeline: Pipeline to run
            transport: Optional transport for I/O
        """
        pass

Service Switching

ServiceSwitcher

{ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategy

class ServiceSwitcher(ParallelPipeline):
    """Switch between services at runtime.

    Allows dynamic switching between multiple services (e.g., different TTS
    providers) based on a strategy. Useful for A/B testing, fallback, or
    dynamic selection.

    Generic Type: ServiceSwitcher[StrategyType]

    Args:
        services: List of services to switch between
        strategy: Switching strategy
        name: Optional name

    Example:
        # Manual switching
        from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual

        switcher = ServiceSwitcher(
            services=[tts_1, tts_2, tts_3],
            strategy=ServiceSwitcherStrategyManual(default_index=0)
        )

        # Switch services at runtime
        from pipecat.frames.frames import ManuallySwitchServiceFrame

        # Switch to service at index 1
        await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))
    """

    def __init__(
        self,
        services: List[FrameProcessor],
        strategy: ServiceSwitcherStrategy,
        name: Optional[str] = None
    ):
        """Initialize service switcher.

        Args:
            services: List of services
            strategy: Switching strategy
            name: Optional name
        """
        pass

    def set_service(self, index: int):
        """Set active service by index.

        Args:
            index: Service index
        """
        pass

    def get_current_service(self) -> FrameProcessor:
        """Get currently active service.

        Returns:
            Active service processor
        """
        pass

ServiceSwitcherStrategy

{ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategy

class ServiceSwitcherStrategy:
    """Base switching strategy.

    Defines how to select which service to use. Subclass to implement
    custom strategies.

    Example:
        class RoundRobinStrategy(ServiceSwitcherStrategy):
            def __init__(self):
                self._index = 0
                self._count = 0

            def select(self, services: List) -> int:
                index = self._index
                self._index = (self._index + 1) % len(services)
                return index
    """

    def select(self, services: List[FrameProcessor]) -> int:
        """Select service index.

        Args:
            services: List of available services

        Returns:
            Index of service to use
        """
        pass

ServiceSwitcherStrategyManual

{ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual

class ServiceSwitcherStrategyManual(ServiceSwitcherStrategy):
    """Manual service switching.

    Service is selected manually via ManuallySwitchServiceFrame.

    Args:
        default_index: Initial service index

    Example:
        strategy = ServiceSwitcherStrategyManual(default_index=0)

        switcher = ServiceSwitcher(
            services=[service1, service2],
            strategy=strategy
        )

        # Switch via frame
        await task.queue_frame(
            ManuallySwitchServiceFrame(service_index=1)
        )
    """

    def __init__(self, default_index: int = 0):
        """Initialize manual strategy.

        Args:
            default_index: Initial service index
        """
        pass

LLMSwitcher

{ .api }
from pipecat.pipeline.llm_switcher import LLMSwitcher

class LLMSwitcher(ServiceSwitcher):
    """LLM-specific service switcher.

    Extends ServiceSwitcher with LLM-specific functionality:
    - Function registration across all LLMs
    - Direct function registration
    - Out-of-band inference
    - LLM context management

    Args:
        llms: List of LLM services to switch between
        strategy: Switching strategy
        name: Optional name

    Example:
        # Create LLM switcher
        switcher = LLMSwitcher(
            llms=[
                OpenAILLMService(api_key="key1", model="gpt-4"),
                AnthropicLLMService(api_key="key2", model="claude-3-5-sonnet-20241022"),
                GoogleLLMService(api_key="key3", model="gemini-2.0-flash-exp")
            ],
            strategy=ServiceSwitcherStrategyManual(default_index=0)
        )

        # Register function on all LLMs
        switcher.register_function(
            function_name="get_weather",
            handler=get_weather_handler,
            description="Get current weather",
            properties={
                "location": {"type": "string", "description": "City name"}
            }
        )

        # Get active LLM
        current_llm = switcher.active_llm

        # Run inference on specific LLM without switching
        result = await switcher.run_inference(
            llm_index=1,
            messages=[{"role": "user", "content": "Hello"}]
        )
    """

    def __init__(
        self,
        llms: List[LLMService],
        strategy: ServiceSwitcherStrategy,
        name: Optional[str] = None
    ):
        """Initialize LLM switcher.

        Args:
            llms: List of LLM services
            strategy: Switching strategy
            name: Optional name
        """
        pass

    def register_function(
        self,
        function_name: str,
        handler: Callable,
        description: str = "",
        properties: Optional[Dict] = None,
        required: Optional[List[str]] = None
    ):
        """Register function on all LLMs.

        Registers the same function across all LLM services in the switcher.

        Args:
            function_name: Name of the function
            handler: Function implementation
            description: Function description
            properties: Function parameter schema
            required: Required parameters
        """
        pass

    def register_direct_function(
        self,
        function_name: str,
        handler: Callable
    ):
        """Register direct function on all LLMs.

        Args:
            function_name: Name of the function
            handler: Function implementation
        """
        pass

    async def run_inference(
        self,
        llm_index: int,
        messages: List[Dict],
        **kwargs
    ) -> Any:
        """Run inference on specific LLM without switching.

        Allows out-of-band LLM calls without affecting the active service.

        Args:
            llm_index: Index of LLM to use
            messages: Messages to send
            **kwargs: Additional LLM parameters

        Returns:
            LLM response
        """
        pass

    @property
    def llms(self) -> List[LLMService]:
        """Get all LLMs in switcher.

        Returns:
            List of LLM services
        """
        pass

    @property
    def active_llm(self) -> LLMService:
        """Get currently active LLM.

        Returns:
            Active LLM service
        """
        pass

Usage Patterns

Basic Pipeline

{ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask

# Create simple pipeline
pipeline = Pipeline([
    processor1,
    processor2,
    processor3
])

# Create and run task
task = PipelineTask(pipeline)
await task.run()

Voice Agent Pipeline

{ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextAggregatorPair
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.daily import DailyTransport, DailyParams

async def create_voice_agent():
    # Setup context
    context = LLMContext(
        messages=[
            {"role": "system", "content": "You are a helpful voice assistant."}
        ]
    )

    # Create services
    stt = DeepgramSTTService(api_key="deepgram-key")
    llm = OpenAILLMService(api_key="openai-key", model="gpt-4")
    llm.set_context(context)
    tts = OpenAITTSService(api_key="openai-key", voice="alloy")

    # Create aggregators
    aggregators = LLMContextAggregatorPair(context=context)

    # Setup transport
    transport = DailyTransport(
        room_url="https://daily.co/room",
        token="daily-token",
        params=DailyParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_enabled=True
        )
    )

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),     # Receive audio from user
        stt,                   # Transcribe speech
        aggregators.user,      # Aggregate user messages
        llm,                   # Generate response
        aggregators.assistant, # Aggregate assistant messages
        tts,                   # Synthesize speech
        transport.output()     # Send audio to user
    ])

    # Create task with params
    task = PipelineTask(
        pipeline=pipeline,
        params=PipelineParams(
            allow_interruptions=True,
            enable_metrics=True
        )
    )

    # Run
    await task.run()

Parallel Processing

{ .api }
from pipecat.pipeline.parallel_pipeline import ParallelPipeline

# Process with multiple TTS services simultaneously
parallel_tts = ParallelPipeline([
    elevenlabs_tts,
    openai_tts,
    cartesia_tts
])

# All services receive same text, generate audio in parallel
pipeline = Pipeline([
    llm_service,
    parallel_tts,        # Parallel TTS generation
    # Outputs merged
    transport.output()
])

Service Fallback

{ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategyManual

# Create fallback strategy
class FallbackStrategy(ServiceSwitcherStrategy):
    def __init__(self):
        self._current = 0

    def select(self, services):
        return self._current

    def on_error(self):
        """Switch to next service on error."""
        self._current = (self._current + 1) % len(services)

# Setup switcher
fallback_strategy = FallbackStrategy()
switcher = ServiceSwitcher(
    services=[primary_tts, secondary_tts, tertiary_tts],
    strategy=fallback_strategy
)

# On error, switch to next service
@primary_tts.event_handler("on_connection_error")
async def handle_error():
    fallback_strategy.on_error()

LLM Switching

{ .api }
from pipecat.pipeline.llm_switcher import LLMSwitcher
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual

# Create LLM switcher with multiple providers
llm_switcher = LLMSwitcher(
    llms=[
        OpenAILLMService(api_key="key1", model="gpt-4"),
        AnthropicLLMService(api_key="key2", model="claude-3-5-sonnet-20241022"),
        GoogleLLMService(api_key="key3", model="gemini-2.0-flash-exp")
    ],
    strategy=ServiceSwitcherStrategyManual(default_index=0)
)

# Register functions on all LLMs
llm_switcher.register_function(
    function_name="get_weather",
    handler=get_weather_handler,
    description="Get current weather for a location",
    properties={
        "location": {
            "type": "string",
            "description": "City and state, e.g. San Francisco, CA"
        }
    },
    required=["location"]
)

# Use in pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    user_agg,
    llm_switcher,  # All LLMs have same functions
    tts,
    transport.output()
])

# Switch LLM at runtime
await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))

# Or run out-of-band inference
response = await llm_switcher.run_inference(
    llm_index=2,  # Use Gemini
    messages=[{"role": "user", "content": "Quick question"}]
)

Frame Injection

{ .api }
# Queue frames into running pipeline
async def inject_frames():
    # Pipeline is running
    await asyncio.sleep(5)

    # Inject frames
    await task.queue_frames([
        TextFrame("Emergency announcement"),
        TTSSpeakFrame("This is an urgent message")
    ])

# Run injection alongside pipeline
await asyncio.gather(
    task.run(),
    inject_frames()
)

Task Lifecycle Management

{ .api }
import asyncio

async def managed_pipeline():
    # Create task
    task = PipelineTask(pipeline)

    try:
        # Run with timeout
        await asyncio.wait_for(task.run(), timeout=300)  # 5 minute timeout

    except asyncio.TimeoutError:
        print("Pipeline timeout, cancelling...")
        await task.cancel()

    except KeyboardInterrupt:
        print("User interrupt, stopping gracefully...")
        await task.cancel()

    finally:
        if task.has_finished():
            print("Pipeline completed successfully")
        else:
            print("Pipeline did not complete")

Nested Pipelines

{ .api }
# Create sub-pipeline for complex processing
audio_processing_pipeline = Pipeline([
    audio_filter,
    audio_resampler,
    audio_mixer
])

# Use sub-pipeline in main pipeline
main_pipeline = Pipeline([
    transport.input(),
    audio_processing_pipeline,  # Sub-pipeline
    stt_service,
    llm_service,
    tts_service,
    transport.output()
])

Dynamic Pipeline Construction

{ .api }
def build_pipeline(use_vision: bool = False, enable_functions: bool = False):
    """Build pipeline with optional features."""
    processors = [
        transport.input(),
        stt_service,
        user_aggregator
    ]

    # Add vision if enabled
    if use_vision:
        processors.extend([
            vision_service,
            vision_aggregator
        ])

    # Add LLM
    processors.append(llm_service)

    # Add function calling if enabled
    if enable_functions:
        processors.append(function_handler)

    # Complete pipeline
    processors.extend([
        assistant_aggregator,
        tts_service,
        transport.output()
    ])

    return Pipeline(processors)

# Use dynamic pipeline
pipeline = build_pipeline(use_vision=True, enable_functions=True)

Best Practices

Pipeline Organization

{ .api }
# Good: Clear pipeline stages
pipeline = Pipeline([
    # Input stage
    transport.input(),

    # Speech recognition stage
    stt_service,

    # User turn stage
    user_aggregator,

    # LLM stage
    llm_service,

    # Assistant turn stage
    assistant_aggregator,

    # Speech synthesis stage
    tts_service,

    # Output stage
    transport.output()
])

# Bad: Unclear organization
pipeline = Pipeline([
    transport.input(), stt_service, user_aggregator, llm_service,
    assistant_aggregator, tts_service, transport.output()
])

Enable Interruptions

{ .api }
# Good: Enable interruptions for voice agents
task = PipelineTask(
    pipeline=pipeline,
    params=PipelineParams(
        allow_interruptions=True  # User can interrupt bot
    )
)

# Bad: No interruptions (frustrating UX)
task = PipelineTask(pipeline=pipeline)
# User must wait for bot to finish speaking

Handle Lifecycle Properly

{ .api }
# Good: Proper lifecycle handling
async def run_pipeline():
    task = PipelineTask(pipeline)

    try:
        await task.run()
    except Exception as e:
        logger.error(f"Pipeline error: {e}")
        await task.cancel()
    finally:
        # Cleanup
        await transport.stop()

# Bad: No cleanup
async def run_pipeline():
    task = PipelineTask(pipeline)
    await task.run()
    # Resources not cleaned up

Use Metrics for Monitoring

{ .api }
# Good: Enable metrics
task = PipelineTask(
    pipeline=pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True
    )
)

# Monitor metrics
@task.event_handler("on_metrics")
async def handle_metrics(metrics):
    print(f"TTFB: {metrics.ttfb_ms}ms")
    print(f"Tokens: {metrics.tokens_used}")

# Bad: No metrics
task = PipelineTask(pipeline=pipeline)
# No visibility into performance

Name Your Pipelines

{ .api }
# Good: Named pipelines for debugging
pipeline = Pipeline(
    processors=[...],
    name="voice-agent-pipeline"
)

# Easier to debug and monitor
print(f"Running pipeline: {pipeline.name}")

# Bad: Anonymous pipeline
pipeline = Pipeline(processors=[...])