or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

core-concepts.mddocs/

Core Concepts

Frame-Based Architecture

All data in Pipecat flows as Frame objects through processors in a pipeline. This design provides:

  • Unified data model: All data types (audio, text, images, control signals) are frames
  • Priority-based processing: Three frame types with different priorities
  • Interruptibility: Fine-grained control over what can be interrupted
  • Bidirectional flow: Frames can flow downstream (toward output) or upstream (toward input)

Frame Hierarchy

{ .api }
from pipecat.frames.frames import Frame, SystemFrame, DataFrame, ControlFrame

class Frame:
    """Base frame class for all frames.

    All frames inherit from Frame and receive:
    - Unique ID for tracking
    - Name combining class name and instance count
    - PTS (presentation timestamp) in nanoseconds
    - Metadata dictionary
    - Transport source/destination tracking

    Attributes:
        id (int): Unique frame identifier
        name (str): Human-readable frame name
        pts (Optional[int]): Presentation timestamp in nanoseconds
        metadata (Dict[str, Any]): Arbitrary frame metadata
        transport_source (Optional[str]): Source transport name
        transport_destination (Optional[str]): Destination transport name
    """
    pass

class SystemFrame(Frame):
    """High-priority frame for immediate processing.

    SystemFrames:
    - Are processed immediately regardless of queue state
    - Are NOT cancelled by interruptions
    - Are used for time-sensitive events (user speech, errors)
    - Examples: InputAudioRawFrame, UserStartedSpeakingFrame, ErrorFrame
    """
    pass

class DataFrame(Frame):
    """Normal-priority frame for ordered data processing.

    DataFrames:
    - Are processed in order from the queue
    - ARE cancelled by interruptions (unless UninterruptibleFrame)
    - Contain actual data (text, audio, images, LLM context)
    - Examples: TextFrame, OutputAudioRawFrame, LLMMessagesFrame
    """
    pass

class ControlFrame(Frame):
    """Normal-priority frame for ordered control signals.

    ControlFrames:
    - Are processed in order like DataFrames
    - ARE cancelled by interruptions (unless UninterruptibleFrame)
    - Contain control information (settings updates, lifecycle signals)
    - Examples: EndFrame, TTSStartedFrame, ServiceUpdateSettingsFrame
    """
    pass

Uninterruptible Frames

{ .api }
from pipecat.frames.frames import UninterruptibleFrame

class UninterruptibleFrame:
    """Mixin for frames that must not be interrupted.

    Frames with this mixin:
    - Are still ordered normally (DataFrame or ControlFrame)
    - Are NOT cancelled during interruptions
    - Remain in queues and complete processing
    - Are used for critical operations (function calls)

    Example:
        class FunctionCallResultFrame(DataFrame, UninterruptibleFrame):
            # This frame will complete even during interruption
            pass
    """
    pass

Frame Direction

{ .api }
from pipecat.processors.frame_processor import FrameDirection

class FrameDirection(Enum):
    """Direction of frame flow through pipeline.

    Attributes:
        DOWNSTREAM: From input toward output (normal flow)
        UPSTREAM: From output toward input (reverse flow)
    """
    DOWNSTREAM = "downstream"
    UPSTREAM = "upstream"

Pipeline Pattern

Pipelines chain processors together with automatic frame routing.

Sequential Pipeline

{ .api }
from pipecat.pipeline.pipeline import Pipeline

class Pipeline:
    """Sequential processor chain.

    Frames flow through processors in order. Each processor
    can transform, filter, or generate frames.

    Args:
        processors (List[FrameProcessor]): Ordered list of processors
        name (Optional[str]): Pipeline name for debugging
        enable_direct_mode (bool): Enable optimized direct frame passing

    Example:
        pipeline = Pipeline([
            processor1,
            processor2,
            processor3
        ])
    """

    def __init__(
        self,
        processors: List[FrameProcessor],
        name: Optional[str] = None,
        enable_direct_mode: bool = False
    ):
        pass

    async def run(self):
        """Run the pipeline until completion."""
        pass

    async def queue_frames(self, frames: List[Frame]):
        """Queue frames for processing.

        Args:
            frames: List of frames to queue
        """
        pass

    def get_processors(self) -> List[FrameProcessor]:
        """Get all processors in the pipeline.

        Returns:
            List of processors in order
        """
        pass

Parallel Pipeline

{ .api }
from pipecat.pipeline.parallel_pipeline import ParallelPipeline

class ParallelPipeline:
    """Parallel processor execution.

    All processors receive the same input frames simultaneously.
    Outputs from all processors are merged.

    Args:
        processors (List[FrameProcessor]): Processors to run in parallel
        name (Optional[str]): Pipeline name

    Example:
        # All processors receive same frames
        parallel = ParallelPipeline([
            tts_service_1,
            tts_service_2,
            logger
        ])
    """

    def __init__(
        self,
        processors: List[FrameProcessor],
        name: Optional[str] = None
    ):
        pass

Synchronized Parallel Pipeline

{ .api }
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline

class SyncParallelPipeline:
    """Synchronized parallel processor execution.

    All processors run in parallel, but the pipeline waits
    for ALL to complete before continuing.

    Args:
        processors (List[FrameProcessor]): Processors to run in parallel
        name (Optional[str]): Pipeline name

    Example:
        # Wait for all to complete
        sync = SyncParallelPipeline([
            metrics_processor,
            logger_processor
        ])
    """

    def __init__(
        self,
        processors: List[FrameProcessor],
        name: Optional[str] = None
    ):
        pass

Processor Lifecycle

Processor Interface

{ .api }
from pipecat.processors.frame_processor import FrameProcessor

class FrameProcessor:
    """Base class for all frame processors.

    Processors are the building blocks of pipelines. They:
    - Receive frames via process_frame()
    - Transform/filter/generate frames
    - Push frames downstream or upstream
    - Support linking to other processors

    Key Methods:
        process_frame(): Process a single frame
        push_frame(): Send frame to next processor
        queue_frame(): Queue frame for later processing
        start(): Initialize processor
        stop(): Gracefully shutdown
        cancel(): Immediate shutdown
        link(): Connect to downstream processor
        interrupt(): Handle interruption
    """

    def __init__(self, name: Optional[str] = None, **kwargs):
        """Initialize processor.

        Args:
            name: Optional processor name
            **kwargs: Additional configuration
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process a frame.

        Override this method to implement custom processing logic.

        Args:
            frame: The frame to process
            direction: Direction of frame flow (DOWNSTREAM or UPSTREAM)
        """
        # Default: pass frame through
        await self.push_frame(frame, direction)

    async def push_frame(self, frame: Frame, direction: FrameDirection):
        """Push frame to the next processor.

        Args:
            frame: Frame to push
            direction: Direction of frame flow
        """
        pass

    async def queue_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
        """Queue frame for processing.

        Args:
            frame: Frame to queue
            direction: Direction of frame flow
        """
        pass

    async def start(self):
        """Start the processor.

        Called when pipeline starts. Use for initialization.
        """
        pass

    async def stop(self):
        """Stop the processor gracefully.

        Called when pipeline stops normally.
        """
        pass

    async def cancel(self):
        """Cancel the processor immediately.

        Called when pipeline is cancelled or errors.
        """
        pass

    def link(self, processor: "FrameProcessor"):
        """Link to downstream processor.

        Args:
            processor: Downstream processor to link
        """
        pass

    async def interrupt(self):
        """Handle interruption event.

        Called when user interrupts (e.g., starts speaking).
        Cancel current operations and clear queues.
        """
        pass

    @property
    def interruptions_enabled(self) -> bool:
        """Whether processor can be interrupted."""
        return True

    @property
    def name(self) -> str:
        """Processor name."""
        pass

    @property
    def id(self) -> int:
        """Unique processor ID."""
        pass

Lifecycle Hooks

{ .api }
# Processors go through this lifecycle:

# 1. Construction
processor = MyProcessor(config)

# 2. Linking (if in pipeline)
processor.link(next_processor)

# 3. Start
await processor.start()

# 4. Processing (repeated)
await processor.process_frame(frame, direction)

# 5. Interruption (optional, repeated)
await processor.interrupt()

# 6. Stop/Cancel
await processor.stop()  # or cancel()

Event-Driven Model

Pipecat uses an event-driven architecture for monitoring and control.

Event Handlers

{ .api }
from pipecat.utils.base_object import BaseObject, EventHandler

class BaseObject:
    """Base class with event handler support.

    All processors and services inherit from BaseObject.
    """

    def register_event_handler(
        self,
        event_name: str
    ) -> Callable[[Callable], Callable]:
        """Register an event handler (decorator style).

        Args:
            event_name: Name of the event to handle

        Returns:
            Decorator function

        Example:
            @processor.register_event_handler("on_process")
            async def handle_process(event_data):
                print(f"Processed: {event_data}")
        """
        pass

    def event_handler(
        self,
        event_name: str
    ) -> Callable[[Callable], Callable]:
        """Alias for register_event_handler."""
        pass

    async def _emit_event(self, event_name: str, **kwargs):
        """Emit an event to all registered handlers.

        Args:
            event_name: Name of event to emit
            **kwargs: Event data
        """
        pass

Common Events

{ .api }
# LLM Service Events
@llm_service.event_handler("on_completion_timeout")
async def handle_timeout():
    """Called when LLM completion times out."""
    pass

@llm_service.event_handler("on_function_call_start")
async def handle_function_start(function_name: str):
    """Called when LLM function call starts.

    Args:
        function_name: Name of function being called
    """
    pass

@llm_service.event_handler("on_function_call_end")
async def handle_function_end(function_name: str, result: Any):
    """Called when LLM function call completes.

    Args:
        function_name: Name of function that completed
        result: Function return value
    """
    pass

# TTS/STT Service Events
@service.event_handler("on_connected")
async def handle_connected():
    """Called when service connects."""
    pass

@service.event_handler("on_disconnected")
async def handle_disconnected():
    """Called when service disconnects."""
    pass

@service.event_handler("on_connection_error")
async def handle_error(error: Exception):
    """Called on connection error.

    Args:
        error: The error that occurred
    """
    pass

Task Management

Pipeline Tasks

{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams

class PipelineParams:
    """Pipeline task configuration.

    Attributes:
        allow_interruptions (bool): Enable user interruptions
        enable_metrics (bool): Collect metrics
        enable_usage_metrics (bool): Collect usage metrics
        report_only_initial_ttfb (bool): Report only first TTFB
    """

    def __init__(
        self,
        allow_interruptions: bool = True,
        enable_metrics: bool = False,
        enable_usage_metrics: bool = False,
        report_only_initial_ttfb: bool = False
    ):
        pass


class PipelineTask:
    """Pipeline execution task.

    Manages pipeline lifecycle and frame queuing.

    Args:
        pipeline: Pipeline to execute
        params: Task configuration parameters
        clock: Optional clock for timing
    """

    def __init__(
        self,
        pipeline: Pipeline,
        params: Optional[PipelineParams] = None,
        clock: Optional[BaseClock] = None
    ):
        pass

    async def run(self):
        """Run the pipeline until completion.

        Processes all queued frames and waits for EndFrame.
        """
        pass

    async def queue_frame(self, frame: Frame):
        """Queue a single frame for processing.

        Args:
            frame: Frame to queue
        """
        pass

    async def queue_frames(self, frames: List[Frame]):
        """Queue multiple frames for processing.

        Args:
            frames: List of frames to queue
        """
        pass

    async def cancel(self):
        """Cancel the task.

        Stops all processors and clears queues.
        """
        pass

    def has_finished(self) -> bool:
        """Check if task has finished.

        Returns:
            True if task completed
        """
        pass

Interruption Handling

Interruptions occur when the user starts speaking during bot output.

{ .api }
from pipecat.frames.frames import (
    UserStartedSpeakingFrame,
    StartInterruptionFrame,
    BotInterruptionFrame
)

# Interruption flow:
# 1. User starts speaking
#    -> UserStartedSpeakingFrame (SystemFrame)

# 2. Transport/VAD detects speech
#    -> StartInterruptionFrame (SystemFrame)

# 3. Pipeline cancels current output
#    -> BotInterruptionFrame (interrupts bot speech)

# 4. DataFrames/ControlFrames are cancelled
#    (except UninterruptibleFrame)

# 5. SystemFrames continue processing

# Enable interruptions via VAD:
from pipecat.audio.vad.vad_analyzer import VADParams, SileroVADAnalyzer

vad = SileroVADAnalyzer(
    params=VADParams(
        threshold=0.5,
        min_speech_duration_ms=250
    )
)

transport = DailyTransport(
    params=DailyParams(
        vad_enabled=True,
        vad_analyzer=vad
    )
)

Asynchronous Processing

All processing in Pipecat is asynchronous using Python's asyncio.

{ .api }
import asyncio

# Running a pipeline
async def main():
    pipeline = Pipeline([...])
    task = PipelineTask(pipeline)
    await task.run()

asyncio.run(main())

# Concurrent operations
async def process_multiple():
    # All run concurrently
    results = await asyncio.gather(
        task1.run(),
        task2.run(),
        task3.run()
    )

# Task management
from pipecat.utils.asyncio import TaskManager

task_manager = TaskManager()
await task_manager.start_task(my_coroutine())
await task_manager.cancel_all()

Frame Processing Order

Frames are processed according to their type and priority:

  1. SystemFrames: Processed immediately in order received
  2. DataFrames: Processed from queue in order
  3. ControlFrames: Processed from queue in order
  4. Interruptions: Cancel queued DataFrames/ControlFrames (except UninterruptibleFrame)
{ .api }
# Processing example:
# Queue: [TextFrame1, TextFrame2, EndFrame]
# Incoming: UserStartedSpeakingFrame (SystemFrame)

# Result:
# 1. UserStartedSpeakingFrame processes immediately
# 2. TextFrame1, TextFrame2 cancelled
# 3. EndFrame processes (ControlFrame, but completes interruption)

# With UninterruptibleFrame:
# Queue: [FunctionCallResultFrame (UninterruptibleFrame), TextFrame]
# Incoming: UserStartedSpeakingFrame

# Result:
# 1. UserStartedSpeakingFrame processes immediately
# 2. FunctionCallResultFrame completes (not cancelled)
# 3. TextFrame cancelled

Best Practices

Processor Design

{ .api }
class WellDesignedProcessor(FrameProcessor):
    """A well-designed processor follows these patterns."""

    async def process_frame(self, frame, direction):
        # 1. Handle specific frame types
        if isinstance(frame, TextFrame):
            await self._process_text(frame)

        # 2. Always push frames downstream (unless filtering)
        await self.push_frame(frame, direction)

    async def _process_text(self, frame: TextFrame):
        # 3. Do async work properly
        result = await self._async_operation(frame.text)

        # 4. Generate new frames as needed
        new_frame = ProcessedTextFrame(text=result)
        await self.push_frame(new_frame)

    async def start(self):
        # 5. Initialize resources in start()
        self._connection = await connect_to_service()

    async def stop(self):
        # 6. Clean up resources in stop()
        await self._connection.close()

    async def interrupt(self):
        # 7. Handle interruptions gracefully
        self._current_operation_cancelled = True
        await super().interrupt()

Error Handling

{ .api }
from pipecat.frames.frames import ErrorFrame, FatalErrorFrame

class RobustProcessor(FrameProcessor):
    """Handle errors properly."""

    async def process_frame(self, frame, direction):
        try:
            # Process frame
            await self._process(frame)
            await self.push_frame(frame, direction)

        except RecoverableError as e:
            # Log and continue
            logger.error(f"Recoverable error: {e}")
            # Optionally send ErrorFrame
            await self.push_frame(ErrorFrame(error=str(e)))
            await self.push_frame(frame, direction)

        except FatalError as e:
            # Fatal error stops pipeline
            logger.critical(f"Fatal error: {e}")
            await self.push_frame(FatalErrorFrame(error=str(e)))

Memory Management

{ .api }
class MemoryEfficientProcessor(FrameProcessor):
    """Manage memory efficiently."""

    async def process_frame(self, frame, direction):
        # 1. Process audio in chunks, not all at once
        if isinstance(frame, AudioRawFrame):
            # Don't accumulate unbounded audio
            await self._process_chunk(frame.audio)

        # 2. Clear large buffers when done
        if isinstance(frame, EndFrame):
            self._buffer.clear()

        await self.push_frame(frame, direction)

    async def stop(self):
        # 3. Clean up in stop()
        self._buffer = None
        self._cache.clear()