or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

real-world-scenarios.mddocs/examples/

Real-World Scenarios

Voice Assistant Application

Complete voice assistant with interruptions and function calling:

{ .api }
import asyncio
import os
from typing import Dict, Any

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.aggregators.llm_context import (
    LLMContext,
    LLMContextAggregatorPair,
)
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.daily import DailyTransport, DailyParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams

async def get_weather(location: str, units: str = "celsius") -> Dict[str, Any]:
    """Get weather for location."""
    # In production, call actual weather API
    return {
        "location": location,
        "temperature": 72,
        "units": units,
        "condition": "sunny"
    }

async def main():
    # Configure VAD
    vad = SileroVADAnalyzer(
        params=VADParams(
            threshold=0.5,
            min_speech_duration_ms=250,
        )
    )
    
    # Set up transport
    transport = DailyTransport(
        room_url=os.getenv("DAILY_ROOM_URL"),
        token=os.getenv("DAILY_TOKEN"),
        bot_name="Weather Assistant",
        params=DailyParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            audio_in_sample_rate=16000,
            audio_out_sample_rate=24000,
            vad_enabled=True,
            vad_analyzer=vad,
        )
    )
    
    # Configure services
    stt = DeepgramSTTService(
        api_key=os.getenv("DEEPGRAM_API_KEY"),
        model="nova-2",
        interim_results=True,
    )
    
    llm = OpenAILLMService(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-4",
        params={"temperature": 0.7}
    )
    
    tts = OpenAITTSService(
        api_key=os.getenv("OPENAI_API_KEY"),
        voice="alloy",
    )
    
    # Set up context with function
    context = LLMContext(
        messages=[{
            "role": "system",
            "content": "You are a helpful weather assistant."
        }]
    )
    
    aggregators = LLMContextAggregatorPair(context=context)
    llm.set_context(context)
    
    # Register function
    llm.register_function(
        name="get_weather",
        handler=get_weather,
        description="Get current weather",
        properties={
            "location": {"type": "string", "description": "City name"},
            "units": {"type": "string", "enum": ["celsius", "fahrenheit"]}
        },
        required=["location"]
    )
    
    # Build pipeline
    pipeline = Pipeline([
        transport.input(),
        stt,
        aggregators.user,
        llm,
        aggregators.assistant,
        tts,
        transport.output()
    ])
    
    # Run
    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            allow_interruptions=True,
            enable_metrics=True,
        )
    )
    
    runner = PipelineRunner(
        name="weather-assistant",
        handle_sigint=True,
        handle_sigterm=True,
    )
    
    try:
        await runner.run(task)
    finally:
        await transport.stop()

if __name__ == "__main__":
    asyncio.run(main())

Multimodal Agent

Agent with vision capabilities:

{ .api }
from pipecat.services.openai import OpenAILLMService
from pipecat.services.moondream import MoondreamService
from pipecat.frames.frames import ImageRawFrame
from PIL import Image
import io

async def multimodal_agent():
    """Multimodal agent with vision."""
    
    # Vision service
    vision = MoondreamService()
    
    # Multimodal LLM
    llm = OpenAILLMService(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-4-vision-preview",
    )
    
    # Process image
    image = Image.open("photo.jpg")
    image_bytes = io.BytesIO()
    image.save(image_bytes, format="JPEG")
    
    # Create image frame
    frame = ImageRawFrame(
        image=image_bytes.getvalue(),
        size=(image.width, image.height),
        format="JPEG"
    )
    
    # Pipeline with vision
    pipeline = Pipeline([
        transport.input(),
        vision,              # Process images
        stt,                 # Process audio
        aggregators.user,
        llm,                 # Multimodal LLM
        aggregators.assistant,
        tts,
        transport.output()
    ])
    
    await task.queue_frame(frame)

Telephony Bot

Bot for phone calls with IVR:

{ .api }
from fastapi import FastAPI, WebSocket
from pipecat.runner.types import WebSocketRunnerArguments
from pipecat.runner.utils import create_transport, parse_telephony_websocket
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.extensions.ivr.ivr_navigator import IVRNavigator, IVRMenu, IVROption
from pipecat.audio.dtmf.types import KeypadEntry

app = FastAPI()

# Define transport params
transport_params = {
    "twilio": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    )
}

async def telephony_bot(runner_args: WebSocketRunnerArguments):
    """Handle telephony calls."""
    
    # Parse provider
    transport_type, call_data = await parse_telephony_websocket(
        runner_args.websocket
    )
    print(f"Call from {transport_type}")
    
    # Create transport
    transport = await create_transport(runner_args, transport_params)
    
    # Configure IVR
    menu = IVRMenu(
        id="main",
        prompt_keywords=["main menu"],
        options=[
            IVROption(digit=KeypadEntry.ONE, description="Sales"),
            IVROption(digit=KeypadEntry.TWO, description="Support"),
        ]
    )
    
    navigator = IVRNavigator(menu=menu)
    
    # Build pipeline
    pipeline = Pipeline([
        transport.input(),
        stt,
        navigator,           # IVR navigation
        aggregators.user,
        llm,
        aggregators.assistant,
        tts,
        transport.output()
    ])
    
    task = PipelineTask(pipeline)
    await task.run()

@app.websocket("/twilio")
async def twilio_endpoint(websocket: WebSocket):
    await websocket.accept()
    await telephony_bot(WebSocketRunnerArguments(websocket=websocket))

Multi-Provider LLM Switching

Switch between LLM providers dynamically:

{ .api }
from pipecat.pipeline.llm_switcher import LLMSwitcher
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual
from pipecat.services.openai import OpenAILLMService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.google import GoogleLLMService
from pipecat.frames.frames import ManuallySwitchServiceFrame

async def multi_llm_bot():
    """Bot with multiple LLM options."""
    
    # Create LLM switcher
    llm_switcher = LLMSwitcher(
        llms=[
            OpenAILLMService(api_key="...", model="gpt-4"),
            AnthropicLLMService(api_key="...", model="claude-3-5-sonnet-20241022"),
            GoogleLLMService(api_key="...", model="gemini-pro"),
        ],
        strategy=ServiceSwitcherStrategyManual(default_index=0)
    )
    
    # Register function on all LLMs
    llm_switcher.register_function(
        function_name="get_weather",
        handler=get_weather,
        description="Get weather",
        properties={"location": {"type": "string"}}
    )
    
    # Use in pipeline
    pipeline = Pipeline([
        transport.input(),
        stt,
        aggregators.user,
        llm_switcher,  # Can switch between LLMs
        aggregators.assistant,
        tts,
        transport.output()
    ])
    
    # Switch LLM at runtime
    await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))

Production Deployment

Docker Deployment

Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY bot.py .

EXPOSE 7860

CMD ["python", "bot.py"]

docker-compose.yml:

version: '3.8'
services:
  bot:
    build: .
    environment:
      - DAILY_API_KEY=${DAILY_API_KEY}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY}
    ports:
      - "7860:7860"
    restart: unless-stopped
    stop_signal: SIGTERM
    stop_grace_period: 30s

Production Bot Pattern

{ .api }
import asyncio
import os
from loguru import logger

async def production_bot():
    """Production-ready bot with error handling."""
    
    try:
        # Configure services with environment variables
        transport = DailyTransport(
            room_url=os.getenv("DAILY_ROOM_URL"),
            token=os.getenv("DAILY_TOKEN"),
            bot_name=os.getenv("BOT_NAME", "Assistant"),
            params=DailyParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
            )
        )
        
        # Build pipeline with error handlers
        pipeline = Pipeline([
            transport.input(),
            stt,
            aggregators.user,
            llm,
            aggregators.assistant,
            tts,
            transport.output()
        ])
        
        # Run with signal handling
        task = PipelineTask(pipeline)
        runner = PipelineRunner(
            name="production-bot",
            handle_sigint=True,
            handle_sigterm=True,
            force_gc=True,
        )
        
        logger.info("Bot starting")
        await runner.run(task)
        logger.info("Bot completed")
        
    except asyncio.CancelledError:
        logger.info("Bot cancelled")
        raise
    except Exception as e:
        logger.error(f"Bot error: {e}")
        raise
    finally:
        logger.info("Cleanup complete")

if __name__ == "__main__":
    asyncio.run(production_bot())

See Also

  • Common Patterns - Reusable code patterns
  • Edge Cases - Advanced scenarios
  • Configuration Reference - All configuration options
  • Error Handling - Error patterns and recovery