docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
This document covers various utility modules in Pipecat including clocks, synchronization primitives, turn management, extensions, observers, transcriptions, and the runner module.
{ .api }
from pipecat.clocks.base_clock import BaseClock
class BaseClock(ABC):
"""Abstract base class for clock implementations.
Provides a common interface for timing operations used in Pipecat
for synchronization, scheduling, and time-based processing.
Methods:
get_time(): Get the current time value
start(): Start or initialize the clock
"""
@abstractmethod
def get_time(self) -> int:
"""Get the current time value.
Returns:
The current time as an integer value. The specific unit and
reference point depend on the concrete implementation
"""
pass
@abstractmethod
def start(self):
"""Start or initialize the clock.
Performs any necessary initialization or starts the timing mechanism.
This method should be called before using get_time()
"""
pass{ .api }
from pipecat.clocks.system_clock import SystemClock
class SystemClock(BaseClock):
"""A monotonic clock implementation using system time.
Provides high-precision timing using the system's monotonic clock,
which is not affected by system clock adjustments and is suitable
for measuring elapsed time in real-time applications.
Example:
clock = SystemClock()
clock.start() # Start the clock
# Later...
elapsed_ns = clock.get_time() # Get elapsed nanoseconds
"""
def __init__(self):
"""Initialize the system clock.
The clock starts in an uninitialized state and must be started
explicitly using the start() method before time measurement begins
"""
pass
def get_time(self) -> int:
"""Get the elapsed time since the clock was started.
Returns:
The elapsed time in nanoseconds since start() was called.
Returns 0 if the clock has not been started yet
"""
pass
def start(self):
"""Start the clock and begin time measurement.
Records the current monotonic time as the reference point
for all subsequent get_time() calls
"""
pass{ .api }
from pipecat.utils.sync.base_notifier import BaseNotifier
class BaseNotifier(ABC):
"""Abstract base class for notification mechanisms.
Provides a standard interface for implementing notification and waiting
patterns used for event coordination and signaling between components
in the Pipecat framework.
Methods:
notify(): Send a notification signal
wait(): Wait for a notification signal
"""
@abstractmethod
async def notify(self):
"""Send a notification signal.
Implementations should trigger any waiting coroutines or processes
that are blocked on this notifier
"""
pass
@abstractmethod
async def wait(self):
"""Wait for a notification signal.
Implementations should block until a notification is received
from the corresponding notify() call
"""
pass{ .api }
from pipecat.utils.sync.event_notifier import EventNotifier
class EventNotifier(BaseNotifier):
"""Event-based notifier using asyncio.Event for task synchronization.
Provides a simple notification mechanism where one task can signal
an event and other tasks can wait for that event to occur. The event
is automatically cleared after each wait operation.
Example:
notifier = EventNotifier()
# In one task
await notifier.wait() # Blocks until notified
# In another task
await notifier.notify() # Wakes up waiting task
"""
def __init__(self):
"""Initialize the event notifier.
Creates an internal asyncio.Event for managing notifications
"""
pass
async def notify(self):
"""Signal the event to notify waiting tasks.
Sets the internal event, causing any tasks waiting on this
notifier to be awakened
"""
pass
async def wait(self):
"""Wait for the event to be signaled.
Blocks until another task calls notify(). Automatically clears
the event after being awakened so subsequent calls will wait
for the next notification
"""
pass{ .api }
from pipecat.turns.user_turn_controller import UserTurnController
class UserTurnController(BaseObject):
"""Controller for managing user turn lifecycle.
This class manages user turn state (active/inactive), handles start and stop
strategies, and emits events when user turns begin, end, or timeout occurs.
Event Handlers:
on_user_turn_started: Emitted when a user turn starts
on_user_turn_stopped: Emitted when a user turn stops
on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout
on_push_frame: Emitted when a strategy wants to push a frame
on_broadcast_frame: Emitted when a strategy wants to broadcast a frame
Example:
from pipecat.turns.user_turn_strategies import UserTurnStrategies
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(...),
user_turn_stop_timeout=5.0
)
@controller.event_handler("on_user_turn_started")
async def on_started(controller, strategy, params):
print("User turn started")
@controller.event_handler("on_user_turn_stopped")
async def on_stopped(controller, strategy, params):
print("User turn stopped")
"""
def __init__(
self,
*,
user_turn_strategies: UserTurnStrategies,
user_turn_stop_timeout: float = 5.0,
):
"""Initialize the user turn controller.
Args:
user_turn_strategies: Configured strategies for starting and stopping user turns
user_turn_stop_timeout: Timeout in seconds to automatically stop a user turn
if no activity is detected
"""
pass
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the controller with the given task manager.
Args:
task_manager: The task manager to be associated with this instance
"""
pass
async def cleanup(self):
"""Cleanup the controller."""
pass
async def update_strategies(self, strategies: UserTurnStrategies):
"""Replace the current strategies with the given ones.
Args:
strategies: The new user turn strategies the controller should use
"""
pass
async def process_frame(self, frame: Frame):
"""Process an incoming frame to detect user turn start or stop.
The frame is passed to the configured user turn strategies, which are
responsible for deciding when a user turn starts or stops and emitting
the corresponding events.
Args:
frame: The frame to be processed
"""
pass{ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
class UserTurnProcessor(FrameProcessor):
"""Frame processor for managing the user turn lifecycle.
This processor uses a turn controller to determine when a user turn starts
or stops. The actual frames emitted (e.g., UserStartedSpeakingFrame,
UserStoppedSpeakingFrame) or interruptions depend on the configured
strategies.
Event Handlers:
on_user_turn_started: Emitted when a user turn starts
on_user_turn_stopped: Emitted when a user turn stops
on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout
on_user_turn_idle: Emitted when the user has been idle for the configured timeout
Example:
processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(...),
user_turn_stop_timeout=5.0,
user_idle_timeout=10.0
)
@processor.event_handler("on_user_turn_started")
async def on_started(processor, strategy):
print("User started speaking")
pipeline = Pipeline([
transport.input(),
processor,
llm,
transport.output()
])
"""
def __init__(
self,
*,
user_turn_strategies: Optional[UserTurnStrategies] = None,
user_turn_stop_timeout: float = 5.0,
user_idle_timeout: Optional[float] = None,
**kwargs,
):
"""Initialize the user turn processor.
Args:
user_turn_strategies: Configured strategies for starting and stopping user turns
user_turn_stop_timeout: Timeout in seconds to automatically stop a user turn
if no activity is detected
user_idle_timeout: Optional timeout in seconds for detecting user idle state.
If set, the processor will emit an `on_user_turn_idle` event when the user
has been idle (not speaking) for this duration. Set to None to disable
idle detection
**kwargs: Additional keyword arguments
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process an incoming frame to detect user turn start or stop.
The frame is passed to the user turn controlled which is responsible for
deciding when a user turn starts or stops and emitting the corresponding
events.
Args:
frame: The frame to be processed
direction: The direction of the incoming frame
"""
pass{ .api }
from pipecat.turns.user_idle_controller import UserIdleController
class UserIdleController(BaseObject):
"""Controller for managing user idle detection.
This class monitors user activity and triggers an event when the user has been
idle (not speaking) for a configured timeout period. It only starts monitoring
after the first conversation activity and does not trigger while the bot is
speaking or function calls are in progress.
Event Handlers:
on_user_turn_idle: Emitted when the user has been idle for the timeout period
Example:
controller = UserIdleController(user_idle_timeout=10.0)
@controller.event_handler("on_user_turn_idle")
async def on_idle(controller):
print("User has been idle - sending reminder")
await pipeline.queue_frame(TTSSpeakFrame("Are you still there?"))
"""
def __init__(
self,
*,
user_idle_timeout: float,
):
"""Initialize the user idle controller.
Args:
user_idle_timeout: Timeout in seconds before considering the user idle
"""
pass
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the controller with the given task manager.
Args:
task_manager: The task manager to be associated with this instance
"""
pass
async def cleanup(self):
"""Cleanup the controller."""
pass
async def process_frame(self, frame: Frame):
"""Process an incoming frame to track user activity state.
Args:
frame: The frame to be processed
"""
pass{ .api }
from pipecat.extensions.ivr import IVRNavigator, IVRStatus
class IVRNavigator(Pipeline):
"""Pipeline for automated IVR system navigation.
Orchestrates LLM-based IVR navigation by combining an LLM service with
IVR processing capabilities. Starts with mode classification to classify input
as conversation or IVR system.
Navigation Behavior:
- Detects conversation vs IVR systems automatically
- Navigates IVR menus using DTMF tones and verbal responses
- Provides event hooks for mode classification and status changes
- Developers control conversation handling via on_conversation_detected event
Event Handlers:
on_conversation_detected: Triggered when conversation (human) is detected
on_ivr_status_changed: Triggered when IVR status changes (detected, completed, stuck)
Example:
from pipecat.services.openai import OpenAILLMService
llm = OpenAILLMService(api_key="...")
navigator = IVRNavigator(
llm=llm,
ivr_prompt="Navigate to billing department",
ivr_vad_params=VADParams(stop_secs=2.0)
)
@navigator.event_handler("on_conversation_detected")
async def on_conversation(processor, conversation_history):
print("Human detected, switching to conversation mode")
# Switch to your conversation pipeline
@navigator.event_handler("on_ivr_status_changed")
async def on_status(processor, status):
if status == IVRStatus.COMPLETED:
print("IVR navigation completed")
elif status == IVRStatus.STUCK:
print("IVR navigation stuck, need assistance")
pipeline = Pipeline([
transport.input(),
stt,
navigator,
transport.output()
])
"""
CLASSIFIER_PROMPT = """...""" # See source for full prompt
IVR_NAVIGATION_BASE = """...""" # See source for full navigation instructions
def __init__(
self,
*,
llm: LLMService,
ivr_prompt: str,
ivr_vad_params: Optional[VADParams] = None,
):
"""Initialize the IVR navigator.
Args:
llm: LLM service for text generation and decision making
ivr_prompt: Navigation goal prompt integrated with IVR navigation instructions
ivr_vad_params: VAD parameters for IVR navigation mode. If None, defaults to VADParams(stop_secs=2.0)
"""
pass
def add_event_handler(self, event_name: str, handler):
"""Add event handler for IVR navigation events.
Args:
event_name: Event name ("on_conversation_detected", "on_ivr_status_changed")
handler: Async function called when event occurs
"""
pass{ .api }
from pipecat.extensions.ivr import IVRStatus
class IVRStatus(Enum):
"""Enumeration of IVR navigation status values.
These statuses are used to communicate the current state of IVR navigation
between the LLM and the IVR processing system.
"""
DETECTED = "detected" # IVR system detected
COMPLETED = "completed" # Navigation successfully completed
STUCK = "stuck" # Navigation stuck, needs assistance
WAIT = "wait" # Waiting for more information{ .api }
from pipecat.extensions.voicemail import VoicemailDetector
class VoicemailDetector(ParallelPipeline):
"""Parallel pipeline for detecting voicemail vs. live conversation in outbound calls.
This detector uses a parallel pipeline architecture to perform real-time
classification of outbound phone calls without interrupting the conversation
flow. It determines whether a human answered the phone or if the call went
to a voicemail system.
Architecture:
- Conversation branch: Empty pipeline that allows normal frame flow
- Classification branch: Contains the LLM classifier and decision logic
The system uses a gate mechanism to control when classification runs and
a gating system to prevent TTS output until classification is complete.
Event Handlers:
on_conversation_detected: Triggered when a human conversation is detected
on_voicemail_detected: Triggered when voicemail is detected after configured delay
Example:
from pipecat.services.openai import OpenAILLMService
classification_llm = OpenAILLMService(api_key="...")
detector = VoicemailDetector(
llm=classification_llm,
voicemail_response_delay=2.0
)
@detector.event_handler("on_voicemail_detected")
async def handle_voicemail(processor):
await processor.push_frame(
TTSSpeakFrame("Please leave a message after the beep.")
)
@detector.event_handler("on_conversation_detected")
async def handle_conversation(processor):
print("Human answered!")
pipeline = Pipeline([
transport.input(),
stt,
detector.detector(), # Classification
context_aggregator.user(),
llm,
tts,
detector.gate(), # TTS gating
transport.output(),
context_aggregator.assistant(),
])
"""
CLASSIFIER_RESPONSE_INSTRUCTION = 'Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it\'s voicemail/recording.'
DEFAULT_SYSTEM_PROMPT = """...""" # See source for full prompt
def __init__(
self,
*,
llm: LLMService,
voicemail_response_delay: float = 2.0,
custom_system_prompt: Optional[str] = None,
):
"""Initialize the voicemail detector with classification and buffering components.
Args:
llm: LLM service used for voicemail vs conversation classification.
Should be fast and reliable for real-time classification
voicemail_response_delay: Delay in seconds after user stops speaking
before triggering the voicemail event handler. This allows voicemail
responses to be played back after a short delay. Default is 2.0 seconds
custom_system_prompt: Optional custom system prompt for classification. If None,
uses the default prompt. Custom prompts should instruct the LLM to respond
with exactly "CONVERSATION" or "VOICEMAIL" for proper detection functionality
"""
pass
def detector(self) -> "VoicemailDetector":
"""Get the detector pipeline for placement after STT in the main pipeline.
Returns:
The VoicemailDetector instance itself (which is a ParallelPipeline)
"""
pass
def gate(self) -> TTSGate:
"""Get the gate processor for placement after TTS in the main pipeline.
Returns:
The TTSGate processor instance
"""
pass{ .api }
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
class BaseObserver(BaseObject):
"""Base class for pipeline frame observers.
Observers can view all frames that flow through the pipeline without
needing to inject processors into the pipeline structure. This enables
non-intrusive monitoring capabilities such as frame logging, debugging,
performance analysis, and analytics collection.
Methods:
on_process_frame(data): Handle frame processing events
on_push_frame(data): Handle frame push events
Example:
class LoggingObserver(BaseObserver):
async def on_push_frame(self, data: FramePushed):
print(f"Frame pushed: {type(data.frame).__name__}")
async def on_process_frame(self, data: FrameProcessed):
print(f"Frame processed: {type(data.frame).__name__}")
observer = LoggingObserver()
pipeline_params = PipelineParams(observer=observer)
"""
async def on_process_frame(self, data: FrameProcessed):
"""Handle the event when a frame is being processed by a processor.
Args:
data: The event data containing details about the frame processing
"""
pass
async def on_push_frame(self, data: FramePushed):
"""Handle the event when a frame is pushed from one processor to another.
Args:
data: The event data containing details about the frame transfer
"""
pass{ .api }
from pipecat.observers.base_observer import FramePushed
@dataclass
class FramePushed:
"""Event data for frame transfers between processors in the pipeline.
Parameters:
source: The processor sending the frame
destination: The processor receiving the frame
frame: The frame being transferred
direction: The direction of the transfer (e.g., downstream or upstream)
timestamp: The time when the frame was pushed, based on the pipeline clock
"""
source: FrameProcessor
destination: FrameProcessor
frame: Frame
direction: FrameDirection
timestamp: int{ .api }
from pipecat.observers.base_observer import FrameProcessed
@dataclass
class FrameProcessed:
"""Event data for frame processing in the pipeline.
Parameters:
processor: The processor processing the frame
frame: The frame being processed
direction: The direction of the frame (e.g., downstream or upstream)
timestamp: The time when the frame was pushed, based on the pipeline clock
"""
processor: FrameProcessor
frame: Frame
direction: FrameDirection
timestamp: int{ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
class TurnTrackingObserver(BaseObserver):
"""Observer that tracks conversation turns in a pipeline.
This observer monitors the flow of conversation by tracking when turns
start and end based on user and bot speaking patterns. It handles
interruptions, timeouts, and maintains turn state throughout the pipeline.
Turn Tracking Logic:
- The first turn starts immediately when the pipeline starts (StartFrame)
- Subsequent turns start when the user starts speaking
- A turn ends when the bot stops speaking and either:
- The user starts speaking again
- A timeout period elapses with no more bot speech
Event Handlers:
on_turn_started: Emitted when a new turn begins
on_turn_ended: Emitted when a turn completes or is interrupted
Example:
observer = TurnTrackingObserver(
max_frames=100,
turn_end_timeout_secs=2.5
)
@observer.event_handler("on_turn_started")
async def on_turn_started(observer, turn_count):
print(f"Turn {turn_count} started")
@observer.event_handler("on_turn_ended")
async def on_turn_ended(observer, turn_count, duration, was_interrupted):
status = "interrupted" if was_interrupted else "completed"
print(f"Turn {turn_count} {status} after {duration:.2f}s")
pipeline_params = PipelineParams(observer=observer)
"""
def __init__(self, max_frames=100, turn_end_timeout_secs=2.5, **kwargs):
"""Initialize the turn tracking observer.
Args:
max_frames: Maximum number of frame IDs to keep in history for
duplicate detection. Defaults to 100
turn_end_timeout_secs: Timeout in seconds after bot stops speaking
before automatically ending the turn. Defaults to 2.5
**kwargs: Additional arguments passed to the parent observer
"""
pass
async def on_push_frame(self, data: FramePushed):
"""Process frame events for turn tracking.
Args:
data: Frame push event data containing the frame and metadata
"""
pass{ .api }
from pipecat.transcriptions.language import Language
class Language(Enum):
"""Language codes for speech transcription and translation.
Supports 100+ language codes with ISO 639-1/639-3 format.
Common Languages:
EN - English
ES - Spanish
FR - French
DE - German
IT - Italian
PT - Portuguese
NL - Dutch
ZH - Chinese
JA - Japanese
KO - Korean
AR - Arabic
RU - Russian
HI - Hindi
Example:
from pipecat.transcriptions.language import Language
# Set STT language
stt = DeepgramSTTService(
api_key="...",
params=DeepgramSTTParams(language=Language.ES)
)
# In TranscriptionFrame
frame = TranscriptionFrame(
text="Hello",
language=Language.EN,
...
)
"""
EN = "en" # English
ES = "es" # Spanish
FR = "fr" # French
DE = "de" # German
IT = "it" # Italian
PT = "pt" # Portuguese
NL = "nl" # Dutch
ZH = "zh" # Chinese
JA = "ja" # Japanese
KO = "ko" # Korean
# ... 90+ more languages{ .api }
from pipecat.runner import PipelineRunner
class PipelineRunner(BaseObject):
"""Pipeline execution manager.
Manages the complete lifecycle of pipeline execution including setup,
execution, and cleanup. Provides a high-level interface for running
Pipecat bots.
Example:
runner = PipelineRunner()
async def run_bot():
async with runner.run_pipeline(pipeline) as task:
await task.wait_for_completion()
"""
pass{ .api }
from pipecat.runner.run import configure, run, main
async def configure() -> Dict[str, Any]:
"""Configure bot instance.
Returns a configuration dictionary for bot initialization.
Returns:
Configuration dictionary
"""
pass
async def run(transport, **kwargs):
"""Run bot with transport.
Args:
transport: Transport instance (Daily, WebSocket, etc.)
**kwargs: Additional configuration options
Example:
from pipecat.runner.run import run
from pipecat.transports.daily import DailyTransport
transport = DailyTransport(room_url, token, "Bot")
await run(transport)
"""
pass
def main():
"""Main entry point for development runner.
Discovers and executes the bot() function in your script.
Example:
async def bot(runner_args):
transport = DailyTransport(
runner_args.room_url,
runner_args.token,
"Bot"
)
# Your bot logic here
if __name__ == "__main__":
from pipecat.runner.run import main
main()
Command Line:
# WebRTC
python bot.py -t webrtc
# Daily
python bot.py -t daily
# Telephony
python bot.py -t twilio -x your_username.ngrok.io
"""
pass{ .api }
from pipecat.clocks.system_clock import SystemClock
# Create and start clock
clock = SystemClock()
clock.start()
# Use in pipeline
pipeline_params = PipelineParams(clock=clock)
# Measure elapsed time
elapsed_ns = clock.get_time()
elapsed_ms = elapsed_ns / 1_000_000{ .api }
from pipecat.utils.sync.event_notifier import EventNotifier
# Create notifier for coordination
gate_notifier = EventNotifier()
# Task 1: Wait for signal
async def waiter():
await gate_notifier.wait()
print("Notified!")
# Task 2: Send signal
async def notifier():
await asyncio.sleep(1)
await gate_notifier.notify()
# Run both tasks
await asyncio.gather(waiter(), notifier()){ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
# Create turn processor
processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(...),
user_turn_stop_timeout=5.0,
user_idle_timeout=10.0
)
# Handle events
@processor.event_handler("on_user_turn_started")
async def on_started(processor, strategy):
print("User started speaking")
# Optionally interrupt bot
@processor.event_handler("on_user_turn_stopped")
async def on_stopped(processor, strategy):
print("User stopped speaking")
# Process user input
@processor.event_handler("on_user_turn_idle")
async def on_idle(processor):
print("User idle - sending prompt")
await processor.push_frame(TTSSpeakFrame("Are you still there?"))
# Add to pipeline
pipeline = Pipeline([
transport.input(),
stt,
processor,
llm,
tts,
transport.output()
]){ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
# Create observer
observer = TurnTrackingObserver(
max_frames=100,
turn_end_timeout_secs=2.5
)
# Handle events
@observer.event_handler("on_turn_started")
async def on_turn_started(observer, turn_count):
print(f"Turn {turn_count} started")
@observer.event_handler("on_turn_ended")
async def on_turn_ended(observer, turn_count, duration, was_interrupted):
status = "interrupted" if was_interrupted else "completed"
print(f"Turn {turn_count} {status} after {duration:.2f}s")
# Use in pipeline
pipeline_params = PipelineParams(observer=observer)
task = PipelineTask(pipeline, params=pipeline_params){ .api }
from pipecat.extensions.ivr import IVRNavigator, IVRStatus
from pipecat.services.openai import OpenAILLMService
llm = OpenAILLMService(api_key="...")
navigator = IVRNavigator(
llm=llm,
ivr_prompt="Navigate to billing department"
)
@navigator.event_handler("on_conversation_detected")
async def on_conversation(processor, conversation_history):
print("Human detected!")
# Switch to conversation mode
@navigator.event_handler("on_ivr_status_changed")
async def on_status(processor, status):
if status == IVRStatus.COMPLETED:
print("Navigation completed")
elif status == IVRStatus.STUCK:
print("Navigation stuck")
pipeline = Pipeline([
transport.input(),
stt,
navigator,
transport.output()
]){ .api }
from pipecat.extensions.voicemail import VoicemailDetector
from pipecat.services.openai import OpenAILLMService
classification_llm = OpenAILLMService(api_key="...")
detector = VoicemailDetector(
llm=classification_llm,
voicemail_response_delay=2.0
)
@detector.event_handler("on_voicemail_detected")
async def handle_voicemail(processor):
await processor.push_frame(
TTSSpeakFrame("Please leave a message after the beep.")
)
@detector.event_handler("on_conversation_detected")
async def handle_conversation(processor):
print("Human answered!")
pipeline = Pipeline([
transport.input(),
stt,
detector.detector(), # Place after STT
context_aggregator.user(),
llm,
tts,
detector.gate(), # Place after TTS
transport.output(),
context_aggregator.assistant(),
]){ .api }
# Good: Use SystemClock for precise timing
from pipecat.clocks.system_clock import SystemClock
clock = SystemClock()
clock.start()
pipeline_params = PipelineParams(clock=clock)
# Bad: Using time.time() directly (less accurate)
import time
start = time.time() # Don't use for pipeline timing{ .api }
# Good: Use notifiers for clean coordination
gate_notifier = EventNotifier()
async def process_when_ready():
await gate_notifier.wait()
# Process only after notification
async def signal_ready():
# Do preparation
await gate_notifier.notify()
# Bad: Using global flags and polling
ready = False
async def poll_until_ready():
while not ready: # Wasteful polling
await asyncio.sleep(0.1){ .api }
# Good: Use turn events for state management
@processor.event_handler("on_user_turn_started")
async def on_started(processor, strategy):
# Clear previous state
context.clear_partial_responses()
# Handle interruption if needed
# Good: Use idle timeout for engagement
@processor.event_handler("on_user_turn_idle")
async def on_idle(processor):
await processor.push_frame(
TTSSpeakFrame("I'm here if you need anything else.")
)
# Bad: Ignoring turn events (missing important state changes)
# Don't ignore these events - they're crucial for conversation flow{ .api }
# Good: Use observers for non-intrusive monitoring
observer = TurnTrackingObserver()
@observer.event_handler("on_turn_ended")
async def log_turn(observer, turn_count, duration, was_interrupted):
# Log metrics without affecting pipeline
metrics.log("turn_duration", duration)
pipeline_params = PipelineParams(observer=observer)
# Bad: Injecting logging processors into pipeline
# Don't add processors just for logging - use observers{ .api }
# Good: Use provided prompts or include required instructions
detector = VoicemailDetector(llm=llm) # Uses default prompt
# Or with custom prompt
custom_prompt = """
Your custom logic here.
""" + VoicemailDetector.CLASSIFIER_RESPONSE_INSTRUCTION
detector = VoicemailDetector(
llm=llm,
custom_system_prompt=custom_prompt
)
# Bad: Custom prompt without response instructions
# This will cause detection to fail
detector = VoicemailDetector(
llm=llm,
custom_system_prompt="Classify this call." # Missing "CONVERSATION"/"VOICEMAIL"
)