or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

audio

dtmf.mdfilters-mixers.mdturn-detection.mdvad.md
core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

filters-mixers.mddocs/audio/

Audio Filters and Mixers

Audio filters process audio to improve quality, while mixers combine multiple audio sources. These components enhance audio quality and enable advanced audio processing in pipelines.

Audio Filters

BaseAudioFilter

{ .api }
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter

class BaseAudioFilter:
    """Base class for audio filters.

    Processes audio to improve quality or modify characteristics.

    Methods:
        filter(audio): Filter audio
        reset(): Reset filter state

    Example:
        filter = SomeAudioFilter()
        filtered_audio = await filter.filter(audio_bytes)
    """

    async def filter(self, audio: bytes) -> bytes:
        """Filter audio.

        Args:
            audio: Input audio bytes

        Returns:
            Filtered audio bytes
        """
        pass

    def reset(self):
        """Reset filter state."""
        pass

KrispFilter

{ .api }
from pipecat.audio.filters.krisp_filter import KrispFilter

class KrispFilter(BaseAudioFilter):
    """Krisp noise suppression filter.

    Professional noise cancellation using Krisp AI.

    Args:
        api_key: Krisp API key

    Example:
        filter = KrispFilter(api_key="your-key")

        # Use with transport
        transport = DailyTransport(
            params=DailyParams(
                audio_in_filter=filter
            )
        )
    """

    def __init__(self, api_key: str):
        pass

RNNoiseFilter

{ .api }
from pipecat.audio.filters.rnnoise_filter import RNNoiseFilter

class RNNoiseFilter(BaseAudioFilter):
    """RNNoise noise suppression filter.

    Open-source noise reduction using RNN.

    Example:
        filter = RNNoiseFilter()
        filtered = await filter.filter(audio)
    """

    def __init__(self):
        pass

NoiseReduceFilter

{ .api }
from pipecat.audio.filters.noisereduce_filter import NoiseReduceFilter

class NoiseReduceFilter(BaseAudioFilter):
    """Noise reduction using noisereduce library.

    Args:
        stationary: Use stationary noise reduction

    Example:
        filter = NoiseReduceFilter(stationary=True)
    """

    def __init__(self, stationary: bool = True):
        pass

KoalaFilter

{ .api }
from pipecat.audio.filters.koala_filter import KoalaFilter

class KoalaFilter(BaseAudioFilter):
    """Koala noise suppression from PicoVoice.

    Real-time noise suppression using PicoVoice's Koala engine.
    Provides high-quality noise reduction with automatic buffering
    for optimal frame processing.

    Args:
        access_key: PicoVoice access key for authentication

    Methods:
        start(sample_rate): Initialize with transport sample rate
        stop(): Clean up resources
        filter(audio): Apply noise suppression
        reset(): Reset filter state

    Example:
        filter = KoalaFilter(access_key="your-picovoice-key")

        # Use with transport
        transport = DailyTransport(
            params=DailyParams(
                audio_in_filter=filter
            )
        )

    Note:
        Requires `pip install pipecat-ai[koala]`
        Sample rate must match Koala's expected rate (typically 16kHz)
    """

    def __init__(self, *, access_key: str):
        pass

AICFilter

{ .api }
from pipecat.audio.filters.aic_filter import AICFilter
from aic import AICModelType

class AICFilter(BaseAudioFilter):
    """AI-Coustics audio enhancement filter.

    Real-time audio enhancement using ai-coustics' AIC SDK.
    Provides advanced speech enhancement, noise suppression,
    and voice gain control.

    Args:
        license_key: ai-coustics license key for authentication
        model_type: Model variant (default: AICModelType.QUAIL_STT)
        enhancement_level: Overall enhancement strength (0.0-1.0, default: 1.0)
        voice_gain: Linear gain for detected speech (0.0-4.0, default: 1.0)
        noise_gate_enable: Deprecated, use create_vad_analyzer() instead

    Methods:
        start(sample_rate): Initialize with transport sample rate
        stop(): Clean up resources
        filter(audio): Apply audio enhancement
        create_vad_analyzer(): Create AIC VAD analyzer
        get_vad_factory(): Get VAD factory function

    Example:
        # Basic usage
        filter = AICFilter(
            license_key="your-aic-key",
            enhancement_level=1.0,
            voice_gain=1.0
        )

        # With VAD
        filter = AICFilter(license_key="your-aic-key")
        vad = filter.create_vad_analyzer(
            lookback_buffer_size=6.0,
            sensitivity=6.0
        )

        transport = DailyTransport(
            params=DailyParams(
                audio_in_filter=filter,
                vad_analyzer=vad
            )
        )

    Note:
        Requires `pip install pipecat-ai[aic]`
        Processes audio in optimal block sizes for efficiency
    """

    def __init__(
        self,
        *,
        license_key: str = "",
        model_type: AICModelType = AICModelType.QUAIL_STT,
        enhancement_level: float = 1.0,
        voice_gain: float = 1.0,
        noise_gate_enable: bool = True
    ):
        pass

KrispVivaFilter

{ .api }
from pipecat.audio.filters.krisp_viva_filter import KrispVivaFilter

class KrispVivaFilter(BaseAudioFilter):
    """Krisp VIVA SDK noise reduction filter.

    Advanced noise reduction using Krisp's VIVA SDK with proprietary
    noise suppression algorithms. Requires a valid Krisp model file.

    Args:
        model_path: Path to Krisp model file (.kef extension)
            If None, uses KRISP_VIVA_FILTER_MODEL_PATH environment variable
        frame_duration: Frame duration in milliseconds (default: 10)
        noise_suppression_level: Suppression level 0-100 (default: 100)

    Methods:
        start(sample_rate): Initialize with transport sample rate
        stop(): Clean up resources
        filter(audio): Apply noise reduction

    Example:
        # Using model path
        filter = KrispVivaFilter(
            model_path="/path/to/model.kef",
            frame_duration=10,
            noise_suppression_level=100
        )

        # Using environment variable
        import os
        os.environ["KRISP_VIVA_FILTER_MODEL_PATH"] = "/path/to/model.kef"
        filter = KrispVivaFilter()

        transport = DailyTransport(
            params=DailyParams(
                audio_in_filter=filter
            )
        )

    Note:
        Requires krisp_audio package
        Model file must have .kef extension
        Buffers audio for optimal frame processing
    """

    def __init__(
        self,
        model_path: str = None,
        frame_duration: int = 10,
        noise_suppression_level: int = 100
    ):
        pass

Audio Resamplers

BaseAudioResampler

{ .api }
from pipecat.audio.resamplers.base_resampler import BaseAudioResampler

class BaseAudioResampler:
    """Base class for audio resampling.

    Converts audio between different sample rates.

    Methods:
        resample(audio): Resample audio

    Example:
        resampler = SomeResampler(
            input_sample_rate=16000,
            output_sample_rate=24000
        )
        resampled = resampler.resample(audio)
    """

    def __init__(self, input_sample_rate: int, output_sample_rate: int):
        pass

    def resample(self, audio: bytes) -> bytes:
        """Resample audio.

        Args:
            audio: Input audio

        Returns:
            Resampled audio
        """
        pass

SoxrResampler

{ .api }
from pipecat.audio.resamplers.soxr_resampler import SoxrResampler

class SoxrResampler(BaseAudioResampler):
    """High-quality SOXR resampler.

    Args:
        input_sample_rate: Input rate (Hz)
        output_sample_rate: Output rate (Hz)
        num_channels: Number of channels

    Example:
        resampler = SoxrResampler(
            input_sample_rate=16000,
            output_sample_rate=24000,
            num_channels=1
        )

        resampled = resampler.resample(audio)
    """

    def __init__(
        self,
        input_sample_rate: int,
        output_sample_rate: int,
        num_channels: int = 1
    ):
        pass

SOXRStreamAudioResampler

{ .api }
from pipecat.audio.resamplers.soxr_stream_resampler import SOXRStreamAudioResampler

class SOXRStreamAudioResampler(BaseAudioResampler):
    """Streaming SOXR resampler with internal history.

    High-quality streaming resampler using SoX ResampleStream library.
    Maintains internal state to avoid clicks at chunk boundaries.
    Ideal for real-time processing and long audio signals.

    Methods:
        resample(audio, in_rate, out_rate): Resample audio stream

    When to use:
        - Real-time processing scenarios
        - Very long audio signals
        - Processing audio in chunks/streams
        - Reusing same resampler configuration (saves initialization overhead)

    Example:
        resampler = SOXRStreamAudioResampler()

        # Process audio stream
        resampled1 = await resampler.resample(
            audio_chunk1,
            in_rate=16000,
            out_rate=24000
        )

        # Internal state prevents clicks between chunks
        resampled2 = await resampler.resample(
            audio_chunk2,
            in_rate=16000,
            out_rate=24000
        )

    Note:
        - Uses VHQ (Very High Quality) resampling
        - Only supports mono audio (1 channel)
        - Input must be 16-bit signed PCM
        - Automatically clears state after 0.2 seconds of inactivity
        - Cannot be reused with different sample rates
    """

    def __init__(self, **kwargs):
        pass

    async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
        """Resample audio with internal state management.

        Args:
            audio: Input audio bytes (16-bit PCM)
            in_rate: Input sample rate (Hz)
            out_rate: Output sample rate (Hz)

        Returns:
            Resampled audio bytes
        """
        pass

ResampyResampler

{ .api }
from pipecat.audio.resamplers.resampy_resampler import ResampyResampler

class ResampyResampler(BaseAudioResampler):
    """Resampy-based audio resampler.

    High-quality resampler using resampy library's Kaiser windowing
    filter. Good balance of quality and performance.

    Methods:
        resample(audio, in_rate, out_rate): Resample audio

    Example:
        resampler = ResampyResampler()

        # Resample audio
        resampled = await resampler.resample(
            audio,
            in_rate=16000,
            out_rate=24000
        )

    Note:
        - Uses Kaiser windowing filter
        - Good performance characteristics
        - Input must be 16-bit signed PCM
        - No internal state between calls
    """

    def __init__(self, **kwargs):
        pass

    async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
        """Resample audio using resampy library.

        Args:
            audio: Input audio bytes (16-bit PCM)
            in_rate: Input sample rate (Hz)
            out_rate: Output sample rate (Hz)

        Returns:
            Resampled audio bytes
        """
        pass

Audio Mixers

BaseAudioMixer

{ .api }
from pipecat.audio.mixers.base_mixer import BaseAudioMixer

class BaseAudioMixer:
    """Base class for audio mixing.

    Combines multiple audio sources.

    Methods:
        mix(audio1, audio2): Mix two audio streams

    Example:
        mixer = SomeMixer(sample_rate=24000)
        mixed = mixer.mix(audio1, audio2)
    """

    def __init__(self, sample_rate: int, num_channels: int = 1):
        pass

    def mix(self, audio1: bytes, audio2: bytes, level1: float = 1.0, level2: float = 1.0) -> bytes:
        """Mix two audio streams.

        Args:
            audio1: First audio stream
            audio2: Second audio stream
            level1: Volume level for audio1 (0.0-1.0)
            level2: Volume level for audio2 (0.0-1.0)

        Returns:
            Mixed audio
        """
        pass

SoundfileMixer

{ .api }
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.frames.frames import MixerUpdateSettingsFrame, MixerEnableFrame

class SoundfileMixer(BaseAudioMixer):
    """File-based audio mixer using soundfile library.

    Mixes incoming audio with audio loaded from files. Supports multiple
    audio formats, runtime configuration changes, and file switching.

    Args:
        sound_files: Mapping of sound names to file paths
        default_sound: Name of default sound to play initially
        volume: Mixing volume level (0.0-1.0, default: 0.4)
        mixing: Whether mixing is initially enabled (default: True)
        loop: Whether to loop audio files (default: True)

    Methods:
        start(sample_rate): Load all sound files
        stop(): Clean up resources
        mix(audio): Mix audio with current sound
        process_frame(frame): Handle control frames

    Control Frames:
        MixerUpdateSettingsFrame: Update mixer settings
            - sound: Change current sound file
            - volume: Change mixing volume
            - loop: Enable/disable looping
        MixerEnableFrame: Enable/disable mixing

    Example:
        # Initialize with multiple sounds
        mixer = SoundfileMixer(
            sound_files={
                "background": "music.wav",
                "alert": "chime.wav",
                "hold": "hold_music.wav"
            },
            default_sound="background",
            volume=0.4,
            loop=True
        )

        # Use with transport
        transport = DailyTransport(
            params=DailyParams(
                audio_out_mixer=mixer
            )
        )

        # Change sound at runtime
        await task.queue_frame(
            MixerUpdateSettingsFrame(settings={
                "sound": "alert",
                "volume": 0.6
            })
        )

        # Disable mixing
        await task.queue_frame(MixerEnableFrame(enable=False))

    Note:
        Requires `pip install pipecat-ai[soundfile]`
        Audio files must be mono and match transport sample rate
        Supports multiple formats via soundfile library
    """

    def __init__(
        self,
        *,
        sound_files: dict[str, str],
        default_sound: str,
        volume: float = 0.4,
        mixing: bool = True,
        loop: bool = True
    ):
        pass

Usage Patterns

Noise Filtering

{ .api }
from pipecat.audio.filters.krisp_filter import KrispFilter
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import InputAudioRawFrame

class AudioFilterProcessor(FrameProcessor):
    """Process audio with filter."""

    def __init__(self, filter: BaseAudioFilter):
        super().__init__()
        self._filter = filter

    async def process_frame(self, frame, direction):
        if isinstance(frame, InputAudioRawFrame):
            # Filter audio
            filtered = await self._filter.filter(frame.audio)

            # Create new frame
            frame = InputAudioRawFrame(
                audio=filtered,
                sample_rate=frame.sample_rate,
                num_channels=frame.num_channels
            )

        await self.push_frame(frame, direction)

# Use in pipeline
filter = KrispFilter(api_key="...")
filter_processor = AudioFilterProcessor(filter)

pipeline = Pipeline([
    transport.input(),
    filter_processor,  # Filter noise
    stt,
    # ...
])

Audio Resampling

{ .api }
from pipecat.audio.resamplers.soxr_resampler import SoxrResampler

# Resample between different rates
resampler = SoxrResampler(
    input_sample_rate=16000,
    output_sample_rate=24000
)

# Resample audio
output_audio = resampler.resample(input_audio)

Background Music Mixing

{ .api }
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer

# Initialize with multiple sound files
mixer = SoundfileMixer(
    sound_files={
        "background": "background.wav",
        "alert": "alert.wav"
    },
    default_sound="background",
    volume=0.3,
    loop=True
)

# Mixer automatically loads and mixes with output audio
# Use with transport for automatic mixing
transport = DailyTransport(
    params=DailyParams(
        audio_out_mixer=mixer
    )
)

# Change sound at runtime
await task.queue_frame(
    MixerUpdateSettingsFrame(settings={
        "sound": "alert",
        "volume": 0.5
    })
)

Advanced Filter Usage

{ .api }
from pipecat.audio.filters.koala_filter import KoalaFilter
from pipecat.audio.filters.aic_filter import AICFilter
from pipecat.transports.daily import DailyTransport, DailyParams

# Koala noise suppression
koala = KoalaFilter(access_key="your-picovoice-key")

transport = DailyTransport(
    params=DailyParams(
        audio_in_filter=koala,
        audio_in_sample_rate=16000  # Match Koala's rate
    )
)

# AIC enhancement with VAD
aic_filter = AICFilter(
    license_key="your-aic-key",
    enhancement_level=1.0,
    voice_gain=1.2
)

# Create AIC VAD
aic_vad = aic_filter.create_vad_analyzer(
    lookback_buffer_size=6.0,
    sensitivity=6.0
)

transport = DailyTransport(
    params=DailyParams(
        audio_in_filter=aic_filter,
        vad_enabled=True,
        vad_analyzer=aic_vad
    )
)

Streaming Resampler Usage

{ .api }
from pipecat.audio.resamplers.soxr_stream_resampler import SOXRStreamAudioResampler

# Create streaming resampler
resampler = SOXRStreamAudioResampler()

# Process chunks without clicks between boundaries
async def process_audio_stream(audio_chunks):
    """Process streaming audio with resampling."""
    resampled_chunks = []

    for chunk in audio_chunks:
        # Resample maintains internal state
        resampled = await resampler.resample(
            chunk,
            in_rate=16000,
            out_rate=24000
        )
        resampled_chunks.append(resampled)

    return b''.join(resampled_chunks)

# No clicks between chunks due to internal history

Filter Control Frames

{ .api }
from pipecat.frames.frames import FilterEnableFrame
from pipecat.audio.filters.koala_filter import KoalaFilter

# Create filter
filter = KoalaFilter(access_key="key")

# Disable filtering at runtime
await task.queue_frame(FilterEnableFrame(enable=False))

# Re-enable filtering
await task.queue_frame(FilterEnableFrame(enable=True))

# Useful for toggling noise suppression dynamically

Best Practices

Choose Right Filter

{ .api }
# Premium: Krisp VIVA - Advanced noise reduction
filter = KrispVivaFilter(
    model_path="/path/to/model.kef",
    noise_suppression_level=100
)
# Best quality, requires model file

# Premium: AIC - AI-powered enhancement
filter = AICFilter(
    license_key="...",
    enhancement_level=1.0,
    voice_gain=1.2
)
# Speech enhancement + noise reduction + VAD

# Good: Koala - PicoVoice noise suppression
filter = KoalaFilter(access_key="...")
# High quality, API-based, requires key

# Good: Krisp - Professional quality
filter = KrispFilter(api_key="...")
# Cloud-based, high quality

# Good: Open-source for development
filter = RNNoiseFilter()
# Free, good quality, no API key

# Basic: Simple noise reduction
filter = NoiseReduceFilter(stationary=True)
# Basic but effective, no dependencies

Choose Right Resampler

{ .api }
# Best for streaming: SOXRStreamAudioResampler
resampler = SOXRStreamAudioResampler()
# Maintains state, prevents clicks between chunks
# Ideal for real-time processing

# Best for one-shot: SoxrResampler
resampler = SoxrResampler(
    input_sample_rate=16000,
    output_sample_rate=24000
)
# High quality, no state management needed

# Alternative: ResampyResampler
resampler = ResampyResampler()
# Good balance of quality and performance
# Kaiser windowing filter

Resample When Necessary

{ .api }
# Only resample if rates don't match
if input_rate != output_rate:
    resampler = SoxrResampler(input_rate, output_rate)
    audio = resampler.resample(audio)
else:
    # No resampling needed
    pass

Mix Carefully

{ .api }
# Good: Appropriate levels
mixed = mixer.mix(
    voice, music,
    level1=1.0,   # Voice at full volume
    level2=0.2    # Music at 20% (background)
)

# Bad: Music too loud
mixed = mixer.mix(
    voice, music,
    level1=0.5,   # Voice too quiet
    level2=1.0    # Music too loud
)

Interruption Strategies

Interruption strategies determine when users can interrupt bot speech during conversations. These strategies analyze audio and text to make intelligent interruption decisions.

BaseInterruptionStrategy

{ .api }
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy

class BaseInterruptionStrategy:
    """Base class for interruption strategies.

    Strategies decide when the user can interrupt the bot while
    the bot is speaking. Can be based on audio volume, word count,
    or other criteria.

    Methods:
        append_audio(audio, sample_rate): Add audio for analysis
        append_text(text): Add text for analysis
        should_interrupt(): Check if interruption should occur
        reset(): Reset accumulated state

    Example:
        class CustomStrategy(BaseInterruptionStrategy):
            async def should_interrupt(self) -> bool:
                # Custom interruption logic
                return True

            async def reset(self):
                # Reset state
                pass
    """

    async def append_audio(self, audio: bytes, sample_rate: int):
        """Append audio data for analysis.

        Args:
            audio: Raw audio bytes
            sample_rate: Sample rate in Hz
        """
        pass

    async def append_text(self, text: str):
        """Append text data for analysis.

        Args:
            text: Text string
        """
        pass

    async def should_interrupt(self) -> bool:
        """Determine if user should interrupt bot.

        Returns:
            True if interruption should occur
        """
        pass

    async def reset(self):
        """Reset accumulated audio and text."""
        pass

MinWordsInterruptionStrategy

{ .api }
from pipecat.audio.interruptions.min_words_interruption_strategy import MinWordsInterruptionStrategy

class MinWordsInterruptionStrategy(BaseInterruptionStrategy):
    """Interruption strategy based on minimum word count.

    Triggers interruption when user has spoken at least the
    specified number of words.

    Args:
        min_words: Minimum words required for interruption

    Note:
        Deprecated since 0.0.99. Use
        pipecat.turns.user_start.MinWordsUserTurnStartStrategy
        with PipelineTask's user_turn_strategies parameter instead.

    Example:
        # Old approach (deprecated)
        strategy = MinWordsInterruptionStrategy(min_words=3)

        # Append text as user speaks
        await strategy.append_text("Hello ")
        await strategy.append_text("there ")
        await strategy.append_text("friend")

        # Check if should interrupt
        if await strategy.should_interrupt():
            print("User spoke 3+ words, interrupt bot")
    """

    def __init__(self, *, min_words: int):
        pass

    async def append_text(self, text: str):
        """Add text to word count.

        Args:
            text: Text to add
        """
        pass

    async def should_interrupt(self) -> bool:
        """Check if word count exceeds minimum.

        Returns:
            True if word count >= min_words
        """
        pass

    async def reset(self):
        """Reset accumulated text."""
        pass

Interruption Patterns

Basic Interruption Setup

{ .api }
from pipecat.audio.interruptions.min_words_interruption_strategy import MinWordsInterruptionStrategy
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams

# Note: This approach is deprecated
# Use user_turn_strategies parameter instead

# Create strategy
strategy = MinWordsInterruptionStrategy(min_words=2)

# Configure pipeline task (old approach)
# See user turn strategies documentation for new approach

Custom Interruption Strategy

{ .api }
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
import time

class VolumeBasedInterruption(BaseInterruptionStrategy):
    """Interrupt based on audio volume threshold."""

    def __init__(self, volume_threshold: float = 0.5, duration_ms: int = 500):
        super().__init__()
        self._threshold = volume_threshold
        self._duration_ms = duration_ms
        self._loud_start = None

    async def append_audio(self, audio: bytes, sample_rate: int):
        """Track audio volume over time."""
        import numpy as np

        # Calculate RMS volume
        audio_np = np.frombuffer(audio, dtype=np.int16)
        rms = np.sqrt(np.mean(audio_np.astype(float) ** 2))
        normalized_volume = rms / 32768.0

        # Track if volume exceeds threshold
        if normalized_volume > self._threshold:
            if self._loud_start is None:
                self._loud_start = time.time()
        else:
            self._loud_start = None

    async def should_interrupt(self) -> bool:
        """Interrupt if volume exceeded threshold for duration."""
        if self._loud_start is None:
            return False

        duration = (time.time() - self._loud_start) * 1000
        return duration >= self._duration_ms

    async def reset(self):
        """Reset volume tracking."""
        self._loud_start = None

# Usage
strategy = VolumeBasedInterruption(
    volume_threshold=0.5,
    duration_ms=500
)

Migration to User Turn Strategies

{ .api }
# Old approach (deprecated)
from pipecat.audio.interruptions.min_words_interruption_strategy import MinWordsInterruptionStrategy

strategy = MinWordsInterruptionStrategy(min_words=3)

# New approach (recommended)
from pipecat.turns.user_start import MinWordsUserTurnStartStrategy
from pipecat.pipeline.task import PipelineTask, PipelineParams

# Create strategy
turn_strategy = MinWordsUserTurnStartStrategy(min_words=3)

# Use with PipelineTask
task = PipelineTask(
    pipeline,
    params=PipelineParams(
        user_turn_strategies=[turn_strategy]
    )
)

# User turn strategies provide more flexibility and better integration
# with the pipeline's turn-taking system

Interruption Best Practices

Choose Appropriate Threshold

{ .api }
# Good: Require meaningful input
strategy = MinWordsInterruptionStrategy(min_words=3)
# User must say at least 3 words to interrupt

# Bad: Too sensitive
strategy = MinWordsInterruptionStrategy(min_words=1)
# Any single word interrupts (too aggressive)

Combine with VAD

{ .api }
# Interruptions work best with VAD
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer, VADParams

# Configure sensitive VAD for interruptions
vad = SileroVADAnalyzer(
    params=VADParams(
        threshold=0.5,
        min_speech_duration_ms=250
    )
)

# VAD detects speech, strategy determines if it should interrupt
transport = DailyTransport(
    params=DailyParams(
        vad_enabled=True,
        vad_analyzer=vad
    )
)

Handle Interruption Events

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import BotInterruptionFrame, UserStartedSpeakingFrame

class InterruptionHandler(FrameProcessor):
    """Handle bot interruptions gracefully."""

    async def process_frame(self, frame, direction):
        if isinstance(frame, BotInterruptionFrame):
            print("Bot was interrupted, stopping current output")
            # Clean up any ongoing TTS or audio
            await self._cleanup_output()

        elif isinstance(frame, UserStartedSpeakingFrame):
            print("User started speaking")
            # Prepare for potential interruption

        await self.push_frame(frame, direction)

    async def _cleanup_output(self):
        """Clean up interrupted output."""
        # Cancel ongoing audio
        # Clear TTS queue
        # Reset bot state
        pass