docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Complete voice assistant with interruptions and function calling:
{ .api }
import asyncio
import os
from typing import Dict, Any
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextAggregatorPair,
)
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.daily import DailyTransport, DailyParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
async def get_weather(location: str, units: str = "celsius") -> Dict[str, Any]:
"""Get weather for location."""
# In production, call actual weather API
return {
"location": location,
"temperature": 72,
"units": units,
"condition": "sunny"
}
async def main():
# Configure VAD
vad = SileroVADAnalyzer(
params=VADParams(
threshold=0.5,
min_speech_duration_ms=250,
)
)
# Set up transport
transport = DailyTransport(
room_url=os.getenv("DAILY_ROOM_URL"),
token=os.getenv("DAILY_TOKEN"),
bot_name="Weather Assistant",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=vad,
)
)
# Configure services
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
model="nova-2",
interim_results=True,
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4",
params={"temperature": 0.7}
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy",
)
# Set up context with function
context = LLMContext(
messages=[{
"role": "system",
"content": "You are a helpful weather assistant."
}]
)
aggregators = LLMContextAggregatorPair(context=context)
llm.set_context(context)
# Register function
llm.register_function(
name="get_weather",
handler=get_weather,
description="Get current weather",
properties={
"location": {"type": "string", "description": "City name"},
"units": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
required=["location"]
)
# Build pipeline
pipeline = Pipeline([
transport.input(),
stt,
aggregators.user,
llm,
aggregators.assistant,
tts,
transport.output()
])
# Run
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
)
)
runner = PipelineRunner(
name="weather-assistant",
handle_sigint=True,
handle_sigterm=True,
)
try:
await runner.run(task)
finally:
await transport.stop()
if __name__ == "__main__":
asyncio.run(main())Agent with vision capabilities:
{ .api }
from pipecat.services.openai import OpenAILLMService
from pipecat.services.moondream import MoondreamService
from pipecat.frames.frames import ImageRawFrame
from PIL import Image
import io
async def multimodal_agent():
"""Multimodal agent with vision."""
# Vision service
vision = MoondreamService()
# Multimodal LLM
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4-vision-preview",
)
# Process image
image = Image.open("photo.jpg")
image_bytes = io.BytesIO()
image.save(image_bytes, format="JPEG")
# Create image frame
frame = ImageRawFrame(
image=image_bytes.getvalue(),
size=(image.width, image.height),
format="JPEG"
)
# Pipeline with vision
pipeline = Pipeline([
transport.input(),
vision, # Process images
stt, # Process audio
aggregators.user,
llm, # Multimodal LLM
aggregators.assistant,
tts,
transport.output()
])
await task.queue_frame(frame)Bot for phone calls with IVR:
{ .api }
from fastapi import FastAPI, WebSocket
from pipecat.runner.types import WebSocketRunnerArguments
from pipecat.runner.utils import create_transport, parse_telephony_websocket
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.extensions.ivr.ivr_navigator import IVRNavigator, IVRMenu, IVROption
from pipecat.audio.dtmf.types import KeypadEntry
app = FastAPI()
# Define transport params
transport_params = {
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
)
}
async def telephony_bot(runner_args: WebSocketRunnerArguments):
"""Handle telephony calls."""
# Parse provider
transport_type, call_data = await parse_telephony_websocket(
runner_args.websocket
)
print(f"Call from {transport_type}")
# Create transport
transport = await create_transport(runner_args, transport_params)
# Configure IVR
menu = IVRMenu(
id="main",
prompt_keywords=["main menu"],
options=[
IVROption(digit=KeypadEntry.ONE, description="Sales"),
IVROption(digit=KeypadEntry.TWO, description="Support"),
]
)
navigator = IVRNavigator(menu=menu)
# Build pipeline
pipeline = Pipeline([
transport.input(),
stt,
navigator, # IVR navigation
aggregators.user,
llm,
aggregators.assistant,
tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
@app.websocket("/twilio")
async def twilio_endpoint(websocket: WebSocket):
await websocket.accept()
await telephony_bot(WebSocketRunnerArguments(websocket=websocket))Switch between LLM providers dynamically:
{ .api }
from pipecat.pipeline.llm_switcher import LLMSwitcher
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyManual
from pipecat.services.openai import OpenAILLMService
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.google import GoogleLLMService
from pipecat.frames.frames import ManuallySwitchServiceFrame
async def multi_llm_bot():
"""Bot with multiple LLM options."""
# Create LLM switcher
llm_switcher = LLMSwitcher(
llms=[
OpenAILLMService(api_key="...", model="gpt-4"),
AnthropicLLMService(api_key="...", model="claude-3-5-sonnet-20241022"),
GoogleLLMService(api_key="...", model="gemini-pro"),
],
strategy=ServiceSwitcherStrategyManual(default_index=0)
)
# Register function on all LLMs
llm_switcher.register_function(
function_name="get_weather",
handler=get_weather,
description="Get weather",
properties={"location": {"type": "string"}}
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
aggregators.user,
llm_switcher, # Can switch between LLMs
aggregators.assistant,
tts,
transport.output()
])
# Switch LLM at runtime
await task.queue_frame(ManuallySwitchServiceFrame(service_index=1))Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY bot.py .
EXPOSE 7860
CMD ["python", "bot.py"]docker-compose.yml:
version: '3.8'
services:
bot:
build: .
environment:
- DAILY_API_KEY=${DAILY_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DEEPGRAM_API_KEY=${DEEPGRAM_API_KEY}
ports:
- "7860:7860"
restart: unless-stopped
stop_signal: SIGTERM
stop_grace_period: 30s{ .api }
import asyncio
import os
from loguru import logger
async def production_bot():
"""Production-ready bot with error handling."""
try:
# Configure services with environment variables
transport = DailyTransport(
room_url=os.getenv("DAILY_ROOM_URL"),
token=os.getenv("DAILY_TOKEN"),
bot_name=os.getenv("BOT_NAME", "Assistant"),
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
)
)
# Build pipeline with error handlers
pipeline = Pipeline([
transport.input(),
stt,
aggregators.user,
llm,
aggregators.assistant,
tts,
transport.output()
])
# Run with signal handling
task = PipelineTask(pipeline)
runner = PipelineRunner(
name="production-bot",
handle_sigint=True,
handle_sigterm=True,
force_gc=True,
)
logger.info("Bot starting")
await runner.run(task)
logger.info("Bot completed")
except asyncio.CancelledError:
logger.info("Bot cancelled")
raise
except Exception as e:
logger.error(f"Bot error: {e}")
raise
finally:
logger.info("Cleanup complete")
if __name__ == "__main__":
asyncio.run(production_bot())