docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Pipelines chain processors together to create complete frame processing workflows. They manage processor linking, frame routing, task execution, and lifecycle management.
{ .api }
from pipecat.pipeline.pipeline import Pipeline
class Pipeline(BasePipeline):
"""Sequential processor chain.
Frames flow through processors in order. Each processor receives frames
from the previous processor and passes them to the next.
Flow: Input -> P1 -> P2 -> P3 -> Output
Args:
processors (List[FrameProcessor]): Ordered list of processors
name (Optional[str]): Pipeline name for debugging
enable_direct_mode (bool): Enable optimized direct frame passing
Example:
pipeline = Pipeline([
transport.input(),
stt_service,
user_aggregator,
llm_service,
tts_service,
transport.output()
])
"""
def __init__(
self,
processors: List[FrameProcessor],
name: Optional[str] = None,
enable_direct_mode: bool = False
):
"""Initialize pipeline.
Args:
processors: Ordered list of processors
name: Optional pipeline name
enable_direct_mode: Use optimized frame passing
"""
pass
async def run(self):
"""Run the pipeline until completion.
Starts all processors and processes frames until EndFrame received.
"""
pass
async def queue_frames(self, frames: List[Frame]):
"""Queue frames for processing.
Args:
frames: List of frames to queue
"""
pass
def get_processors(self) -> List[FrameProcessor]:
"""Get all processors in the pipeline.
Returns:
List of processors in order
"""
pass
def get_metrics(self) -> Dict[str, Any]:
"""Get pipeline metrics.
Returns:
Dictionary of metrics data
"""
pass{ .api }
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
class ParallelPipeline(BasePipeline):
"""Parallel processor execution.
All processors receive the same input frames simultaneously.
Outputs from all processors are merged.
Flow: Input -> [P1, P2, P3] -> Merged Output
Args:
processors (List[FrameProcessor]): Processors to run in parallel
name (Optional[str]): Pipeline name
Example:
# All processors receive same frames
parallel = ParallelPipeline([
tts_service_1, # Generate TTS with service 1
tts_service_2, # Generate TTS with service 2
metrics_logger # Log metrics
])
Use Cases:
- A/B testing multiple services
- Parallel processing for speed
- Monitoring/logging alongside main flow
"""
def __init__(
self,
processors: List[FrameProcessor],
name: Optional[str] = None
):
"""Initialize parallel pipeline.
Args:
processors: Processors to run in parallel
name: Optional pipeline name
"""
pass{ .api }
from pipecat.pipeline.sync_parallel_pipeline import SyncParallelPipeline
class SyncParallelPipeline(BasePipeline):
"""Synchronized parallel processor execution.
All processors run in parallel, but the pipeline waits for ALL
to complete before continuing. Ensures synchronization.
Flow: Input -> [P1, P2, P3] -> Wait All -> Output
Args:
processors (List[FrameProcessor]): Processors to run in parallel
name (Optional[str]): Pipeline name
Example:
# Wait for all to complete
sync = SyncParallelPipeline([
metrics_processor, # Collect metrics
logger_processor, # Log frames
audit_processor # Audit trail
])
Use Cases:
- Side effects that must complete
- Multiple dependent operations
- Synchronized checkpoints
"""
def __init__(
self,
processors: List[FrameProcessor],
name: Optional[str] = None
):
"""Initialize synchronized parallel pipeline.
Args:
processors: Processors to run in parallel
name: Optional pipeline name
"""
pass{ .api }
from pipecat.pipeline.pipeline import PipelineSource
class PipelineSource(FrameProcessor):
"""Pipeline entry point that forwards frames to an upstream handler.
This processor acts as the entry point for a pipeline, forwarding
downstream frames to the next processor and upstream frames to a
provided upstream handler function.
Args:
upstream_push_frame: Coroutine function to handle upstream frames
**kwargs: Additional arguments passed to FrameProcessor
Example:
# Create a source with upstream handler
async def handle_upstream(frame, direction):
print(f"Upstream frame: {frame}")
source = PipelineSource(upstream_push_frame=handle_upstream)
pipeline = Pipeline([source, processor1, processor2])
# Feed frames downstream
await source.queue_frame(TextFrame("Hello"))
Note: upstream_push_frame is a required parameter.{ .api }
from pipecat.pipeline.pipeline import PipelineSink
class PipelineSink(FrameProcessor):
"""Pipeline exit point that forwards frames to a downstream handler.
This processor acts as the exit point for a pipeline, forwarding
upstream frames to the previous processor and downstream frames to a
provided downstream handler function.
Args:
downstream_push_frame: Coroutine function to handle downstream frames
**kwargs: Additional arguments passed to FrameProcessor
Example:
# Create a sink with downstream handler
async def handle_downstream(frame, direction):
print(f"Downstream frame: {frame}")
# Process or output the frame
sink = PipelineSink(downstream_push_frame=handle_downstream)
pipeline = Pipeline([processor1, processor2, sink])
Note: downstream_push_frame is a required parameter.{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams
class PipelineTask(BasePipelineTask):
"""Pipeline execution task.
Manages pipeline lifecycle, frame queuing, and execution. Provides
control over running, stopping, and cancelling pipelines.
Args:
pipeline: Pipeline to execute
params: Task configuration parameters
clock: Optional clock for timing
Example:
# Create task
task = PipelineTask(
pipeline=pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True
)
)
# Run task
await task.run()
"""
def __init__(
self,
pipeline: Pipeline,
params: Optional[PipelineParams] = None,
clock: Optional[BaseClock] = None
):
"""Initialize pipeline task.
Args:
pipeline: Pipeline to execute
params: Task parameters
clock: Optional clock
"""
pass
async def run(self):
"""Run the pipeline until completion.
Processes all queued frames and waits for EndFrame.
Handles interruptions if enabled.
Example:
await task.run()
# Blocks until pipeline completes
"""
pass
async def queue_frame(self, frame: Frame):
"""Queue a single frame for processing.
Args:
frame: Frame to queue
Example:
await task.queue_frame(TextFrame("Hello"))
"""
pass
async def queue_frames(self, frames: List[Frame]):
"""Queue multiple frames for processing.
Args:
frames: List of frames to queue
Example:
frames = [
TextFrame("Hello"),
TextFrame("world"),
EndFrame()
]
await task.queue_frames(frames)
"""
pass
async def cancel(self):
"""Cancel the task.
Stops all processors and clears queues. Faster than normal stop.
Example:
await task.cancel()
"""
pass
def has_finished(self) -> bool:
"""Check if task has finished.
Returns:
True if task completed
Example:
if task.has_finished():
print("Task complete")
"""
pass
@property
def id(self) -> str:
"""Get task ID.
Returns:
Unique task identifier
"""
pass{ .api }
from pipecat.pipeline.task import PipelineParams
class PipelineParams:
"""Pipeline task configuration.
Controls pipeline behavior including interruptions, metrics,
and performance settings.
Attributes:
allow_interruptions (bool): Enable user interruptions
enable_metrics (bool): Collect performance metrics
enable_usage_metrics (bool): Collect usage metrics (tokens, etc.)
report_only_initial_ttfb (bool): Report only first TTFB
Example:
params = PipelineParams(
allow_interruptions=True, # Allow user to interrupt bot
enable_metrics=True, # Collect TTFB metrics
enable_usage_metrics=True # Track token usage
)
task = PipelineTask(pipeline=pipeline, params=params)
"""
def __init__(
self,
allow_interruptions: bool = True,
enable_metrics: bool = False,
enable_usage_metrics: bool = False,
report_only_initial_ttfb: bool = False
):
"""Initialize pipeline parameters.
Args:
allow_interruptions: Enable interruptions
enable_metrics: Enable metrics collection
enable_usage_metrics: Enable usage metrics
report_only_initial_ttfb: Report only first TTFB
"""
pass{ .api }
from pipecat.pipeline.runner import PipelineRunner
class PipelineRunner(BaseObject):
"""Manages pipeline execution lifecycle.
High-level interface for running pipelines with transports.
Handles setup, execution, and cleanup.
Example:
runner = PipelineRunner()
await runner.run(
pipeline=pipeline,
transport=transport
)
"""
async def run(
self,
pipeline: Pipeline,
transport: Optional[BaseTransport] = None
):
"""Run pipeline with optional transport.
Args:
pipeline: Pipeline to run
transport: Optional transport for I/O
"""
pass{ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategy
class ServiceSwitcher(ParallelPipeline):
"""Switch between services at runtime.
Allows dynamic switching between multiple services (e.g., different TTS
providers) based on a strategy. Useful for A/B testing, fallback, or
dynamic selection.
Generic Type: ServiceSwitcher[StrategyType]
Args:
services: List of services to switch between
strategy: Switching strategy
name: Optional name
Example:
# Manual switching
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual
switcher = ServiceSwitcher(
services=[tts_1, tts_2, tts_3],
strategy=ServiceSwitcherStrategyManual(default_index=0)
)
# Switch services at runtime
from pipecat.frames.frames import ManuallySwitchServiceFrame
# Switch to service at index 1
await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))
"""
def __init__(
self,
services: List[FrameProcessor],
strategy: ServiceSwitcherStrategy,
name: Optional[str] = None
):
"""Initialize service switcher.
Args:
services: List of services
strategy: Switching strategy
name: Optional name
"""
pass
def set_service(self, index: int):
"""Set active service by index.
Args:
index: Service index
"""
pass
def get_current_service(self) -> FrameProcessor:
"""Get currently active service.
Returns:
Active service processor
"""
pass{ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategy
class ServiceSwitcherStrategy:
"""Base switching strategy.
Defines how to select which service to use. Subclass to implement
custom strategies.
Example:
class RoundRobinStrategy(ServiceSwitcherStrategy):
def __init__(self):
self._index = 0
self._count = 0
def select(self, services: List) -> int:
index = self._index
self._index = (self._index + 1) % len(services)
return index
"""
def select(self, services: List[FrameProcessor]) -> int:
"""Select service index.
Args:
services: List of available services
Returns:
Index of service to use
"""
pass{ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual
class ServiceSwitcherStrategyManual(ServiceSwitcherStrategy):
"""Manual service switching.
Service is selected manually via ManuallySwitchServiceFrame.
Args:
default_index: Initial service index
Example:
strategy = ServiceSwitcherStrategyManual(default_index=0)
switcher = ServiceSwitcher(
services=[service1, service2],
strategy=strategy
)
# Switch via frame
await task.queue_frame(
ManuallySwitchServiceFrame(service_index=1)
)
"""
def __init__(self, default_index: int = 0):
"""Initialize manual strategy.
Args:
default_index: Initial service index
"""
pass{ .api }
from pipecat.pipeline.llm_switcher import LLMSwitcher
class LLMSwitcher(ServiceSwitcher):
"""LLM-specific service switcher.
Extends ServiceSwitcher with LLM-specific functionality:
- Function registration across all LLMs
- Direct function registration
- Out-of-band inference
- LLM context management
Args:
llms: List of LLM services to switch between
strategy: Switching strategy
name: Optional name
Example:
# Create LLM switcher
switcher = LLMSwitcher(
llms=[
OpenAILLMService(api_key="key1", model="gpt-4"),
AnthropicLLMService(api_key="key2", model="claude-3-5-sonnet-20241022"),
GoogleLLMService(api_key="key3", model="gemini-2.0-flash-exp")
],
strategy=ServiceSwitcherStrategyManual(default_index=0)
)
# Register function on all LLMs
switcher.register_function(
function_name="get_weather",
handler=get_weather_handler,
description="Get current weather",
properties={
"location": {"type": "string", "description": "City name"}
}
)
# Get active LLM
current_llm = switcher.active_llm
# Run inference on specific LLM without switching
result = await switcher.run_inference(
llm_index=1,
messages=[{"role": "user", "content": "Hello"}]
)
"""
def __init__(
self,
llms: List[LLMService],
strategy: ServiceSwitcherStrategy,
name: Optional[str] = None
):
"""Initialize LLM switcher.
Args:
llms: List of LLM services
strategy: Switching strategy
name: Optional name
"""
pass
def register_function(
self,
function_name: str,
handler: Callable,
description: str = "",
properties: Optional[Dict] = None,
required: Optional[List[str]] = None
):
"""Register function on all LLMs.
Registers the same function across all LLM services in the switcher.
Args:
function_name: Name of the function
handler: Function implementation
description: Function description
properties: Function parameter schema
required: Required parameters
"""
pass
def register_direct_function(
self,
function_name: str,
handler: Callable
):
"""Register direct function on all LLMs.
Args:
function_name: Name of the function
handler: Function implementation
"""
pass
async def run_inference(
self,
llm_index: int,
messages: List[Dict],
**kwargs
) -> Any:
"""Run inference on specific LLM without switching.
Allows out-of-band LLM calls without affecting the active service.
Args:
llm_index: Index of LLM to use
messages: Messages to send
**kwargs: Additional LLM parameters
Returns:
LLM response
"""
pass
@property
def llms(self) -> List[LLMService]:
"""Get all LLMs in switcher.
Returns:
List of LLM services
"""
pass
@property
def active_llm(self) -> LLMService:
"""Get currently active LLM.
Returns:
Active LLM service
"""
pass{ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
# Create simple pipeline
pipeline = Pipeline([
processor1,
processor2,
processor3
])
# Create and run task
task = PipelineTask(pipeline)
await task.run(){ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextAggregatorPair
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.daily import DailyTransport, DailyParams
async def create_voice_agent():
# Setup context
context = LLMContext(
messages=[
{"role": "system", "content": "You are a helpful voice assistant."}
]
)
# Create services
stt = DeepgramSTTService(api_key="deepgram-key")
llm = OpenAILLMService(api_key="openai-key", model="gpt-4")
llm.set_context(context)
tts = OpenAITTSService(api_key="openai-key", voice="alloy")
# Create aggregators
aggregators = LLMContextAggregatorPair(context=context)
# Setup transport
transport = DailyTransport(
room_url="https://daily.co/room",
token="daily-token",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True
)
)
# Build pipeline
pipeline = Pipeline([
transport.input(), # Receive audio from user
stt, # Transcribe speech
aggregators.user, # Aggregate user messages
llm, # Generate response
aggregators.assistant, # Aggregate assistant messages
tts, # Synthesize speech
transport.output() # Send audio to user
])
# Create task with params
task = PipelineTask(
pipeline=pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True
)
)
# Run
await task.run(){ .api }
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
# Process with multiple TTS services simultaneously
parallel_tts = ParallelPipeline([
elevenlabs_tts,
openai_tts,
cartesia_tts
])
# All services receive same text, generate audio in parallel
pipeline = Pipeline([
llm_service,
parallel_tts, # Parallel TTS generation
# Outputs merged
transport.output()
]){ .api }
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategyManual
# Create fallback strategy
class FallbackStrategy(ServiceSwitcherStrategy):
def __init__(self):
self._current = 0
def select(self, services):
return self._current
def on_error(self):
"""Switch to next service on error."""
self._current = (self._current + 1) % len(services)
# Setup switcher
fallback_strategy = FallbackStrategy()
switcher = ServiceSwitcher(
services=[primary_tts, secondary_tts, tertiary_tts],
strategy=fallback_strategy
)
# On error, switch to next service
@primary_tts.event_handler("on_connection_error")
async def handle_error():
fallback_strategy.on_error(){ .api }
from pipecat.pipeline.llm_switcher import LLMSwitcher
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual
# Create LLM switcher with multiple providers
llm_switcher = LLMSwitcher(
llms=[
OpenAILLMService(api_key="key1", model="gpt-4"),
AnthropicLLMService(api_key="key2", model="claude-3-5-sonnet-20241022"),
GoogleLLMService(api_key="key3", model="gemini-2.0-flash-exp")
],
strategy=ServiceSwitcherStrategyManual(default_index=0)
)
# Register functions on all LLMs
llm_switcher.register_function(
function_name="get_weather",
handler=get_weather_handler,
description="Get current weather for a location",
properties={
"location": {
"type": "string",
"description": "City and state, e.g. San Francisco, CA"
}
},
required=["location"]
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
user_agg,
llm_switcher, # All LLMs have same functions
tts,
transport.output()
])
# Switch LLM at runtime
await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))
# Or run out-of-band inference
response = await llm_switcher.run_inference(
llm_index=2, # Use Gemini
messages=[{"role": "user", "content": "Quick question"}]
){ .api }
# Queue frames into running pipeline
async def inject_frames():
# Pipeline is running
await asyncio.sleep(5)
# Inject frames
await task.queue_frames([
TextFrame("Emergency announcement"),
TTSSpeakFrame("This is an urgent message")
])
# Run injection alongside pipeline
await asyncio.gather(
task.run(),
inject_frames()
){ .api }
import asyncio
async def managed_pipeline():
# Create task
task = PipelineTask(pipeline)
try:
# Run with timeout
await asyncio.wait_for(task.run(), timeout=300) # 5 minute timeout
except asyncio.TimeoutError:
print("Pipeline timeout, cancelling...")
await task.cancel()
except KeyboardInterrupt:
print("User interrupt, stopping gracefully...")
await task.cancel()
finally:
if task.has_finished():
print("Pipeline completed successfully")
else:
print("Pipeline did not complete"){ .api }
# Create sub-pipeline for complex processing
audio_processing_pipeline = Pipeline([
audio_filter,
audio_resampler,
audio_mixer
])
# Use sub-pipeline in main pipeline
main_pipeline = Pipeline([
transport.input(),
audio_processing_pipeline, # Sub-pipeline
stt_service,
llm_service,
tts_service,
transport.output()
]){ .api }
def build_pipeline(use_vision: bool = False, enable_functions: bool = False):
"""Build pipeline with optional features."""
processors = [
transport.input(),
stt_service,
user_aggregator
]
# Add vision if enabled
if use_vision:
processors.extend([
vision_service,
vision_aggregator
])
# Add LLM
processors.append(llm_service)
# Add function calling if enabled
if enable_functions:
processors.append(function_handler)
# Complete pipeline
processors.extend([
assistant_aggregator,
tts_service,
transport.output()
])
return Pipeline(processors)
# Use dynamic pipeline
pipeline = build_pipeline(use_vision=True, enable_functions=True){ .api }
# Good: Clear pipeline stages
pipeline = Pipeline([
# Input stage
transport.input(),
# Speech recognition stage
stt_service,
# User turn stage
user_aggregator,
# LLM stage
llm_service,
# Assistant turn stage
assistant_aggregator,
# Speech synthesis stage
tts_service,
# Output stage
transport.output()
])
# Bad: Unclear organization
pipeline = Pipeline([
transport.input(), stt_service, user_aggregator, llm_service,
assistant_aggregator, tts_service, transport.output()
]){ .api }
# Good: Enable interruptions for voice agents
task = PipelineTask(
pipeline=pipeline,
params=PipelineParams(
allow_interruptions=True # User can interrupt bot
)
)
# Bad: No interruptions (frustrating UX)
task = PipelineTask(pipeline=pipeline)
# User must wait for bot to finish speaking{ .api }
# Good: Proper lifecycle handling
async def run_pipeline():
task = PipelineTask(pipeline)
try:
await task.run()
except Exception as e:
logger.error(f"Pipeline error: {e}")
await task.cancel()
finally:
# Cleanup
await transport.stop()
# Bad: No cleanup
async def run_pipeline():
task = PipelineTask(pipeline)
await task.run()
# Resources not cleaned up{ .api }
# Good: Enable metrics
task = PipelineTask(
pipeline=pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True
)
)
# Monitor metrics
@task.event_handler("on_metrics")
async def handle_metrics(metrics):
print(f"TTFB: {metrics.ttfb_ms}ms")
print(f"Tokens: {metrics.tokens_used}")
# Bad: No metrics
task = PipelineTask(pipeline=pipeline)
# No visibility into performance{ .api }
# Good: Named pipelines for debugging
pipeline = Pipeline(
processors=[...],
name="voice-agent-pipeline"
)
# Easier to debug and monitor
print(f"Running pipeline: {pipeline.name}")
# Bad: Anonymous pipeline
pipeline = Pipeline(processors=[...])