docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
The Runner module provides both a development framework and production runtime for deploying Pipecat bots across different transport types (Daily, WebRTC, WebSocket/Telephony). It includes:
{ .api }
# Production pipeline runner
from pipecat.pipeline.runner import PipelineRunner
# Runner argument types
from pipecat.runner.types import (
RunnerArguments,
DailyRunnerArguments,
WebSocketRunnerArguments,
SmallWebRTCRunnerArguments,
DialinSettings,
DailyDialinRequest,
)
# Transport utilities
from pipecat.runner.utils import (
create_transport,
parse_telephony_websocket,
get_transport_client_id,
maybe_capture_participant_camera,
maybe_capture_participant_screen,
smallwebrtc_sdp_munging,
)
# Configuration helpers
from pipecat.runner.daily import configure as configure_daily
from pipecat.runner.livekit import configure as configure_livekit
# Development runner
from pipecat.runner.run import main as runner_mainProduction-grade pipeline task executor that manages lifecycle, signal handling, and resource cleanup.
class PipelineRunner:
"""Manages the execution of pipeline tasks with lifecycle and signal handling.
Provides a high-level interface for running pipeline tasks with automatic
signal handling (SIGINT/SIGTERM), optional garbage collection, and proper
cleanup of resources.
Parameters:
name: Optional name for the runner instance
handle_sigint: Whether to automatically handle SIGINT signals (default: True)
handle_sigterm: Whether to automatically handle SIGTERM signals (default: False)
force_gc: Whether to force garbage collection after task completion (default: False)
loop: Event loop to use. If None, uses the current running loop
"""
def __init__(
self,
*,
name: Optional[str] = None,
handle_sigint: bool = True,
handle_sigterm: bool = False,
force_gc: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
)
async def run(self, task: PipelineTask):
"""Run a pipeline task to completion.
Parameters:
task: The pipeline task to execute
"""
async def stop_when_done(self):
"""Schedule all running tasks to stop when their current processing is complete."""
async def cancel(self):
"""Cancel all running tasks immediately."""Usage:
{ .api }
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
async def main():
# Create your pipeline
pipeline = Pipeline([
transport.input(),
# ... your processors ...
transport.output(),
])
# Create task
task = PipelineTask(pipeline)
# Create runner with signal handling
runner = PipelineRunner(
name="my-bot",
handle_sigint=True, # Graceful shutdown on Ctrl+C
handle_sigterm=True, # Graceful shutdown on termination
)
# Run until completion
await runner.run(task)
# Run with asyncio
asyncio.run(main())Production Deployment:
{ .api }
from pipecat.pipeline.runner import PipelineRunner
from pipecat.transports.daily.transport import DailyTransport, DailyParams
import asyncio
import os
async def production_bot():
"""Production bot with proper signal handling and cleanup."""
# Configure transport
transport = DailyTransport(
os.getenv("DAILY_ROOM_URL"),
os.getenv("DAILY_TOKEN"),
"Production Bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
)
)
# Build pipeline
pipeline = Pipeline([
transport.input(),
# ... your processors ...
transport.output(),
])
# Create task and runner
task = PipelineTask(pipeline)
runner = PipelineRunner(
name="production-bot",
handle_sigint=True,
handle_sigterm=True, # Important for Docker/Kubernetes
force_gc=True, # Clean up resources properly
)
# Run with automatic signal handling
await runner.run(task)
if __name__ == "__main__":
asyncio.run(production_bot())Session argument types passed to bot functions by the development runner, containing transport-specific configuration and connection information.
@dataclass
class RunnerArguments:
"""Base class for runner session arguments.
Attributes:
handle_sigint: Whether to handle SIGINT signals (False by default)
handle_sigterm: Whether to handle SIGTERM signals (False by default)
pipeline_idle_timeout_secs: Seconds before pipeline idle timeout (300 by default)
body: Additional request data (optional, defaults to empty dict)
"""
handle_sigint: bool
handle_sigterm: bool
pipeline_idle_timeout_secs: int
body: Optional[Any]@dataclass
class DailyRunnerArguments(RunnerArguments):
"""Daily transport session arguments.
Parameters:
room_url: Daily room URL to join
token: Authentication token for the room (optional)
body: Additional request data (optional)
"""
room_url: str
token: Optional[str] = NoneUsage:
{ .api }
from pipecat.runner.types import DailyRunnerArguments
# In your bot function
async def run_bot(runner_args: DailyRunnerArguments):
# Access Daily-specific parameters
print(f"Joining room: {runner_args.room_url}")
# Create Daily transport
from pipecat.transports.daily.transport import DailyTransport, DailyParams
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"My Bot",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
)
)@dataclass
class WebSocketRunnerArguments(RunnerArguments):
"""WebSocket transport session arguments for telephony providers.
Parameters:
websocket: FastAPI WebSocket connection for audio streaming
body: Additional request data including dial-in settings (optional)
"""
websocket: WebSocketUsage:
{ .api }
from pipecat.runner.types import WebSocketRunnerArguments
from pipecat.runner.utils import parse_telephony_websocket
# In your bot function for telephony
async def run_telephony_bot(runner_args: WebSocketRunnerArguments):
# Parse telephony provider type and call data
transport_type, call_data = await parse_telephony_websocket(
runner_args.websocket
)
print(f"Provider: {transport_type}")
print(f"Call ID: {call_data.get('call_id')}")
# Access custom parameters from body
user_id = runner_args.body.get("user_id")@dataclass
class SmallWebRTCRunnerArguments(RunnerArguments):
"""Small WebRTC transport session arguments for ESP32 and embedded devices.
Parameters:
webrtc_connection: Pre-configured WebRTC peer connection
"""
webrtc_connection: AnyUsage:
{ .api }
from pipecat.runner.types import SmallWebRTCRunnerArguments
# In your bot function for SmallWebRTC
async def run_webrtc_bot(runner_args: SmallWebRTCRunnerArguments):
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
transport = SmallWebRTCTransport(
params=params,
webrtc_connection=runner_args.webrtc_connection,
)Data types for PSTN/SIP telephony integration with Daily.co and Pipecat Cloud.
class DialinSettings(BaseModel):
"""Dial-in settings from Daily/Pipecat Cloud webhook.
Parameters:
call_id: Unique identifier for the call (UUID representing sessionId in SIP Network)
call_domain: Daily domain for the call (UUID representing Daily Domain on SIP Network)
To: The dialed phone number (optional)
From: The caller's phone number (optional)
sip_headers: SIP headers from the call (optional)
"""
call_id: str
call_domain: str
To: Optional[str] = None
From: Optional[str] = None
sip_headers: Optional[Dict[str, str]] = Noneclass DailyDialinRequest(BaseModel):
"""Request data for Daily PSTN dial-in calls.
This structure is passed in runner_args.body for dial-in calls from
Pipecat Cloud's webhook handler.
Parameters:
dialin_settings: Dial-in configuration including call_id, call_domain, To, From
daily_api_key: Daily API key for pinlessCallUpdate (required for dial-in)
daily_api_url: Daily API URL (staging or production)
"""
dialin_settings: DialinSettings
daily_api_key: str
daily_api_url: strUsage:
{ .api }
from pipecat.runner.types import DailyDialinRequest
async def run_bot(runner_args: DailyRunnerArguments):
# Parse dial-in request if present
if runner_args.body:
dialin_request = DailyDialinRequest(**runner_args.body)
print(f"Dial-in call from: {dialin_request.dialin_settings.From}")
print(f"To number: {dialin_request.dialin_settings.To}")Factory functions for creating transports from runner arguments with automatic provider detection and configuration.
async def create_transport(
runner_args: Any,
transport_params: Dict[str, Callable]
) -> BaseTransport:
"""Create a transport from runner arguments using factory functions.
Automatically detects transport type from runner_args and creates the
appropriate transport with the provided parameters. For WebSocket
telephony transports, automatically detects provider (Twilio, Telnyx,
Plivo, Exotel) and configures serializers.
Parameters:
runner_args: DailyRunnerArguments, WebSocketRunnerArguments, or
SmallWebRTCRunnerArguments
transport_params: Dict mapping transport names to parameter factory functions.
Keys: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel"
Values: Functions returning transport parameters when called
Returns:
Configured transport instance ready to use
Raises:
ValueError: If transport key missing from transport_params or unsupported runner_args type
ImportError: If required transport dependencies not installed
"""Usage:
{ .api }
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
# Define transport parameter factories
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer set automatically
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
# Serializer configured automatically
),
}
# Create transport automatically based on runner_args type
async def run_bot(runner_args):
transport = await create_transport(runner_args, transport_params)
# Transport is ready to use
# - DailyRunnerArguments → DailyTransport
# - SmallWebRTCRunnerArguments → SmallWebRTCTransport
# - WebSocketRunnerArguments → FastAPIWebsocketTransport with provider detectionParse telephony provider WebSocket messages to detect provider type and extract call data.
async def parse_telephony_websocket(
websocket: WebSocket
) -> tuple[str, dict]:
"""Parse telephony WebSocket messages and return transport type and call data.
Automatically detects telephony provider (Twilio, Telnyx, Plivo, Exotel) from
WebSocket message structure and extracts provider-specific call metadata.
Parameters:
websocket: FastAPI WebSocket connection from telephony provider
Returns:
tuple: (transport_type, call_data)
transport_type: "twilio", "telnyx", "plivo", "exotel", or "unknown"
call_data dict contains provider-specific fields:
Twilio:
- stream_id: str - Stream identifier
- call_id: str - Call SID
- body: dict - Custom parameters from webhook
Telnyx:
- stream_id: str - Stream identifier
- call_control_id: str - Call control ID
- outbound_encoding: str - Audio encoding format
- from: str - Caller number
- to: str - Dialed number
Plivo:
- stream_id: str - Stream identifier
- call_id: str - Call UUID
Exotel:
- stream_id: str - Stream SID
- call_id: str - Call SID
- account_sid: str - Account SID
- from: str - Caller number
- to: str - Dialed number
- custom_parameters: str - Custom parameters
"""Usage:
{ .api }
from pipecat.runner.utils import parse_telephony_websocket
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Parse provider and call data
transport_type, call_data = await parse_telephony_websocket(websocket)
if transport_type == "twilio":
print(f"Twilio call: {call_data['call_id']}")
user_id = call_data["body"].get("user_id")
elif transport_type == "telnyx":
print(f"Telnyx call control ID: {call_data['call_control_id']}")
print(f"From: {call_data['from']} To: {call_data['to']}")
elif transport_type == "plivo":
print(f"Plivo stream: {call_data['stream_id']}")Helper functions for working with transport client objects across different transport types.
def get_transport_client_id(
transport: BaseTransport,
client: Any
) -> str:
"""Get client identifier from transport-specific client object.
Extracts the client ID string from transport-specific client objects,
supporting Daily and SmallWebRTC transports.
Parameters:
transport: The transport instance
client: Transport-specific client object (Daily participant or SmallWebRTC pc_id)
Returns:
Client identifier string, empty string if transport not supported
"""Usage:
{ .api }
from pipecat.runner.utils import get_transport_client_id
# In transport event handler
async def on_participant_joined(transport, participant):
client_id = get_transport_client_id(transport, participant)
print(f"Client {client_id} joined")Helper functions to capture participant video across different transport types.
async def maybe_capture_participant_camera(
transport: BaseTransport,
client: Any,
framerate: int = 0
):
"""Capture participant camera video if transport supports it.
Attempts to capture camera video for Daily and SmallWebRTC transports.
No-op for unsupported transports.
Parameters:
transport: The transport instance
client: Transport-specific client object
framerate: Video capture framerate (0 = auto)
"""async def maybe_capture_participant_screen(
transport: BaseTransport,
client: Any,
framerate: int = 0
):
"""Capture participant screen video if transport supports it.
Attempts to capture screen share video for Daily and SmallWebRTC transports.
No-op for unsupported transports.
Parameters:
transport: The transport instance
client: Transport-specific client object
framerate: Video capture framerate (0 = auto)
"""Usage:
{ .api }
from pipecat.runner.utils import (
maybe_capture_participant_camera,
maybe_capture_participant_screen
)
# In transport event handler
async def on_first_participant_joined(transport, participant):
# Capture camera if available
await maybe_capture_participant_camera(transport, participant, framerate=15)
# Or capture screen share
await maybe_capture_participant_screen(transport, participant)SDP manipulation utilities for WebRTC compatibility with embedded devices like ESP32.
def smallwebrtc_sdp_munging(
sdp: str,
host: Optional[str]
) -> str:
"""Apply SDP modifications for SmallWebRTC compatibility.
Removes unsupported fingerprint algorithms (sha-384, sha-512) and filters
ICE candidates for ESP32 and embedded device compatibility.
Parameters:
sdp: Original SDP string
host: Host address for ICE candidate filtering (optional)
Returns:
Modified SDP string with compatibility fixes applied
"""Usage:
{ .api }
from pipecat.runner.utils import smallwebrtc_sdp_munging
# When creating SmallWebRTC offer/answer
async def create_offer():
# Get SDP from peer connection
offer = await pc.createOffer()
# Apply SmallWebRTC compatibility fixes
modified_sdp = smallwebrtc_sdp_munging(
offer.sdp,
host="192.168.1.100" # Your server IP
)
# Use modified SDP
await pc.setLocalDescription(RTCSessionDescription(
type=offer.type,
sdp=modified_sdp
))Utility for creating Daily rooms and tokens with support for standard video rooms and SIP-enabled rooms for PSTN calling.
async def configure(
aiohttp_session: aiohttp.ClientSession,
*,
api_key: Optional[str] = None,
room_exp_duration: Optional[float] = 2.0,
token_exp_duration: Optional[float] = 2.0,
sip_caller_phone: Optional[str] = None,
sip_enable_video: Optional[bool] = False,
sip_num_endpoints: Optional[int] = 1,
sip_codecs: Optional[Dict[str, List[str]]] = None,
room_properties: Optional[DailyRoomProperties] = None,
token_properties: Optional[DailyMeetingTokenProperties] = None,
) -> DailyRoomConfig:
"""Configure Daily room URL and token with optional SIP capabilities.
Either uses existing room from DAILY_SAMPLE_ROOM_URL environment variable
or creates a new temporary room automatically.
Parameters:
aiohttp_session: HTTP session for making API requests
api_key: Daily API key (uses DAILY_API_KEY env var if not provided)
room_exp_duration: Room expiration time in hours (default: 2.0)
token_exp_duration: Token expiration time in hours (default: 2.0)
sip_caller_phone: Phone number for SIP display name. When provided, enables SIP
sip_enable_video: Whether video is enabled for SIP (default: False)
sip_num_endpoints: Number of allowed SIP endpoints (default: 1)
sip_codecs: Audio/video codecs to support (e.g., {"audio": ["OPUS"]})
room_properties: Custom DailyRoomProperties (overrides other parameters)
token_properties: Custom DailyMeetingTokenProperties
Returns:
DailyRoomConfig with room_url, token, and optional sip_endpoint
Environment Variables:
DAILY_API_KEY: Required for creating rooms and tokens
DAILY_SAMPLE_ROOM_URL: Optional existing room URL (standard mode only)
DAILY_API_URL: Optional API URL (default: https://api.daily.co/v1)
"""Usage:
{ .api }
import aiohttp
from pipecat.runner.daily import configure
from pipecat.transports.daily.transport import DailyTransport, DailyParams
async def setup_daily_bot():
async with aiohttp.ClientSession() as session:
# Standard room
room_url, token = await configure(session)
# SIP-enabled room for phone calls
sip_config = await configure(
session,
sip_caller_phone="+15551234567",
sip_enable_video=False,
)
print(f"Dial into: {sip_config.sip_endpoint}")
# Custom room properties
from pipecat.transports.daily.utils import DailyRoomProperties
custom_config = await configure(
session,
room_properties=DailyRoomProperties(
enable_recording="cloud",
max_participants=2,
start_audio_off=False,
)
)
# Create transport
transport = DailyTransport(
room_url,
token,
"My Bot",
DailyParams(audio_in_enabled=True, audio_out_enabled=True)
)Utility for generating LiveKit access tokens with proper room permissions for agents.
async def configure() -> tuple[str, str, str]:
"""Configure LiveKit room URL and token from environment.
Returns:
Tuple containing (url, token, room_name)
Environment Variables:
LIVEKIT_URL: LiveKit server URL (required)
LIVEKIT_ROOM_NAME: Room name to join (required)
LIVEKIT_API_KEY: LiveKit API key (required)
LIVEKIT_API_SECRET: LiveKit API secret (required)
Raises:
Exception: If required environment variables are not set
"""
def generate_token(
room_name: str,
participant_name: str,
api_key: str,
api_secret: str
) -> str:
"""Generate a LiveKit access token for a participant.
Parameters:
room_name: Name of the LiveKit room
participant_name: Name of the participant
api_key: LiveKit API key
api_secret: LiveKit API secret
Returns:
JWT token string for room access
"""
def generate_token_with_agent(
room_name: str,
participant_name: str,
api_key: str,
api_secret: str
) -> str:
"""Generate a LiveKit access token for an agent participant.
Parameters:
room_name: Name of the LiveKit room
participant_name: Name of the participant
api_key: LiveKit API key
api_secret: LiveKit API secret
Returns:
JWT token string with agent grants
"""Usage:
{ .api }
from pipecat.runner.livekit import configure, generate_token
from pipecat.transports.livekit.transport import LiveKitTransport, LiveKitParams
async def setup_livekit_bot():
# Configure from environment
url, token, room_name = await configure()
# Create transport
transport = LiveKitTransport(
url,
token,
room_name,
LiveKitParams(audio_in_enabled=True, audio_out_enabled=True)
)
# Generate custom tokens
agent_token = generate_token_with_agent(
"my-room",
"Bot Agent",
api_key,
api_secret
)
user_token = generate_token(
"my-room",
"Test User",
api_key,
api_secret
)Command-line development server that automatically discovers bot functions and provides FastAPI endpoints for different transport types.
Bot Function Interface:
{ .api }
async def bot(runner_args: RunnerArguments):
"""Entry point for your bot - discovered automatically by the runner.
Parameters:
runner_args: Transport-specific arguments (DailyRunnerArguments,
WebSocketRunnerArguments, or SmallWebRTCRunnerArguments)
"""
# Your bot implementation
passRunning the Development Server:
# Install runner dependencies
pip install pipecat-ai[runner]
# WebRTC (local development)
python bot.py -t webrtc
# Opens http://localhost:7860/client
# WebRTC for ESP32 devices
python bot.py -t webrtc --esp32 --host 192.168.1.100
# Daily (server mode with web UI)
python bot.py -t daily
# Opens http://localhost:7860
# Daily (direct connection for testing)
python bot.py -d
# Connects directly to Daily room
# Telephony (Twilio)
python bot.py -t twilio -x your-bot.ngrok.io
# Telephony (Telnyx)
python bot.py -t telnyx -x your-bot.ngrok.io
# Daily with PSTN dial-in
python bot.py -t daily --dialin
# Webhook: http://localhost:7860/daily-dialin-webhookCommand-Line Options:
--host: Server host address (default: localhost)--port: Server port (default: 7860)-t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)-x/--proxy: Public proxy hostname for telephony webhooks--esp32: Enable ESP32 compatibility mode-d/--direct: Connect directly to Daily without server--dialin: Enable Daily PSTN dial-in webhook-f/--folder: Downloads folder path-v/--verbose: Increase logging verbosityExample Bot:
{ .api }
# bot.py
from pipecat.runner.types import DailyRunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
# Define transport parameters
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
)
}
async def bot(runner_args: DailyRunnerArguments):
"""Bot entry point - discovered by runner."""
# Create transport automatically
transport = await create_transport(runner_args, transport_params)
# Build pipeline
pipeline = Pipeline([
transport.input(),
# ... your processors ...
transport.output(),
])
# Run with signal handling from runner_args
task = PipelineTask(pipeline)
runner = PipelineRunner(
handle_sigint=runner_args.handle_sigint,
handle_sigterm=runner_args.handle_sigterm,
)
await runner.run(task)
if __name__ == "__main__":
from pipecat.runner.run import main
main()Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy bot code
COPY bot.py .
# Expose port for webhooks
EXPOSE 7860
# Run bot with proper signal handling
CMD ["python", "bot.py"]docker-compose.yml:
version: '3.8'
services:
bot:
build: .
environment:
- DAILY_API_KEY=${DAILY_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
ports:
- "7860:7860"
restart: unless-stopped
# Ensure proper signal handling
stop_signal: SIGTERM
stop_grace_period: 30sProduction Bot with Error Handling:
{ .api }
from pipecat.pipeline.runner import PipelineRunner
from loguru import logger
import asyncio
async def production_bot(runner_args):
"""Production bot with comprehensive error handling."""
try:
# Create transport
transport = await create_transport(runner_args, transport_params)
# Build pipeline with error handlers
pipeline = Pipeline([
transport.input(),
# ... processors with error handling ...
transport.output(),
])
# Create task and runner
task = PipelineTask(pipeline)
runner = PipelineRunner(
name="production-bot",
handle_sigint=True,
handle_sigterm=True, # Critical for Docker/K8s
force_gc=True,
)
logger.info("Bot starting")
await runner.run(task)
logger.info("Bot completed successfully")
except asyncio.CancelledError:
logger.info("Bot cancelled gracefully")
raise
except Exception as e:
logger.error(f"Bot error: {e}")
raise
finally:
# Cleanup resources
logger.info("Bot cleanup complete")deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: pipecat-bot
spec:
replicas: 3
selector:
matchLabels:
app: pipecat-bot
template:
metadata:
labels:
app: pipecat-bot
spec:
containers:
- name: bot
image: your-registry/pipecat-bot:latest
ports:
- containerPort: 7860
env:
- name: DAILY_API_KEY
valueFrom:
secretKeyRef:
name: bot-secrets
key: daily-api-key
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: bot-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 7860
initialDelaySeconds: 30
periodSeconds: 10
# Allow graceful shutdown
terminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: pipecat-bot
spec:
selector:
app: pipecat-bot
ports:
- port: 80
targetPort: 7860
type: LoadBalancerDaily Dial-in Setup:
{ .api }
from pipecat.runner.types import DailyRunnerArguments, DailyDialinRequest
from pipecat.runner.daily import configure
import aiohttp
async def dialin_handler(runner_args: DailyRunnerArguments):
"""Handle incoming PSTN dial-in calls."""
# Parse dial-in request
if runner_args.body:
dialin_request = DailyDialinRequest(**runner_args.body)
settings = dialin_request.dialin_settings
logger.info(f"Incoming call from: {settings.From}")
logger.info(f"To number: {settings.To}")
logger.info(f"Call ID: {settings.call_id}")
# Access SIP headers
if settings.sip_headers:
custom_data = settings.sip_headers.get("X-Custom-Data")
# Create transport and run bot
transport = await create_transport(runner_args, transport_params)
# ... build and run pipeline ...
# Configure Daily phone number webhook
# POST to: https://your-domain.com/daily-dialin-webhookCreating SIP-Enabled Rooms:
{ .api }
import aiohttp
from pipecat.runner.daily import configure
async def setup_dialin():
async with aiohttp.ClientSession() as session:
# Create SIP-enabled room
config = await configure(
session,
sip_caller_phone="+15551234567",
sip_enable_video=False,
sip_num_endpoints=1,
sip_codecs={"audio": ["OPUS", "PCMU"]},
)
print(f"Room URL: {config.room_url}")
print(f"SIP endpoint: {config.sip_endpoint}")
print(f"Token: {config.token}")
# Configure your phone number to forward to sip_endpointMulti-Provider Telephony Bot:
{ .api }
from pipecat.runner.utils import parse_telephony_websocket, create_transport
from pipecat.runner.types import WebSocketRunnerArguments
async def telephony_bot(runner_args: WebSocketRunnerArguments):
"""Universal telephony bot supporting multiple providers."""
# Parse provider and call data
transport_type, call_data = await parse_telephony_websocket(
runner_args.websocket
)
logger.info(f"Call from {transport_type} provider")
# Handle provider-specific data
if transport_type == "twilio":
call_id = call_data["call_id"]
custom_params = call_data["body"]
user_id = custom_params.get("user_id")
elif transport_type == "telnyx":
call_control_id = call_data["call_control_id"]
from_number = call_data["from"]
to_number = call_data["to"]
elif transport_type == "plivo":
stream_id = call_data["stream_id"]
elif transport_type == "exotel":
account_sid = call_data["account_sid"]
# Create transport (automatically configured for provider)
transport = await create_transport(runner_args, transport_params)
# Build and run pipeline
pipeline = Pipeline([transport.input(), /* ... */, transport.output()])
task = PipelineTask(pipeline)
runner = PipelineRunner(handle_sigint=False) # No signals in webhook context
await runner.run(task)Horizontal Scaling:
{ .api }
# Use stateless bot design for horizontal scaling
async def stateless_bot(runner_args):
"""Stateless bot that can scale horizontally."""
# Load user context from external store
user_id = runner_args.body.get("user_id")
user_context = await load_from_redis(user_id)
# Build pipeline with loaded context
# ... your pipeline ...
# Save state back to external store
await save_to_redis(user_id, updated_context)Load Balancing:
# nginx.conf for load balancing
upstream pipecat_bots {
least_conn; # Use least connections
server bot1:7860;
server bot2:7860;
server bot3:7860;
}
server {
listen 80;
location / {
proxy_pass http://pipecat_bots;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
}Resource Management:
{ .api }
from pipecat.pipeline.runner import PipelineRunner
async def resource_managed_bot(runner_args):
"""Bot with explicit resource management."""
runner = PipelineRunner(
force_gc=True, # Force garbage collection
handle_sigterm=True, # Handle container termination
)
# Use context managers for resources
async with aiohttp.ClientSession() as session:
# Your bot logic
pass
await runner.run(task){ .api }
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import (
DailyRunnerArguments,
WebSocketRunnerArguments,
SmallWebRTCRunnerArguments,
)
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
# Define transport parameters for all supported transports
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(runner_args):
"""Universal bot function that works with any transport type."""
# Create transport automatically based on runner_args type
transport = await create_transport(runner_args, transport_params)
# Build your pipeline
pipeline = Pipeline([
transport.input(),
# ... your processors ...
transport.output(),
])
# Create and run task
task = PipelineTask(pipeline)
# Run until pipeline completes
await task.run()
# Use with Daily
daily_args = DailyRunnerArguments(
room_url="https://example.daily.co/room",
token="your-token"
)
await run_bot(daily_args)
# Use with WebSocket/Telephony (runner will detect provider)
# Called from FastAPI WebSocket endpoint
@app.websocket("/telephony")
async def telephony_endpoint(websocket: WebSocket):
await websocket.accept()
ws_args = WebSocketRunnerArguments(websocket=websocket)
await run_bot(ws_args){ .api }
from pipecat.runner.types import WebSocketRunnerArguments
from pipecat.runner.utils import parse_telephony_websocket, create_transport
from fastapi import WebSocket
async def telephony_bot(runner_args: WebSocketRunnerArguments):
# Parse telephony provider and call data
transport_type, call_data = await parse_telephony_websocket(
runner_args.websocket
)
# Access provider-specific data
if transport_type == "twilio":
call_id = call_data["call_id"]
stream_id = call_data["stream_id"]
custom_params = call_data["body"]
# Use custom parameters from your webhook
user_id = custom_params.get("user_id")
bot_personality = custom_params.get("personality", "default")
print(f"Twilio call {call_id} for user {user_id}")
elif transport_type == "telnyx":
print(f"Telnyx call from {call_data['from']} to {call_data['to']}")
# Create transport (automatically configured for provider)
transport = await create_transport(runner_args, transport_params)
# ... build and run pipeline ...
@app.websocket("/twilio")
async def twilio_endpoint(websocket: WebSocket):
await websocket.accept()
await telephony_bot(WebSocketRunnerArguments(websocket=websocket)){ .api }
from pipecat.runner.types import DailyRunnerArguments, DailyDialinRequest
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
)
}
async def dialin_bot(runner_args: DailyRunnerArguments):
"""Handle incoming PSTN dial-in calls."""
# Parse dial-in request from runner_args.body
if runner_args.body:
dialin_request = DailyDialinRequest(**runner_args.body)
settings = dialin_request.dialin_settings
print(f"Incoming dial-in call")
print(f" From: {settings.From}")
print(f" To: {settings.To}")
print(f" Call ID: {settings.call_id}")
# Access SIP headers if available
if settings.sip_headers:
caller_name = settings.sip_headers.get("X-Caller-Name")
print(f" Caller: {caller_name}")
# Use Daily API credentials for call control
api_key = dialin_request.daily_api_key
api_url = dialin_request.daily_api_url
# Create Daily transport
transport = await create_transport(runner_args, transport_params)
# Build pipeline
pipeline = Pipeline([
transport.input(),
# ... your processors ...
transport.output(),
])
# Run pipeline
task = PipelineTask(pipeline)
runner = PipelineRunner(
handle_sigint=runner_args.handle_sigint,
handle_sigterm=runner_args.handle_sigterm,
)
await runner.run(task){ .api }
from pipecat.runner.types import DailyRunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.audio.vad.silero import SileroVADAnalyzer
from loguru import logger
import asyncio
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
}
async def production_bot(runner_args: DailyRunnerArguments):
"""Production-ready bot with comprehensive error handling."""
try:
logger.info("Starting production bot")
# Create transport with error handling
try:
transport = await create_transport(runner_args, transport_params)
except Exception as e:
logger.error(f"Failed to create transport: {e}")
raise
# Register transport event handlers
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.info(f"Participant joined: {participant['id']}")
await transport.capture_participant_video(participant["id"])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.info(f"Participant left: {participant['id']}, reason: {reason}")
@transport.event_handler("on_dialin_ready")
async def on_dialin_ready(transport, cdata):
logger.info("PSTN call connected and ready")
# Build pipeline with error handlers
pipeline = Pipeline([
transport.input(),
# ... your processors with error handling ...
transport.output(),
])
# Create task and runner
task = PipelineTask(pipeline)
runner = PipelineRunner(
name="production-bot",
handle_sigint=runner_args.handle_sigint,
handle_sigterm=runner_args.handle_sigterm,
force_gc=True, # Clean up resources
)
logger.info("Bot ready, starting pipeline")
await runner.run(task)
logger.info("Bot completed successfully")
except asyncio.CancelledError:
logger.info("Bot cancelled gracefully")
raise
except Exception as e:
logger.error(f"Bot error: {e}", exc_info=True)
raise
finally:
logger.info("Bot cleanup complete")
if __name__ == "__main__":
# For direct execution
from pipecat.runner.run import main
main(){ .api }
from pipecat.runner.types import (
RunnerArguments,
DailyRunnerArguments,
WebSocketRunnerArguments,
SmallWebRTCRunnerArguments,
)
from pipecat.runner.utils import create_transport, parse_telephony_websocket
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.transports.smallwebrtc.params import TransportParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from loguru import logger
# Define transport parameters for all supported transports
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"plivo": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"exotel": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def universal_bot(runner_args: RunnerArguments):
"""Universal bot that works with any transport type."""
# Log transport type
if isinstance(runner_args, DailyRunnerArguments):
logger.info(f"Starting Daily bot: {runner_args.room_url}")
elif isinstance(runner_args, SmallWebRTCRunnerArguments):
logger.info("Starting WebRTC bot")
elif isinstance(runner_args, WebSocketRunnerArguments):
# Detect telephony provider
transport_type, call_data = await parse_telephony_websocket(
runner_args.websocket
)
logger.info(f"Starting {transport_type} telephony bot")
# Log provider-specific info
if transport_type == "twilio":
logger.info(f"Call ID: {call_data['call_id']}")
elif transport_type == "telnyx":
logger.info(f"From: {call_data['from']} To: {call_data['to']}")
# Create transport automatically
transport = await create_transport(runner_args, transport_params)
# Build your pipeline (same for all transports)
pipeline = Pipeline([
transport.input(),
# ... your processors work with any transport ...
transport.output(),
])
# Create and run task
task = PipelineTask(pipeline)
runner = PipelineRunner(
handle_sigint=runner_args.handle_sigint,
handle_sigterm=runner_args.handle_sigterm,
)
await runner.run(task)
# Run with development runner
if __name__ == "__main__":
from pipecat.runner.run import main
main()
# Usage:
# python bot.py -t daily # Daily server mode
# python bot.py -d # Daily direct mode
# python bot.py -t webrtc # WebRTC local
# python bot.py -t twilio -x ngrok.io # Twilio
# python bot.py -t telnyx -x ngrok.io # Telnyx