or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

interaction-frames.mddocs/frames/

Interaction Frames

Interaction frames track user and bot state changes during conversations. They signal when the user or bot starts/stops speaking, handle interruptions, and manage turn-taking in conversational flows.

User Speech Frames

Frames signaling user speech activity.

UserStartedSpeakingFrame

{ .api }
from pipecat.frames.frames import UserStartedSpeakingFrame

class UserStartedSpeakingFrame(SystemFrame):
    """User began speaking.

    Emitted when the user starts speaking. This is a SystemFrame
    for immediate processing to enable fast interruption handling.

    Used by:
    - Transport VAD detection
    - Manual user speech tracking
    - Interruption triggers

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted by transport when user speech detected
        frame = UserStartedSpeakingFrame()
        await task.queue_frame(frame)
    """
    pass

UserStoppedSpeakingFrame

{ .api }
from pipecat.frames.frames import UserStoppedSpeakingFrame

class UserStoppedSpeakingFrame(SystemFrame):
    """User stopped speaking.

    Emitted when the user stops speaking. Marks the end of a
    user utterance, typically followed by STT processing completion.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted by transport after user speech ends
        frame = UserStoppedSpeakingFrame()
        await task.queue_frame(frame)
    """
    pass

UserSpeakingFrame

{ .api }
from pipecat.frames.frames import UserSpeakingFrame

class UserSpeakingFrame(SystemFrame):
    """User is speaking (continuous signal).

    Emitted periodically while the user is speaking. Can be used
    for UI updates or monitoring speech duration.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted by transport during active speech
        frame = UserSpeakingFrame()
        await task.queue_frame(frame)
    """
    pass

VADUserStartedSpeakingFrame

{ .api }
from pipecat.frames.frames import VADUserStartedSpeakingFrame

class VADUserStartedSpeakingFrame(SystemFrame):
    """VAD-detected speech start.

    Emitted by Voice Activity Detection when speech is detected.
    More specific than UserStartedSpeakingFrame, explicitly indicating
    VAD as the detection source.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted by VAD analyzer
        from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer

        vad = SileroVADAnalyzer()
        # VAD emits this frame when speech detected
    """
    pass

VADUserStoppedSpeakingFrame

{ .api }
from pipecat.frames.frames import VADUserStoppedSpeakingFrame

class VADUserStoppedSpeakingFrame(SystemFrame):
    """VAD-detected speech end.

    Emitted by Voice Activity Detection when speech ends.
    Explicitly indicates VAD as the detection source.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted by VAD analyzer
        # Triggers STT processing completion
    """
    pass

Emulated User Speech Frames

Frames for testing or programmatic user speech simulation.

EmulateUserStartedSpeakingFrame

{ .api }
from pipecat.frames.frames import EmulateUserStartedSpeakingFrame

class EmulateUserStartedSpeakingFrame(SystemFrame):
    """Emulated user speech start.

    Simulates user starting to speak without actual audio input.
    Useful for testing interruption handling or programmatic control.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Test interruption behavior
        frame = EmulateUserStartedSpeakingFrame()
        await task.queue_frame(frame)
        # Pipeline treats this like real user speech
    """
    pass

EmulateUserStoppedSpeakingFrame

{ .api }
from pipecat.frames.frames import EmulateUserStoppedSpeakingFrame

class EmulateUserStoppedSpeakingFrame(SystemFrame):
    """Emulated user speech end.

    Simulates user stopping speaking without actual audio input.
    Pairs with EmulateUserStartedSpeakingFrame for testing.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Complete emulated user turn
        await task.queue_frame(EmulateUserStartedSpeakingFrame())
        # ... send text or other input ...
        await task.queue_frame(EmulateUserStoppedSpeakingFrame())
    """
    pass

Bot Speech Frames

Frames signaling bot speech activity.

BotStartedSpeakingFrame

{ .api }
from pipecat.frames.frames import BotStartedSpeakingFrame

class BotStartedSpeakingFrame(SystemFrame):
    """Bot started speaking.

    Emitted when the bot begins speaking (TTS output starts).
    Used for UI updates, analytics, and turn tracking.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted by TTS service when synthesis starts
        @tts_service.event_handler("on_tts_started")
        async def handle_tts_start():
            await task.queue_frame(BotStartedSpeakingFrame())
    """
    pass

BotStoppedSpeakingFrame

{ .api }
from pipecat.frames.frames import BotStoppedSpeakingFrame

class BotStoppedSpeakingFrame(SystemFrame):
    """Bot stopped speaking.

    Emitted when the bot finishes speaking (TTS output ends).
    Marks the end of a bot turn in the conversation.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted by TTS service when synthesis completes
        @tts_service.event_handler("on_tts_stopped")
        async def handle_tts_stop():
            await task.queue_frame(BotStoppedSpeakingFrame())
    """
    pass

BotSpeakingFrame

{ .api }
from pipecat.frames.frames import BotSpeakingFrame

class BotSpeakingFrame(SystemFrame):
    """Bot is speaking (continuous signal).

    Emitted periodically while the bot is speaking. Can be used
    for UI updates or monitoring speech duration.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Emitted during TTS playback
        frame = BotSpeakingFrame()
        await task.queue_frame(frame)
    """
    pass

Interruption Frames

Frames for handling conversation interruptions.

InterruptionFrame

{ .api }
from pipecat.frames.frames import InterruptionFrame

class InterruptionFrame(SystemFrame):
    """Base interruption signal.

    Base class for interruption-related frames. Indicates that
    an interruption event has occurred.

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Base class, typically use specific subclasses
        frame = InterruptionFrame()
    """
    pass

StartInterruptionFrame

{ .api }
from pipecat.frames.frames import StartInterruptionFrame

class StartInterruptionFrame(InterruptionFrame):
    """Start interruption handling.

    Signals the start of an interruption. When received, the pipeline
    cancels pending DataFrames and ControlFrames (except UninterruptibleFrame).

    Flow:
    1. User starts speaking (UserStartedSpeakingFrame)
    2. StartInterruptionFrame emitted
    3. Pipeline cancels queued bot output
    4. SystemFrames continue processing

    Inherits from:
        InterruptionFrame: High priority interruption

    Example:
        # Emitted by transport when user interrupts bot
        frame = StartInterruptionFrame()
        await task.queue_frame(frame)
        # All pending bot speech cancelled
    """
    pass

BotInterruptionFrame

{ .api }
from pipecat.frames.frames import BotInterruptionFrame

class BotInterruptionFrame(InterruptionTaskFrame):
    """Bot was interrupted.

    Emitted when the bot's speech is interrupted by the user.
    Allows tracking of interruption events for analytics or
    special handling.

    Inherits from:
        InterruptionTaskFrame: Task-level interruption

    Example:
        # Emitted when bot is interrupted during speech
        @pipeline.event_handler("on_bot_interrupted")
        async def handle_interrupt(frame):
            print("Bot was interrupted!")
            # Log analytics, adjust behavior, etc.
    """
    pass

User Input Request Frames

Frames for requesting specific input from the user.

UserImageRequestFrame

{ .api }
from pipecat.frames.frames import UserImageRequestFrame

class UserImageRequestFrame(SystemFrame):
    """Request image from user.

    A frame to request an image from the given user. The request might come with
    a text that can be later used to describe the requested image. Used in multimodal
    conversations where vision input is needed.

    Attributes:
        user_id (str): Identifier of the user to request image from
        text (Optional[str]): An optional text associated to the image request
        append_to_context (Optional[bool]): Whether the requested image should be appended to the LLM context
        video_source (Optional[str]): Specific video source to capture from
        function_name (Optional[str]): Name of function that generated this request (if any)
        tool_call_id (Optional[str]): Tool call ID if generated by function call (if any)
        context (Optional[Any]): [DEPRECATED] Optional context for the image request

    Inherits from:
        SystemFrame: High priority, immediate processing

    Example:
        # Simple image request
        frame = UserImageRequestFrame(user_id="user123")
        await task.queue_frame(frame)

        # Image request with descriptive text
        frame = UserImageRequestFrame(
            user_id="user456",
            text="Show me what you see",
            append_to_context=True
        )

        # Image request from specific video source
        frame = UserImageRequestFrame(
            user_id="user789",
            video_source="camera_front",
            append_to_context=True
        )

        # Image request from function call
        frame = UserImageRequestFrame(
            user_id="user123",
            text="Capture current view",
            function_name="capture_image",
            tool_call_id="call_abc"
        )

        # Response comes as UserImageRawFrame
    """

    user_id: str
    text: Optional[str] = None
    append_to_context: Optional[bool] = None
    video_source: Optional[str] = None
    function_name: Optional[str] = None
    tool_call_id: Optional[str] = None
    context: Optional[Any] = None  # Deprecated

TTS Control Frames

Frames for controlling Text-to-Speech behavior.

TTSStartedFrame

{ .api }
from pipecat.frames.frames import TTSStartedFrame

class TTSStartedFrame(ControlFrame):
    """TTS synthesis started.

    Emitted when a TTS service begins synthesizing speech.
    Can be used for tracking, metrics, or synchronization.

    Inherits from:
        ControlFrame: Normal priority, ordered processing

    Example:
        # Emitted by TTS service
        class CustomTTSService(TTSService):
            async def run_tts(self, text):
                await self.push_frame(TTSStartedFrame())
                # Synthesize speech...
                await self.push_frame(TTSStoppedFrame())
    """
    pass

TTSStoppedFrame

{ .api }
from pipecat.frames.frames import TTSStoppedFrame

class TTSStoppedFrame(ControlFrame):
    """TTS synthesis stopped.

    Emitted when a TTS service completes synthesizing speech.
    Marks the end of a TTS operation.

    Inherits from:
        ControlFrame: Normal priority, ordered processing

    Example:
        # Emitted by TTS service after synthesis
        await self.push_frame(TTSStoppedFrame())
    """
    pass

Usage Patterns

Tracking User Turns

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
    TranscriptionFrame
)

class UserTurnTracker(FrameProcessor):
    """Track user conversation turns."""

    def __init__(self):
        super().__init__()
        self._user_speaking = False
        self._current_transcription = []

    async def process_frame(self, frame, direction):
        if isinstance(frame, UserStartedSpeakingFrame):
            self._user_speaking = True
            self._current_transcription = []
            print("User started speaking")

        elif isinstance(frame, TranscriptionFrame):
            if self._user_speaking:
                self._current_transcription.append(frame.text)

        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._user_speaking = False
            full_text = " ".join(self._current_transcription)
            print(f"User said: {full_text}")
            self._current_transcription = []

        await self.push_frame(frame, direction)

Handling Interruptions

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
    StartInterruptionFrame,
    BotInterruptionFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame
)

class InterruptionHandler(FrameProcessor):
    """Handle bot interruptions."""

    def __init__(self):
        super().__init__()
        self._bot_speaking = False
        self._interruption_count = 0

    async def process_frame(self, frame, direction):
        if isinstance(frame, BotStartedSpeakingFrame):
            self._bot_speaking = True

        elif isinstance(frame, BotStoppedSpeakingFrame):
            self._bot_speaking = False

        elif isinstance(frame, StartInterruptionFrame):
            if self._bot_speaking:
                self._interruption_count += 1
                print(f"Bot interrupted! (Count: {self._interruption_count})")
                # Emit custom interruption frame
                await self.push_frame(BotInterruptionFrame())

        await self.push_frame(frame, direction)

Bot Speech Monitoring

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
    TTSAudioRawFrame
)
import time

class BotSpeechMonitor(FrameProcessor):
    """Monitor bot speech duration and audio."""

    def __init__(self):
        super().__init__()
        self._speech_start = None
        self._audio_bytes = 0

    async def process_frame(self, frame, direction):
        if isinstance(frame, BotStartedSpeakingFrame):
            self._speech_start = time.time()
            self._audio_bytes = 0
            print("Bot speech started")

        elif isinstance(frame, TTSAudioRawFrame):
            if self._speech_start:
                self._audio_bytes += len(frame.audio)

        elif isinstance(frame, BotStoppedSpeakingFrame):
            if self._speech_start:
                duration = time.time() - self._speech_start
                print(f"Bot speech ended: {duration:.2f}s, {self._audio_bytes} bytes")
                self._speech_start = None

        await self.push_frame(frame, direction)

Emulating User Input

{ .api }
from pipecat.pipeline.task import PipelineTask
from pipecat.frames.frames import (
    EmulateUserStartedSpeakingFrame,
    EmulateUserStoppedSpeakingFrame,
    InputTextRawFrame,
    TranscriptionFrame
)

async def emulate_user_input(task: PipelineTask, text: str):
    """Emulate user text input.

    Args:
        task: Pipeline task
        text: User input text
    """
    # Signal user started speaking
    await task.queue_frame(EmulateUserStartedSpeakingFrame())

    # Send text as transcription
    await task.queue_frame(TranscriptionFrame(text=text))

    # Signal user stopped speaking
    await task.queue_frame(EmulateUserStoppedSpeakingFrame())

# Example usage
async def test_conversation():
    # ... setup pipeline ...
    await emulate_user_input(task, "Hello, how are you?")
    # Pipeline processes as if user spoke

VAD-Based Turn Detection

{ .api }
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer, VADParams
from pipecat.transports.daily import DailyTransport, DailyParams

# Configure VAD
vad = SileroVADAnalyzer(
    params=VADParams(
        threshold=0.5,
        min_speech_duration_ms=250,
        min_silence_duration_ms=500
    )
)

# Configure transport with VAD
transport = DailyTransport(
    room_url="https://daily.co/room",
    token="token",
    params=DailyParams(
        audio_in_enabled=True,
        vad_enabled=True,
        vad_analyzer=vad
    )
)

# Transport automatically emits:
# - VADUserStartedSpeakingFrame when speech detected
# - VADUserStoppedSpeakingFrame when silence detected

Interruption with Turn Detection

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
    UserStartedSpeakingFrame,
    StartInterruptionFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame
)

class SmartInterruptionHandler(FrameProcessor):
    """Handle interruptions intelligently."""

    def __init__(self, allow_interruptions: bool = True):
        super().__init__()
        self._bot_speaking = False
        self._allow_interruptions = allow_interruptions

    async def process_frame(self, frame, direction):
        if isinstance(frame, BotStartedSpeakingFrame):
            self._bot_speaking = True

        elif isinstance(frame, BotStoppedSpeakingFrame):
            self._bot_speaking = False

        elif isinstance(frame, UserStartedSpeakingFrame):
            # Only interrupt if bot is speaking and interruptions allowed
            if self._bot_speaking and self._allow_interruptions:
                print("User interrupted bot, stopping bot speech")
                await self.push_frame(StartInterruptionFrame())

        await self.push_frame(frame, direction)

    def enable_interruptions(self):
        """Enable interruptions."""
        self._allow_interruptions = True

    def disable_interruptions(self):
        """Disable interruptions."""
        self._allow_interruptions = False

Best Practices

Always Handle Interruptions

{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams

# Enable interruptions in pipeline params
task = PipelineTask(
    pipeline=pipeline,
    params=PipelineParams(
        allow_interruptions=True  # Enable interruption handling
    )
)

# Interruption frames automatically handled by pipeline
# DataFrames/ControlFrames cancelled when user speaks
# SystemFrames continue processing

Use Specific Frame Types

{ .api }
# Good: Use specific frame types for clarity
if isinstance(frame, VADUserStartedSpeakingFrame):
    # VAD-specific handling
    pass
elif isinstance(frame, UserStartedSpeakingFrame):
    # General user speech handling
    pass

# Avoid: Generic handling loses context
if isinstance(frame, SystemFrame):
    # Too broad, can't distinguish types
    pass

Track Turn State

{ .api }
class ConversationStateTracker(FrameProcessor):
    """Track conversation state machine."""

    def __init__(self):
        super().__init__()
        self._state = "idle"  # idle, user_speaking, bot_speaking

    async def process_frame(self, frame, direction):
        old_state = self._state

        if isinstance(frame, UserStartedSpeakingFrame):
            self._state = "user_speaking"
        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._state = "idle"
        elif isinstance(frame, BotStartedSpeakingFrame):
            self._state = "bot_speaking"
        elif isinstance(frame, BotStoppedSpeakingFrame):
            self._state = "idle"

        if old_state != self._state:
            print(f"State transition: {old_state} -> {self._state}")

        await self.push_frame(frame, direction)

Log Interruptions for Analysis

{ .api }
import logging

class InterruptionAnalytics(FrameProcessor):
    """Collect interruption analytics."""

    def __init__(self):
        super().__init__()
        self._logger = logging.getLogger(__name__)
        self._metrics = {
            "user_interruptions": 0,
            "bot_interruptions": 0,
            "total_turns": 0
        }

    async def process_frame(self, frame, direction):
        if isinstance(frame, BotInterruptionFrame):
            self._metrics["bot_interruptions"] += 1
            self._logger.info(
                f"Bot interrupted (total: {self._metrics['bot_interruptions']})"
            )

        elif isinstance(frame, UserStoppedSpeakingFrame):
            self._metrics["total_turns"] += 1

        await self.push_frame(frame, direction)

    def get_metrics(self):
        """Get interruption metrics."""
        return self._metrics.copy()