or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

helpers.mddocs/utilities/

Utility Helpers

This document covers various utility modules in Pipecat including clocks, synchronization primitives, turn management, extensions, observers, transcriptions, and the runner module.

Clocks

BaseClock

{ .api }
from pipecat.clocks.base_clock import BaseClock

class BaseClock(ABC):
    """Abstract base class for clock implementations.

    Provides a common interface for timing operations used in Pipecat
    for synchronization, scheduling, and time-based processing.

    Methods:
        get_time(): Get the current time value
        start(): Start or initialize the clock
    """

    @abstractmethod
    def get_time(self) -> int:
        """Get the current time value.

        Returns:
            The current time as an integer value. The specific unit and
            reference point depend on the concrete implementation
        """
        pass

    @abstractmethod
    def start(self):
        """Start or initialize the clock.

        Performs any necessary initialization or starts the timing mechanism.
        This method should be called before using get_time()
        """
        pass

SystemClock

{ .api }
from pipecat.clocks.system_clock import SystemClock

class SystemClock(BaseClock):
    """A monotonic clock implementation using system time.

    Provides high-precision timing using the system's monotonic clock,
    which is not affected by system clock adjustments and is suitable
    for measuring elapsed time in real-time applications.

    Example:
        clock = SystemClock()
        clock.start()  # Start the clock

        # Later...
        elapsed_ns = clock.get_time()  # Get elapsed nanoseconds
    """

    def __init__(self):
        """Initialize the system clock.

        The clock starts in an uninitialized state and must be started
        explicitly using the start() method before time measurement begins
        """
        pass

    def get_time(self) -> int:
        """Get the elapsed time since the clock was started.

        Returns:
            The elapsed time in nanoseconds since start() was called.
            Returns 0 if the clock has not been started yet
        """
        pass

    def start(self):
        """Start the clock and begin time measurement.

        Records the current monotonic time as the reference point
        for all subsequent get_time() calls
        """
        pass

Synchronization Primitives

BaseNotifier

{ .api }
from pipecat.utils.sync.base_notifier import BaseNotifier

class BaseNotifier(ABC):
    """Abstract base class for notification mechanisms.

    Provides a standard interface for implementing notification and waiting
    patterns used for event coordination and signaling between components
    in the Pipecat framework.

    Methods:
        notify(): Send a notification signal
        wait(): Wait for a notification signal
    """

    @abstractmethod
    async def notify(self):
        """Send a notification signal.

        Implementations should trigger any waiting coroutines or processes
        that are blocked on this notifier
        """
        pass

    @abstractmethod
    async def wait(self):
        """Wait for a notification signal.

        Implementations should block until a notification is received
        from the corresponding notify() call
        """
        pass

EventNotifier

{ .api }
from pipecat.utils.sync.event_notifier import EventNotifier

class EventNotifier(BaseNotifier):
    """Event-based notifier using asyncio.Event for task synchronization.

    Provides a simple notification mechanism where one task can signal
    an event and other tasks can wait for that event to occur. The event
    is automatically cleared after each wait operation.

    Example:
        notifier = EventNotifier()

        # In one task
        await notifier.wait()  # Blocks until notified

        # In another task
        await notifier.notify()  # Wakes up waiting task
    """

    def __init__(self):
        """Initialize the event notifier.

        Creates an internal asyncio.Event for managing notifications
        """
        pass

    async def notify(self):
        """Signal the event to notify waiting tasks.

        Sets the internal event, causing any tasks waiting on this
        notifier to be awakened
        """
        pass

    async def wait(self):
        """Wait for the event to be signaled.

        Blocks until another task calls notify(). Automatically clears
        the event after being awakened so subsequent calls will wait
        for the next notification
        """
        pass

Turn Management

UserTurnController

{ .api }
from pipecat.turns.user_turn_controller import UserTurnController

class UserTurnController(BaseObject):
    """Controller for managing user turn lifecycle.

    This class manages user turn state (active/inactive), handles start and stop
    strategies, and emits events when user turns begin, end, or timeout occurs.

    Event Handlers:
        on_user_turn_started: Emitted when a user turn starts
        on_user_turn_stopped: Emitted when a user turn stops
        on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout
        on_push_frame: Emitted when a strategy wants to push a frame
        on_broadcast_frame: Emitted when a strategy wants to broadcast a frame

    Example:
        from pipecat.turns.user_turn_strategies import UserTurnStrategies

        controller = UserTurnController(
            user_turn_strategies=UserTurnStrategies(...),
            user_turn_stop_timeout=5.0
        )

        @controller.event_handler("on_user_turn_started")
        async def on_started(controller, strategy, params):
            print("User turn started")

        @controller.event_handler("on_user_turn_stopped")
        async def on_stopped(controller, strategy, params):
            print("User turn stopped")
    """

    def __init__(
        self,
        *,
        user_turn_strategies: UserTurnStrategies,
        user_turn_stop_timeout: float = 5.0,
    ):
        """Initialize the user turn controller.

        Args:
            user_turn_strategies: Configured strategies for starting and stopping user turns
            user_turn_stop_timeout: Timeout in seconds to automatically stop a user turn
                if no activity is detected
        """
        pass

    async def setup(self, task_manager: BaseTaskManager):
        """Initialize the controller with the given task manager.

        Args:
            task_manager: The task manager to be associated with this instance
        """
        pass

    async def cleanup(self):
        """Cleanup the controller."""
        pass

    async def update_strategies(self, strategies: UserTurnStrategies):
        """Replace the current strategies with the given ones.

        Args:
            strategies: The new user turn strategies the controller should use
        """
        pass

    async def process_frame(self, frame: Frame):
        """Process an incoming frame to detect user turn start or stop.

        The frame is passed to the configured user turn strategies, which are
        responsible for deciding when a user turn starts or stops and emitting
        the corresponding events.

        Args:
            frame: The frame to be processed
        """
        pass

UserTurnProcessor

{ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor

class UserTurnProcessor(FrameProcessor):
    """Frame processor for managing the user turn lifecycle.

    This processor uses a turn controller to determine when a user turn starts
    or stops. The actual frames emitted (e.g., UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame) or interruptions depend on the configured
    strategies.

    Event Handlers:
        on_user_turn_started: Emitted when a user turn starts
        on_user_turn_stopped: Emitted when a user turn stops
        on_user_turn_stop_timeout: Emitted if no stop strategy triggers before timeout
        on_user_turn_idle: Emitted when the user has been idle for the configured timeout

    Example:
        processor = UserTurnProcessor(
            user_turn_strategies=UserTurnStrategies(...),
            user_turn_stop_timeout=5.0,
            user_idle_timeout=10.0
        )

        @processor.event_handler("on_user_turn_started")
        async def on_started(processor, strategy):
            print("User started speaking")

        pipeline = Pipeline([
            transport.input(),
            processor,
            llm,
            transport.output()
        ])
    """

    def __init__(
        self,
        *,
        user_turn_strategies: Optional[UserTurnStrategies] = None,
        user_turn_stop_timeout: float = 5.0,
        user_idle_timeout: Optional[float] = None,
        **kwargs,
    ):
        """Initialize the user turn processor.

        Args:
            user_turn_strategies: Configured strategies for starting and stopping user turns
            user_turn_stop_timeout: Timeout in seconds to automatically stop a user turn
                if no activity is detected
            user_idle_timeout: Optional timeout in seconds for detecting user idle state.
                If set, the processor will emit an `on_user_turn_idle` event when the user
                has been idle (not speaking) for this duration. Set to None to disable
                idle detection
            **kwargs: Additional keyword arguments
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process an incoming frame to detect user turn start or stop.

        The frame is passed to the user turn controlled which is responsible for
        deciding when a user turn starts or stops and emitting the corresponding
        events.

        Args:
            frame: The frame to be processed
            direction: The direction of the incoming frame
        """
        pass

UserIdleController

{ .api }
from pipecat.turns.user_idle_controller import UserIdleController

class UserIdleController(BaseObject):
    """Controller for managing user idle detection.

    This class monitors user activity and triggers an event when the user has been
    idle (not speaking) for a configured timeout period. It only starts monitoring
    after the first conversation activity and does not trigger while the bot is
    speaking or function calls are in progress.

    Event Handlers:
        on_user_turn_idle: Emitted when the user has been idle for the timeout period

    Example:
        controller = UserIdleController(user_idle_timeout=10.0)

        @controller.event_handler("on_user_turn_idle")
        async def on_idle(controller):
            print("User has been idle - sending reminder")
            await pipeline.queue_frame(TTSSpeakFrame("Are you still there?"))
    """

    def __init__(
        self,
        *,
        user_idle_timeout: float,
    ):
        """Initialize the user idle controller.

        Args:
            user_idle_timeout: Timeout in seconds before considering the user idle
        """
        pass

    async def setup(self, task_manager: BaseTaskManager):
        """Initialize the controller with the given task manager.

        Args:
            task_manager: The task manager to be associated with this instance
        """
        pass

    async def cleanup(self):
        """Cleanup the controller."""
        pass

    async def process_frame(self, frame: Frame):
        """Process an incoming frame to track user activity state.

        Args:
            frame: The frame to be processed
        """
        pass

Extensions

IVRNavigator

{ .api }
from pipecat.extensions.ivr import IVRNavigator, IVRStatus

class IVRNavigator(Pipeline):
    """Pipeline for automated IVR system navigation.

    Orchestrates LLM-based IVR navigation by combining an LLM service with
    IVR processing capabilities. Starts with mode classification to classify input
    as conversation or IVR system.

    Navigation Behavior:
        - Detects conversation vs IVR systems automatically
        - Navigates IVR menus using DTMF tones and verbal responses
        - Provides event hooks for mode classification and status changes
        - Developers control conversation handling via on_conversation_detected event

    Event Handlers:
        on_conversation_detected: Triggered when conversation (human) is detected
        on_ivr_status_changed: Triggered when IVR status changes (detected, completed, stuck)

    Example:
        from pipecat.services.openai import OpenAILLMService

        llm = OpenAILLMService(api_key="...")

        navigator = IVRNavigator(
            llm=llm,
            ivr_prompt="Navigate to billing department",
            ivr_vad_params=VADParams(stop_secs=2.0)
        )

        @navigator.event_handler("on_conversation_detected")
        async def on_conversation(processor, conversation_history):
            print("Human detected, switching to conversation mode")
            # Switch to your conversation pipeline

        @navigator.event_handler("on_ivr_status_changed")
        async def on_status(processor, status):
            if status == IVRStatus.COMPLETED:
                print("IVR navigation completed")
            elif status == IVRStatus.STUCK:
                print("IVR navigation stuck, need assistance")

        pipeline = Pipeline([
            transport.input(),
            stt,
            navigator,
            transport.output()
        ])
    """

    CLASSIFIER_PROMPT = """..."""  # See source for full prompt
    IVR_NAVIGATION_BASE = """..."""  # See source for full navigation instructions

    def __init__(
        self,
        *,
        llm: LLMService,
        ivr_prompt: str,
        ivr_vad_params: Optional[VADParams] = None,
    ):
        """Initialize the IVR navigator.

        Args:
            llm: LLM service for text generation and decision making
            ivr_prompt: Navigation goal prompt integrated with IVR navigation instructions
            ivr_vad_params: VAD parameters for IVR navigation mode. If None, defaults to VADParams(stop_secs=2.0)
        """
        pass

    def add_event_handler(self, event_name: str, handler):
        """Add event handler for IVR navigation events.

        Args:
            event_name: Event name ("on_conversation_detected", "on_ivr_status_changed")
            handler: Async function called when event occurs
        """
        pass

IVRStatus

{ .api }
from pipecat.extensions.ivr import IVRStatus

class IVRStatus(Enum):
    """Enumeration of IVR navigation status values.

    These statuses are used to communicate the current state of IVR navigation
    between the LLM and the IVR processing system.
    """

    DETECTED = "detected"    # IVR system detected
    COMPLETED = "completed"  # Navigation successfully completed
    STUCK = "stuck"          # Navigation stuck, needs assistance
    WAIT = "wait"            # Waiting for more information

VoicemailDetector

{ .api }
from pipecat.extensions.voicemail import VoicemailDetector

class VoicemailDetector(ParallelPipeline):
    """Parallel pipeline for detecting voicemail vs. live conversation in outbound calls.

    This detector uses a parallel pipeline architecture to perform real-time
    classification of outbound phone calls without interrupting the conversation
    flow. It determines whether a human answered the phone or if the call went
    to a voicemail system.

    Architecture:
        - Conversation branch: Empty pipeline that allows normal frame flow
        - Classification branch: Contains the LLM classifier and decision logic

    The system uses a gate mechanism to control when classification runs and
    a gating system to prevent TTS output until classification is complete.

    Event Handlers:
        on_conversation_detected: Triggered when a human conversation is detected
        on_voicemail_detected: Triggered when voicemail is detected after configured delay

    Example:
        from pipecat.services.openai import OpenAILLMService

        classification_llm = OpenAILLMService(api_key="...")
        detector = VoicemailDetector(
            llm=classification_llm,
            voicemail_response_delay=2.0
        )

        @detector.event_handler("on_voicemail_detected")
        async def handle_voicemail(processor):
            await processor.push_frame(
                TTSSpeakFrame("Please leave a message after the beep.")
            )

        @detector.event_handler("on_conversation_detected")
        async def handle_conversation(processor):
            print("Human answered!")

        pipeline = Pipeline([
            transport.input(),
            stt,
            detector.detector(),          # Classification
            context_aggregator.user(),
            llm,
            tts,
            detector.gate(),              # TTS gating
            transport.output(),
            context_aggregator.assistant(),
        ])
    """

    CLASSIFIER_RESPONSE_INSTRUCTION = 'Respond with ONLY "CONVERSATION" if a person answered, or "VOICEMAIL" if it\'s voicemail/recording.'
    DEFAULT_SYSTEM_PROMPT = """..."""  # See source for full prompt

    def __init__(
        self,
        *,
        llm: LLMService,
        voicemail_response_delay: float = 2.0,
        custom_system_prompt: Optional[str] = None,
    ):
        """Initialize the voicemail detector with classification and buffering components.

        Args:
            llm: LLM service used for voicemail vs conversation classification.
                Should be fast and reliable for real-time classification
            voicemail_response_delay: Delay in seconds after user stops speaking
                before triggering the voicemail event handler. This allows voicemail
                responses to be played back after a short delay. Default is 2.0 seconds
            custom_system_prompt: Optional custom system prompt for classification. If None,
                uses the default prompt. Custom prompts should instruct the LLM to respond
                with exactly "CONVERSATION" or "VOICEMAIL" for proper detection functionality
        """
        pass

    def detector(self) -> "VoicemailDetector":
        """Get the detector pipeline for placement after STT in the main pipeline.

        Returns:
            The VoicemailDetector instance itself (which is a ParallelPipeline)
        """
        pass

    def gate(self) -> TTSGate:
        """Get the gate processor for placement after TTS in the main pipeline.

        Returns:
            The TTSGate processor instance
        """
        pass

Observers

BaseObserver

{ .api }
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed

class BaseObserver(BaseObject):
    """Base class for pipeline frame observers.

    Observers can view all frames that flow through the pipeline without
    needing to inject processors into the pipeline structure. This enables
    non-intrusive monitoring capabilities such as frame logging, debugging,
    performance analysis, and analytics collection.

    Methods:
        on_process_frame(data): Handle frame processing events
        on_push_frame(data): Handle frame push events

    Example:
        class LoggingObserver(BaseObserver):
            async def on_push_frame(self, data: FramePushed):
                print(f"Frame pushed: {type(data.frame).__name__}")

            async def on_process_frame(self, data: FrameProcessed):
                print(f"Frame processed: {type(data.frame).__name__}")

        observer = LoggingObserver()
        pipeline_params = PipelineParams(observer=observer)
    """

    async def on_process_frame(self, data: FrameProcessed):
        """Handle the event when a frame is being processed by a processor.

        Args:
            data: The event data containing details about the frame processing
        """
        pass

    async def on_push_frame(self, data: FramePushed):
        """Handle the event when a frame is pushed from one processor to another.

        Args:
            data: The event data containing details about the frame transfer
        """
        pass

FramePushed

{ .api }
from pipecat.observers.base_observer import FramePushed

@dataclass
class FramePushed:
    """Event data for frame transfers between processors in the pipeline.

    Parameters:
        source: The processor sending the frame
        destination: The processor receiving the frame
        frame: The frame being transferred
        direction: The direction of the transfer (e.g., downstream or upstream)
        timestamp: The time when the frame was pushed, based on the pipeline clock
    """

    source: FrameProcessor
    destination: FrameProcessor
    frame: Frame
    direction: FrameDirection
    timestamp: int

FrameProcessed

{ .api }
from pipecat.observers.base_observer import FrameProcessed

@dataclass
class FrameProcessed:
    """Event data for frame processing in the pipeline.

    Parameters:
        processor: The processor processing the frame
        frame: The frame being processed
        direction: The direction of the frame (e.g., downstream or upstream)
        timestamp: The time when the frame was pushed, based on the pipeline clock
    """

    processor: FrameProcessor
    frame: Frame
    direction: FrameDirection
    timestamp: int

TurnTrackingObserver

{ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver

class TurnTrackingObserver(BaseObserver):
    """Observer that tracks conversation turns in a pipeline.

    This observer monitors the flow of conversation by tracking when turns
    start and end based on user and bot speaking patterns. It handles
    interruptions, timeouts, and maintains turn state throughout the pipeline.

    Turn Tracking Logic:
        - The first turn starts immediately when the pipeline starts (StartFrame)
        - Subsequent turns start when the user starts speaking
        - A turn ends when the bot stops speaking and either:
          - The user starts speaking again
          - A timeout period elapses with no more bot speech

    Event Handlers:
        on_turn_started: Emitted when a new turn begins
        on_turn_ended: Emitted when a turn completes or is interrupted

    Example:
        observer = TurnTrackingObserver(
            max_frames=100,
            turn_end_timeout_secs=2.5
        )

        @observer.event_handler("on_turn_started")
        async def on_turn_started(observer, turn_count):
            print(f"Turn {turn_count} started")

        @observer.event_handler("on_turn_ended")
        async def on_turn_ended(observer, turn_count, duration, was_interrupted):
            status = "interrupted" if was_interrupted else "completed"
            print(f"Turn {turn_count} {status} after {duration:.2f}s")

        pipeline_params = PipelineParams(observer=observer)
    """

    def __init__(self, max_frames=100, turn_end_timeout_secs=2.5, **kwargs):
        """Initialize the turn tracking observer.

        Args:
            max_frames: Maximum number of frame IDs to keep in history for
                duplicate detection. Defaults to 100
            turn_end_timeout_secs: Timeout in seconds after bot stops speaking
                before automatically ending the turn. Defaults to 2.5
            **kwargs: Additional arguments passed to the parent observer
        """
        pass

    async def on_push_frame(self, data: FramePushed):
        """Process frame events for turn tracking.

        Args:
            data: Frame push event data containing the frame and metadata
        """
        pass

Transcriptions

Language

{ .api }
from pipecat.transcriptions.language import Language

class Language(Enum):
    """Language codes for speech transcription and translation.

    Supports 100+ language codes with ISO 639-1/639-3 format.

    Common Languages:
        EN - English
        ES - Spanish
        FR - French
        DE - German
        IT - Italian
        PT - Portuguese
        NL - Dutch
        ZH - Chinese
        JA - Japanese
        KO - Korean
        AR - Arabic
        RU - Russian
        HI - Hindi

    Example:
        from pipecat.transcriptions.language import Language

        # Set STT language
        stt = DeepgramSTTService(
            api_key="...",
            params=DeepgramSTTParams(language=Language.ES)
        )

        # In TranscriptionFrame
        frame = TranscriptionFrame(
            text="Hello",
            language=Language.EN,
            ...
        )
    """

    EN = "en"       # English
    ES = "es"       # Spanish
    FR = "fr"       # French
    DE = "de"       # German
    IT = "it"       # Italian
    PT = "pt"       # Portuguese
    NL = "nl"       # Dutch
    ZH = "zh"       # Chinese
    JA = "ja"       # Japanese
    KO = "ko"       # Korean
    # ... 90+ more languages

Runner Module

PipelineRunner

{ .api }
from pipecat.runner import PipelineRunner

class PipelineRunner(BaseObject):
    """Pipeline execution manager.

    Manages the complete lifecycle of pipeline execution including setup,
    execution, and cleanup. Provides a high-level interface for running
    Pipecat bots.

    Example:
        runner = PipelineRunner()

        async def run_bot():
            async with runner.run_pipeline(pipeline) as task:
                await task.wait_for_completion()
    """
    pass

Development Runner Functions

{ .api }
from pipecat.runner.run import configure, run, main

async def configure() -> Dict[str, Any]:
    """Configure bot instance.

    Returns a configuration dictionary for bot initialization.

    Returns:
        Configuration dictionary
    """
    pass

async def run(transport, **kwargs):
    """Run bot with transport.

    Args:
        transport: Transport instance (Daily, WebSocket, etc.)
        **kwargs: Additional configuration options

    Example:
        from pipecat.runner.run import run
        from pipecat.transports.daily import DailyTransport

        transport = DailyTransport(room_url, token, "Bot")
        await run(transport)
    """
    pass

def main():
    """Main entry point for development runner.

    Discovers and executes the bot() function in your script.

    Example:
        async def bot(runner_args):
            transport = DailyTransport(
                runner_args.room_url,
                runner_args.token,
                "Bot"
            )
            # Your bot logic here

        if __name__ == "__main__":
            from pipecat.runner.run import main
            main()

    Command Line:
        # WebRTC
        python bot.py -t webrtc

        # Daily
        python bot.py -t daily

        # Telephony
        python bot.py -t twilio -x your_username.ngrok.io
    """
    pass

Usage Patterns

Clock Usage

{ .api }
from pipecat.clocks.system_clock import SystemClock

# Create and start clock
clock = SystemClock()
clock.start()

# Use in pipeline
pipeline_params = PipelineParams(clock=clock)

# Measure elapsed time
elapsed_ns = clock.get_time()
elapsed_ms = elapsed_ns / 1_000_000

Notifier Usage

{ .api }
from pipecat.utils.sync.event_notifier import EventNotifier

# Create notifier for coordination
gate_notifier = EventNotifier()

# Task 1: Wait for signal
async def waiter():
    await gate_notifier.wait()
    print("Notified!")

# Task 2: Send signal
async def notifier():
    await asyncio.sleep(1)
    await gate_notifier.notify()

# Run both tasks
await asyncio.gather(waiter(), notifier())

Turn Management Usage

{ .api }
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies

# Create turn processor
processor = UserTurnProcessor(
    user_turn_strategies=UserTurnStrategies(...),
    user_turn_stop_timeout=5.0,
    user_idle_timeout=10.0
)

# Handle events
@processor.event_handler("on_user_turn_started")
async def on_started(processor, strategy):
    print("User started speaking")
    # Optionally interrupt bot

@processor.event_handler("on_user_turn_stopped")
async def on_stopped(processor, strategy):
    print("User stopped speaking")
    # Process user input

@processor.event_handler("on_user_turn_idle")
async def on_idle(processor):
    print("User idle - sending prompt")
    await processor.push_frame(TTSSpeakFrame("Are you still there?"))

# Add to pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    processor,
    llm,
    tts,
    transport.output()
])

Observer Usage

{ .api }
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver

# Create observer
observer = TurnTrackingObserver(
    max_frames=100,
    turn_end_timeout_secs=2.5
)

# Handle events
@observer.event_handler("on_turn_started")
async def on_turn_started(observer, turn_count):
    print(f"Turn {turn_count} started")

@observer.event_handler("on_turn_ended")
async def on_turn_ended(observer, turn_count, duration, was_interrupted):
    status = "interrupted" if was_interrupted else "completed"
    print(f"Turn {turn_count} {status} after {duration:.2f}s")

# Use in pipeline
pipeline_params = PipelineParams(observer=observer)
task = PipelineTask(pipeline, params=pipeline_params)

IVR Navigation Usage

{ .api }
from pipecat.extensions.ivr import IVRNavigator, IVRStatus
from pipecat.services.openai import OpenAILLMService

llm = OpenAILLMService(api_key="...")

navigator = IVRNavigator(
    llm=llm,
    ivr_prompt="Navigate to billing department"
)

@navigator.event_handler("on_conversation_detected")
async def on_conversation(processor, conversation_history):
    print("Human detected!")
    # Switch to conversation mode

@navigator.event_handler("on_ivr_status_changed")
async def on_status(processor, status):
    if status == IVRStatus.COMPLETED:
        print("Navigation completed")
    elif status == IVRStatus.STUCK:
        print("Navigation stuck")

pipeline = Pipeline([
    transport.input(),
    stt,
    navigator,
    transport.output()
])

Voicemail Detection Usage

{ .api }
from pipecat.extensions.voicemail import VoicemailDetector
from pipecat.services.openai import OpenAILLMService

classification_llm = OpenAILLMService(api_key="...")
detector = VoicemailDetector(
    llm=classification_llm,
    voicemail_response_delay=2.0
)

@detector.event_handler("on_voicemail_detected")
async def handle_voicemail(processor):
    await processor.push_frame(
        TTSSpeakFrame("Please leave a message after the beep.")
    )

@detector.event_handler("on_conversation_detected")
async def handle_conversation(processor):
    print("Human answered!")

pipeline = Pipeline([
    transport.input(),
    stt,
    detector.detector(),          # Place after STT
    context_aggregator.user(),
    llm,
    tts,
    detector.gate(),              # Place after TTS
    transport.output(),
    context_aggregator.assistant(),
])

Best Practices

Use System Clock for Timing

{ .api }
# Good: Use SystemClock for precise timing
from pipecat.clocks.system_clock import SystemClock

clock = SystemClock()
clock.start()
pipeline_params = PipelineParams(clock=clock)

# Bad: Using time.time() directly (less accurate)
import time
start = time.time()  # Don't use for pipeline timing

Coordinate with Notifiers

{ .api }
# Good: Use notifiers for clean coordination
gate_notifier = EventNotifier()

async def process_when_ready():
    await gate_notifier.wait()
    # Process only after notification

async def signal_ready():
    # Do preparation
    await gate_notifier.notify()

# Bad: Using global flags and polling
ready = False
async def poll_until_ready():
    while not ready:  # Wasteful polling
        await asyncio.sleep(0.1)

Handle Turn Events Properly

{ .api }
# Good: Use turn events for state management
@processor.event_handler("on_user_turn_started")
async def on_started(processor, strategy):
    # Clear previous state
    context.clear_partial_responses()
    # Handle interruption if needed

# Good: Use idle timeout for engagement
@processor.event_handler("on_user_turn_idle")
async def on_idle(processor):
    await processor.push_frame(
        TTSSpeakFrame("I'm here if you need anything else.")
    )

# Bad: Ignoring turn events (missing important state changes)
# Don't ignore these events - they're crucial for conversation flow

Use Observers for Monitoring

{ .api }
# Good: Use observers for non-intrusive monitoring
observer = TurnTrackingObserver()

@observer.event_handler("on_turn_ended")
async def log_turn(observer, turn_count, duration, was_interrupted):
    # Log metrics without affecting pipeline
    metrics.log("turn_duration", duration)

pipeline_params = PipelineParams(observer=observer)

# Bad: Injecting logging processors into pipeline
# Don't add processors just for logging - use observers

Validate Extension Prompts

{ .api }
# Good: Use provided prompts or include required instructions
detector = VoicemailDetector(llm=llm)  # Uses default prompt

# Or with custom prompt
custom_prompt = """
Your custom logic here.
""" + VoicemailDetector.CLASSIFIER_RESPONSE_INSTRUCTION

detector = VoicemailDetector(
    llm=llm,
    custom_system_prompt=custom_prompt
)

# Bad: Custom prompt without response instructions
# This will cause detection to fail
detector = VoicemailDetector(
    llm=llm,
    custom_system_prompt="Classify this call."  # Missing "CONVERSATION"/"VOICEMAIL"
)