docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Interaction frames track user and bot state changes during conversations. They signal when the user or bot starts/stops speaking, handle interruptions, and manage turn-taking in conversational flows.
Frames signaling user speech activity.
{ .api }
from pipecat.frames.frames import UserStartedSpeakingFrame
class UserStartedSpeakingFrame(SystemFrame):
"""User began speaking.
Emitted when the user starts speaking. This is a SystemFrame
for immediate processing to enable fast interruption handling.
Used by:
- Transport VAD detection
- Manual user speech tracking
- Interruption triggers
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted by transport when user speech detected
frame = UserStartedSpeakingFrame()
await task.queue_frame(frame)
"""
pass{ .api }
from pipecat.frames.frames import UserStoppedSpeakingFrame
class UserStoppedSpeakingFrame(SystemFrame):
"""User stopped speaking.
Emitted when the user stops speaking. Marks the end of a
user utterance, typically followed by STT processing completion.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted by transport after user speech ends
frame = UserStoppedSpeakingFrame()
await task.queue_frame(frame)
"""
pass{ .api }
from pipecat.frames.frames import UserSpeakingFrame
class UserSpeakingFrame(SystemFrame):
"""User is speaking (continuous signal).
Emitted periodically while the user is speaking. Can be used
for UI updates or monitoring speech duration.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted by transport during active speech
frame = UserSpeakingFrame()
await task.queue_frame(frame)
"""
pass{ .api }
from pipecat.frames.frames import VADUserStartedSpeakingFrame
class VADUserStartedSpeakingFrame(SystemFrame):
"""VAD-detected speech start.
Emitted by Voice Activity Detection when speech is detected.
More specific than UserStartedSpeakingFrame, explicitly indicating
VAD as the detection source.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted by VAD analyzer
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer
vad = SileroVADAnalyzer()
# VAD emits this frame when speech detected
"""
pass{ .api }
from pipecat.frames.frames import VADUserStoppedSpeakingFrame
class VADUserStoppedSpeakingFrame(SystemFrame):
"""VAD-detected speech end.
Emitted by Voice Activity Detection when speech ends.
Explicitly indicates VAD as the detection source.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted by VAD analyzer
# Triggers STT processing completion
"""
passFrames for testing or programmatic user speech simulation.
{ .api }
from pipecat.frames.frames import EmulateUserStartedSpeakingFrame
class EmulateUserStartedSpeakingFrame(SystemFrame):
"""Emulated user speech start.
Simulates user starting to speak without actual audio input.
Useful for testing interruption handling or programmatic control.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Test interruption behavior
frame = EmulateUserStartedSpeakingFrame()
await task.queue_frame(frame)
# Pipeline treats this like real user speech
"""
pass{ .api }
from pipecat.frames.frames import EmulateUserStoppedSpeakingFrame
class EmulateUserStoppedSpeakingFrame(SystemFrame):
"""Emulated user speech end.
Simulates user stopping speaking without actual audio input.
Pairs with EmulateUserStartedSpeakingFrame for testing.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Complete emulated user turn
await task.queue_frame(EmulateUserStartedSpeakingFrame())
# ... send text or other input ...
await task.queue_frame(EmulateUserStoppedSpeakingFrame())
"""
passFrames signaling bot speech activity.
{ .api }
from pipecat.frames.frames import BotStartedSpeakingFrame
class BotStartedSpeakingFrame(SystemFrame):
"""Bot started speaking.
Emitted when the bot begins speaking (TTS output starts).
Used for UI updates, analytics, and turn tracking.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted by TTS service when synthesis starts
@tts_service.event_handler("on_tts_started")
async def handle_tts_start():
await task.queue_frame(BotStartedSpeakingFrame())
"""
pass{ .api }
from pipecat.frames.frames import BotStoppedSpeakingFrame
class BotStoppedSpeakingFrame(SystemFrame):
"""Bot stopped speaking.
Emitted when the bot finishes speaking (TTS output ends).
Marks the end of a bot turn in the conversation.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted by TTS service when synthesis completes
@tts_service.event_handler("on_tts_stopped")
async def handle_tts_stop():
await task.queue_frame(BotStoppedSpeakingFrame())
"""
pass{ .api }
from pipecat.frames.frames import BotSpeakingFrame
class BotSpeakingFrame(SystemFrame):
"""Bot is speaking (continuous signal).
Emitted periodically while the bot is speaking. Can be used
for UI updates or monitoring speech duration.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Emitted during TTS playback
frame = BotSpeakingFrame()
await task.queue_frame(frame)
"""
passFrames for handling conversation interruptions.
{ .api }
from pipecat.frames.frames import InterruptionFrame
class InterruptionFrame(SystemFrame):
"""Base interruption signal.
Base class for interruption-related frames. Indicates that
an interruption event has occurred.
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Base class, typically use specific subclasses
frame = InterruptionFrame()
"""
pass{ .api }
from pipecat.frames.frames import StartInterruptionFrame
class StartInterruptionFrame(InterruptionFrame):
"""Start interruption handling.
Signals the start of an interruption. When received, the pipeline
cancels pending DataFrames and ControlFrames (except UninterruptibleFrame).
Flow:
1. User starts speaking (UserStartedSpeakingFrame)
2. StartInterruptionFrame emitted
3. Pipeline cancels queued bot output
4. SystemFrames continue processing
Inherits from:
InterruptionFrame: High priority interruption
Example:
# Emitted by transport when user interrupts bot
frame = StartInterruptionFrame()
await task.queue_frame(frame)
# All pending bot speech cancelled
"""
pass{ .api }
from pipecat.frames.frames import BotInterruptionFrame
class BotInterruptionFrame(InterruptionTaskFrame):
"""Bot was interrupted.
Emitted when the bot's speech is interrupted by the user.
Allows tracking of interruption events for analytics or
special handling.
Inherits from:
InterruptionTaskFrame: Task-level interruption
Example:
# Emitted when bot is interrupted during speech
@pipeline.event_handler("on_bot_interrupted")
async def handle_interrupt(frame):
print("Bot was interrupted!")
# Log analytics, adjust behavior, etc.
"""
passFrames for requesting specific input from the user.
{ .api }
from pipecat.frames.frames import UserImageRequestFrame
class UserImageRequestFrame(SystemFrame):
"""Request image from user.
A frame to request an image from the given user. The request might come with
a text that can be later used to describe the requested image. Used in multimodal
conversations where vision input is needed.
Attributes:
user_id (str): Identifier of the user to request image from
text (Optional[str]): An optional text associated to the image request
append_to_context (Optional[bool]): Whether the requested image should be appended to the LLM context
video_source (Optional[str]): Specific video source to capture from
function_name (Optional[str]): Name of function that generated this request (if any)
tool_call_id (Optional[str]): Tool call ID if generated by function call (if any)
context (Optional[Any]): [DEPRECATED] Optional context for the image request
Inherits from:
SystemFrame: High priority, immediate processing
Example:
# Simple image request
frame = UserImageRequestFrame(user_id="user123")
await task.queue_frame(frame)
# Image request with descriptive text
frame = UserImageRequestFrame(
user_id="user456",
text="Show me what you see",
append_to_context=True
)
# Image request from specific video source
frame = UserImageRequestFrame(
user_id="user789",
video_source="camera_front",
append_to_context=True
)
# Image request from function call
frame = UserImageRequestFrame(
user_id="user123",
text="Capture current view",
function_name="capture_image",
tool_call_id="call_abc"
)
# Response comes as UserImageRawFrame
"""
user_id: str
text: Optional[str] = None
append_to_context: Optional[bool] = None
video_source: Optional[str] = None
function_name: Optional[str] = None
tool_call_id: Optional[str] = None
context: Optional[Any] = None # DeprecatedFrames for controlling Text-to-Speech behavior.
{ .api }
from pipecat.frames.frames import TTSStartedFrame
class TTSStartedFrame(ControlFrame):
"""TTS synthesis started.
Emitted when a TTS service begins synthesizing speech.
Can be used for tracking, metrics, or synchronization.
Inherits from:
ControlFrame: Normal priority, ordered processing
Example:
# Emitted by TTS service
class CustomTTSService(TTSService):
async def run_tts(self, text):
await self.push_frame(TTSStartedFrame())
# Synthesize speech...
await self.push_frame(TTSStoppedFrame())
"""
pass{ .api }
from pipecat.frames.frames import TTSStoppedFrame
class TTSStoppedFrame(ControlFrame):
"""TTS synthesis stopped.
Emitted when a TTS service completes synthesizing speech.
Marks the end of a TTS operation.
Inherits from:
ControlFrame: Normal priority, ordered processing
Example:
# Emitted by TTS service after synthesis
await self.push_frame(TTSStoppedFrame())
"""
pass{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
TranscriptionFrame
)
class UserTurnTracker(FrameProcessor):
"""Track user conversation turns."""
def __init__(self):
super().__init__()
self._user_speaking = False
self._current_transcription = []
async def process_frame(self, frame, direction):
if isinstance(frame, UserStartedSpeakingFrame):
self._user_speaking = True
self._current_transcription = []
print("User started speaking")
elif isinstance(frame, TranscriptionFrame):
if self._user_speaking:
self._current_transcription.append(frame.text)
elif isinstance(frame, UserStoppedSpeakingFrame):
self._user_speaking = False
full_text = " ".join(self._current_transcription)
print(f"User said: {full_text}")
self._current_transcription = []
await self.push_frame(frame, direction){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
StartInterruptionFrame,
BotInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame
)
class InterruptionHandler(FrameProcessor):
"""Handle bot interruptions."""
def __init__(self):
super().__init__()
self._bot_speaking = False
self._interruption_count = 0
async def process_frame(self, frame, direction):
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
elif isinstance(frame, StartInterruptionFrame):
if self._bot_speaking:
self._interruption_count += 1
print(f"Bot interrupted! (Count: {self._interruption_count})")
# Emit custom interruption frame
await self.push_frame(BotInterruptionFrame())
await self.push_frame(frame, direction){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
TTSAudioRawFrame
)
import time
class BotSpeechMonitor(FrameProcessor):
"""Monitor bot speech duration and audio."""
def __init__(self):
super().__init__()
self._speech_start = None
self._audio_bytes = 0
async def process_frame(self, frame, direction):
if isinstance(frame, BotStartedSpeakingFrame):
self._speech_start = time.time()
self._audio_bytes = 0
print("Bot speech started")
elif isinstance(frame, TTSAudioRawFrame):
if self._speech_start:
self._audio_bytes += len(frame.audio)
elif isinstance(frame, BotStoppedSpeakingFrame):
if self._speech_start:
duration = time.time() - self._speech_start
print(f"Bot speech ended: {duration:.2f}s, {self._audio_bytes} bytes")
self._speech_start = None
await self.push_frame(frame, direction){ .api }
from pipecat.pipeline.task import PipelineTask
from pipecat.frames.frames import (
EmulateUserStartedSpeakingFrame,
EmulateUserStoppedSpeakingFrame,
InputTextRawFrame,
TranscriptionFrame
)
async def emulate_user_input(task: PipelineTask, text: str):
"""Emulate user text input.
Args:
task: Pipeline task
text: User input text
"""
# Signal user started speaking
await task.queue_frame(EmulateUserStartedSpeakingFrame())
# Send text as transcription
await task.queue_frame(TranscriptionFrame(text=text))
# Signal user stopped speaking
await task.queue_frame(EmulateUserStoppedSpeakingFrame())
# Example usage
async def test_conversation():
# ... setup pipeline ...
await emulate_user_input(task, "Hello, how are you?")
# Pipeline processes as if user spoke{ .api }
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer, VADParams
from pipecat.transports.daily import DailyTransport, DailyParams
# Configure VAD
vad = SileroVADAnalyzer(
params=VADParams(
threshold=0.5,
min_speech_duration_ms=250,
min_silence_duration_ms=500
)
)
# Configure transport with VAD
transport = DailyTransport(
room_url="https://daily.co/room",
token="token",
params=DailyParams(
audio_in_enabled=True,
vad_enabled=True,
vad_analyzer=vad
)
)
# Transport automatically emits:
# - VADUserStartedSpeakingFrame when speech detected
# - VADUserStoppedSpeakingFrame when silence detected{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import (
UserStartedSpeakingFrame,
StartInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame
)
class SmartInterruptionHandler(FrameProcessor):
"""Handle interruptions intelligently."""
def __init__(self, allow_interruptions: bool = True):
super().__init__()
self._bot_speaking = False
self._allow_interruptions = allow_interruptions
async def process_frame(self, frame, direction):
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
elif isinstance(frame, UserStartedSpeakingFrame):
# Only interrupt if bot is speaking and interruptions allowed
if self._bot_speaking and self._allow_interruptions:
print("User interrupted bot, stopping bot speech")
await self.push_frame(StartInterruptionFrame())
await self.push_frame(frame, direction)
def enable_interruptions(self):
"""Enable interruptions."""
self._allow_interruptions = True
def disable_interruptions(self):
"""Disable interruptions."""
self._allow_interruptions = False{ .api }
from pipecat.pipeline.task import PipelineTask, PipelineParams
# Enable interruptions in pipeline params
task = PipelineTask(
pipeline=pipeline,
params=PipelineParams(
allow_interruptions=True # Enable interruption handling
)
)
# Interruption frames automatically handled by pipeline
# DataFrames/ControlFrames cancelled when user speaks
# SystemFrames continue processing{ .api }
# Good: Use specific frame types for clarity
if isinstance(frame, VADUserStartedSpeakingFrame):
# VAD-specific handling
pass
elif isinstance(frame, UserStartedSpeakingFrame):
# General user speech handling
pass
# Avoid: Generic handling loses context
if isinstance(frame, SystemFrame):
# Too broad, can't distinguish types
pass{ .api }
class ConversationStateTracker(FrameProcessor):
"""Track conversation state machine."""
def __init__(self):
super().__init__()
self._state = "idle" # idle, user_speaking, bot_speaking
async def process_frame(self, frame, direction):
old_state = self._state
if isinstance(frame, UserStartedSpeakingFrame):
self._state = "user_speaking"
elif isinstance(frame, UserStoppedSpeakingFrame):
self._state = "idle"
elif isinstance(frame, BotStartedSpeakingFrame):
self._state = "bot_speaking"
elif isinstance(frame, BotStoppedSpeakingFrame):
self._state = "idle"
if old_state != self._state:
print(f"State transition: {old_state} -> {self._state}")
await self.push_frame(frame, direction){ .api }
import logging
class InterruptionAnalytics(FrameProcessor):
"""Collect interruption analytics."""
def __init__(self):
super().__init__()
self._logger = logging.getLogger(__name__)
self._metrics = {
"user_interruptions": 0,
"bot_interruptions": 0,
"total_turns": 0
}
async def process_frame(self, frame, direction):
if isinstance(frame, BotInterruptionFrame):
self._metrics["bot_interruptions"] += 1
self._logger.info(
f"Bot interrupted (total: {self._metrics['bot_interruptions']})"
)
elif isinstance(frame, UserStoppedSpeakingFrame):
self._metrics["total_turns"] += 1
await self.push_frame(frame, direction)
def get_metrics(self):
"""Get interruption metrics."""
return self._metrics.copy()