docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
# Core framework only
pip install pipecat-ai
# With common AI services
pip install "pipecat-ai[openai,anthropic,deepgram,daily]"
# With all services
pip install "pipecat-ai[all]"{ .api }
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import Frame, TextFrame, EndFrame
class PrintProcessor(FrameProcessor):
"""Prints text frames to console."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, TextFrame):
print(f"Received: {frame.text}")
await self.push_frame(frame, direction)
async def main():
# Create pipeline
pipeline = Pipeline([PrintProcessor()])
# Create task
task = PipelineTask(pipeline)
# Queue frames
await task.queue_frame(TextFrame("Hello, Pipecat!"))
await task.queue_frame(TextFrame("This is my first pipeline."))
await task.queue_frame(EndFrame()) # Signals completion
# Run until EndFrame
await task.run()
print("Pipeline complete!")
if __name__ == "__main__":
asyncio.run(main())What's happening:
{ .api }
import asyncio
import os
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextAggregatorPair,
)
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.daily import DailyTransport, DailyParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
async def voice_agent():
"""Complete voice agent example."""
# 1. Configure VAD for interruption detection
vad = SileroVADAnalyzer(
params=VADParams(
threshold=0.5,
min_speech_duration_ms=250,
)
)
# 2. Set up transport
transport = DailyTransport(
room_url=os.getenv("DAILY_ROOM_URL"),
token=os.getenv("DAILY_TOKEN"),
bot_name="Assistant",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000, # For STT
audio_out_sample_rate=24000, # For TTS
vad_enabled=True,
vad_analyzer=vad,
)
)
# 3. Configure AI services
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
model="nova-2",
interim_results=True,
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4",
params={"temperature": 0.7}
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy",
)
# 4. Set up LLM context
context = LLMContext(
messages=[{
"role": "system",
"content": "You are a helpful voice assistant. Keep responses concise."
}]
)
aggregators = LLMContextAggregatorPair(context=context)
llm.set_context(context)
# 5. Build pipeline
pipeline = Pipeline([
transport.input(),
stt,
aggregators.user,
llm,
aggregators.assistant,
tts,
transport.output()
])
# 6. Run with production settings
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
)
)
runner = PipelineRunner(
name="voice-agent",
handle_sigint=True,
handle_sigterm=True,
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(voice_agent())What's happening:
{ .api }
from pipecat.transports.local import LocalAudioTransport, LocalAudioTransportParams
# Replace Daily transport with local audio
transport = LocalAudioTransport(
params=LocalAudioTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
)
)
# Use same pipeline structure
pipeline = Pipeline([
transport.input(),
stt, aggregators.user, llm, aggregators.assistant, tts,
transport.output()
])What's happening:
{ .api }
async def get_weather(location: str) -> dict:
"""Get weather for a location."""
return {"temp": 72, "condition": "sunny"}
llm.register_function(
name="get_weather",
handler=get_weather,
description="Get current weather",
properties={"location": {"type": "string", "description": "City name"}},
required=["location"]
){ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.7)]
)
turn_processor = UserTurnProcessor(
user_turn_strategies=strategies,
user_turn_stop_timeout=5.0
)
# Add to pipeline after STT
pipeline = Pipeline([
transport.input(),
stt,
turn_processor, # Manages turn-taking
aggregators.user,
llm,
# ... rest
]){ .api }
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.observers.loggers import (
MetricsLogObserver,
LLMLogObserver,
)
observer = TaskObserver()
observer.add_observer(MetricsLogObserver())
observer.add_observer(LLMLogObserver(log_tokens=True))
task = PipelineTask(pipeline, observer=observer){ .api }
try:
await task.run()
except Exception as e:
print(f"Pipeline error: {e}")
await task.cancel()
finally:
await transport.stop()# Missing dependencies
pip install "pipecat-ai[openai]" # Add provider extras# Match transport and service rates
params = DailyParams(
audio_in_sample_rate=16000, # For STT
audio_out_sample_rate=24000, # For TTS
)# Adjust sensitivity
vad = SileroVADAnalyzer(
params=VADParams(threshold=0.3) # Lower = more sensitive
)