docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
The Turn Management system provides sophisticated control over conversational turn-taking in voice and multimodal AI agents. It manages when users can speak, when interruptions occur, and how the agent responds to user activity patterns through configurable strategies.
{ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_turn_strategies import (
UserTurnStrategies,
ExternalUserTurnStrategies,
)
# User start strategies
from pipecat.turns.user_start import (
BaseUserTurnStartStrategy,
VADUserTurnStartStrategy,
TranscriptionUserTurnStartStrategy,
MinWordsUserTurnStartStrategy,
ExternalUserTurnStartStrategy,
)
# User stop strategies
from pipecat.turns.user_stop import (
BaseUserTurnStopStrategy,
TranscriptionUserTurnStopStrategy,
TurnAnalyzerUserTurnStopStrategy,
ExternalUserTurnStopStrategy,
)
# User mute strategies
from pipecat.turns.user_mute import (
BaseUserMuteStrategy,
AlwaysUserMuteStrategy,
FirstSpeechUserMuteStrategy,
MuteUntilFirstBotCompleteUserMuteStrategy,
FunctionCallUserMuteStrategy,
)
# Turn analyzers
from pipecat.audio.turn.base_turn_analyzer import (
BaseTurnAnalyzer,
BaseTurnParams,
EndOfTurnState,
)
from pipecat.audio.turn.smart_turn import (
SmartTurnParams,
LocalSmartTurnAnalyzer, # Deprecated - use V3
LocalSmartTurnAnalyzerV3,
)Main frame processor for managing complete user turn lifecycle with configurable strategies.
class UserTurnProcessor(FrameProcessor):
"""Frame processor for user turn management.
Manages when user turns start and stop using configured strategies,
and optionally detects user idle state.
Parameters:
user_turn_strategies: Strategies for detecting turn start/stop (default: UserTurnStrategies())
user_turn_stop_timeout: Seconds to auto-stop turn if no activity (default: 5.0)
user_idle_timeout: Seconds before user considered idle, None to disable (default: None)
Events:
on_user_turn_started: Emitted when user starts speaking
on_user_turn_stopped: Emitted when user stops speaking
on_user_turn_stop_timeout: Emitted if no stop strategy triggers
on_user_turn_idle: Emitted when user idle (if enabled)
"""
def __init__(
self,
*,
user_turn_strategies: Optional[UserTurnStrategies] = None,
user_turn_stop_timeout: float = 5.0,
user_idle_timeout: Optional[float] = None,
**kwargs,
)Usage:
{ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
# Create with default strategies
turn_processor = UserTurnProcessor()
# Or customize strategies
strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.7)]
)
turn_processor = UserTurnProcessor(
user_turn_strategies=strategies,
user_turn_stop_timeout=5.0,
user_idle_timeout=30.0 # Detect idle after 30 seconds
)
# Add to pipeline
pipeline = Pipeline([
transport.input(),
turn_processor, # Manages turn-taking
llm_user_aggregator,
# ... rest of pipeline
])Containers for combining start, stop, and mute strategies.
@dataclass
class UserTurnStrategies:
"""Container for user turn start and stop strategies.
If no strategies specified, uses defaults:
start: [VADUserTurnStartStrategy, TranscriptionUserTurnStartStrategy]
stop: [TranscriptionUserTurnStopStrategy]
Attributes:
start: List of strategies detecting when user starts speaking
stop: List of strategies detecting when user stops speaking
"""
start: Optional[List[BaseUserTurnStartStrategy]] = None
stop: Optional[List[BaseUserTurnStopStrategy]] = None
@dataclass
class ExternalUserTurnStrategies(UserTurnStrategies):
"""Convenience default for external turn control.
Preconfigures strategies for external processors to control turns.
User aggregator does not push UserStartedSpeakingFrame or
UserStoppedSpeakingFrame - these must come from external sources.
Defaults:
start: [ExternalUserTurnStartStrategy()]
stop: [ExternalUserTurnStopStrategy()]
"""Usage:
{ .api }
# Default strategies (VAD + transcription based)
strategies = UserTurnStrategies()
# Custom strategies
strategies = UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),
MinWordsUserTurnStartStrategy(min_words=2)
],
stop=[
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=my_analyzer,
timeout=0.5
)
]
)
# External control
external_strategies = ExternalUserTurnStrategies()Strategies for detecting when user turns begin.
class BaseUserTurnStartStrategy(BaseObject):
"""Base class for user turn start detection strategies.
Parameters:
enable_interruptions: If True, emit interruption frame when user turn starts (default: True)
enable_user_speaking_frames: If True, emit UserStartedSpeakingFrame (default: True).
Disable if another component already generates these.
Events:
on_user_turn_started: User turn has started (emits UserTurnStartedParams)
on_push_frame: Strategy wants to push a frame
on_broadcast_frame: Strategy wants to broadcast a frame
"""
def __init__(
self,
*,
enable_interruptions: bool = True,
enable_user_speaking_frames: bool = True,
**kwargs,
)class VADUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""VAD-based user turn start detection.
Triggers user turn immediately when VAD detects voice activity.
Simplest and most responsive strategy for interrupt-based conversations.
Triggered by: VADUserStartedSpeakingFrame
"""class TranscriptionUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""Transcription-based user turn start detection.
Signals user turn start when transcription received while bot is speaking.
Useful as fallback when VAD fails but STT produces transcriptions.
Parameters:
use_interim: Use interim transcription frames for earlier detection (default: True)
Triggered by: TranscriptionFrame or InterimTranscriptionFrame (if use_interim=True)
"""
def __init__(self, *, use_interim: bool = True, **kwargs)class MinWordsUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""Word-count based user turn start detection.
Signals user turn start once user has spoken minimum number of words.
Reduces false positives from single words or accidental speech.
Parameters:
min_words: Minimum spoken words required to trigger start (required)
use_interim: Consider interim transcriptions for earlier detection (default: True)
Triggered by: TranscriptionFrame or InterimTranscriptionFrame when word count >= min_words
Note: If bot is not speaking, requires only 1 word to trigger.
If bot is speaking, requires min_words words.
"""
def __init__(
self,
*,
min_words: int,
use_interim: bool = True,
**kwargs
)Usage:
{ .api }
# Prevent interruptions on single words
min_words_strategy = MinWordsUserTurnStartStrategy(
min_words=2,
use_interim=True
)class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""External user turn start control.
Turn start controlled by external processor that emits
UserStartedSpeakingFrame. No internal detection performed.
Defaults:
enable_interruptions=False
enable_user_speaking_frames=False
Triggered by: UserStartedSpeakingFrame (from external source)
"""Strategies for detecting when user turns end.
class BaseUserTurnStopStrategy(BaseObject):
"""Base class for user turn stop detection strategies.
Parameters:
enable_user_speaking_frames: If True, emit UserStoppedSpeakingFrame (default: True).
Disable if another component already generates these.
Events:
on_user_turn_stopped: User stopped speaking (emits UserTurnStoppedParams)
on_push_frame: Strategy wants to push a frame
on_broadcast_frame: Strategy wants to broadcast a frame
"""
def __init__(self, *, enable_user_speaking_frames: bool = True, **kwargs)class TranscriptionUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""Transcription-based user turn stop detection.
Assumes user stops speaking once transcription received. Handles
multiple or delayed transcription frames gracefully with timeout.
Parameters:
timeout: Short delay (seconds) for consecutive/delayed transcriptions (default: 0.5)
Triggered by: User VAD stops AND no interim results AND transcription text exists
(after timeout)
"""
def __init__(self, *, timeout: float = 0.5, **kwargs)class TurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""ML-based user turn stop detection using turn analyzer.
Uses turn detection model combining audio, VAD, and transcription
for more accurate turn end detection.
Parameters:
turn_analyzer: Turn detection analyzer instance (required)
timeout: Short delay (seconds) for frame timing (default: 0.5)
Triggered by: Transcription text received AND turn analyzer marks turn complete
Most sophisticated strategy. Requires BaseTurnAnalyzer instance.
Best for applications requiring high turn detection accuracy.
"""
def __init__(
self,
*,
turn_analyzer: BaseTurnAnalyzer,
timeout: float = 0.5,
**kwargs
)Usage:
from pipecat.audio.turn.smart_turn import LocalSmartTurnAnalyzerV3
# Create turn analyzer
analyzer = LocalSmartTurnAnalyzerV3(
sample_rate=16000,
params=SmartTurnParams(
stop_secs=3.0,
pre_speech_ms=500,
max_duration_secs=8
)
)
# Use with strategy
stop_strategy = TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=analyzer,
timeout=0.5
)class ExternalUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""External user turn stop control.
Turn stop controlled by external processor that emits
UserStoppedSpeakingFrame. Relies on external source for detection.
Parameters:
timeout: Short delay (seconds) for transcription handling (default: 0.5)
Defaults:
enable_user_speaking_frames=False
Triggered by: External UserStoppedSpeakingFrame AND no interim results AND text exists
"""
def __init__(self, *, timeout: float = 0.5, **kwargs)Strategies for deciding when user input should be suppressed.
class BaseUserMuteStrategy(BaseObject):
"""Base class for strategies deciding if user should be muted.
Returns:
bool: True if user should be muted, False if unmuted
"""
async def process_frame(self, frame: Frame) -> bool:
"""Process frame and return mute state."""class AlwaysUserMuteStrategy(BaseUserMuteStrategy):
"""Always mutes user while bot is speaking.
Most restrictive mute strategy. No interruptions allowed during bot speech.
Returns:
True (muted) if bot is speaking
False (unmuted) if bot is not speaking
"""class FirstSpeechUserMuteStrategy(BaseUserMuteStrategy):
"""Mutes user only during bot's first speech.
Allows user input before bot starts and after bot finishes first turn.
Useful for protecting bot's initial response from interruption.
Returns:
True (muted) only if bot is speaking AND first speech not yet complete
False (unmuted) otherwise
"""class MuteUntilFirstBotCompleteUserMuteStrategy(BaseUserMuteStrategy):
"""Mutes user from start until bot completes first speech.
More restrictive than FirstSpeechUserMuteStrategy - blocks user input
even before bot starts speaking. Ensures bot delivers complete first
response without any interruption.
Returns:
True (muted) until first bot speech completes
False (unmuted) after first bot speech completes
"""class FunctionCallUserMuteStrategy(BaseUserMuteStrategy):
"""Mutes user while function call is executing.
Ensures user input doesn't interfere with ongoing function execution.
Tracks multiple concurrent function calls.
Returns:
True (muted) if any function calls in progress
False (unmuted) if no function calls in progress
Frames tracked:
- FunctionCallsStartedFrame: Adds tool_call_ids to in-progress set
- FunctionCallCancelFrame: Removes tool_call_id
- FunctionCallResultFrame: Removes tool_call_id
"""Usage:
{ .api }
from pipecat.turns.user_mute import FunctionCallUserMuteStrategy
# Mute user during function execution
mute_strategy = FunctionCallUserMuteStrategy()
# Use with LLMUserAggregator
aggregator = LLMUserAggregator(
# ... other params ...
user_mute_strategy=mute_strategy
)Detects when user has been idle (not speaking) for configured timeout.
class UserIdleController(BaseObject):
"""Controller for user idle detection.
Monitors user activity and triggers event when user idle for timeout.
Only monitors after first conversation activity. Does not trigger while
bot is speaking or function calls are in progress.
Parameters:
user_idle_timeout: Timeout (seconds) before considering user idle (required)
Events:
on_user_turn_idle: User has been idle for configured timeout
Frames processed:
- UserStartedSpeakingFrame/BotSpeakingFrame: Starts monitoring on first activity
- UserSpeakingFrame/BotSpeakingFrame: Reset idle timer (continuous frames)
- FunctionCallsStartedFrame: Marks function call in progress (prevents idle trigger)
- FunctionCallResultFrame: Marks function call complete
"""
def __init__(self, *, user_idle_timeout: float)
async def setup(self, task_manager: BaseTaskManager):
"""Initialize controller with task manager and start idle monitoring."""
async def cleanup(self):
"""Cleanup controller and cancel idle task."""
async def process_frame(self, frame: Frame):
"""Process frame to track user activity state."""Usage:
{ .api }
# Idle controller typically used via UserTurnProcessor
turn_processor = UserTurnProcessor(
user_idle_timeout=30.0 # Trigger after 30 seconds of inactivity
)
@turn_processor.event_handler("on_user_turn_idle")
async def on_idle(processor):
print("User has been idle, sending reminder...")
# Push reminder prompt to pipeline
await processor.push_frame(TextFrame("Are you still there?"))Turn analyzers use ML models to detect end-of-turn with higher accuracy than simple silence-based detection.
class BaseTurnAnalyzer(ABC):
"""Abstract base class for analyzing user end of turn.
Provides the interface for turn detection analyzers that combine
audio, VAD, and transcription data to determine when user has
finished speaking.
Parameters:
sample_rate: Optional initial sample rate for audio processing
"""
def __init__(self, *, sample_rate: Optional[int] = None)
@property
def sample_rate(self) -> int:
"""Returns the current sample rate."""
def set_sample_rate(self, sample_rate: int):
"""Sets the sample rate for audio processing."""
@property
@abstractmethod
def speech_triggered(self) -> bool:
"""Returns True if speech has been detected and analysis is active."""
@property
@abstractmethod
def params(self) -> BaseTurnParams:
"""Get current turn analyzer parameters."""
@abstractmethod
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
"""Appends audio data for analysis.
Args:
buffer: Raw audio data bytes
is_speech: Whether audio contains detected speech
Returns:
EndOfTurnState.COMPLETE or EndOfTurnState.INCOMPLETE
"""
@abstractmethod
async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
"""Analyzes if end of turn has occurred.
Returns:
Tuple of (state, metrics) where metrics contains model prediction data
"""
def update_vad_start_secs(self, vad_start_secs: float):
"""Update VAD start trigger time for buffer management."""
@abstractmethod
def clear(self):
"""Reset analyzer to initial state."""
async def cleanup(self):
"""Cleanup analyzer resources."""class EndOfTurnState(Enum):
"""State enumeration for end-of-turn analysis results.
Attributes:
COMPLETE: User has finished their turn and stopped speaking
INCOMPLETE: User is still speaking or may continue speaking
"""
COMPLETE = 1
INCOMPLETE = 2class SmartTurnParams(BaseTurnParams):
"""Configuration parameters for smart turn analysis.
Parameters:
stop_secs: Maximum silence duration in seconds before ending turn (default: 3)
pre_speech_ms: Milliseconds of audio to include before speech starts (default: 500)
max_duration_secs: Maximum duration in seconds for audio segments (default: 8)
"""
stop_secs: float = 3
pre_speech_ms: float = 500
max_duration_secs: float = 8class LocalSmartTurnAnalyzerV3(BaseTurnAnalyzer):
"""Local turn analyzer using smart-turn-v3 ONNX model.
Provides end-of-turn detection using locally-stored ONNX model,
enabling offline operation without network dependencies. Uses
Whisper feature extraction with binary classification.
Most efficient and recommended analyzer for production use.
Parameters:
sample_rate: Optional sample rate for audio processing
smart_turn_model_path: Path to ONNX model file. If None, uses bundled smart-turn-v3.2-cpu model
cpu_count: Number of CPUs to use for inference (default: 1)
params: SmartTurnParams configuration
"""
def __init__(
self,
*,
sample_rate: Optional[int] = None,
smart_turn_model_path: Optional[str] = None,
cpu_count: int = 1,
params: Optional[SmartTurnParams] = None,
)
def _predict_endpoint(self, audio_array: np.ndarray) -> Dict[str, Any]:
"""Predict end-of-turn using local ONNX model.
Returns:
Dict with 'prediction' (0 or 1) and 'probability' (0.0 to 1.0)
"""Usage:
{ .api }
from pipecat.audio.turn.smart_turn import LocalSmartTurnAnalyzerV3, SmartTurnParams
# Create with default bundled model
analyzer = LocalSmartTurnAnalyzerV3(
sample_rate=16000,
cpu_count=2,
params=SmartTurnParams(
stop_secs=3.0, # Max silence before timeout
pre_speech_ms=500, # Pre-speech buffer
max_duration_secs=8 # Max segment duration
)
)
# Or with custom model path
analyzer = LocalSmartTurnAnalyzerV3(
smart_turn_model_path="/path/to/custom-model.onnx",
sample_rate=16000
)
# Use with turn stop strategy
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
stop_strategy = TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=analyzer,
timeout=0.5
)class LocalSmartTurnAnalyzer(BaseTurnAnalyzer):
"""Local turn analyzer using PyTorch Wav2Vec2-BERT model.
.. deprecated:: 0.98.0
LocalSmartTurnAnalyzer is deprecated. Use LocalSmartTurnAnalyzerV3 instead.
Parameters:
smart_turn_model_path: Path to PyTorch model directory. If empty, uses default HuggingFace model
sample_rate: Optional sample rate for audio processing
params: SmartTurnParams configuration
"""
def __init__(
self,
*,
smart_turn_model_path: str,
sample_rate: Optional[int] = None,
params: Optional[SmartTurnParams] = None,
)Low-level controller for managing turn lifecycle. Usually accessed via UserTurnProcessor.
class UserTurnController(BaseObject):
"""Controller for managing user turn lifecycle.
Manages user turn state (active/inactive), handles start and stop
strategies, and emits events when user turns begin, end, or timeout.
Parameters:
user_turn_strategies: Configured strategies for starting and stopping turns (required)
user_turn_stop_timeout: Seconds to auto-stop turn if no activity (default: 5.0)
Events:
on_user_turn_started: Emitted when user turn starts
on_user_turn_stopped: Emitted when user turn stops
on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout
on_push_frame: Emitted when strategy wants to push frame
on_broadcast_frame: Emitted when strategy wants to broadcast frame
"""
def __init__(
self,
*,
user_turn_strategies: UserTurnStrategies,
user_turn_stop_timeout: float = 5.0,
)
async def setup(self, task_manager: BaseTaskManager):
"""Initialize controller with task manager."""
async def cleanup(self):
"""Cleanup controller."""
async def update_strategies(self, strategies: UserTurnStrategies):
"""Replace current strategies with new ones."""
async def process_frame(self, frame: Frame):
"""Process incoming frame to detect user turn start or stop."""{ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.turns.user_turn_processor import UserTurnProcessor
# Create turn processor with default strategies
turn_processor = UserTurnProcessor()
# Add to pipeline
pipeline = Pipeline([
transport.input(),
turn_processor, # Manages turn-taking
llm_user_aggregator,
llm,
tts,
transport.output(),
]){ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import (
VADUserTurnStartStrategy,
MinWordsUserTurnStartStrategy,
)
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
# Configure custom strategies
strategies = UserTurnStrategies(
start=[
# Start on VAD OR after 2 words
VADUserTurnStartStrategy(),
MinWordsUserTurnStartStrategy(min_words=2, use_interim=True)
],
stop=[
# Wait longer for transcription completion
TranscriptionUserTurnStopStrategy(timeout=0.7)
]
)
turn_processor = UserTurnProcessor(
user_turn_strategies=strategies,
user_turn_stop_timeout=5.0
){ .api }
from pipecat.audio.turn.smart_turn import LocalSmartTurnAnalyzerV3, SmartTurnParams
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
# Create smart turn analyzer with custom parameters
analyzer = LocalSmartTurnAnalyzerV3(
sample_rate=16000,
cpu_count=2,
params=SmartTurnParams(
stop_secs=3.0,
pre_speech_ms=500,
max_duration_secs=8
)
)
# Use analyzer for stop detection
strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=analyzer,
timeout=0.5
)]
)
turn_processor = UserTurnProcessor(user_turn_strategies=strategies)
# Add to pipeline
pipeline = Pipeline([
transport.input(),
turn_processor,
llm_user_aggregator,
llm,
tts,
transport.output(),
]){ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
# Enable idle detection
turn_processor = UserTurnProcessor(
user_idle_timeout=30.0 # 30 seconds
)
# Handle idle event
@turn_processor.event_handler("on_user_turn_idle")
async def on_user_idle(processor):
"""Send reminder when user is idle."""
await processor.push_frame(
TextFrame("I'm still here if you need anything!")
)
# Add to pipeline
pipeline = Pipeline([
transport.input(),
turn_processor,
llm_user_aggregator,
# ... rest of pipeline
]){ .api }
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies
from pipecat.turns.user_turn_processor import UserTurnProcessor
# Use external control (from service or custom processor)
external_strategies = ExternalUserTurnStrategies()
turn_processor = UserTurnProcessor(
user_turn_strategies=external_strategies
)
# Your external processor must emit:
# - UserStartedSpeakingFrame when user starts
# - UserStoppedSpeakingFrame when user stops{ .api }
from pipecat.processors.aggregators.llm_context import (
LLMUserAggregator,
LLMAssistantAggregator,
LLMContextAggregatorPair,
)
from pipecat.turns.user_mute import FunctionCallUserMuteStrategy
# Create aggregators with mute strategy
user_aggregator = LLMUserAggregator(
context=context,
user_mute_strategy=FunctionCallUserMuteStrategy()
)
assistant_aggregator = LLMAssistantAggregator(context=context)
aggregator_pair = LLMContextAggregatorPair(
user=user_aggregator,
assistant=assistant_aggregator
)
# User will be muted during function execution
pipeline = Pipeline([
transport.input(),
aggregator_pair.user(), # Applies mute strategy
llm,
aggregator_pair.assistant(),
tts,
transport.output(),
]){ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
turn_processor = UserTurnProcessor()
# Later, update strategies dynamically
async def update_conversation_mode(mode: str):
if mode == "formal":
# Require more words for formal mode
strategies = UserTurnStrategies(
start=[MinWordsUserTurnStartStrategy(min_words=3)],
stop=[TranscriptionUserTurnStopStrategy(timeout=1.0)]
)
elif mode == "casual":
# Quick interruptions for casual mode
strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.3)]
)
# Update turn processor strategies
await turn_processor._turn_controller.update_strategies(strategies){ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import VADUserTurnStartStrategy, TranscriptionUserTurnStartStrategy
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
# Configure for services with good VAD (Deepgram, Daily)
vad_based_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.5)]
)
# Configure for services without VAD (OpenAI, Azure)
transcription_based_strategies = UserTurnStrategies(
start=[TranscriptionUserTurnStartStrategy(use_interim=True)],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.7)]
)
# Choose based on your STT service
turn_processor = UserTurnProcessor(
user_turn_strategies=vad_based_strategies # or transcription_based_strategies
){ .api }
from pipecat.audio.turn.smart_turn import LocalSmartTurnAnalyzerV3, SmartTurnParams
from pipecat.frames.frames import MetricsFrame
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
# Create analyzer
analyzer = LocalSmartTurnAnalyzerV3(
sample_rate=16000,
params=SmartTurnParams(
stop_secs=3.0,
pre_speech_ms=500,
max_duration_secs=8
)
)
# Use with turn strategy
strategies = UserTurnStrategies(
stop=[TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=analyzer,
timeout=0.5
)]
)
turn_processor = UserTurnProcessor(user_turn_strategies=strategies)
# Metrics are automatically emitted as MetricsFrame
# containing SmartTurnMetricsData with:
# - is_complete: bool
# - probability: float
# - inference_time_ms: float
# - server_total_time_ms: float
# - e2e_processing_time_ms: floatUnderstanding how frames flow through the turn management system helps debug and optimize turn detection.
VAD-based start detection:
InputAudioRawFrame
-> VAD Service
-> VADUserStartedSpeakingFrame
-> VADUserTurnStartStrategy
-> UserTurnController
-> UserTurnProcessor
-> UserStartedSpeakingFrame (broadcast)
-> InterruptionTaskFrame (if interruptions enabled)Transcription-based start detection:
InputAudioRawFrame
-> STT Service
-> InterimTranscriptionFrame or TranscriptionFrame
-> TranscriptionUserTurnStartStrategy
-> UserTurnController
-> UserTurnProcessor
-> UserStartedSpeakingFrame (broadcast)
-> InterruptionTaskFrame (if interruptions enabled)Min words start detection:
InputAudioRawFrame
-> STT Service
-> TranscriptionFrame
-> MinWordsUserTurnStartStrategy (counts words)
-> (triggers when word count >= min_words)
-> UserTurnController
-> UserTurnProcessor
-> UserStartedSpeakingFrame (broadcast)
-> InterruptionTaskFrameTranscription-based stop detection:
VADUserStoppedSpeakingFrame
-> TranscriptionUserTurnStopStrategy
-> (waits for final transcription + timeout)
TranscriptionFrame
-> TranscriptionUserTurnStopStrategy
-> (triggers after timeout with no more transcriptions)
-> UserTurnController
-> UserTurnProcessor
-> UserStoppedSpeakingFrame (broadcast)Turn analyzer-based stop detection:
InputAudioRawFrame
-> TurnAnalyzerUserTurnStopStrategy
-> BaseTurnAnalyzer.append_audio()
-> (accumulates audio)
VADUserStoppedSpeakingFrame
-> TurnAnalyzerUserTurnStopStrategy
-> BaseTurnAnalyzer.analyze_end_of_turn()
-> ML model prediction
-> MetricsFrame (with prediction data)
TranscriptionFrame
-> TurnAnalyzerUserTurnStopStrategy
-> (triggers if analyzer marked complete)
-> UserTurnController
-> UserTurnProcessor
-> UserStoppedSpeakingFrame (broadcast)Mute strategies process frames to determine user mute state:
Frame
-> LLMUserAggregator
-> UserMuteStrategy.process_frame()
-> returns bool (muted/unmuted)
-> (if muted) user frames suppressed
-> (if unmuted) user frames pass throughRelevant frames for mute strategies:
BotStartedSpeakingFrame / BotStoppedSpeakingFrame - Track bot stateFunctionCallsStartedFrame / FunctionCallResultFrame - Track function executionFunctionCallCancelFrame - Cancel function trackingTurn Analyzer Performance:
Strategy Performance Impact:
Common issues and solutions:
Turn starts too early (false positives):
Turn doesn't start (missed detection):
Turn stops too early:
Turn doesn't stop (hangs):
Interruptions don't work:
User input is always muted:
Debugging tools:
{ .api }
import logging
from loguru import logger
# Enable debug logging for turn management
logger.add("turn_debug.log", level="DEBUG", filter=lambda record: "turn" in record["name"].lower())
# Log strategy triggers
@turn_processor.event_handler("on_user_turn_started")
async def on_started(processor, strategy):
logger.debug(f"Turn started by {strategy.__class__.__name__}")
@turn_processor.event_handler("on_user_turn_stopped")
async def on_stopped(processor, strategy):
logger.debug(f"Turn stopped by {strategy.__class__.__name__}")
# Monitor frame flow
class DebugProcessor(FrameProcessor):
async def process_frame(self, frame, direction):
if "Speaking" in frame.__class__.__name__:
logger.debug(f"Frame: {frame.__class__.__name__}")
await self.push_frame(frame, direction)Verification checklist:
Different STT and VAD services have different characteristics. Here are recommended configurations for common services:
Deepgram provides excellent VAD, use it for turn start detection:
{ .api }
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
# Deepgram with VAD
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
vad_enabled=True,
interim_results=True,
)
# Use VAD for start, transcription for stop
strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.5)]
)
turn_processor = UserTurnProcessor(user_turn_strategies=strategies)Daily provides VAD through transport:
{ .api }
from pipecat.transports.services.daily import DailyTransport, DailyParams
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import VADUserTurnStartStrategy
# Daily transport with VAD
transport = DailyTransport(
room_url=room_url,
token=token,
bot_name="Assistant",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
)
# Use VAD from transport
strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.5)]
)OpenAI Realtime API manages turns externally:
{ .api }
from pipecat.services.openai import OpenAIRealtimeSTTService
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies
# OpenAI Realtime handles turns
stt = OpenAIRealtimeSTTService(api_key=os.getenv("OPENAI_API_KEY"))
# Use external strategies
strategies = ExternalUserTurnStrategies()
turn_processor = UserTurnProcessor(user_turn_strategies=strategies)Azure Speech without VAD requires transcription-based detection:
{ .api }
from pipecat.services.azure import AzureSTTService
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
# Azure STT
stt = AzureSTTService(
api_key=os.getenv("AZURE_API_KEY"),
region=os.getenv("AZURE_REGION"),
)
# Use transcription for both start and stop
strategies = UserTurnStrategies(
start=[TranscriptionUserTurnStartStrategy(use_interim=True)],
stop=[TranscriptionUserTurnStopStrategy(timeout=0.7)]
)
turn_processor = UserTurnProcessor(user_turn_strategies=strategies)For production applications requiring highest accuracy:
{ .api }
from pipecat.audio.turn.smart_turn import LocalSmartTurnAnalyzerV3, SmartTurnParams
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_start import VADUserTurnStartStrategy, MinWordsUserTurnStartStrategy
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
# STT with VAD
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
vad_enabled=True,
interim_results=True,
)
# ML-based turn analyzer
analyzer = LocalSmartTurnAnalyzerV3(
sample_rate=16000,
cpu_count=2,
params=SmartTurnParams(
stop_secs=3.0,
pre_speech_ms=500,
max_duration_secs=8
)
)
# Dual start detection + ML stop detection
strategies = UserTurnStrategies(
start=[
VADUserTurnStartStrategy(),
MinWordsUserTurnStartStrategy(min_words=2)
],
stop=[TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=analyzer,
timeout=0.5
)]
)
turn_processor = UserTurnProcessor(
user_turn_strategies=strategies,
user_turn_stop_timeout=5.0,
user_idle_timeout=30.0
)