or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

quick-start.mddocs/guides/

Quick Start Guide

Installation

Basic Installation

# Core framework only
pip install pipecat-ai

# With common AI services
pip install "pipecat-ai[openai,anthropic,deepgram,daily]"

# With all services
pip install "pipecat-ai[all]"

Requirements

  • Python: >= 3.10 (3.12 recommended for best async performance)
  • OS: Linux, macOS, Windows
  • License: BSD-2-Clause

Your First Pipeline

Step 1: Simple Text Pipeline

{ .api }
import asyncio
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import Frame, TextFrame, EndFrame

class PrintProcessor(FrameProcessor):
    """Prints text frames to console."""
    
    async def process_frame(self, frame: Frame, direction: FrameDirection):
        if isinstance(frame, TextFrame):
            print(f"Received: {frame.text}")
        await self.push_frame(frame, direction)

async def main():
    # Create pipeline
    pipeline = Pipeline([PrintProcessor()])
    
    # Create task
    task = PipelineTask(pipeline)
    
    # Queue frames
    await task.queue_frame(TextFrame("Hello, Pipecat!"))
    await task.queue_frame(TextFrame("This is my first pipeline."))
    await task.queue_frame(EndFrame())  # Signals completion
    
    # Run until EndFrame
    await task.run()
    print("Pipeline complete!")

if __name__ == "__main__":
    asyncio.run(main())

What's happening:

  1. Create a processor that handles frames
  2. Build a pipeline with the processor
  3. Create a task to execute the pipeline
  4. Queue frames for processing
  5. Run the pipeline until EndFrame

Step 2: Voice Agent with Daily

{ .api }
import asyncio
import os
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.aggregators.llm_context import (
    LLMContext,
    LLMContextAggregatorPair,
)
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.transports.daily import DailyTransport, DailyParams
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams

async def voice_agent():
    """Complete voice agent example."""
    
    # 1. Configure VAD for interruption detection
    vad = SileroVADAnalyzer(
        params=VADParams(
            threshold=0.5,
            min_speech_duration_ms=250,
        )
    )
    
    # 2. Set up transport
    transport = DailyTransport(
        room_url=os.getenv("DAILY_ROOM_URL"),
        token=os.getenv("DAILY_TOKEN"),
        bot_name="Assistant",
        params=DailyParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            audio_in_sample_rate=16000,   # For STT
            audio_out_sample_rate=24000,  # For TTS
            vad_enabled=True,
            vad_analyzer=vad,
        )
    )
    
    # 3. Configure AI services
    stt = DeepgramSTTService(
        api_key=os.getenv("DEEPGRAM_API_KEY"),
        model="nova-2",
        interim_results=True,
    )
    
    llm = OpenAILLMService(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-4",
        params={"temperature": 0.7}
    )
    
    tts = OpenAITTSService(
        api_key=os.getenv("OPENAI_API_KEY"),
        voice="alloy",
    )
    
    # 4. Set up LLM context
    context = LLMContext(
        messages=[{
            "role": "system",
            "content": "You are a helpful voice assistant. Keep responses concise."
        }]
    )
    
    aggregators = LLMContextAggregatorPair(context=context)
    llm.set_context(context)
    
    # 5. Build pipeline
    pipeline = Pipeline([
        transport.input(),
        stt,
        aggregators.user,
        llm,
        aggregators.assistant,
        tts,
        transport.output()
    ])
    
    # 6. Run with production settings
    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            allow_interruptions=True,
            enable_metrics=True,
        )
    )
    
    runner = PipelineRunner(
        name="voice-agent",
        handle_sigint=True,
        handle_sigterm=True,
    )
    
    await runner.run(task)

if __name__ == "__main__":
    asyncio.run(voice_agent())

What's happening:

  1. Configure VAD for detecting when user speaks
  2. Set up Daily transport for WebRTC audio
  3. Configure STT, LLM, and TTS services
  4. Create LLM context with system message
  5. Build pipeline: audio in → STT → LLM → TTS → audio out
  6. Run with production runner (handles signals)

Step 3: Local Testing

{ .api }
from pipecat.transports.local import LocalAudioTransport, LocalAudioTransportParams

# Replace Daily transport with local audio
transport = LocalAudioTransport(
    params=LocalAudioTransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
    )
)

# Use same pipeline structure
pipeline = Pipeline([
    transport.input(),
    stt, aggregators.user, llm, aggregators.assistant, tts,
    transport.output()
])

What's happening:

  • Uses your computer's microphone and speakers
  • No WebRTC or network required
  • Perfect for local development

Common Workflows

Add Function Calling

{ .api }
async def get_weather(location: str) -> dict:
    """Get weather for a location."""
    return {"temp": 72, "condition": "sunny"}

llm.register_function(
    name="get_weather",
    handler=get_weather,
    description="Get current weather",
    properties={"location": {"type": "string", "description": "City name"}},
    required=["location"]
)

Enable Turn Management

{ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy

strategies = UserTurnStrategies(
    start=[VADUserTurnStartStrategy()],
    stop=[TranscriptionUserTurnStopStrategy(timeout=0.7)]
)

turn_processor = UserTurnProcessor(
    user_turn_strategies=strategies,
    user_turn_stop_timeout=5.0
)

# Add to pipeline after STT
pipeline = Pipeline([
    transport.input(),
    stt,
    turn_processor,  # Manages turn-taking
    aggregators.user,
    llm,
    # ... rest
])

Add Metrics and Monitoring

{ .api }
from pipecat.pipeline.task_observer import TaskObserver
from pipecat.observers.loggers import (
    MetricsLogObserver,
    LLMLogObserver,
)

observer = TaskObserver()
observer.add_observer(MetricsLogObserver())
observer.add_observer(LLMLogObserver(log_tokens=True))

task = PipelineTask(pipeline, observer=observer)

Handle Errors

{ .api }
try:
    await task.run()
except Exception as e:
    print(f"Pipeline error: {e}")
    await task.cancel()
finally:
    await transport.stop()

Next Steps

  1. Learn Core Concepts: Core Concepts →
  2. Explore Examples: Real-World Scenarios →
  3. Browse Services: Services →
  4. Deploy: Runner & Deployment →
  5. Advanced Features: Turn Management →

Common Issues

Import Errors

# Missing dependencies
pip install "pipecat-ai[openai]"  # Add provider extras

Audio Sample Rate Mismatch

# Match transport and service rates
params = DailyParams(
    audio_in_sample_rate=16000,   # For STT
    audio_out_sample_rate=24000,  # For TTS
)

VAD Not Working

# Adjust sensitivity
vad = SileroVADAnalyzer(
    params=VADParams(threshold=0.3)  # Lower = more sensitive
)

See: Error Handling Reference →