docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
All data in Pipecat flows as Frame objects through processors in a pipeline. This design provides:
{ .api }
from pipecat.frames.frames import Frame, SystemFrame, DataFrame, ControlFrame
class Frame:
"""Base frame class for all frames.
All frames inherit from Frame and receive:
- Unique ID for tracking
- Name combining class name and instance count
- PTS (presentation timestamp) in nanoseconds
- Metadata dictionary
- Transport source/destination tracking
Attributes:
id (int): Unique frame identifier
name (str): Human-readable frame name
pts (Optional[int]): Presentation timestamp in nanoseconds
metadata (Dict[str, Any]): Arbitrary frame metadata
transport_source (Optional[str]): Source transport name
transport_destination (Optional[str]): Destination transport name
"""
pass
class SystemFrame(Frame):
"""High-priority frame for immediate processing.
SystemFrames:
- Are processed immediately regardless of queue state
- Are NOT cancelled by interruptions
- Are used for time-sensitive events (user speech, errors)
- Examples: InputAudioRawFrame, UserStartedSpeakingFrame, ErrorFrame
"""
pass
class DataFrame(Frame):
"""Normal-priority frame for ordered data processing.
DataFrames:
- Are processed in order from the queue
- ARE cancelled by interruptions (unless UninterruptibleFrame)
- Contain actual data (text, audio, images, LLM context)
- Examples: TextFrame, OutputAudioRawFrame, LLMMessagesFrame
"""
pass
class ControlFrame(Frame):
"""Normal-priority frame for ordered control signals.
ControlFrames:
- Are processed in order like DataFrames
- ARE cancelled by interruptions (unless UninterruptibleFrame)
- Contain control information (settings updates, lifecycle signals)
- Examples: EndFrame, TTSStartedFrame, ServiceUpdateSettingsFrame
"""
pass{ .api }
from pipecat.frames.frames import UninterruptibleFrame
class UninterruptibleFrame:
"""Mixin for frames that must not be interrupted.
Frames with this mixin:
- Are still ordered normally (DataFrame or ControlFrame)
- Are NOT cancelled during interruptions
- Remain in queues and complete processing
- Are used for critical operations (function calls)
Example:
class FunctionCallResultFrame(DataFrame, UninterruptibleFrame):
# This frame will complete even during interruption
pass
"""
pass{ .api }
from pipecat.processors.frame_processor import FrameDirection
class FrameDirection(Enum):
"""Direction of frame flow through pipeline.
Attributes:
DOWNSTREAM: From input toward output (normal flow)
UPSTREAM: From output toward input (reverse flow)
"""
DOWNSTREAM = "downstream"
UPSTREAM = "upstream"Pipelines chain processors together with automatic frame routing.
{ .api }
from pipecat.pipeline.pipeline import Pipeline
class Pipeline:
"""Sequential processor chain.
Frames flow through processors in order. Each processor
can transform, filter, or generate frames.
Args:
processors (List[FrameProcessor]): Ordered list of processors
name (Optional[str]): Pipeline name for debugging
enable_direct_mode (bool): Enable optimized direct frame passing
Example:
pipeline = Pipeline([
processor1,
processor2,
processor3
])
"""
def __init__(
self,
processors: List[FrameProcessor],
name: Optional[str] = None,
enable_direct_mode: bool = False
):
pass
async def run(self):
"""Run the pipeline until completion."""
pass
async def queue_frames(self, frames: List[Frame]):
"""Queue frames for processing.
Args:
frames: List of frames to queue
"""
pass
def get_processors(self) -> List[FrameProcessor]:
"""Get all processors in the pipeline.
Returns:
List of processors in order
"""
pass{ .api }
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
class ParallelPipeline:
"""Parallel processor execution.
All processors receive the same input frames simultaneously.
Outputs from all processors are merged.
Args:
processors (List[FrameProcessor]): Processors to run in parallel
name (Optional[str]): Pipeline name
Example:
# All processors receive same frames
parallel = ParallelPipeline([
tts_service_1,
tts_service_2,
logger
])
"""
def __init__(
self,
processors: List[FrameProcessor],
name: Optional[str] = None
):
pass{ .api }
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
class SyncParallelPipeline:
"""Synchronized parallel processor execution.
All processors run in parallel, but the pipeline waits
for ALL to complete before continuing.
Args:
processors (List[FrameProcessor]): Processors to run in parallel
name (Optional[str]): Pipeline name
Example:
# Wait for all to complete
sync = SyncParallelPipeline([
metrics_processor,
logger_processor
])
"""
def __init__(
self,
processors: List[FrameProcessor],
name: Optional[str] = None
):
pass{ .api }
from pipecat.processors.frame_processor import FrameProcessor
class FrameProcessor:
"""Base class for all frame processors.
Processors are the building blocks of pipelines. They:
- Receive frames via process_frame()
- Transform/filter/generate frames
- Push frames downstream or upstream
- Support linking to other processors
Key Methods:
process_frame(): Process a single frame
push_frame(): Send frame to next processor
queue_frame(): Queue frame for later processing
start(): Initialize processor
stop(): Gracefully shutdown
cancel(): Immediate shutdown
link(): Connect to downstream processor
interrupt(): Handle interruption
"""
def __init__(self, name: Optional[str] = None, **kwargs):
"""Initialize processor.
Args:
name: Optional processor name
**kwargs: Additional configuration
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process a frame.
Override this method to implement custom processing logic.
Args:
frame: The frame to process
direction: Direction of frame flow (DOWNSTREAM or UPSTREAM)
"""
# Default: pass frame through
await self.push_frame(frame, direction)
async def push_frame(self, frame: Frame, direction: FrameDirection):
"""Push frame to the next processor.
Args:
frame: Frame to push
direction: Direction of frame flow
"""
pass
async def queue_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Queue frame for processing.
Args:
frame: Frame to queue
direction: Direction of frame flow
"""
pass
async def start(self):
"""Start the processor.
Called when pipeline starts. Use for initialization.
"""
pass
async def stop(self):
"""Stop the processor gracefully.
Called when pipeline stops normally.
"""
pass
async def cancel(self):
"""Cancel the processor immediately.
Called when pipeline is cancelled or errors.
"""
pass
def link(self, processor: "FrameProcessor"):
"""Link to downstream processor.
Args:
processor: Downstream processor to link
"""
pass
async def interrupt(self):
"""Handle interruption event.
Called when user interrupts (e.g., starts speaking).
Cancel current operations and clear queues.
"""
pass
@property
def interruptions_enabled(self) -> bool:
"""Whether processor can be interrupted."""
return True
@property
def name(self) -> str:
"""Processor name."""
pass
@property
def id(self) -> int:
"""Unique processor ID."""
pass{ .api }
# Processors go through this lifecycle:
# 1. Construction
processor = MyProcessor(config)
# 2. Linking (if in pipeline)
processor.link(next_processor)
# 3. Start
await processor.start()
# 4. Processing (repeated)
await processor.process_frame(frame, direction)
# 5. Interruption (optional, repeated)
await processor.interrupt()
# 6. Stop/Cancel
await processor.stop() # or cancel()Pipecat uses an event-driven architecture for monitoring and control.
{ .api }
from pipecat.utils.base_object import BaseObject, EventHandler
class BaseObject:
"""Base class with event handler support.
All processors and services inherit from BaseObject.
"""
def register_event_handler(
self,
event_name: str
) -> Callable[[Callable], Callable]:
"""Register an event handler (decorator style).
Args:
event_name: Name of the event to handle
Returns:
Decorator function
Example:
@processor.register_event_handler("on_process")
async def handle_process(event_data):
print(f"Processed: {event_data}")
"""
pass
def event_handler(
self,
event_name: str
) -> Callable[[Callable], Callable]:
"""Alias for register_event_handler."""
pass
async def _emit_event(self, event_name: str, **kwargs):
"""Emit an event to all registered handlers.
Args:
event_name: Name of event to emit
**kwargs: Event data
"""
pass{ .api }
# LLM Service Events
@llm_service.event_handler("on_completion_timeout")
async def handle_timeout():
"""Called when LLM completion times out."""
pass
@llm_service.event_handler("on_function_call_start")
async def handle_function_start(function_name: str):
"""Called when LLM function call starts.
Args:
function_name: Name of function being called
"""
pass
@llm_service.event_handler("on_function_call_end")
async def handle_function_end(function_name: str, result: Any):
"""Called when LLM function call completes.
Args:
function_name: Name of function that completed
result: Function return value
"""
pass
# TTS/STT Service Events
@service.event_handler("on_connected")
async def handle_connected():
"""Called when service connects."""
pass
@service.event_handler("on_disconnected")
async def handle_disconnected():
"""Called when service disconnects."""
pass
@service.event_handler("on_connection_error")
async def handle_error(error: Exception):
"""Called on connection error.
Args:
error: The error that occurred
"""
pass{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams
class PipelineParams:
"""Pipeline task configuration.
Attributes:
allow_interruptions (bool): Enable user interruptions
enable_metrics (bool): Collect metrics
enable_usage_metrics (bool): Collect usage metrics
report_only_initial_ttfb (bool): Report only first TTFB
"""
def __init__(
self,
allow_interruptions: bool = True,
enable_metrics: bool = False,
enable_usage_metrics: bool = False,
report_only_initial_ttfb: bool = False
):
pass
class PipelineTask:
"""Pipeline execution task.
Manages pipeline lifecycle and frame queuing.
Args:
pipeline: Pipeline to execute
params: Task configuration parameters
clock: Optional clock for timing
"""
def __init__(
self,
pipeline: Pipeline,
params: Optional[PipelineParams] = None,
clock: Optional[BaseClock] = None
):
pass
async def run(self):
"""Run the pipeline until completion.
Processes all queued frames and waits for EndFrame.
"""
pass
async def queue_frame(self, frame: Frame):
"""Queue a single frame for processing.
Args:
frame: Frame to queue
"""
pass
async def queue_frames(self, frames: List[Frame]):
"""Queue multiple frames for processing.
Args:
frames: List of frames to queue
"""
pass
async def cancel(self):
"""Cancel the task.
Stops all processors and clears queues.
"""
pass
def has_finished(self) -> bool:
"""Check if task has finished.
Returns:
True if task completed
"""
passInterruptions occur when the user starts speaking during bot output.
{ .api }
from pipecat.frames.frames import (
UserStartedSpeakingFrame,
StartInterruptionFrame,
BotInterruptionFrame
)
# Interruption flow:
# 1. User starts speaking
# -> UserStartedSpeakingFrame (SystemFrame)
# 2. Transport/VAD detects speech
# -> StartInterruptionFrame (SystemFrame)
# 3. Pipeline cancels current output
# -> BotInterruptionFrame (interrupts bot speech)
# 4. DataFrames/ControlFrames are cancelled
# (except UninterruptibleFrame)
# 5. SystemFrames continue processing
# Enable interruptions via VAD:
from pipecat.audio.vad.vad_analyzer import VADParams, SileroVADAnalyzer
vad = SileroVADAnalyzer(
params=VADParams(
threshold=0.5,
min_speech_duration_ms=250
)
)
transport = DailyTransport(
params=DailyParams(
vad_enabled=True,
vad_analyzer=vad
)
)All processing in Pipecat is asynchronous using Python's asyncio.
{ .api }
import asyncio
# Running a pipeline
async def main():
pipeline = Pipeline([...])
task = PipelineTask(pipeline)
await task.run()
asyncio.run(main())
# Concurrent operations
async def process_multiple():
# All run concurrently
results = await asyncio.gather(
task1.run(),
task2.run(),
task3.run()
)
# Task management
from pipecat.utils.asyncio import TaskManager
task_manager = TaskManager()
await task_manager.start_task(my_coroutine())
await task_manager.cancel_all()Frames are processed according to their type and priority:
{ .api }
# Processing example:
# Queue: [TextFrame1, TextFrame2, EndFrame]
# Incoming: UserStartedSpeakingFrame (SystemFrame)
# Result:
# 1. UserStartedSpeakingFrame processes immediately
# 2. TextFrame1, TextFrame2 cancelled
# 3. EndFrame processes (ControlFrame, but completes interruption)
# With UninterruptibleFrame:
# Queue: [FunctionCallResultFrame (UninterruptibleFrame), TextFrame]
# Incoming: UserStartedSpeakingFrame
# Result:
# 1. UserStartedSpeakingFrame processes immediately
# 2. FunctionCallResultFrame completes (not cancelled)
# 3. TextFrame cancelled{ .api }
class WellDesignedProcessor(FrameProcessor):
"""A well-designed processor follows these patterns."""
async def process_frame(self, frame, direction):
# 1. Handle specific frame types
if isinstance(frame, TextFrame):
await self._process_text(frame)
# 2. Always push frames downstream (unless filtering)
await self.push_frame(frame, direction)
async def _process_text(self, frame: TextFrame):
# 3. Do async work properly
result = await self._async_operation(frame.text)
# 4. Generate new frames as needed
new_frame = ProcessedTextFrame(text=result)
await self.push_frame(new_frame)
async def start(self):
# 5. Initialize resources in start()
self._connection = await connect_to_service()
async def stop(self):
# 6. Clean up resources in stop()
await self._connection.close()
async def interrupt(self):
# 7. Handle interruptions gracefully
self._current_operation_cancelled = True
await super().interrupt(){ .api }
from pipecat.frames.frames import ErrorFrame, FatalErrorFrame
class RobustProcessor(FrameProcessor):
"""Handle errors properly."""
async def process_frame(self, frame, direction):
try:
# Process frame
await self._process(frame)
await self.push_frame(frame, direction)
except RecoverableError as e:
# Log and continue
logger.error(f"Recoverable error: {e}")
# Optionally send ErrorFrame
await self.push_frame(ErrorFrame(error=str(e)))
await self.push_frame(frame, direction)
except FatalError as e:
# Fatal error stops pipeline
logger.critical(f"Fatal error: {e}")
await self.push_frame(FatalErrorFrame(error=str(e))){ .api }
class MemoryEfficientProcessor(FrameProcessor):
"""Manage memory efficiently."""
async def process_frame(self, frame, direction):
# 1. Process audio in chunks, not all at once
if isinstance(frame, AudioRawFrame):
# Don't accumulate unbounded audio
await self._process_chunk(frame.audio)
# 2. Clear large buffers when done
if isinstance(frame, EndFrame):
self._buffer.clear()
await self.push_frame(frame, direction)
async def stop(self):
# 3. Clean up in stop()
self._buffer = None
self._cache.clear()