or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

runner.mddocs/

Runner Module

The Runner module provides both a development framework and production runtime for deploying Pipecat bots across different transport types (Daily, WebRTC, WebSocket/Telephony). It includes:

  • PipelineRunner: Production-grade task executor with signal handling and lifecycle management
  • Development Runner: Rapid prototyping server with automatic transport configuration
  • Transport Utilities: Factory functions for creating transports with automatic provider detection
  • Deployment Helpers: Configuration utilities for Daily, LiveKit, and telephony services

Imports

{ .api }
# Production pipeline runner
from pipecat.pipeline.runner import PipelineRunner

# Runner argument types
from pipecat.runner.types import (
    RunnerArguments,
    DailyRunnerArguments,
    WebSocketRunnerArguments,
    SmallWebRTCRunnerArguments,
    DialinSettings,
    DailyDialinRequest,
)

# Transport utilities
from pipecat.runner.utils import (
    create_transport,
    parse_telephony_websocket,
    get_transport_client_id,
    maybe_capture_participant_camera,
    maybe_capture_participant_screen,
    smallwebrtc_sdp_munging,
)

# Configuration helpers
from pipecat.runner.daily import configure as configure_daily
from pipecat.runner.livekit import configure as configure_livekit

# Development runner
from pipecat.runner.run import main as runner_main

Capabilities

PipelineRunner

Production-grade pipeline task executor that manages lifecycle, signal handling, and resource cleanup.

class PipelineRunner:
    """Manages the execution of pipeline tasks with lifecycle and signal handling.

    Provides a high-level interface for running pipeline tasks with automatic
    signal handling (SIGINT/SIGTERM), optional garbage collection, and proper
    cleanup of resources.

    Parameters:
        name: Optional name for the runner instance
        handle_sigint: Whether to automatically handle SIGINT signals (default: True)
        handle_sigterm: Whether to automatically handle SIGTERM signals (default: False)
        force_gc: Whether to force garbage collection after task completion (default: False)
        loop: Event loop to use. If None, uses the current running loop
    """

    def __init__(
        self,
        *,
        name: Optional[str] = None,
        handle_sigint: bool = True,
        handle_sigterm: bool = False,
        force_gc: bool = False,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    )

    async def run(self, task: PipelineTask):
        """Run a pipeline task to completion.

        Parameters:
            task: The pipeline task to execute
        """

    async def stop_when_done(self):
        """Schedule all running tasks to stop when their current processing is complete."""

    async def cancel(self):
        """Cancel all running tasks immediately."""

Usage:

{ .api }
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner

async def main():
    # Create your pipeline
    pipeline = Pipeline([
        transport.input(),
        # ... your processors ...
        transport.output(),
    ])

    # Create task
    task = PipelineTask(pipeline)

    # Create runner with signal handling
    runner = PipelineRunner(
        name="my-bot",
        handle_sigint=True,  # Graceful shutdown on Ctrl+C
        handle_sigterm=True,  # Graceful shutdown on termination
    )

    # Run until completion
    await runner.run(task)

# Run with asyncio
asyncio.run(main())

Production Deployment:

{ .api }
from pipecat.pipeline.runner import PipelineRunner
from pipecat.transports.daily.transport import DailyTransport, DailyParams
import asyncio
import os

async def production_bot():
    """Production bot with proper signal handling and cleanup."""

    # Configure transport
    transport = DailyTransport(
        os.getenv("DAILY_ROOM_URL"),
        os.getenv("DAILY_TOKEN"),
        "Production Bot",
        DailyParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
        )
    )

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),
        # ... your processors ...
        transport.output(),
    ])

    # Create task and runner
    task = PipelineTask(pipeline)
    runner = PipelineRunner(
        name="production-bot",
        handle_sigint=True,
        handle_sigterm=True,  # Important for Docker/Kubernetes
        force_gc=True,  # Clean up resources properly
    )

    # Run with automatic signal handling
    await runner.run(task)

if __name__ == "__main__":
    asyncio.run(production_bot())

Runner Argument Types

Session argument types passed to bot functions by the development runner, containing transport-specific configuration and connection information.

Base Runner Arguments

@dataclass
class RunnerArguments:
    """Base class for runner session arguments.

    Attributes:
        handle_sigint: Whether to handle SIGINT signals (False by default)
        handle_sigterm: Whether to handle SIGTERM signals (False by default)
        pipeline_idle_timeout_secs: Seconds before pipeline idle timeout (300 by default)
        body: Additional request data (optional, defaults to empty dict)
    """
    handle_sigint: bool
    handle_sigterm: bool
    pipeline_idle_timeout_secs: int
    body: Optional[Any]

Daily Runner Arguments

@dataclass
class DailyRunnerArguments(RunnerArguments):
    """Daily transport session arguments.

    Parameters:
        room_url: Daily room URL to join
        token: Authentication token for the room (optional)
        body: Additional request data (optional)
    """
    room_url: str
    token: Optional[str] = None

Usage:

{ .api }
from pipecat.runner.types import DailyRunnerArguments

# In your bot function
async def run_bot(runner_args: DailyRunnerArguments):
    # Access Daily-specific parameters
    print(f"Joining room: {runner_args.room_url}")

    # Create Daily transport
    from pipecat.transports.daily.transport import DailyTransport, DailyParams

    transport = DailyTransport(
        runner_args.room_url,
        runner_args.token,
        "My Bot",
        params=DailyParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
        )
    )

WebSocket Runner Arguments

@dataclass
class WebSocketRunnerArguments(RunnerArguments):
    """WebSocket transport session arguments for telephony providers.

    Parameters:
        websocket: FastAPI WebSocket connection for audio streaming
        body: Additional request data including dial-in settings (optional)
    """
    websocket: WebSocket

Usage:

{ .api }
from pipecat.runner.types import WebSocketRunnerArguments
from pipecat.runner.utils import parse_telephony_websocket

# In your bot function for telephony
async def run_telephony_bot(runner_args: WebSocketRunnerArguments):
    # Parse telephony provider type and call data
    transport_type, call_data = await parse_telephony_websocket(
        runner_args.websocket
    )

    print(f"Provider: {transport_type}")
    print(f"Call ID: {call_data.get('call_id')}")

    # Access custom parameters from body
    user_id = runner_args.body.get("user_id")

SmallWebRTC Runner Arguments

@dataclass
class SmallWebRTCRunnerArguments(RunnerArguments):
    """Small WebRTC transport session arguments for ESP32 and embedded devices.

    Parameters:
        webrtc_connection: Pre-configured WebRTC peer connection
    """
    webrtc_connection: Any

Usage:

{ .api }
from pipecat.runner.types import SmallWebRTCRunnerArguments

# In your bot function for SmallWebRTC
async def run_webrtc_bot(runner_args: SmallWebRTCRunnerArguments):
    from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport

    transport = SmallWebRTCTransport(
        params=params,
        webrtc_connection=runner_args.webrtc_connection,
    )

Telephony Integration Types

Data types for PSTN/SIP telephony integration with Daily.co and Pipecat Cloud.

Dial-in Settings

class DialinSettings(BaseModel):
    """Dial-in settings from Daily/Pipecat Cloud webhook.

    Parameters:
        call_id: Unique identifier for the call (UUID representing sessionId in SIP Network)
        call_domain: Daily domain for the call (UUID representing Daily Domain on SIP Network)
        To: The dialed phone number (optional)
        From: The caller's phone number (optional)
        sip_headers: SIP headers from the call (optional)
    """
    call_id: str
    call_domain: str
    To: Optional[str] = None
    From: Optional[str] = None
    sip_headers: Optional[Dict[str, str]] = None

Daily Dial-in Request

class DailyDialinRequest(BaseModel):
    """Request data for Daily PSTN dial-in calls.

    This structure is passed in runner_args.body for dial-in calls from
    Pipecat Cloud's webhook handler.

    Parameters:
        dialin_settings: Dial-in configuration including call_id, call_domain, To, From
        daily_api_key: Daily API key for pinlessCallUpdate (required for dial-in)
        daily_api_url: Daily API URL (staging or production)
    """
    dialin_settings: DialinSettings
    daily_api_key: str
    daily_api_url: str

Usage:

{ .api }
from pipecat.runner.types import DailyDialinRequest

async def run_bot(runner_args: DailyRunnerArguments):
    # Parse dial-in request if present
    if runner_args.body:
        dialin_request = DailyDialinRequest(**runner_args.body)
        print(f"Dial-in call from: {dialin_request.dialin_settings.From}")
        print(f"To number: {dialin_request.dialin_settings.To}")

Transport Creation Utilities

Factory functions for creating transports from runner arguments with automatic provider detection and configuration.

Create Transport

async def create_transport(
    runner_args: Any,
    transport_params: Dict[str, Callable]
) -> BaseTransport:
    """Create a transport from runner arguments using factory functions.

    Automatically detects transport type from runner_args and creates the
    appropriate transport with the provided parameters. For WebSocket
    telephony transports, automatically detects provider (Twilio, Telnyx,
    Plivo, Exotel) and configures serializers.

    Parameters:
        runner_args: DailyRunnerArguments, WebSocketRunnerArguments, or
                     SmallWebRTCRunnerArguments
        transport_params: Dict mapping transport names to parameter factory functions.
                         Keys: "daily", "webrtc", "twilio", "telnyx", "plivo", "exotel"
                         Values: Functions returning transport parameters when called

    Returns:
        Configured transport instance ready to use

    Raises:
        ValueError: If transport key missing from transport_params or unsupported runner_args type
        ImportError: If required transport dependencies not installed
    """

Usage:

{ .api }
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.audio.vad.silero import SileroVADAnalyzer

# Define transport parameter factories
transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    ),
    "twilio": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
        # add_wav_header and serializer set automatically
    ),
    "telnyx": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        # Serializer configured automatically
    ),
}

# Create transport automatically based on runner_args type
async def run_bot(runner_args):
    transport = await create_transport(runner_args, transport_params)

    # Transport is ready to use
    # - DailyRunnerArguments → DailyTransport
    # - SmallWebRTCRunnerArguments → SmallWebRTCTransport
    # - WebSocketRunnerArguments → FastAPIWebsocketTransport with provider detection

Telephony WebSocket Parsing

Parse telephony provider WebSocket messages to detect provider type and extract call data.

Parse Telephony WebSocket

async def parse_telephony_websocket(
    websocket: WebSocket
) -> tuple[str, dict]:
    """Parse telephony WebSocket messages and return transport type and call data.

    Automatically detects telephony provider (Twilio, Telnyx, Plivo, Exotel) from
    WebSocket message structure and extracts provider-specific call metadata.

    Parameters:
        websocket: FastAPI WebSocket connection from telephony provider

    Returns:
        tuple: (transport_type, call_data)

        transport_type: "twilio", "telnyx", "plivo", "exotel", or "unknown"

        call_data dict contains provider-specific fields:

        Twilio:
            - stream_id: str - Stream identifier
            - call_id: str - Call SID
            - body: dict - Custom parameters from webhook

        Telnyx:
            - stream_id: str - Stream identifier
            - call_control_id: str - Call control ID
            - outbound_encoding: str - Audio encoding format
            - from: str - Caller number
            - to: str - Dialed number

        Plivo:
            - stream_id: str - Stream identifier
            - call_id: str - Call UUID

        Exotel:
            - stream_id: str - Stream SID
            - call_id: str - Call SID
            - account_sid: str - Account SID
            - from: str - Caller number
            - to: str - Dialed number
            - custom_parameters: str - Custom parameters
    """

Usage:

{ .api }
from pipecat.runner.utils import parse_telephony_websocket
from fastapi import WebSocket

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    # Parse provider and call data
    transport_type, call_data = await parse_telephony_websocket(websocket)

    if transport_type == "twilio":
        print(f"Twilio call: {call_data['call_id']}")
        user_id = call_data["body"].get("user_id")
    elif transport_type == "telnyx":
        print(f"Telnyx call control ID: {call_data['call_control_id']}")
        print(f"From: {call_data['from']} To: {call_data['to']}")
    elif transport_type == "plivo":
        print(f"Plivo stream: {call_data['stream_id']}")

Transport Client Utilities

Helper functions for working with transport client objects across different transport types.

Get Transport Client ID

def get_transport_client_id(
    transport: BaseTransport,
    client: Any
) -> str:
    """Get client identifier from transport-specific client object.

    Extracts the client ID string from transport-specific client objects,
    supporting Daily and SmallWebRTC transports.

    Parameters:
        transport: The transport instance
        client: Transport-specific client object (Daily participant or SmallWebRTC pc_id)

    Returns:
        Client identifier string, empty string if transport not supported
    """

Usage:

{ .api }
from pipecat.runner.utils import get_transport_client_id

# In transport event handler
async def on_participant_joined(transport, participant):
    client_id = get_transport_client_id(transport, participant)
    print(f"Client {client_id} joined")

Video Capture Utilities

Helper functions to capture participant video across different transport types.

Capture Participant Camera

async def maybe_capture_participant_camera(
    transport: BaseTransport,
    client: Any,
    framerate: int = 0
):
    """Capture participant camera video if transport supports it.

    Attempts to capture camera video for Daily and SmallWebRTC transports.
    No-op for unsupported transports.

    Parameters:
        transport: The transport instance
        client: Transport-specific client object
        framerate: Video capture framerate (0 = auto)
    """

Capture Participant Screen

async def maybe_capture_participant_screen(
    transport: BaseTransport,
    client: Any,
    framerate: int = 0
):
    """Capture participant screen video if transport supports it.

    Attempts to capture screen share video for Daily and SmallWebRTC transports.
    No-op for unsupported transports.

    Parameters:
        transport: The transport instance
        client: Transport-specific client object
        framerate: Video capture framerate (0 = auto)
    """

Usage:

{ .api }
from pipecat.runner.utils import (
    maybe_capture_participant_camera,
    maybe_capture_participant_screen
)

# In transport event handler
async def on_first_participant_joined(transport, participant):
    # Capture camera if available
    await maybe_capture_participant_camera(transport, participant, framerate=15)

    # Or capture screen share
    await maybe_capture_participant_screen(transport, participant)

WebRTC SDP Utilities

SDP manipulation utilities for WebRTC compatibility with embedded devices like ESP32.

SmallWebRTC SDP Munging

def smallwebrtc_sdp_munging(
    sdp: str,
    host: Optional[str]
) -> str:
    """Apply SDP modifications for SmallWebRTC compatibility.

    Removes unsupported fingerprint algorithms (sha-384, sha-512) and filters
    ICE candidates for ESP32 and embedded device compatibility.

    Parameters:
        sdp: Original SDP string
        host: Host address for ICE candidate filtering (optional)

    Returns:
        Modified SDP string with compatibility fixes applied
    """

Usage:

{ .api }
from pipecat.runner.utils import smallwebrtc_sdp_munging

# When creating SmallWebRTC offer/answer
async def create_offer():
    # Get SDP from peer connection
    offer = await pc.createOffer()

    # Apply SmallWebRTC compatibility fixes
    modified_sdp = smallwebrtc_sdp_munging(
        offer.sdp,
        host="192.168.1.100"  # Your server IP
    )

    # Use modified SDP
    await pc.setLocalDescription(RTCSessionDescription(
        type=offer.type,
        sdp=modified_sdp
    ))

Daily Configuration Helper

Utility for creating Daily rooms and tokens with support for standard video rooms and SIP-enabled rooms for PSTN calling.

async def configure(
    aiohttp_session: aiohttp.ClientSession,
    *,
    api_key: Optional[str] = None,
    room_exp_duration: Optional[float] = 2.0,
    token_exp_duration: Optional[float] = 2.0,
    sip_caller_phone: Optional[str] = None,
    sip_enable_video: Optional[bool] = False,
    sip_num_endpoints: Optional[int] = 1,
    sip_codecs: Optional[Dict[str, List[str]]] = None,
    room_properties: Optional[DailyRoomProperties] = None,
    token_properties: Optional[DailyMeetingTokenProperties] = None,
) -> DailyRoomConfig:
    """Configure Daily room URL and token with optional SIP capabilities.

    Either uses existing room from DAILY_SAMPLE_ROOM_URL environment variable
    or creates a new temporary room automatically.

    Parameters:
        aiohttp_session: HTTP session for making API requests
        api_key: Daily API key (uses DAILY_API_KEY env var if not provided)
        room_exp_duration: Room expiration time in hours (default: 2.0)
        token_exp_duration: Token expiration time in hours (default: 2.0)
        sip_caller_phone: Phone number for SIP display name. When provided, enables SIP
        sip_enable_video: Whether video is enabled for SIP (default: False)
        sip_num_endpoints: Number of allowed SIP endpoints (default: 1)
        sip_codecs: Audio/video codecs to support (e.g., {"audio": ["OPUS"]})
        room_properties: Custom DailyRoomProperties (overrides other parameters)
        token_properties: Custom DailyMeetingTokenProperties

    Returns:
        DailyRoomConfig with room_url, token, and optional sip_endpoint

    Environment Variables:
        DAILY_API_KEY: Required for creating rooms and tokens
        DAILY_SAMPLE_ROOM_URL: Optional existing room URL (standard mode only)
        DAILY_API_URL: Optional API URL (default: https://api.daily.co/v1)
    """

Usage:

{ .api }
import aiohttp
from pipecat.runner.daily import configure
from pipecat.transports.daily.transport import DailyTransport, DailyParams

async def setup_daily_bot():
    async with aiohttp.ClientSession() as session:
        # Standard room
        room_url, token = await configure(session)

        # SIP-enabled room for phone calls
        sip_config = await configure(
            session,
            sip_caller_phone="+15551234567",
            sip_enable_video=False,
        )
        print(f"Dial into: {sip_config.sip_endpoint}")

        # Custom room properties
        from pipecat.transports.daily.utils import DailyRoomProperties

        custom_config = await configure(
            session,
            room_properties=DailyRoomProperties(
                enable_recording="cloud",
                max_participants=2,
                start_audio_off=False,
            )
        )

        # Create transport
        transport = DailyTransport(
            room_url,
            token,
            "My Bot",
            DailyParams(audio_in_enabled=True, audio_out_enabled=True)
        )

LiveKit Configuration Helper

Utility for generating LiveKit access tokens with proper room permissions for agents.

async def configure() -> tuple[str, str, str]:
    """Configure LiveKit room URL and token from environment.

    Returns:
        Tuple containing (url, token, room_name)

    Environment Variables:
        LIVEKIT_URL: LiveKit server URL (required)
        LIVEKIT_ROOM_NAME: Room name to join (required)
        LIVEKIT_API_KEY: LiveKit API key (required)
        LIVEKIT_API_SECRET: LiveKit API secret (required)

    Raises:
        Exception: If required environment variables are not set
    """

def generate_token(
    room_name: str,
    participant_name: str,
    api_key: str,
    api_secret: str
) -> str:
    """Generate a LiveKit access token for a participant.

    Parameters:
        room_name: Name of the LiveKit room
        participant_name: Name of the participant
        api_key: LiveKit API key
        api_secret: LiveKit API secret

    Returns:
        JWT token string for room access
    """

def generate_token_with_agent(
    room_name: str,
    participant_name: str,
    api_key: str,
    api_secret: str
) -> str:
    """Generate a LiveKit access token for an agent participant.

    Parameters:
        room_name: Name of the LiveKit room
        participant_name: Name of the participant
        api_key: LiveKit API key
        api_secret: LiveKit API secret

    Returns:
        JWT token string with agent grants
    """

Usage:

{ .api }
from pipecat.runner.livekit import configure, generate_token
from pipecat.transports.livekit.transport import LiveKitTransport, LiveKitParams

async def setup_livekit_bot():
    # Configure from environment
    url, token, room_name = await configure()

    # Create transport
    transport = LiveKitTransport(
        url,
        token,
        room_name,
        LiveKitParams(audio_in_enabled=True, audio_out_enabled=True)
    )

    # Generate custom tokens
    agent_token = generate_token_with_agent(
        "my-room",
        "Bot Agent",
        api_key,
        api_secret
    )

    user_token = generate_token(
        "my-room",
        "Test User",
        api_key,
        api_secret
    )

Development Runner

Command-line development server that automatically discovers bot functions and provides FastAPI endpoints for different transport types.

Bot Function Interface:

{ .api }
async def bot(runner_args: RunnerArguments):
    """Entry point for your bot - discovered automatically by the runner.

    Parameters:
        runner_args: Transport-specific arguments (DailyRunnerArguments,
                     WebSocketRunnerArguments, or SmallWebRTCRunnerArguments)
    """
    # Your bot implementation
    pass

Running the Development Server:

# Install runner dependencies
pip install pipecat-ai[runner]

# WebRTC (local development)
python bot.py -t webrtc
# Opens http://localhost:7860/client

# WebRTC for ESP32 devices
python bot.py -t webrtc --esp32 --host 192.168.1.100

# Daily (server mode with web UI)
python bot.py -t daily
# Opens http://localhost:7860

# Daily (direct connection for testing)
python bot.py -d
# Connects directly to Daily room

# Telephony (Twilio)
python bot.py -t twilio -x your-bot.ngrok.io

# Telephony (Telnyx)
python bot.py -t telnyx -x your-bot.ngrok.io

# Daily with PSTN dial-in
python bot.py -t daily --dialin
# Webhook: http://localhost:7860/daily-dialin-webhook

Command-Line Options:

  • --host: Server host address (default: localhost)
  • --port: Server port (default: 7860)
  • -t/--transport: Transport type (daily, webrtc, twilio, telnyx, plivo, exotel)
  • -x/--proxy: Public proxy hostname for telephony webhooks
  • --esp32: Enable ESP32 compatibility mode
  • -d/--direct: Connect directly to Daily without server
  • --dialin: Enable Daily PSTN dial-in webhook
  • -f/--folder: Downloads folder path
  • -v/--verbose: Increase logging verbosity

Example Bot:

{ .api }
# bot.py
from pipecat.runner.types import DailyRunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner

# Define transport parameters
transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    )
}

async def bot(runner_args: DailyRunnerArguments):
    """Bot entry point - discovered by runner."""

    # Create transport automatically
    transport = await create_transport(runner_args, transport_params)

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),
        # ... your processors ...
        transport.output(),
    ])

    # Run with signal handling from runner_args
    task = PipelineTask(pipeline)
    runner = PipelineRunner(
        handle_sigint=runner_args.handle_sigint,
        handle_sigterm=runner_args.handle_sigterm,
    )
    await runner.run(task)

if __name__ == "__main__":
    from pipecat.runner.run import main
    main()

Deployment Patterns

Production Deployment with Docker

Dockerfile:

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy bot code
COPY bot.py .

# Expose port for webhooks
EXPOSE 7860

# Run bot with proper signal handling
CMD ["python", "bot.py"]

docker-compose.yml:

version: '3.8'
services:
  bot:
    build: .
    environment:
      - DAILY_API_KEY=${DAILY_API_KEY}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    ports:
      - "7860:7860"
    restart: unless-stopped
    # Ensure proper signal handling
    stop_signal: SIGTERM
    stop_grace_period: 30s

Production Bot with Error Handling:

{ .api }
from pipecat.pipeline.runner import PipelineRunner
from loguru import logger
import asyncio

async def production_bot(runner_args):
    """Production bot with comprehensive error handling."""

    try:
        # Create transport
        transport = await create_transport(runner_args, transport_params)

        # Build pipeline with error handlers
        pipeline = Pipeline([
            transport.input(),
            # ... processors with error handling ...
            transport.output(),
        ])

        # Create task and runner
        task = PipelineTask(pipeline)
        runner = PipelineRunner(
            name="production-bot",
            handle_sigint=True,
            handle_sigterm=True,  # Critical for Docker/K8s
            force_gc=True,
        )

        logger.info("Bot starting")
        await runner.run(task)
        logger.info("Bot completed successfully")

    except asyncio.CancelledError:
        logger.info("Bot cancelled gracefully")
        raise
    except Exception as e:
        logger.error(f"Bot error: {e}")
        raise
    finally:
        # Cleanup resources
        logger.info("Bot cleanup complete")

Kubernetes Deployment

deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: pipecat-bot
spec:
  replicas: 3
  selector:
    matchLabels:
      app: pipecat-bot
  template:
    metadata:
      labels:
        app: pipecat-bot
    spec:
      containers:
      - name: bot
        image: your-registry/pipecat-bot:latest
        ports:
        - containerPort: 7860
        env:
        - name: DAILY_API_KEY
          valueFrom:
            secretKeyRef:
              name: bot-secrets
              key: daily-api-key
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: bot-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 7860
          initialDelaySeconds: 30
          periodSeconds: 10
        # Allow graceful shutdown
        terminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
  name: pipecat-bot
spec:
  selector:
    app: pipecat-bot
  ports:
  - port: 80
    targetPort: 7860
  type: LoadBalancer

SIP/PSTN Dial-in Integration

Daily Dial-in Setup:

{ .api }
from pipecat.runner.types import DailyRunnerArguments, DailyDialinRequest
from pipecat.runner.daily import configure
import aiohttp

async def dialin_handler(runner_args: DailyRunnerArguments):
    """Handle incoming PSTN dial-in calls."""

    # Parse dial-in request
    if runner_args.body:
        dialin_request = DailyDialinRequest(**runner_args.body)
        settings = dialin_request.dialin_settings

        logger.info(f"Incoming call from: {settings.From}")
        logger.info(f"To number: {settings.To}")
        logger.info(f"Call ID: {settings.call_id}")

        # Access SIP headers
        if settings.sip_headers:
            custom_data = settings.sip_headers.get("X-Custom-Data")

    # Create transport and run bot
    transport = await create_transport(runner_args, transport_params)
    # ... build and run pipeline ...

# Configure Daily phone number webhook
# POST to: https://your-domain.com/daily-dialin-webhook

Creating SIP-Enabled Rooms:

{ .api }
import aiohttp
from pipecat.runner.daily import configure

async def setup_dialin():
    async with aiohttp.ClientSession() as session:
        # Create SIP-enabled room
        config = await configure(
            session,
            sip_caller_phone="+15551234567",
            sip_enable_video=False,
            sip_num_endpoints=1,
            sip_codecs={"audio": ["OPUS", "PCMU"]},
        )

        print(f"Room URL: {config.room_url}")
        print(f"SIP endpoint: {config.sip_endpoint}")
        print(f"Token: {config.token}")

        # Configure your phone number to forward to sip_endpoint

Telephony Provider Integration

Multi-Provider Telephony Bot:

{ .api }
from pipecat.runner.utils import parse_telephony_websocket, create_transport
from pipecat.runner.types import WebSocketRunnerArguments

async def telephony_bot(runner_args: WebSocketRunnerArguments):
    """Universal telephony bot supporting multiple providers."""

    # Parse provider and call data
    transport_type, call_data = await parse_telephony_websocket(
        runner_args.websocket
    )

    logger.info(f"Call from {transport_type} provider")

    # Handle provider-specific data
    if transport_type == "twilio":
        call_id = call_data["call_id"]
        custom_params = call_data["body"]
        user_id = custom_params.get("user_id")

    elif transport_type == "telnyx":
        call_control_id = call_data["call_control_id"]
        from_number = call_data["from"]
        to_number = call_data["to"]

    elif transport_type == "plivo":
        stream_id = call_data["stream_id"]

    elif transport_type == "exotel":
        account_sid = call_data["account_sid"]

    # Create transport (automatically configured for provider)
    transport = await create_transport(runner_args, transport_params)

    # Build and run pipeline
    pipeline = Pipeline([transport.input(), /* ... */, transport.output()])
    task = PipelineTask(pipeline)
    runner = PipelineRunner(handle_sigint=False)  # No signals in webhook context
    await runner.run(task)

Scaling Considerations

Horizontal Scaling:

{ .api }
# Use stateless bot design for horizontal scaling
async def stateless_bot(runner_args):
    """Stateless bot that can scale horizontally."""

    # Load user context from external store
    user_id = runner_args.body.get("user_id")
    user_context = await load_from_redis(user_id)

    # Build pipeline with loaded context
    # ... your pipeline ...

    # Save state back to external store
    await save_to_redis(user_id, updated_context)

Load Balancing:

# nginx.conf for load balancing
upstream pipecat_bots {
    least_conn;  # Use least connections
    server bot1:7860;
    server bot2:7860;
    server bot3:7860;
}

server {
    listen 80;
    location / {
        proxy_pass http://pipecat_bots;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
}

Resource Management:

{ .api }
from pipecat.pipeline.runner import PipelineRunner

async def resource_managed_bot(runner_args):
    """Bot with explicit resource management."""

    runner = PipelineRunner(
        force_gc=True,  # Force garbage collection
        handle_sigterm=True,  # Handle container termination
    )

    # Use context managers for resources
    async with aiohttp.ClientSession() as session:
        # Your bot logic
        pass

    await runner.run(task)

Complete Usage Examples

Example 1: Multi-Transport Bot with Runner

{ .api }
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.runner.types import (
    DailyRunnerArguments,
    WebSocketRunnerArguments,
    SmallWebRTCRunnerArguments,
)
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.audio.vad.silero import SileroVADAnalyzer

# Define transport parameters for all supported transports
transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    ),
    "twilio": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
    "telnyx": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    ),
}

async def run_bot(runner_args):
    """Universal bot function that works with any transport type."""

    # Create transport automatically based on runner_args type
    transport = await create_transport(runner_args, transport_params)

    # Build your pipeline
    pipeline = Pipeline([
        transport.input(),
        # ... your processors ...
        transport.output(),
    ])

    # Create and run task
    task = PipelineTask(pipeline)

    # Run until pipeline completes
    await task.run()

# Use with Daily
daily_args = DailyRunnerArguments(
    room_url="https://example.daily.co/room",
    token="your-token"
)
await run_bot(daily_args)

# Use with WebSocket/Telephony (runner will detect provider)
# Called from FastAPI WebSocket endpoint
@app.websocket("/telephony")
async def telephony_endpoint(websocket: WebSocket):
    await websocket.accept()
    ws_args = WebSocketRunnerArguments(websocket=websocket)
    await run_bot(ws_args)

Example 2: Telephony Bot with Call Data

{ .api }
from pipecat.runner.types import WebSocketRunnerArguments
from pipecat.runner.utils import parse_telephony_websocket, create_transport
from fastapi import WebSocket

async def telephony_bot(runner_args: WebSocketRunnerArguments):
    # Parse telephony provider and call data
    transport_type, call_data = await parse_telephony_websocket(
        runner_args.websocket
    )

    # Access provider-specific data
    if transport_type == "twilio":
        call_id = call_data["call_id"]
        stream_id = call_data["stream_id"]
        custom_params = call_data["body"]

        # Use custom parameters from your webhook
        user_id = custom_params.get("user_id")
        bot_personality = custom_params.get("personality", "default")

        print(f"Twilio call {call_id} for user {user_id}")

    elif transport_type == "telnyx":
        print(f"Telnyx call from {call_data['from']} to {call_data['to']}")

    # Create transport (automatically configured for provider)
    transport = await create_transport(runner_args, transport_params)

    # ... build and run pipeline ...

@app.websocket("/twilio")
async def twilio_endpoint(websocket: WebSocket):
    await websocket.accept()
    await telephony_bot(WebSocketRunnerArguments(websocket=websocket))

Example 3: Daily Dial-in Integration

{ .api }
from pipecat.runner.types import DailyRunnerArguments, DailyDialinRequest
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner

transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    )
}

async def dialin_bot(runner_args: DailyRunnerArguments):
    """Handle incoming PSTN dial-in calls."""

    # Parse dial-in request from runner_args.body
    if runner_args.body:
        dialin_request = DailyDialinRequest(**runner_args.body)
        settings = dialin_request.dialin_settings

        print(f"Incoming dial-in call")
        print(f"  From: {settings.From}")
        print(f"  To: {settings.To}")
        print(f"  Call ID: {settings.call_id}")

        # Access SIP headers if available
        if settings.sip_headers:
            caller_name = settings.sip_headers.get("X-Caller-Name")
            print(f"  Caller: {caller_name}")

        # Use Daily API credentials for call control
        api_key = dialin_request.daily_api_key
        api_url = dialin_request.daily_api_url

    # Create Daily transport
    transport = await create_transport(runner_args, transport_params)

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),
        # ... your processors ...
        transport.output(),
    ])

    # Run pipeline
    task = PipelineTask(pipeline)
    runner = PipelineRunner(
        handle_sigint=runner_args.handle_sigint,
        handle_sigterm=runner_args.handle_sigterm,
    )
    await runner.run(task)

Example 4: Production Bot with Error Handling

{ .api }
from pipecat.runner.types import DailyRunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.transports.daily.transport import DailyParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.audio.vad.silero import SileroVADAnalyzer
from loguru import logger
import asyncio

transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    )
}

async def production_bot(runner_args: DailyRunnerArguments):
    """Production-ready bot with comprehensive error handling."""

    try:
        logger.info("Starting production bot")

        # Create transport with error handling
        try:
            transport = await create_transport(runner_args, transport_params)
        except Exception as e:
            logger.error(f"Failed to create transport: {e}")
            raise

        # Register transport event handlers
        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            logger.info(f"Participant joined: {participant['id']}")
            await transport.capture_participant_video(participant["id"])

        @transport.event_handler("on_participant_left")
        async def on_participant_left(transport, participant, reason):
            logger.info(f"Participant left: {participant['id']}, reason: {reason}")

        @transport.event_handler("on_dialin_ready")
        async def on_dialin_ready(transport, cdata):
            logger.info("PSTN call connected and ready")

        # Build pipeline with error handlers
        pipeline = Pipeline([
            transport.input(),
            # ... your processors with error handling ...
            transport.output(),
        ])

        # Create task and runner
        task = PipelineTask(pipeline)
        runner = PipelineRunner(
            name="production-bot",
            handle_sigint=runner_args.handle_sigint,
            handle_sigterm=runner_args.handle_sigterm,
            force_gc=True,  # Clean up resources
        )

        logger.info("Bot ready, starting pipeline")
        await runner.run(task)
        logger.info("Bot completed successfully")

    except asyncio.CancelledError:
        logger.info("Bot cancelled gracefully")
        raise
    except Exception as e:
        logger.error(f"Bot error: {e}", exc_info=True)
        raise
    finally:
        logger.info("Bot cleanup complete")

if __name__ == "__main__":
    # For direct execution
    from pipecat.runner.run import main
    main()

Example 5: Multi-Transport Universal Bot

{ .api }
from pipecat.runner.types import (
    RunnerArguments,
    DailyRunnerArguments,
    WebSocketRunnerArguments,
    SmallWebRTCRunnerArguments,
)
from pipecat.runner.utils import create_transport, parse_telephony_websocket
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.transports.smallwebrtc.params import TransportParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from loguru import logger

# Define transport parameters for all supported transports
transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    ),
    "twilio": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
    "telnyx": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    ),
    "plivo": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    ),
    "exotel": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    ),
}

async def universal_bot(runner_args: RunnerArguments):
    """Universal bot that works with any transport type."""

    # Log transport type
    if isinstance(runner_args, DailyRunnerArguments):
        logger.info(f"Starting Daily bot: {runner_args.room_url}")
    elif isinstance(runner_args, SmallWebRTCRunnerArguments):
        logger.info("Starting WebRTC bot")
    elif isinstance(runner_args, WebSocketRunnerArguments):
        # Detect telephony provider
        transport_type, call_data = await parse_telephony_websocket(
            runner_args.websocket
        )
        logger.info(f"Starting {transport_type} telephony bot")

        # Log provider-specific info
        if transport_type == "twilio":
            logger.info(f"Call ID: {call_data['call_id']}")
        elif transport_type == "telnyx":
            logger.info(f"From: {call_data['from']} To: {call_data['to']}")

    # Create transport automatically
    transport = await create_transport(runner_args, transport_params)

    # Build your pipeline (same for all transports)
    pipeline = Pipeline([
        transport.input(),
        # ... your processors work with any transport ...
        transport.output(),
    ])

    # Create and run task
    task = PipelineTask(pipeline)
    runner = PipelineRunner(
        handle_sigint=runner_args.handle_sigint,
        handle_sigterm=runner_args.handle_sigterm,
    )

    await runner.run(task)

# Run with development runner
if __name__ == "__main__":
    from pipecat.runner.run import main
    main()

# Usage:
# python bot.py -t daily          # Daily server mode
# python bot.py -d                # Daily direct mode
# python bot.py -t webrtc         # WebRTC local
# python bot.py -t twilio -x ngrok.io  # Twilio
# python bot.py -t telnyx -x ngrok.io  # Telnyx