docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Audio filters process audio to improve quality, while mixers combine multiple audio sources. These components enhance audio quality and enable advanced audio processing in pipelines.
{ .api }
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
class BaseAudioFilter:
"""Base class for audio filters.
Processes audio to improve quality or modify characteristics.
Methods:
filter(audio): Filter audio
reset(): Reset filter state
Example:
filter = SomeAudioFilter()
filtered_audio = await filter.filter(audio_bytes)
"""
async def filter(self, audio: bytes) -> bytes:
"""Filter audio.
Args:
audio: Input audio bytes
Returns:
Filtered audio bytes
"""
pass
def reset(self):
"""Reset filter state."""
pass{ .api }
from pipecat.audio.filters.krisp_filter import KrispFilter
class KrispFilter(BaseAudioFilter):
"""Krisp noise suppression filter.
Professional noise cancellation using Krisp AI.
Args:
api_key: Krisp API key
Example:
filter = KrispFilter(api_key="your-key")
# Use with transport
transport = DailyTransport(
params=DailyParams(
audio_in_filter=filter
)
)
"""
def __init__(self, api_key: str):
pass{ .api }
from pipecat.audio.filters.rnnoise_filter import RNNoiseFilter
class RNNoiseFilter(BaseAudioFilter):
"""RNNoise noise suppression filter.
Open-source noise reduction using RNN.
Example:
filter = RNNoiseFilter()
filtered = await filter.filter(audio)
"""
def __init__(self):
pass{ .api }
from pipecat.audio.filters.noisereduce_filter import NoiseReduceFilter
class NoiseReduceFilter(BaseAudioFilter):
"""Noise reduction using noisereduce library.
Args:
stationary: Use stationary noise reduction
Example:
filter = NoiseReduceFilter(stationary=True)
"""
def __init__(self, stationary: bool = True):
pass{ .api }
from pipecat.audio.filters.koala_filter import KoalaFilter
class KoalaFilter(BaseAudioFilter):
"""Koala noise suppression from PicoVoice.
Real-time noise suppression using PicoVoice's Koala engine.
Provides high-quality noise reduction with automatic buffering
for optimal frame processing.
Args:
access_key: PicoVoice access key for authentication
Methods:
start(sample_rate): Initialize with transport sample rate
stop(): Clean up resources
filter(audio): Apply noise suppression
reset(): Reset filter state
Example:
filter = KoalaFilter(access_key="your-picovoice-key")
# Use with transport
transport = DailyTransport(
params=DailyParams(
audio_in_filter=filter
)
)
Note:
Requires `pip install pipecat-ai[koala]`
Sample rate must match Koala's expected rate (typically 16kHz)
"""
def __init__(self, *, access_key: str):
pass{ .api }
from pipecat.audio.filters.aic_filter import AICFilter
from aic import AICModelType
class AICFilter(BaseAudioFilter):
"""AI-Coustics audio enhancement filter.
Real-time audio enhancement using ai-coustics' AIC SDK.
Provides advanced speech enhancement, noise suppression,
and voice gain control.
Args:
license_key: ai-coustics license key for authentication
model_type: Model variant (default: AICModelType.QUAIL_STT)
enhancement_level: Overall enhancement strength (0.0-1.0, default: 1.0)
voice_gain: Linear gain for detected speech (0.0-4.0, default: 1.0)
noise_gate_enable: Deprecated, use create_vad_analyzer() instead
Methods:
start(sample_rate): Initialize with transport sample rate
stop(): Clean up resources
filter(audio): Apply audio enhancement
create_vad_analyzer(): Create AIC VAD analyzer
get_vad_factory(): Get VAD factory function
Example:
# Basic usage
filter = AICFilter(
license_key="your-aic-key",
enhancement_level=1.0,
voice_gain=1.0
)
# With VAD
filter = AICFilter(license_key="your-aic-key")
vad = filter.create_vad_analyzer(
lookback_buffer_size=6.0,
sensitivity=6.0
)
transport = DailyTransport(
params=DailyParams(
audio_in_filter=filter,
vad_analyzer=vad
)
)
Note:
Requires `pip install pipecat-ai[aic]`
Processes audio in optimal block sizes for efficiency
"""
def __init__(
self,
*,
license_key: str = "",
model_type: AICModelType = AICModelType.QUAIL_STT,
enhancement_level: float = 1.0,
voice_gain: float = 1.0,
noise_gate_enable: bool = True
):
pass{ .api }
from pipecat.audio.filters.krisp_viva_filter import KrispVivaFilter
class KrispVivaFilter(BaseAudioFilter):
"""Krisp VIVA SDK noise reduction filter.
Advanced noise reduction using Krisp's VIVA SDK with proprietary
noise suppression algorithms. Requires a valid Krisp model file.
Args:
model_path: Path to Krisp model file (.kef extension)
If None, uses KRISP_VIVA_FILTER_MODEL_PATH environment variable
frame_duration: Frame duration in milliseconds (default: 10)
noise_suppression_level: Suppression level 0-100 (default: 100)
Methods:
start(sample_rate): Initialize with transport sample rate
stop(): Clean up resources
filter(audio): Apply noise reduction
Example:
# Using model path
filter = KrispVivaFilter(
model_path="/path/to/model.kef",
frame_duration=10,
noise_suppression_level=100
)
# Using environment variable
import os
os.environ["KRISP_VIVA_FILTER_MODEL_PATH"] = "/path/to/model.kef"
filter = KrispVivaFilter()
transport = DailyTransport(
params=DailyParams(
audio_in_filter=filter
)
)
Note:
Requires krisp_audio package
Model file must have .kef extension
Buffers audio for optimal frame processing
"""
def __init__(
self,
model_path: str = None,
frame_duration: int = 10,
noise_suppression_level: int = 100
):
pass{ .api }
from pipecat.audio.resamplers.base_resampler import BaseAudioResampler
class BaseAudioResampler:
"""Base class for audio resampling.
Converts audio between different sample rates.
Methods:
resample(audio): Resample audio
Example:
resampler = SomeResampler(
input_sample_rate=16000,
output_sample_rate=24000
)
resampled = resampler.resample(audio)
"""
def __init__(self, input_sample_rate: int, output_sample_rate: int):
pass
def resample(self, audio: bytes) -> bytes:
"""Resample audio.
Args:
audio: Input audio
Returns:
Resampled audio
"""
pass{ .api }
from pipecat.audio.resamplers.soxr_resampler import SoxrResampler
class SoxrResampler(BaseAudioResampler):
"""High-quality SOXR resampler.
Args:
input_sample_rate: Input rate (Hz)
output_sample_rate: Output rate (Hz)
num_channels: Number of channels
Example:
resampler = SoxrResampler(
input_sample_rate=16000,
output_sample_rate=24000,
num_channels=1
)
resampled = resampler.resample(audio)
"""
def __init__(
self,
input_sample_rate: int,
output_sample_rate: int,
num_channels: int = 1
):
pass{ .api }
from pipecat.audio.resamplers.soxr_stream_resampler import SOXRStreamAudioResampler
class SOXRStreamAudioResampler(BaseAudioResampler):
"""Streaming SOXR resampler with internal history.
High-quality streaming resampler using SoX ResampleStream library.
Maintains internal state to avoid clicks at chunk boundaries.
Ideal for real-time processing and long audio signals.
Methods:
resample(audio, in_rate, out_rate): Resample audio stream
When to use:
- Real-time processing scenarios
- Very long audio signals
- Processing audio in chunks/streams
- Reusing same resampler configuration (saves initialization overhead)
Example:
resampler = SOXRStreamAudioResampler()
# Process audio stream
resampled1 = await resampler.resample(
audio_chunk1,
in_rate=16000,
out_rate=24000
)
# Internal state prevents clicks between chunks
resampled2 = await resampler.resample(
audio_chunk2,
in_rate=16000,
out_rate=24000
)
Note:
- Uses VHQ (Very High Quality) resampling
- Only supports mono audio (1 channel)
- Input must be 16-bit signed PCM
- Automatically clears state after 0.2 seconds of inactivity
- Cannot be reused with different sample rates
"""
def __init__(self, **kwargs):
pass
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
"""Resample audio with internal state management.
Args:
audio: Input audio bytes (16-bit PCM)
in_rate: Input sample rate (Hz)
out_rate: Output sample rate (Hz)
Returns:
Resampled audio bytes
"""
pass{ .api }
from pipecat.audio.resamplers.resampy_resampler import ResampyResampler
class ResampyResampler(BaseAudioResampler):
"""Resampy-based audio resampler.
High-quality resampler using resampy library's Kaiser windowing
filter. Good balance of quality and performance.
Methods:
resample(audio, in_rate, out_rate): Resample audio
Example:
resampler = ResampyResampler()
# Resample audio
resampled = await resampler.resample(
audio,
in_rate=16000,
out_rate=24000
)
Note:
- Uses Kaiser windowing filter
- Good performance characteristics
- Input must be 16-bit signed PCM
- No internal state between calls
"""
def __init__(self, **kwargs):
pass
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
"""Resample audio using resampy library.
Args:
audio: Input audio bytes (16-bit PCM)
in_rate: Input sample rate (Hz)
out_rate: Output sample rate (Hz)
Returns:
Resampled audio bytes
"""
pass{ .api }
from pipecat.audio.mixers.base_mixer import BaseAudioMixer
class BaseAudioMixer:
"""Base class for audio mixing.
Combines multiple audio sources.
Methods:
mix(audio1, audio2): Mix two audio streams
Example:
mixer = SomeMixer(sample_rate=24000)
mixed = mixer.mix(audio1, audio2)
"""
def __init__(self, sample_rate: int, num_channels: int = 1):
pass
def mix(self, audio1: bytes, audio2: bytes, level1: float = 1.0, level2: float = 1.0) -> bytes:
"""Mix two audio streams.
Args:
audio1: First audio stream
audio2: Second audio stream
level1: Volume level for audio1 (0.0-1.0)
level2: Volume level for audio2 (0.0-1.0)
Returns:
Mixed audio
"""
pass{ .api }
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.frames.frames import MixerUpdateSettingsFrame, MixerEnableFrame
class SoundfileMixer(BaseAudioMixer):
"""File-based audio mixer using soundfile library.
Mixes incoming audio with audio loaded from files. Supports multiple
audio formats, runtime configuration changes, and file switching.
Args:
sound_files: Mapping of sound names to file paths
default_sound: Name of default sound to play initially
volume: Mixing volume level (0.0-1.0, default: 0.4)
mixing: Whether mixing is initially enabled (default: True)
loop: Whether to loop audio files (default: True)
Methods:
start(sample_rate): Load all sound files
stop(): Clean up resources
mix(audio): Mix audio with current sound
process_frame(frame): Handle control frames
Control Frames:
MixerUpdateSettingsFrame: Update mixer settings
- sound: Change current sound file
- volume: Change mixing volume
- loop: Enable/disable looping
MixerEnableFrame: Enable/disable mixing
Example:
# Initialize with multiple sounds
mixer = SoundfileMixer(
sound_files={
"background": "music.wav",
"alert": "chime.wav",
"hold": "hold_music.wav"
},
default_sound="background",
volume=0.4,
loop=True
)
# Use with transport
transport = DailyTransport(
params=DailyParams(
audio_out_mixer=mixer
)
)
# Change sound at runtime
await task.queue_frame(
MixerUpdateSettingsFrame(settings={
"sound": "alert",
"volume": 0.6
})
)
# Disable mixing
await task.queue_frame(MixerEnableFrame(enable=False))
Note:
Requires `pip install pipecat-ai[soundfile]`
Audio files must be mono and match transport sample rate
Supports multiple formats via soundfile library
"""
def __init__(
self,
*,
sound_files: dict[str, str],
default_sound: str,
volume: float = 0.4,
mixing: bool = True,
loop: bool = True
):
pass{ .api }
from pipecat.audio.filters.krisp_filter import KrispFilter
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import InputAudioRawFrame
class AudioFilterProcessor(FrameProcessor):
"""Process audio with filter."""
def __init__(self, filter: BaseAudioFilter):
super().__init__()
self._filter = filter
async def process_frame(self, frame, direction):
if isinstance(frame, InputAudioRawFrame):
# Filter audio
filtered = await self._filter.filter(frame.audio)
# Create new frame
frame = InputAudioRawFrame(
audio=filtered,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels
)
await self.push_frame(frame, direction)
# Use in pipeline
filter = KrispFilter(api_key="...")
filter_processor = AudioFilterProcessor(filter)
pipeline = Pipeline([
transport.input(),
filter_processor, # Filter noise
stt,
# ...
]){ .api }
from pipecat.audio.resamplers.soxr_resampler import SoxrResampler
# Resample between different rates
resampler = SoxrResampler(
input_sample_rate=16000,
output_sample_rate=24000
)
# Resample audio
output_audio = resampler.resample(input_audio){ .api }
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
# Initialize with multiple sound files
mixer = SoundfileMixer(
sound_files={
"background": "background.wav",
"alert": "alert.wav"
},
default_sound="background",
volume=0.3,
loop=True
)
# Mixer automatically loads and mixes with output audio
# Use with transport for automatic mixing
transport = DailyTransport(
params=DailyParams(
audio_out_mixer=mixer
)
)
# Change sound at runtime
await task.queue_frame(
MixerUpdateSettingsFrame(settings={
"sound": "alert",
"volume": 0.5
})
){ .api }
from pipecat.audio.filters.koala_filter import KoalaFilter
from pipecat.audio.filters.aic_filter import AICFilter
from pipecat.transports.daily import DailyTransport, DailyParams
# Koala noise suppression
koala = KoalaFilter(access_key="your-picovoice-key")
transport = DailyTransport(
params=DailyParams(
audio_in_filter=koala,
audio_in_sample_rate=16000 # Match Koala's rate
)
)
# AIC enhancement with VAD
aic_filter = AICFilter(
license_key="your-aic-key",
enhancement_level=1.0,
voice_gain=1.2
)
# Create AIC VAD
aic_vad = aic_filter.create_vad_analyzer(
lookback_buffer_size=6.0,
sensitivity=6.0
)
transport = DailyTransport(
params=DailyParams(
audio_in_filter=aic_filter,
vad_enabled=True,
vad_analyzer=aic_vad
)
){ .api }
from pipecat.audio.resamplers.soxr_stream_resampler import SOXRStreamAudioResampler
# Create streaming resampler
resampler = SOXRStreamAudioResampler()
# Process chunks without clicks between boundaries
async def process_audio_stream(audio_chunks):
"""Process streaming audio with resampling."""
resampled_chunks = []
for chunk in audio_chunks:
# Resample maintains internal state
resampled = await resampler.resample(
chunk,
in_rate=16000,
out_rate=24000
)
resampled_chunks.append(resampled)
return b''.join(resampled_chunks)
# No clicks between chunks due to internal history{ .api }
from pipecat.frames.frames import FilterEnableFrame
from pipecat.audio.filters.koala_filter import KoalaFilter
# Create filter
filter = KoalaFilter(access_key="key")
# Disable filtering at runtime
await task.queue_frame(FilterEnableFrame(enable=False))
# Re-enable filtering
await task.queue_frame(FilterEnableFrame(enable=True))
# Useful for toggling noise suppression dynamically{ .api }
# Premium: Krisp VIVA - Advanced noise reduction
filter = KrispVivaFilter(
model_path="/path/to/model.kef",
noise_suppression_level=100
)
# Best quality, requires model file
# Premium: AIC - AI-powered enhancement
filter = AICFilter(
license_key="...",
enhancement_level=1.0,
voice_gain=1.2
)
# Speech enhancement + noise reduction + VAD
# Good: Koala - PicoVoice noise suppression
filter = KoalaFilter(access_key="...")
# High quality, API-based, requires key
# Good: Krisp - Professional quality
filter = KrispFilter(api_key="...")
# Cloud-based, high quality
# Good: Open-source for development
filter = RNNoiseFilter()
# Free, good quality, no API key
# Basic: Simple noise reduction
filter = NoiseReduceFilter(stationary=True)
# Basic but effective, no dependencies{ .api }
# Best for streaming: SOXRStreamAudioResampler
resampler = SOXRStreamAudioResampler()
# Maintains state, prevents clicks between chunks
# Ideal for real-time processing
# Best for one-shot: SoxrResampler
resampler = SoxrResampler(
input_sample_rate=16000,
output_sample_rate=24000
)
# High quality, no state management needed
# Alternative: ResampyResampler
resampler = ResampyResampler()
# Good balance of quality and performance
# Kaiser windowing filter{ .api }
# Only resample if rates don't match
if input_rate != output_rate:
resampler = SoxrResampler(input_rate, output_rate)
audio = resampler.resample(audio)
else:
# No resampling needed
pass{ .api }
# Good: Appropriate levels
mixed = mixer.mix(
voice, music,
level1=1.0, # Voice at full volume
level2=0.2 # Music at 20% (background)
)
# Bad: Music too loud
mixed = mixer.mix(
voice, music,
level1=0.5, # Voice too quiet
level2=1.0 # Music too loud
)Interruption strategies determine when users can interrupt bot speech during conversations. These strategies analyze audio and text to make intelligent interruption decisions.
{ .api }
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
class BaseInterruptionStrategy:
"""Base class for interruption strategies.
Strategies decide when the user can interrupt the bot while
the bot is speaking. Can be based on audio volume, word count,
or other criteria.
Methods:
append_audio(audio, sample_rate): Add audio for analysis
append_text(text): Add text for analysis
should_interrupt(): Check if interruption should occur
reset(): Reset accumulated state
Example:
class CustomStrategy(BaseInterruptionStrategy):
async def should_interrupt(self) -> bool:
# Custom interruption logic
return True
async def reset(self):
# Reset state
pass
"""
async def append_audio(self, audio: bytes, sample_rate: int):
"""Append audio data for analysis.
Args:
audio: Raw audio bytes
sample_rate: Sample rate in Hz
"""
pass
async def append_text(self, text: str):
"""Append text data for analysis.
Args:
text: Text string
"""
pass
async def should_interrupt(self) -> bool:
"""Determine if user should interrupt bot.
Returns:
True if interruption should occur
"""
pass
async def reset(self):
"""Reset accumulated audio and text."""
pass{ .api }
from pipecat.audio.interruptions.min_words_interruption_strategy import MinWordsInterruptionStrategy
class MinWordsInterruptionStrategy(BaseInterruptionStrategy):
"""Interruption strategy based on minimum word count.
Triggers interruption when user has spoken at least the
specified number of words.
Args:
min_words: Minimum words required for interruption
Note:
Deprecated since 0.0.99. Use
pipecat.turns.user_start.MinWordsUserTurnStartStrategy
with PipelineTask's user_turn_strategies parameter instead.
Example:
# Old approach (deprecated)
strategy = MinWordsInterruptionStrategy(min_words=3)
# Append text as user speaks
await strategy.append_text("Hello ")
await strategy.append_text("there ")
await strategy.append_text("friend")
# Check if should interrupt
if await strategy.should_interrupt():
print("User spoke 3+ words, interrupt bot")
"""
def __init__(self, *, min_words: int):
pass
async def append_text(self, text: str):
"""Add text to word count.
Args:
text: Text to add
"""
pass
async def should_interrupt(self) -> bool:
"""Check if word count exceeds minimum.
Returns:
True if word count >= min_words
"""
pass
async def reset(self):
"""Reset accumulated text."""
pass{ .api }
from pipecat.audio.interruptions.min_words_interruption_strategy import MinWordsInterruptionStrategy
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask, PipelineParams
# Note: This approach is deprecated
# Use user_turn_strategies parameter instead
# Create strategy
strategy = MinWordsInterruptionStrategy(min_words=2)
# Configure pipeline task (old approach)
# See user turn strategies documentation for new approach{ .api }
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
import time
class VolumeBasedInterruption(BaseInterruptionStrategy):
"""Interrupt based on audio volume threshold."""
def __init__(self, volume_threshold: float = 0.5, duration_ms: int = 500):
super().__init__()
self._threshold = volume_threshold
self._duration_ms = duration_ms
self._loud_start = None
async def append_audio(self, audio: bytes, sample_rate: int):
"""Track audio volume over time."""
import numpy as np
# Calculate RMS volume
audio_np = np.frombuffer(audio, dtype=np.int16)
rms = np.sqrt(np.mean(audio_np.astype(float) ** 2))
normalized_volume = rms / 32768.0
# Track if volume exceeds threshold
if normalized_volume > self._threshold:
if self._loud_start is None:
self._loud_start = time.time()
else:
self._loud_start = None
async def should_interrupt(self) -> bool:
"""Interrupt if volume exceeded threshold for duration."""
if self._loud_start is None:
return False
duration = (time.time() - self._loud_start) * 1000
return duration >= self._duration_ms
async def reset(self):
"""Reset volume tracking."""
self._loud_start = None
# Usage
strategy = VolumeBasedInterruption(
volume_threshold=0.5,
duration_ms=500
){ .api }
# Old approach (deprecated)
from pipecat.audio.interruptions.min_words_interruption_strategy import MinWordsInterruptionStrategy
strategy = MinWordsInterruptionStrategy(min_words=3)
# New approach (recommended)
from pipecat.turns.user_start import MinWordsUserTurnStartStrategy
from pipecat.pipeline.task import PipelineTask, PipelineParams
# Create strategy
turn_strategy = MinWordsUserTurnStartStrategy(min_words=3)
# Use with PipelineTask
task = PipelineTask(
pipeline,
params=PipelineParams(
user_turn_strategies=[turn_strategy]
)
)
# User turn strategies provide more flexibility and better integration
# with the pipeline's turn-taking system{ .api }
# Good: Require meaningful input
strategy = MinWordsInterruptionStrategy(min_words=3)
# User must say at least 3 words to interrupt
# Bad: Too sensitive
strategy = MinWordsInterruptionStrategy(min_words=1)
# Any single word interrupts (too aggressive){ .api }
# Interruptions work best with VAD
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer, VADParams
# Configure sensitive VAD for interruptions
vad = SileroVADAnalyzer(
params=VADParams(
threshold=0.5,
min_speech_duration_ms=250
)
)
# VAD detects speech, strategy determines if it should interrupt
transport = DailyTransport(
params=DailyParams(
vad_enabled=True,
vad_analyzer=vad
)
){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import BotInterruptionFrame, UserStartedSpeakingFrame
class InterruptionHandler(FrameProcessor):
"""Handle bot interruptions gracefully."""
async def process_frame(self, frame, direction):
if isinstance(frame, BotInterruptionFrame):
print("Bot was interrupted, stopping current output")
# Clean up any ongoing TTS or audio
await self._cleanup_output()
elif isinstance(frame, UserStartedSpeakingFrame):
print("User started speaking")
# Prepare for potential interruption
await self.push_frame(frame, direction)
async def _cleanup_output(self):
"""Clean up interrupted output."""
# Cancel ongoing audio
# Clear TTS queue
# Reset bot state
pass