or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

edge-cases.mddocs/examples/

Edge Cases and Advanced Scenarios

Handling Partial Transcriptions

Deal with interim transcription results:

{ .api }
from pipecat.frames.frames import (
    TranscriptionFrame,
    InterimTranscriptionFrame
)

class TranscriptionHandler(FrameProcessor):
    """Handle both interim and final transcriptions."""
    
    def __init__(self):
        super().__init__()
        self._interim_text = ""
    
    async def process_frame(self, frame, direction):
        if isinstance(frame, InterimTranscriptionFrame):
            self._interim_text = frame.text
            print(f"Interim: {frame.text}", end='\r')
        
        elif isinstance(frame, TranscriptionFrame):
            print(f"\nFinal: {frame.text}")
            self._interim_text = ""
        
        await self.push_frame(frame, direction)

Multiple Concurrent Streams

Handle multiple audio sources:

{ .api }
from pipecat.frames.frames import AudioRawFrame

class MultiStreamMixer(FrameProcessor):
    """Mix multiple audio streams."""
    
    def __init__(self):
        super().__init__()
        self._streams = {}
    
    async def process_frame(self, frame, direction):
        if isinstance(frame, AudioRawFrame):
            stream_id = frame.transport_source or "default"
            
            if stream_id not in self._streams:
                self._streams[stream_id] = bytearray()
            
            self._streams[stream_id].extend(frame.audio)
            
            # Mix streams when all available
            if len(self._streams) > 1:
                mixed = self._mix_audio(list(self._streams.values()))
                mixed_frame = AudioRawFrame(
                    audio=mixed,
                    sample_rate=frame.sample_rate,
                    num_channels=frame.num_channels
                )
                await self.push_frame(mixed_frame, direction)
        else:
            await self.push_frame(frame, direction)

Timeout Handling

Handle LLM completion timeouts:

{ .api }
import asyncio

class TimeoutHandler(FrameProcessor):
    """Handle LLM timeouts gracefully."""
    
    def __init__(self, timeout: float = 30.0):
        super().__init__()
        self._timeout = timeout
    
    @llm.event_handler("on_completion_timeout")
    async def handle_timeout(self):
        """Handle LLM timeout."""
        print("LLM timed out, using fallback response")
        await self.push_frame(
            TextFrame("I apologize, I'm taking too long to respond.")
        )

Retry with Exponential Backoff

Retry failed operations:

{ .api }
async def retry_with_backoff(
    func,
    max_retries: int = 3,
    initial_delay: float = 1.0
):
    """Retry with exponential backoff."""
    delay = initial_delay
    
    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(delay)
            delay *= 2

Voicemail Detection Edge Cases

Handle ambiguous voicemail scenarios:

{ .api }
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector

detector = VoicemailDetector(
    beep_threshold=0.75,
    min_beep_duration_ms=200,
    max_beep_duration_ms=1000,
    on_voicemail_detected=handle_voicemail
)

async def handle_voicemail():
    """Leave voicemail message."""
    await task.queue_frames([
        TTSSpeakFrame("This is an automated message..."),
        EndFrame()
    ])

IVR Navigation with Ambiguous Input

Handle unclear user responses in IVR:

{ .api }
from pipecat.extensions.ivr.ivr_navigator import IVRNavigator, IVRMenu

menu = IVRMenu(
    id="main",
    prompt_keywords=["main menu", "options", "help"],
    options=[...],
    max_retries=3,
    retry_prompt="I didn't catch that. Please say..."
)

navigator = IVRNavigator(
    menu=menu,
    timeout=10.0,  # Timeout for user response
)

WebRTC SDP Compatibility

Handle ESP32 and embedded device compatibility:

{ .api }
from pipecat.runner.utils import smallwebrtc_sdp_munging

async def create_esp32_offer():
    """Create ESP32-compatible WebRTC offer."""
    offer = await pc.createOffer()
    
    # Apply compatibility fixes
    modified_sdp = smallwebrtc_sdp_munging(
        offer.sdp,
        host="192.168.1.100"
    )
    
    await pc.setLocalDescription(RTCSessionDescription(
        type=offer.type,
        sdp=modified_sdp
    ))

UninterruptibleFrame Usage

Protect critical operations from interruption:

{ .api }
from pipecat.frames.frames import UninterruptibleFrame, DataFrame

class CriticalOperationFrame(DataFrame, UninterruptibleFrame):
    """Frame that must complete even during interruption."""
    
    def __init__(self, operation_data: Any):
        super().__init__()
        self.operation_data = operation_data

# FunctionCallResultFrame is UninterruptibleFrame by default
# Ensures function results reach LLM even if user interrupts

Dynamic Strategy Updates

Change turn strategies at runtime:

{ .api }
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import MinWordsUserTurnStartStrategy

async def update_conversation_mode(mode: str, turn_processor):
    """Update turn strategies based on conversation mode."""
    if mode == "formal":
        strategies = UserTurnStrategies(
            start=[MinWordsUserTurnStartStrategy(min_words=3)],
            stop=[TranscriptionUserTurnStopStrategy(timeout=1.0)]
        )
    else:  # casual
        strategies = UserTurnStrategies(
            start=[VADUserTurnStartStrategy()],
            stop=[TranscriptionUserTurnStopStrategy(timeout=0.3)]
        )
    
    await turn_processor._turn_controller.update_strategies(strategies)

Gated Context Flow

Control when LLM processes context:

{ .api }
from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
from pipecat.utils.sync.base_notifier import BaseNotifier

# Create notifier
notifier = BaseNotifier()

# Create gated aggregator
gated_context = GatedLLMContextAggregator(
    notifier=notifier,
    start_open=False  # Wait for approval
)

# Use in pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    user_aggregator,
    gated_context,  # Holds context until notified
    llm,
    tts,
    transport.output()
])

# Release when user approves
async def on_user_approval():
    await notifier.notify()  # Context flows to LLM

Custom Mute Strategies

Custom logic for when to mute user input:

{ .api }
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.frames.frames import Frame

class ConditionalMuteStrategy(BaseUserMuteStrategy):
    """Mute based on custom conditions."""
    
    def __init__(self):
        super().__init__()
        self._important_task_active = False
    
    async def process_frame(self, frame: Frame) -> bool:
        """Return True if user should be muted."""
        # Mute during important tasks
        if self._important_task_active:
            return True
        
        # Normal bot speaking mute logic
        if isinstance(frame, BotStartedSpeakingFrame):
            return True
        if isinstance(frame, BotStoppedSpeakingFrame):
            return False
        
        return self._is_muted

Audio Buffer Management

Handle variable-length audio buffers:

{ .api }
class AdaptiveAudioBuffer(FrameProcessor):
    """Buffer audio with adaptive sizing."""
    
    def __init__(self, target_duration_ms: int = 100):
        super().__init__()
        self._target_ms = target_duration_ms
        self._buffer = bytearray()
        self._sample_rate = 16000
    
    async def process_frame(self, frame, direction):
        if isinstance(frame, AudioRawFrame):
            self._buffer.extend(frame.audio)
            self._sample_rate = frame.sample_rate
            
            # Calculate target buffer size
            bytes_per_sample = 2  # 16-bit audio
            samples_per_ms = self._sample_rate / 1000
            target_size = int(
                self._target_ms * samples_per_ms * bytes_per_sample * frame.num_channels
            )
            
            # Emit when buffer reaches target
            if len(self._buffer) >= target_size:
                buffered_frame = AudioRawFrame(
                    audio=bytes(self._buffer[:target_size]),
                    sample_rate=frame.sample_rate,
                    num_channels=frame.num_channels
                )
                await self.push_frame(buffered_frame, direction)
                self._buffer = self._buffer[target_size:]
        else:
            await self.push_frame(frame, direction)

Multi-Language Support

Handle multiple languages dynamically:

{ .api }
from pipecat.frames.frames import STTUpdateSettingsFrame, TTSUpdateSettingsFrame

class LanguageSwitcher(FrameProcessor):
    """Switch STT/TTS language based on detection."""
    
    async def switch_language(self, language: str):
        """Switch to new language."""
        # Update STT
        await self.push_frame(STTUpdateSettingsFrame(
            settings={"language": language}
        ))
        
        # Update TTS voice for language
        voice_map = {
            "en": "alloy",
            "es": "nova",
            "fr": "shimmer"
        }
        
        await self.push_frame(TTSUpdateSettingsFrame(
            settings={"voice": voice_map.get(language, "alloy")}
        ))

Context Injection

Inject context updates dynamically:

{ .api }
from pipecat.frames.frames import LLMMessagesAppendFrame

async def inject_context_update(task, new_info: str):
    """Inject new context information."""
    await task.queue_frame(
        LLMMessagesAppendFrame(
            messages=[{
                "role": "system",
                "content": f"Additional context: {new_info}"
            }]
        )
    )

Custom Turn Analyzer

Implement custom turn detection logic:

{ .api }
from pipecat.audio.turn.base_turn_analyzer import (
    BaseTurnAnalyzer,
    EndOfTurnState
)
import numpy as np

class CustomTurnAnalyzer(BaseTurnAnalyzer):
    """Custom turn detection with energy-based analysis."""
    
    def __init__(self, sample_rate: int = 16000):
        super().__init__(sample_rate=sample_rate)
        self._audio_buffer = []
    
    def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
        """Accumulate audio for analysis."""
        self._audio_buffer.append(buffer)
        return EndOfTurnState.INCOMPLETE
    
    async def analyze_end_of_turn(self):
        """Analyze if turn is complete."""
        if not self._audio_buffer:
            return EndOfTurnState.INCOMPLETE, None
        
        # Combine audio
        audio = b''.join(self._audio_buffer)
        audio_array = np.frombuffer(audio, dtype=np.int16)
        
        # Calculate energy
        energy = np.sqrt(np.mean(audio_array.astype(float) ** 2))
        
        # Check if energy dropped (user stopped)
        if energy < 100:  # Threshold
            return EndOfTurnState.COMPLETE, None
        
        return EndOfTurnState.INCOMPLETE, None
    
    def clear(self):
        """Reset analyzer."""
        self._audio_buffer.clear()

See Also

  • Real-World Scenarios - Complete examples
  • Common Patterns - Standard patterns
  • Configuration Reference - All options
  • Error Handling - Error recovery