docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Deal with interim transcription results:
{ .api }
from pipecat.frames.frames import (
TranscriptionFrame,
InterimTranscriptionFrame
)
class TranscriptionHandler(FrameProcessor):
"""Handle both interim and final transcriptions."""
def __init__(self):
super().__init__()
self._interim_text = ""
async def process_frame(self, frame, direction):
if isinstance(frame, InterimTranscriptionFrame):
self._interim_text = frame.text
print(f"Interim: {frame.text}", end='\r')
elif isinstance(frame, TranscriptionFrame):
print(f"\nFinal: {frame.text}")
self._interim_text = ""
await self.push_frame(frame, direction)Handle multiple audio sources:
{ .api }
from pipecat.frames.frames import AudioRawFrame
class MultiStreamMixer(FrameProcessor):
"""Mix multiple audio streams."""
def __init__(self):
super().__init__()
self._streams = {}
async def process_frame(self, frame, direction):
if isinstance(frame, AudioRawFrame):
stream_id = frame.transport_source or "default"
if stream_id not in self._streams:
self._streams[stream_id] = bytearray()
self._streams[stream_id].extend(frame.audio)
# Mix streams when all available
if len(self._streams) > 1:
mixed = self._mix_audio(list(self._streams.values()))
mixed_frame = AudioRawFrame(
audio=mixed,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels
)
await self.push_frame(mixed_frame, direction)
else:
await self.push_frame(frame, direction)Handle LLM completion timeouts:
{ .api }
import asyncio
class TimeoutHandler(FrameProcessor):
"""Handle LLM timeouts gracefully."""
def __init__(self, timeout: float = 30.0):
super().__init__()
self._timeout = timeout
@llm.event_handler("on_completion_timeout")
async def handle_timeout(self):
"""Handle LLM timeout."""
print("LLM timed out, using fallback response")
await self.push_frame(
TextFrame("I apologize, I'm taking too long to respond.")
)Retry failed operations:
{ .api }
async def retry_with_backoff(
func,
max_retries: int = 3,
initial_delay: float = 1.0
):
"""Retry with exponential backoff."""
delay = initial_delay
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(delay)
delay *= 2Handle ambiguous voicemail scenarios:
{ .api }
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
detector = VoicemailDetector(
beep_threshold=0.75,
min_beep_duration_ms=200,
max_beep_duration_ms=1000,
on_voicemail_detected=handle_voicemail
)
async def handle_voicemail():
"""Leave voicemail message."""
await task.queue_frames([
TTSSpeakFrame("This is an automated message..."),
EndFrame()
])Handle unclear user responses in IVR:
{ .api }
from pipecat.extensions.ivr.ivr_navigator import IVRNavigator, IVRMenu
menu = IVRMenu(
id="main",
prompt_keywords=["main menu", "options", "help"],
options=[...],
max_retries=3,
retry_prompt="I didn't catch that. Please say..."
)
navigator = IVRNavigator(
menu=menu,
timeout=10.0, # Timeout for user response
)Handle ESP32 and embedded device compatibility:
{ .api }
from pipecat.runner.utils import smallwebrtc_sdp_munging
async def create_esp32_offer():
"""Create ESP32-compatible WebRTC offer."""
offer = await pc.createOffer()
# Apply compatibility fixes
modified_sdp = smallwebrtc_sdp_munging(
offer.sdp,
host="192.168.1.100"
)
await pc.setLocalDescription(RTCSessionDescription(
type=offer.type,
sdp=modified_sdp
))Protect critical operations from interruption:
{ .api }
from pipecat.frames.frames import UninterruptibleFrame, DataFrame
class CriticalOperationFrame(DataFrame, UninterruptibleFrame):
"""Frame that must complete even during interruption."""
def __init__(self, operation_data: Any):
super().__init__()
self.operation_data = operation_data
# FunctionCallResultFrame is UninterruptibleFrame by default
# Ensures function results reach LLM even if user interruptsChange turn strategies at runtime:
{ .api }
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import MinWordsUserTurnStartStrategy
async def update_conversation_mode(mode: str, turn_processor):
"""Update turn strategies based on conversation mode."""
if mode == "formal":
strategies = UserTurnStrategies(
start=[MinWordsUserTurnStartStrategy(min_words=3)],
stop=[TranscriptionUserTurnStopStrategy(timeout=1.0)]
)
else: # casual
strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.3)]
)
await turn_processor._turn_controller.update_strategies(strategies)Control when LLM processes context:
{ .api }
from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
from pipecat.utils.sync.base_notifier import BaseNotifier
# Create notifier
notifier = BaseNotifier()
# Create gated aggregator
gated_context = GatedLLMContextAggregator(
notifier=notifier,
start_open=False # Wait for approval
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
user_aggregator,
gated_context, # Holds context until notified
llm,
tts,
transport.output()
])
# Release when user approves
async def on_user_approval():
await notifier.notify() # Context flows to LLMCustom logic for when to mute user input:
{ .api }
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.frames.frames import Frame
class ConditionalMuteStrategy(BaseUserMuteStrategy):
"""Mute based on custom conditions."""
def __init__(self):
super().__init__()
self._important_task_active = False
async def process_frame(self, frame: Frame) -> bool:
"""Return True if user should be muted."""
# Mute during important tasks
if self._important_task_active:
return True
# Normal bot speaking mute logic
if isinstance(frame, BotStartedSpeakingFrame):
return True
if isinstance(frame, BotStoppedSpeakingFrame):
return False
return self._is_mutedHandle variable-length audio buffers:
{ .api }
class AdaptiveAudioBuffer(FrameProcessor):
"""Buffer audio with adaptive sizing."""
def __init__(self, target_duration_ms: int = 100):
super().__init__()
self._target_ms = target_duration_ms
self._buffer = bytearray()
self._sample_rate = 16000
async def process_frame(self, frame, direction):
if isinstance(frame, AudioRawFrame):
self._buffer.extend(frame.audio)
self._sample_rate = frame.sample_rate
# Calculate target buffer size
bytes_per_sample = 2 # 16-bit audio
samples_per_ms = self._sample_rate / 1000
target_size = int(
self._target_ms * samples_per_ms * bytes_per_sample * frame.num_channels
)
# Emit when buffer reaches target
if len(self._buffer) >= target_size:
buffered_frame = AudioRawFrame(
audio=bytes(self._buffer[:target_size]),
sample_rate=frame.sample_rate,
num_channels=frame.num_channels
)
await self.push_frame(buffered_frame, direction)
self._buffer = self._buffer[target_size:]
else:
await self.push_frame(frame, direction)Handle multiple languages dynamically:
{ .api }
from pipecat.frames.frames import STTUpdateSettingsFrame, TTSUpdateSettingsFrame
class LanguageSwitcher(FrameProcessor):
"""Switch STT/TTS language based on detection."""
async def switch_language(self, language: str):
"""Switch to new language."""
# Update STT
await self.push_frame(STTUpdateSettingsFrame(
settings={"language": language}
))
# Update TTS voice for language
voice_map = {
"en": "alloy",
"es": "nova",
"fr": "shimmer"
}
await self.push_frame(TTSUpdateSettingsFrame(
settings={"voice": voice_map.get(language, "alloy")}
))Inject context updates dynamically:
{ .api }
from pipecat.frames.frames import LLMMessagesAppendFrame
async def inject_context_update(task, new_info: str):
"""Inject new context information."""
await task.queue_frame(
LLMMessagesAppendFrame(
messages=[{
"role": "system",
"content": f"Additional context: {new_info}"
}]
)
)Implement custom turn detection logic:
{ .api }
from pipecat.audio.turn.base_turn_analyzer import (
BaseTurnAnalyzer,
EndOfTurnState
)
import numpy as np
class CustomTurnAnalyzer(BaseTurnAnalyzer):
"""Custom turn detection with energy-based analysis."""
def __init__(self, sample_rate: int = 16000):
super().__init__(sample_rate=sample_rate)
self._audio_buffer = []
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""Accumulate audio for analysis."""
self._audio_buffer.append(buffer)
return EndOfTurnState.INCOMPLETE
async def analyze_end_of_turn(self):
"""Analyze if turn is complete."""
if not self._audio_buffer:
return EndOfTurnState.INCOMPLETE, None
# Combine audio
audio = b''.join(self._audio_buffer)
audio_array = np.frombuffer(audio, dtype=np.int16)
# Calculate energy
energy = np.sqrt(np.mean(audio_array.astype(float) ** 2))
# Check if energy dropped (user stopped)
if energy < 100: # Threshold
return EndOfTurnState.COMPLETE, None
return EndOfTurnState.INCOMPLETE, None
def clear(self):
"""Reset analyzer."""
self._audio_buffer.clear()