docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Core processors provide the fundamental building blocks for frame processing pipelines. This includes the base processor interface, basic processor types, frame filters, and text/audio aggregators.
The foundation for all frame processors.
{ .api }
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
class FrameProcessor(BaseObject):
"""Base class for all frame processors.
Processors are the building blocks of pipelines. They receive frames,
transform/filter/generate frames, and push frames to the next processor.
Key Lifecycle:
1. Construction (__init__)
2. Linking (link)
3. Start (start)
4. Processing (process_frame - repeated)
5. Interruption (interrupt - optional, repeated)
6. Stop/Cancel (stop or cancel)
Attributes:
name (str): Processor name for debugging
id (int): Unique processor identifier
interruptions_enabled (bool): Whether processor can be interrupted
Methods:
process_frame(frame, direction): Process a single frame
push_frame(frame, direction): Push frame to next processor
queue_frame(frame, direction): Queue frame for processing
start(): Initialize processor
stop(): Gracefully shutdown
cancel(): Immediate shutdown
link(processor): Connect to downstream processor
interrupt(): Handle interruption event
register_event_handler(event_name): Register event handler
"""
def __init__(self, name: Optional[str] = None, **kwargs):
"""Initialize processor.
Args:
name: Optional processor name for debugging
**kwargs: Additional configuration passed to subclasses
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process a frame.
Override this method to implement custom processing logic.
Args:
frame: The frame to process
direction: Direction of frame flow (DOWNSTREAM or UPSTREAM)
Example:
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
# Transform text
frame.text = frame.text.upper()
# Always push frames downstream
await self.push_frame(frame, direction)
"""
# Default: pass frame through
await self.push_frame(frame, direction)
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push frame to the next processor.
Args:
frame: Frame to push
direction: Direction of frame flow
"""
pass
async def queue_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Queue frame for processing.
Args:
frame: Frame to queue
direction: Direction of frame flow
"""
pass
async def start(self):
"""Start the processor.
Called when pipeline starts. Use for initialization like:
- Opening connections
- Loading models
- Starting background tasks
Example:
async def start(self):
await super().start()
self._connection = await connect_to_service()
"""
pass
async def stop(self):
"""Stop the processor gracefully.
Called when pipeline stops normally. Use for cleanup like:
- Closing connections
- Saving state
- Stopping background tasks
Example:
async def stop(self):
await self._connection.close()
await super().stop()
"""
pass
async def cancel(self):
"""Cancel the processor immediately.
Called when pipeline is cancelled or errors occur.
Similar to stop() but expects faster cleanup.
Example:
async def cancel(self):
self._cancelled = True
await super().cancel()
"""
pass
def link(self, processor: "FrameProcessor"):
"""Link to downstream processor.
Args:
processor: Downstream processor to link
Example:
processor1.link(processor2)
# processor1 -> processor2
"""
pass
async def interrupt(self):
"""Handle interruption event.
Called when user interrupts (e.g., starts speaking during bot output).
Cancel current operations and clear queues.
Example:
async def interrupt(self):
self._current_operation_cancelled = True
await super().interrupt()
"""
pass
@property
def interruptions_enabled(self) -> bool:
"""Whether processor can be interrupted.
Returns:
True if processor can be interrupted
"""
return True
@property
def name(self) -> str:
"""Processor name."""
pass
@property
def id(self) -> int:
"""Unique processor ID."""
pass{ .api }
from pipecat.processors.frame_processor import FrameDirection
from enum import Enum
class FrameDirection(Enum):
"""Direction of frame flow through pipeline.
Attributes:
DOWNSTREAM: From input toward output (normal flow)
UPSTREAM: From output toward input (reverse flow)
Example:
# Push downstream (normal)
await self.push_frame(frame, FrameDirection.DOWNSTREAM)
# Push upstream (reverse)
await self.push_frame(frame, FrameDirection.UPSTREAM)
"""
DOWNSTREAM = "downstream"
UPSTREAM = "upstream"Simple processor implementations for common patterns.
{ .api }
from pipecat.processors.consumer_processor import ConsumerProcessor
from pipecat.processors.producer_processor import ProducerProcessor, identity_transformer
class ConsumerProcessor(FrameProcessor):
"""Frame processor that consumes frames from a ProducerProcessor's queue.
This processor passes through frames normally while also consuming frames
from a ProducerProcessor's queue. When frames are received from the producer
queue, they are optionally transformed and pushed in the specified direction.
Use Case:
- Dual pipeline processing (main + side processing)
- Fan-out patterns from ProducerProcessor
- Branching frame flows
- Parallel processing of filtered frames
Args:
producer: The producer processor to consume frames from
transformer: Function to transform frames before pushing (default: identity)
direction: Direction to push consumed frames (default: DOWNSTREAM)
Example:
from pipecat.processors.consumer_processor import ConsumerProcessor
from pipecat.processors.producer_processor import ProducerProcessor
# Create producer that filters TextFrames
async def text_filter(frame):
return isinstance(frame, TextFrame)
producer = ProducerProcessor(
filter=text_filter,
passthrough=True # Continue main pipeline
)
# Create consumer to process filtered frames
async def uppercase_transform(frame):
if isinstance(frame, TextFrame):
frame.text = frame.text.upper()
return frame
consumer = ConsumerProcessor(
producer=producer,
transformer=uppercase_transform,
direction=FrameDirection.DOWNSTREAM
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
producer, # Filters and distributes TextFrames
llm, # Main pipeline continues
consumer, # Receives filtered frames from producer
tts,
transport.output()
])
"""
def __init__(
self,
*,
producer: ProducerProcessor,
transformer: Callable[[Frame], Awaitable[Frame]] = identity_transformer,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
**kwargs,
):
"""Initialize the consumer processor.
Args:
producer: The producer processor to consume frames from
transformer: Function to transform frames before pushing
direction: Direction to push consumed frames
**kwargs: Additional arguments passed to parent class
"""
pass{ .api }
from pipecat.processors.producer_processor import ProducerProcessor, identity_transformer
async def identity_transformer(frame: Frame):
"""Default transformer that returns the frame unchanged."""
return frame
class ProducerProcessor(FrameProcessor):
"""A processor that filters frames and distributes them to multiple consumers.
This processor receives frames, applies a filter to determine which frames
should be sent to consumers (ConsumerProcessor), optionally transforms those
frames, and distributes them to registered consumer queues. It can also pass
frames through to the next processor in the pipeline.
Use Cases:
- Fan-out patterns (one producer, multiple consumers)
- Frame filtering and distribution
- Parallel processing branches
- Side-channel frame processing
Args:
filter: Async function that determines if a frame should be produced.
Must return True for frames to be sent to consumers
transformer: Async function to transform frames before sending to consumers
passthrough: Whether to pass frames through to the next processor
Example:
from pipecat.processors.producer_processor import ProducerProcessor
from pipecat.processors.consumer_processor import ConsumerProcessor
# Create producer that filters and distributes audio frames
async def audio_filter(frame):
return isinstance(frame, AudioRawFrame)
async def audio_transform(frame):
# Apply audio processing
frame.audio = apply_noise_reduction(frame.audio)
return frame
producer = ProducerProcessor(
filter=audio_filter,
transformer=audio_transform,
passthrough=True # Continue main pipeline
)
# Create consumers
consumer1 = ConsumerProcessor(producer=producer)
consumer2 = ConsumerProcessor(producer=producer)
# Both consumers receive filtered audio frames
# Main pipeline also continues with original frames
"""
def __init__(
self,
*,
filter: Callable[[Frame], Awaitable[bool]],
transformer: Callable[[Frame], Awaitable[Frame]] = identity_transformer,
passthrough: bool = True,
):
"""Initialize producer.
Args:
filter: Async function that determines if a frame should be produced
transformer: Async function to transform frames before sending to consumers
passthrough: Whether to pass frames through to the next processor
"""
pass
def add_consumer(self) -> asyncio.Queue:
"""Add a new consumer and return its associated queue.
Returns:
asyncio.Queue: The queue for the newly added consumer
"""
pass{ .api }
from pipecat.processors.async_generator import AsyncGeneratorProcessor
from pipecat.serializers.base_serializer import FrameSerializer
class AsyncGeneratorProcessor(FrameProcessor):
"""A frame processor that serializes frames and provides them via async generator.
This processor passes frames through unchanged while simultaneously serializing
them and making the serialized data available through an async generator interface.
Useful for streaming frame data to external consumers while maintaining the
normal frame processing pipeline.
Use Cases:
- Streaming frame data to external systems
- Real-time frame monitoring
- Frame data export/logging
- WebSocket frame streaming
Args:
serializer: The frame serializer to use for converting frames to data
Example:
from pipecat.processors.async_generator import AsyncGeneratorProcessor
from pipecat.serializers.protobuf import ProtobufFrameSerializer
# Create processor with serializer
serializer = ProtobufFrameSerializer()
async_gen = AsyncGeneratorProcessor(serializer=serializer)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
async_gen, # Serializes frames for external consumption
llm,
tts,
transport.output()
])
# Consume serialized data externally
async def stream_frames():
async for data in async_gen.generator():
# Send serialized frame data to external system
await websocket.send(data)
# Start streaming task
asyncio.create_task(stream_frames())
"""
def __init__(self, *, serializer: FrameSerializer, **kwargs):
"""Initialize async generator processor.
Args:
serializer: The frame serializer to use for converting frames to data
**kwargs: Additional arguments passed to parent class
"""
pass
async def generator(self) -> AsyncGenerator[Any, None]:
"""Generate serialized frame data asynchronously.
Yields:
Serialized frame data from the internal queue until a termination
signal (None) is received
"""
pass{ .api }
from pipecat.processors.frame_processor import FrameLogger
class FrameLogger(FrameProcessor):
"""Logs frames for debugging.
Logs all frames passing through for debugging and monitoring.
Args:
name: Logger name
level: Logging level (DEBUG, INFO, etc.)
include_pts: Include presentation timestamp in logs
Example:
logger = FrameLogger(name="pipeline", level="DEBUG")
pipeline = Pipeline([
processor1,
logger, # Log frames between processors
processor2
])
"""
def __init__(
self,
name: Optional[str] = None,
level: str = "DEBUG",
include_pts: bool = False,
**kwargs
):
"""Initialize frame logger.
Args:
name: Logger name
level: Logging level
include_pts: Include timestamp in logs
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.processors.frame_processor import StatelessTextTransformer
class StatelessTextTransformer(FrameProcessor):
"""Transforms text frames without state.
Applies a transformation function to text frames. Stateless means
each frame is processed independently.
Args:
transform: Function to transform text (str -> str)
Example:
# Uppercase transformer
uppercase = StatelessTextTransformer(
transform=lambda text: text.upper()
)
# Remove punctuation
no_punct = StatelessTextTransformer(
transform=lambda text: text.replace(".", "").replace(",", "")
)
"""
def __init__(self, transform: Callable[[str], str], **kwargs):
"""Initialize text transformer.
Args:
transform: Text transformation function
**kwargs: Additional arguments
"""
pass
async def process_frame(self, frame, direction):
"""Transform text frames.
Args:
frame: Frame to process
direction: Frame direction
"""
pass{ .api }
from pipecat.processors.frame_processor import IdleFrameProcessor
class IdleFrameProcessor(FrameProcessor):
"""Handles idle timeouts.
Emits frames after periods of inactivity. Useful for detecting
when no frames are flowing.
Args:
timeout_secs: Idle timeout in seconds
callback: Optional callback on idle
Example:
async def on_idle():
print("Pipeline is idle!")
idle = IdleFrameProcessor(
timeout_secs=5.0,
callback=on_idle
)
"""
def __init__(
self,
timeout_secs: float = 5.0,
callback: Optional[Callable] = None,
**kwargs
):
"""Initialize idle processor.
Args:
timeout_secs: Idle timeout in seconds
callback: Callback on idle
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.processors.frame_processor import UserIdleProcessor
class UserIdleProcessor(FrameProcessor):
"""Tracks user idle state.
Emits events when user has been idle for a specified duration.
Resets on user input frames.
Args:
timeout_secs: User idle timeout in seconds
Example:
# Emit event if user idle for 30 seconds
user_idle = UserIdleProcessor(timeout_secs=30.0)
@user_idle.event_handler("on_user_idle")
async def handle_idle():
print("User has been idle for 30 seconds")
# Prompt user, end conversation, etc.
"""
def __init__(self, timeout_secs: float = 30.0, **kwargs):
"""Initialize user idle processor.
Args:
timeout_secs: User idle timeout in seconds
**kwargs: Additional arguments
"""
passProcessors that filter or gate frame flow.
{ .api }
from pipecat.processors.filters.frame_filter import FrameFilter
class FrameFilter(FrameProcessor):
"""Base class for frame filters.
Filters selectively pass or drop frames based on criteria.
Example:
class TextOnlyFilter(FrameFilter):
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
await self.push_frame(frame, direction)
# Other frames are dropped
"""
pass{ .api }
from pipecat.processors.filters.identity_filter import IdentityFilter
class IdentityFilter(FrameProcessor):
"""Pass-through filter.
Passes all frames unchanged. Useful as a placeholder or for testing.
Example:
filter = IdentityFilter()
# All frames pass through unchanged
"""
pass{ .api }
from pipecat.processors.filters.null_filter import NullFilter
class NullFilter(FrameProcessor):
"""Drops all frames.
Consumes all frames without passing them downstream.
Useful for testing or temporarily disabling pipeline sections.
Example:
filter = NullFilter()
# All frames are dropped
"""
pass{ .api }
from pipecat.processors.filters.function_filter import FunctionFilter
class FunctionFilter(FrameProcessor):
"""Custom filtering logic.
Applies a user-defined function to decide whether to pass frames.
Args:
filter_func: Function taking (frame, direction) returning bool
True = pass frame, False = drop frame
Example:
# Pass only TextFrames
text_filter = FunctionFilter(
filter_func=lambda frame, direction: isinstance(frame, TextFrame)
)
# Pass frames with specific metadata
meta_filter = FunctionFilter(
filter_func=lambda frame, direction: frame.metadata.get("priority") == "high"
)
"""
def __init__(self, filter_func: Callable[[Frame, FrameDirection], bool], **kwargs):
"""Initialize function filter.
Args:
filter_func: Filter function returning bool
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter
class WakeCheckFilter(FrameProcessor):
"""Wake word detection filter.
Checks transcribed text for wake words/phrases. Passes frames
only after wake word is detected.
Args:
wake_words: List of wake words/phrases
case_sensitive: Whether matching is case-sensitive
Example:
# Listen for "hey assistant"
wake_filter = WakeCheckFilter(
wake_words=["hey assistant", "hello assistant"],
case_sensitive=False
)
# After wake word detected, all frames pass through
"""
def __init__(
self,
wake_words: List[str],
case_sensitive: bool = False,
**kwargs
):
"""Initialize wake word filter.
Args:
wake_words: List of wake words/phrases
case_sensitive: Case-sensitive matching
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter
class WakeNotifierFilter(FrameProcessor):
"""Wake word notification filter.
Similar to WakeCheckFilter but emits notification events
when wake words are detected.
Args:
wake_words: List of wake words/phrases
case_sensitive: Whether matching is case-sensitive
Example:
wake_notifier = WakeNotifierFilter(
wake_words=["hey bot"],
case_sensitive=False
)
@wake_notifier.event_handler("on_wake_word_detected")
async def on_wake(word: str):
print(f"Wake word detected: {word}")
"""
def __init__(
self,
wake_words: List[str],
case_sensitive: bool = False,
**kwargs
):
"""Initialize wake notifier filter.
Args:
wake_words: List of wake words/phrases
case_sensitive: Case-sensitive matching
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.processors.filters.stt_mute_filter import STTMuteFilter, STTMuteStrategy, STTMuteConfig
class STTMuteStrategy(Enum):
"""STT muting strategies.
Attributes:
FIRST_SPEECH: Mute STT after first speech detected
CUSTOM: Custom muting logic via callback
"""
FIRST_SPEECH = "first_speech"
CUSTOM = "custom"
class STTMuteConfig:
"""Configuration for STT mute filter.
Attributes:
strategy (STTMuteStrategy): Muting strategy
custom_mute_callback (Optional[Callable]): Custom callback for muting
"""
def __init__(
self,
strategy: STTMuteStrategy = STTMuteStrategy.FIRST_SPEECH,
custom_mute_callback: Optional[Callable] = None
):
"""Initialize STT mute config.
Args:
strategy: Muting strategy
custom_mute_callback: Custom callback returning bool (should mute)
"""
pass
class STTMuteFilter(FrameProcessor):
"""STT muting with strategies.
Mutes Speech-to-Text based on configurable strategies. Useful for
scenarios where you want to stop listening after certain conditions.
Args:
config: STT mute configuration
Example:
# Mute after first speech
mute_filter = STTMuteFilter(
config=STTMuteConfig(strategy=STTMuteStrategy.FIRST_SPEECH)
)
# Custom muting logic
async def should_mute(frame):
# Mute if user said "goodbye"
if isinstance(frame, TranscriptionFrame):
return "goodbye" in frame.text.lower()
return False
custom_mute = STTMuteFilter(
config=STTMuteConfig(
strategy=STTMuteStrategy.CUSTOM,
custom_mute_callback=should_mute
)
)
"""
def __init__(self, config: STTMuteConfig = None, **kwargs):
"""Initialize STT mute filter.
Args:
config: Mute configuration
**kwargs: Additional arguments
"""
pass
def mute(self):
"""Manually mute STT."""
pass
def unmute(self):
"""Manually unmute STT."""
passProcessors that aggregate text into larger units.
{ .api }
from pipecat.processors.aggregators.simple_text import SimpleTextAggregator
class SimpleTextAggregator(FrameProcessor):
"""Simple text frame aggregator for basic text accumulation.
Accumulates text frames into larger units without complex sentence
detection. Useful for basic text buffering and simple aggregation patterns.
Example:
# Basic text aggregation
aggregator = SimpleTextAggregator()
# Accumulates: "Hello", " ", "world"
# Outputs when threshold met or manually flushed
"""
def __init__(self, **kwargs):
"""Initialize simple text aggregator.
Args:
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.processors.aggregators.sentence import SentenceAggregator
class SentenceAggregator(FrameProcessor):
"""Aggregates text into sentences.
Accumulates text frames and emits complete sentences. Uses punctuation
and other heuristics to detect sentence boundaries.
Args:
aggregation_type: Type of aggregation (SENTENCE, WORD)
include_punctuation: Include punctuation in output
sentence_terminators: List of sentence-ending characters
Example:
# Aggregate into sentences
aggregator = SentenceAggregator()
# Input: "Hello", " world", ".", " How", " are", " you", "?"
# Output: "Hello world.", "How are you?"
"""
def __init__(
self,
aggregation_type: AggregationType = AggregationType.SENTENCE,
include_punctuation: bool = True,
sentence_terminators: List[str] = [".", "!", "?"],
**kwargs
):
"""Initialize sentence aggregator.
Args:
aggregation_type: Aggregation type
include_punctuation: Include punctuation
sentence_terminators: Sentence ending characters
**kwargs: Additional arguments
"""
pass{ .api }
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.frame_processor import FrameDirection
class GatedAggregator(FrameProcessor):
"""Accumulate frames with custom functions to start and stop accumulation.
Yields gate-opening frame before any accumulated frames, then ensuing frames
until and not including the gate-closed frame. The aggregator maintains an
internal gate state that controls whether frames are passed through immediately
or accumulated for later release.
Gate Behavior:
- When gate is OPEN: Frames pass through immediately, accumulated frames are released
- When gate is CLOSED: Frames are accumulated in buffer
- SystemFrames always pass through regardless of gate state
- Only frames matching the specified direction are gated
Args:
gate_open_fn: Function that returns True when a frame should open the gate
gate_close_fn: Function that returns True when a frame should close the gate
start_open: Whether the gate should start in the open state
direction: The frame direction this aggregator operates on (default: DOWNSTREAM)
Example:
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.frame_processor import FrameDirection
# Gate opens on wake word, closes on goodbye
def should_open(frame):
if isinstance(frame, TranscriptionFrame):
return "hello" in frame.text.lower()
return False
def should_close(frame):
if isinstance(frame, TranscriptionFrame):
return "goodbye" in frame.text.lower()
return False
gated = GatedAggregator(
gate_open_fn=should_open,
gate_close_fn=should_close,
start_open=False,
direction=FrameDirection.DOWNSTREAM
)
# Frames held until "hello" detected, then all released
# Gate closes again when "goodbye" detected
"""
def __init__(
self,
gate_open_fn: Callable[[Frame], bool],
gate_close_fn: Callable[[Frame], bool],
start_open: bool,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
):
"""Initialize gated aggregator.
Args:
gate_open_fn: Function that returns True when a frame should open the gate
gate_close_fn: Function that returns True when a frame should close the gate
start_open: Whether the gate should start in the open state
direction: The frame direction this aggregator operates on
"""
pass{ .api }
from pipecat.processors.aggregators.user_response import UserResponseAggregator
class UserResponseAggregator(LLMUserAggregator):
"""DEPRECATED: Aggregates user responses into TextFrame objects.
.. deprecated:: 0.0.92
`UserResponseAggregator` is deprecated and will be removed in a future version.
Use LLMUserAggregator directly instead.
This aggregator extends LLMUserAggregator to specifically handle
user input by collecting text responses and outputting them as TextFrame
objects when the aggregation is complete.
Migration:
# Old way (deprecated)
user_agg = UserResponseAggregator()
# New way
from pipecat.processors.aggregators.llm_context import LLMUserAggregator, LLMContext
context = LLMContext()
user_agg = LLMUserAggregator(context=context)
Example:
# DEPRECATED - Do not use in new code
aggregator = UserResponseAggregator()
# Aggregates user speech into complete responses
# Emits TextFrame when aggregation complete
"""
def __init__(self, **kwargs):
"""Initialize the user response aggregator.
.. deprecated:: 0.0.92
Use LLMUserAggregator instead.
Args:
**kwargs: Additional arguments passed to parent LLMUserAggregator
"""
pass
async def push_aggregation(self):
"""Push the aggregated user response as a TextFrame.
Creates a TextFrame from the current aggregation if it contains content,
resets the aggregation state, and pushes the frame downstream.
"""
pass{ .api }
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator
class VisionImageFrameAggregator(FrameProcessor):
"""DEPRECATED: Aggregates consecutive text and image frames into vision frames.
.. deprecated:: 0.0.85
VisionImageRawFrame has been removed in favor of context frames
(LLMContextFrame or OpenAILLMContextFrame), so this aggregator is not
needed anymore. See the 12* examples for the new recommended pattern.
This aggregator waits for a consecutive TextFrame and an InputImageRawFrame.
After the InputImageRawFrame arrives it will output a VisionImageRawFrame
combining both the text and image data for multimodal processing.
Migration:
Use LLMContext with image content instead:
# Old way (deprecated)
vision_agg = VisionImageFrameAggregator()
# New way
from pipecat.processors.aggregators.llm_context import LLMContext
context = LLMContext()
context.add_message({
"role": "user",
"content": [
{"type": "text", "text": "What's in this image?"},
{"type": "image_url", "image_url": {"url": image_data}}
]
})
Example:
# DEPRECATED - Do not use in new code
vision_agg = VisionImageFrameAggregator()
# Aggregates: TextFrame("Describe this") + InputImageRawFrame(...)
# Output: OpenAILLMContextFrame with combined text and image
"""
def __init__(self):
"""Initialize the vision image frame aggregator.
The aggregator starts with no cached text, waiting for the first
TextFrame to arrive before it can create vision frames.
"""
pass{ .api }
from pipecat.processors.aggregators.dtmf_aggregator import DTMFAggregator
from pipecat.audio.dtmf.types import KeypadEntry
class DTMFAggregator(FrameProcessor):
"""Aggregates DTMF frames into meaningful sequences for LLM processing.
The aggregator accumulates digits from InputDTMFFrame instances and flushes when:
- Timeout occurs (configurable idle period)
- Termination digit is received (default: '#')
- EndFrame or CancelFrame is received
Emits TranscriptionFrame for compatibility with existing LLM context aggregators.
Frames Consumed:
- InputDTMFFrame: DTMF keypad input
- StartFrame: Begins aggregation task
- EndFrame/CancelFrame: Flushes and stops aggregation
Frames Produced:
- TranscriptionFrame: Contains DTMF sequence with prefix
Example:
from pipecat.processors.aggregators.dtmf_aggregator import DTMFAggregator
from pipecat.audio.dtmf.types import KeypadEntry
# Aggregate DTMF until # pressed or 2 second timeout
dtmf_agg = DTMFAggregator(
timeout=2.0,
termination_digit=KeypadEntry.POUND,
prefix="DTMF: "
)
# Use in phone-based pipeline
pipeline = Pipeline([
transport.input(),
dtmf_agg, # Convert DTMF to transcription
context_aggregator.user(),
llm,
tts,
transport.output()
])
# User presses: 1-2-3-4-#
# Output: TranscriptionFrame("DTMF: 1234")
"""
def __init__(
self,
timeout: float = 2.0,
termination_digit: KeypadEntry = KeypadEntry.POUND,
prefix: str = "DTMF: ",
**kwargs
):
"""Initialize DTMF aggregator.
Args:
timeout: Idle timeout in seconds before flushing (default: 2.0)
termination_digit: Digit that triggers immediate flush (default: KeypadEntry.POUND)
prefix: Prefix added to DTMF sequence in transcription (default: "DTMF: ")
**kwargs: Additional arguments passed to FrameProcessor
"""
passSpecialized processors for audio stream management.
{ .api }
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
class AudioBufferProcessor(FrameProcessor):
"""Processes and buffers audio frames from both input (user) and output (bot) sources.
This processor manages audio buffering and synchronization, providing both merged and
track-specific audio access through event handlers. It supports various audio configurations
including sample rate conversion and mono/stereo output.
Events:
- on_audio_data: Triggered when buffer_size is reached, providing merged audio
- on_track_audio_data: Triggered when buffer_size is reached, providing separate tracks
- on_user_turn_audio_data: Triggered when user turn has ended, providing that user turn's audio
- on_bot_turn_audio_data: Triggered when bot turn has ended, providing that bot turn's audio
Audio Handling:
- Mono output (num_channels=1): User and bot audio are mixed
- Stereo output (num_channels=2): User audio on left, bot audio on right
- Automatic resampling of incoming audio to match desired sample_rate
- Silence insertion for non-continuous audio streams
- Buffer synchronization between user and bot audio
Args:
sample_rate: Desired output sample rate. If None, uses source rate
num_channels: Number of channels (1 for mono, 2 for stereo). Defaults to 1
buffer_size: Size of buffer before triggering events. 0 for no buffering
enable_turn_audio: Whether turn audio event handlers should be triggered
Example:
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
# Create audio buffer with 16kHz, mono, 16KB buffer
audio_buffer = AudioBufferProcessor(
sample_rate=16000,
num_channels=1,
buffer_size=16384,
enable_turn_audio=True
)
# Handle buffered audio
@audio_buffer.event_handler("on_audio_data")
async def handle_audio(audio: bytes, sample_rate: int, num_channels: int):
# Process merged audio
save_to_file(audio)
# Handle separate tracks
@audio_buffer.event_handler("on_track_audio_data")
async def handle_tracks(user_audio: bytes, bot_audio: bytes, sample_rate: int, num_channels: int):
# Process user and bot audio separately
analyze_conversation(user_audio, bot_audio)
# Handle turn-based audio
@audio_buffer.event_handler("on_user_turn_audio_data")
async def handle_user_turn(audio: bytes, sample_rate: int, num_channels: int):
# Process complete user turn
transcribe_turn(audio)
# Use in pipeline
await audio_buffer.start_recording()
# ... process audio frames ...
await audio_buffer.stop_recording()
"""
def __init__(
self,
*,
sample_rate: Optional[int] = None,
num_channels: int = 1,
buffer_size: int = 0,
user_continuous_stream: Optional[bool] = None,
enable_turn_audio: bool = False,
**kwargs,
):
"""Initialize the audio buffer processor.
Args:
sample_rate: Desired output sample rate. If None, uses source rate
num_channels: Number of channels (1 for mono, 2 for stereo). Defaults to 1
buffer_size: Size of buffer before triggering events. 0 for no buffering
user_continuous_stream: DEPRECATED. No longer has any effect
enable_turn_audio: Whether turn audio event handlers should be triggered
**kwargs: Additional arguments passed to parent class
"""
pass
@property
def sample_rate(self) -> int:
"""Current sample rate of the audio processor."""
pass
@property
def num_channels(self) -> int:
"""Number of channels in the audio output."""
pass
def has_audio(self) -> bool:
"""Check if either user or bot audio buffers contain data."""
pass
def merge_audio_buffers(self) -> bytes:
"""Merge user and bot audio buffers into a single audio stream.
For mono output, audio is mixed. For stereo output, user audio is placed
on the left channel and bot audio on the right channel.
Returns:
Mixed audio data as bytes
"""
pass
async def start_recording(self):
"""Start recording audio from both user and bot.
Initializes recording state and resets audio buffers.
"""
pass
async def stop_recording(self):
"""Stop recording and trigger final audio data handlers.
Calls audio handlers with any remaining buffered audio before stopping.
"""
passProcessors for collecting and reporting performance metrics.
{ .api }
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.metrics.metrics import LLMTokenUsage
class FrameProcessorMetrics(BaseObject):
"""Metrics collection and reporting for frame processors.
Provides comprehensive metrics tracking for frame processing operations,
including timing measurements, resource usage, and performance analytics.
Supports TTFB tracking, processing duration metrics, and usage statistics
for LLM and TTS operations.
Metrics Types:
- TTFB (Time To First Byte): Time from request to first response
- Processing Duration: Total processing time for operations
- LLM Token Usage: Prompt and completion token counts
- TTS Character Usage: Character count for TTS operations
Example:
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.metrics.metrics import LLMTokenUsage
# Create metrics collector
metrics = FrameProcessorMetrics()
await metrics.setup(task_manager)
# Track TTFB
await metrics.start_ttfb_metrics(report_only_initial_ttfb=True)
# ... LLM processing ...
ttfb_frame = await metrics.stop_ttfb_metrics()
await push_frame(ttfb_frame)
# Track processing time
await metrics.start_processing_metrics()
# ... processing ...
processing_frame = await metrics.stop_processing_metrics()
await push_frame(processing_frame)
# Track LLM usage
tokens = LLMTokenUsage(
prompt_tokens=100,
completion_tokens=50,
total_tokens=150
)
usage_frame = await metrics.start_llm_usage_metrics(tokens)
await push_frame(usage_frame)
# Track TTS usage
tts_frame = await metrics.start_tts_usage_metrics("Hello world")
await push_frame(tts_frame)
"""
def __init__(self):
"""Initialize the frame processor metrics collector.
Sets up internal state for tracking various metrics including TTFB,
processing times, and usage statistics.
"""
pass
async def setup(self, task_manager: BaseTaskManager):
"""Set up the metrics collector with a task manager.
Args:
task_manager: The task manager for handling async operations
"""
pass
@property
def ttfb(self) -> Optional[float]:
"""Get the current TTFB value in seconds.
Returns:
The TTFB value in seconds, or None if not measured
"""
pass
def set_core_metrics_data(self, data: MetricsData):
"""Set the core metrics data for this collector.
Args:
data: The core metrics data containing processor and model information
"""
pass
def set_processor_name(self, name: str):
"""Set the processor name for metrics reporting.
Args:
name: The name of the processor to use in metrics
"""
pass
async def start_ttfb_metrics(self, report_only_initial_ttfb: bool):
"""Start measuring time-to-first-byte (TTFB).
Args:
report_only_initial_ttfb: Whether to report only the first TTFB measurement
"""
pass
async def stop_ttfb_metrics(self) -> Optional[MetricsFrame]:
"""Stop TTFB measurement and generate metrics frame.
Returns:
MetricsFrame containing TTFB data, or None if not measuring
"""
pass
async def start_processing_metrics(self):
"""Start measuring processing time."""
pass
async def stop_processing_metrics(self) -> Optional[MetricsFrame]:
"""Stop processing time measurement and generate metrics frame.
Returns:
MetricsFrame containing processing duration data, or None if not measuring
"""
pass
async def start_llm_usage_metrics(self, tokens: LLMTokenUsage) -> MetricsFrame:
"""Record LLM token usage metrics.
Args:
tokens: Token usage information including prompt and completion tokens
Returns:
MetricsFrame containing LLM usage data
"""
pass
async def start_tts_usage_metrics(self, text: str) -> MetricsFrame:
"""Record TTS character usage metrics.
Args:
text: The text being processed by TTS
Returns:
MetricsFrame containing TTS usage data
"""
pass{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame, AudioRawFrame
class CustomProcessor(FrameProcessor):
"""Template for custom processors."""
def __init__(self, config_value: str, **kwargs):
super().__init__(**kwargs)
self._config = config_value
self._state = {}
async def start(self):
"""Initialize resources."""
await super().start()
# Open connections, load models, etc.
self._state = {}
async def process_frame(self, frame, direction):
"""Process frames."""
# 1. Check frame type
if isinstance(frame, TextFrame):
# 2. Process frame
processed_text = self._process_text(frame.text)
# 3. Create new frame or modify existing
frame.text = processed_text
# 4. Optionally emit additional frames
# await self.push_frame(AnotherFrame(...))
# 5. Always push frames (unless filtering)
await self.push_frame(frame, direction)
async def stop(self):
"""Clean up resources."""
# Close connections, save state, etc.
self._state.clear()
await super().stop()
async def interrupt(self):
"""Handle interruptions."""
# Cancel current operations
await super().interrupt()
def _process_text(self, text: str) -> str:
"""Internal processing method."""
return text.upper(){ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.filters.function_filter import FunctionFilter
# Chain multiple filters
pipeline = Pipeline([
# 1. Wake word filter
WakeCheckFilter(wake_words=["hey bot"]),
# 2. Custom filter
FunctionFilter(
filter_func=lambda frame, _: not frame.metadata.get("spam")
),
# 3. Processing
MyProcessor(),
]){ .api }
from pipecat.processors.aggregators.sentence_aggregator import SentenceAggregator
from pipecat.services.openai import OpenAITTSService
# Aggregate sentences before TTS
pipeline = Pipeline([
llm_service, # Produces TextFrames with words
SentenceAggregator(), # Aggregates into sentences
tts_service, # Synthesizes complete sentences
transport.output()
])
# Benefits:
# - More natural TTS output
# - Fewer TTS API calls
# - Better prosody{ .api }
class StatefulProcessor(FrameProcessor):
"""Process frames with state."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._counter = 0
self._history = []
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
# Update state
self._counter += 1
self._history.append(frame.text)
# Keep history bounded
if len(self._history) > 100:
self._history.pop(0)
# Add state to metadata
frame.metadata["count"] = self._counter
frame.metadata["history_len"] = len(self._history)
await self.push_frame(frame, direction){ .api }
async def heartbeat_generator():
"""Generate periodic heartbeat frames."""
while True:
await asyncio.sleep(5.0)
yield HeartbeatFrame()
# Add to pipeline
heartbeat = AsyncGeneratorProcessor(generator=heartbeat_generator())
pipeline = Pipeline([processor1, heartbeat, processor2]){ .api }
# Good: Always push frames
async def process_frame(self, frame, direction):
# Process frame
if isinstance(frame, TextFrame):
frame.text = frame.text.upper()
# Push to next processor
await self.push_frame(frame, direction)
# Bad: Forgetting to push
async def process_frame(self, frame, direction):
# Process frame
if isinstance(frame, TextFrame):
frame.text = frame.text.upper()
# Frame is lost! Forgot to push{ .api }
# Good: Specific type checks
async def process_frame(self, frame, direction):
if isinstance(frame, TextFrame):
# Handle text
pass
elif isinstance(frame, AudioRawFrame):
# Handle audio
pass
# Always push
await self.push_frame(frame, direction)
# Bad: Generic handling
async def process_frame(self, frame, direction):
if frame.name.startswith("Text"): # Fragile!
pass{ .api }
class ResourceProcessor(FrameProcessor):
"""Always clean up resources."""
async def start(self):
await super().start()
self._connection = await open_connection()
self._file = open("data.txt", "w")
async def stop(self):
# Clean up in reverse order
if self._file:
self._file.close()
if self._connection:
await self._connection.close()
await super().stop()
async def cancel(self):
# Same cleanup for cancel
await self.stop()
await super().cancel(){ .api }
class MonitoringProcessor(FrameProcessor):
"""Use events for monitoring."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._frame_count = 0
async def process_frame(self, frame, direction):
self._frame_count += 1
# Emit event for monitoring
await self._emit_event("on_frame_processed", count=self._frame_count)
await self.push_frame(frame, direction)
# Use events
processor = MonitoringProcessor()
@processor.event_handler("on_frame_processed")
async def log_count(count: int):
if count % 100 == 0:
print(f"Processed {count} frames")