or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

core-processors.mddocs/processors/

Core Processors

Core processors provide the fundamental building blocks for frame processing pipelines. This includes the base processor interface, basic processor types, frame filters, and text/audio aggregators.

Base Processor

The foundation for all frame processors.

FrameProcessor

{ .api }
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection

class FrameProcessor(BaseObject):
    """Base class for all frame processors.

    Processors are the building blocks of pipelines. They receive frames,
    transform/filter/generate frames, and push frames to the next processor.

    Key Lifecycle:
    1. Construction (__init__)
    2. Linking (link)
    3. Start (start)
    4. Processing (process_frame - repeated)
    5. Interruption (interrupt - optional, repeated)
    6. Stop/Cancel (stop or cancel)

    Attributes:
        name (str): Processor name for debugging
        id (int): Unique processor identifier
        interruptions_enabled (bool): Whether processor can be interrupted

    Methods:
        process_frame(frame, direction): Process a single frame
        push_frame(frame, direction): Push frame to next processor
        queue_frame(frame, direction): Queue frame for processing
        start(): Initialize processor
        stop(): Gracefully shutdown
        cancel(): Immediate shutdown
        link(processor): Connect to downstream processor
        interrupt(): Handle interruption event
        register_event_handler(event_name): Register event handler
    """

    def __init__(self, name: Optional[str] = None, **kwargs):
        """Initialize processor.

        Args:
            name: Optional processor name for debugging
            **kwargs: Additional configuration passed to subclasses
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process a frame.

        Override this method to implement custom processing logic.

        Args:
            frame: The frame to process
            direction: Direction of frame flow (DOWNSTREAM or UPSTREAM)

        Example:
            async def process_frame(self, frame, direction):
                if isinstance(frame, TextFrame):
                    # Transform text
                    frame.text = frame.text.upper()
                # Always push frames downstream
                await self.push_frame(frame, direction)
        """
        # Default: pass frame through
        await self.push_frame(frame, direction)

    async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
        """Push frame to the next processor.

        Args:
            frame: Frame to push
            direction: Direction of frame flow
        """
        pass

    async def queue_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
        """Queue frame for processing.

        Args:
            frame: Frame to queue
            direction: Direction of frame flow
        """
        pass

    async def start(self):
        """Start the processor.

        Called when pipeline starts. Use for initialization like:
        - Opening connections
        - Loading models
        - Starting background tasks

        Example:
            async def start(self):
                await super().start()
                self._connection = await connect_to_service()
        """
        pass

    async def stop(self):
        """Stop the processor gracefully.

        Called when pipeline stops normally. Use for cleanup like:
        - Closing connections
        - Saving state
        - Stopping background tasks

        Example:
            async def stop(self):
                await self._connection.close()
                await super().stop()
        """
        pass

    async def cancel(self):
        """Cancel the processor immediately.

        Called when pipeline is cancelled or errors occur.
        Similar to stop() but expects faster cleanup.

        Example:
            async def cancel(self):
                self._cancelled = True
                await super().cancel()
        """
        pass

    def link(self, processor: "FrameProcessor"):
        """Link to downstream processor.

        Args:
            processor: Downstream processor to link

        Example:
            processor1.link(processor2)
            # processor1 -> processor2
        """
        pass

    async def interrupt(self):
        """Handle interruption event.

        Called when user interrupts (e.g., starts speaking during bot output).
        Cancel current operations and clear queues.

        Example:
            async def interrupt(self):
                self._current_operation_cancelled = True
                await super().interrupt()
        """
        pass

    @property
    def interruptions_enabled(self) -> bool:
        """Whether processor can be interrupted.

        Returns:
            True if processor can be interrupted
        """
        return True

    @property
    def name(self) -> str:
        """Processor name."""
        pass

    @property
    def id(self) -> int:
        """Unique processor ID."""
        pass

FrameDirection

{ .api }
from pipecat.processors.frame_processor import FrameDirection
from enum import Enum

class FrameDirection(Enum):
    """Direction of frame flow through pipeline.

    Attributes:
        DOWNSTREAM: From input toward output (normal flow)
        UPSTREAM: From output toward input (reverse flow)

    Example:
        # Push downstream (normal)
        await self.push_frame(frame, FrameDirection.DOWNSTREAM)

        # Push upstream (reverse)
        await self.push_frame(frame, FrameDirection.UPSTREAM)
    """
    DOWNSTREAM = "downstream"
    UPSTREAM = "upstream"

Basic Processors

Simple processor implementations for common patterns.

ConsumerProcessor

{ .api }
from pipecat.processors.consumer_processor import ConsumerProcessor
from pipecat.processors.producer_processor import ProducerProcessor, identity_transformer

class ConsumerProcessor(FrameProcessor):
    """Frame processor that consumes frames from a ProducerProcessor's queue.

    This processor passes through frames normally while also consuming frames
    from a ProducerProcessor's queue. When frames are received from the producer
    queue, they are optionally transformed and pushed in the specified direction.

    Use Case:
        - Dual pipeline processing (main + side processing)
        - Fan-out patterns from ProducerProcessor
        - Branching frame flows
        - Parallel processing of filtered frames

    Args:
        producer: The producer processor to consume frames from
        transformer: Function to transform frames before pushing (default: identity)
        direction: Direction to push consumed frames (default: DOWNSTREAM)

    Example:
        from pipecat.processors.consumer_processor import ConsumerProcessor
        from pipecat.processors.producer_processor import ProducerProcessor

        # Create producer that filters TextFrames
        async def text_filter(frame):
            return isinstance(frame, TextFrame)

        producer = ProducerProcessor(
            filter=text_filter,
            passthrough=True  # Continue main pipeline
        )

        # Create consumer to process filtered frames
        async def uppercase_transform(frame):
            if isinstance(frame, TextFrame):
                frame.text = frame.text.upper()
            return frame

        consumer = ConsumerProcessor(
            producer=producer,
            transformer=uppercase_transform,
            direction=FrameDirection.DOWNSTREAM
        )

        # Use in pipeline
        pipeline = Pipeline([
            transport.input(),
            producer,    # Filters and distributes TextFrames
            llm,         # Main pipeline continues
            consumer,    # Receives filtered frames from producer
            tts,
            transport.output()
        ])
    """

    def __init__(
        self,
        *,
        producer: ProducerProcessor,
        transformer: Callable[[Frame], Awaitable[Frame]] = identity_transformer,
        direction: FrameDirection = FrameDirection.DOWNSTREAM,
        **kwargs,
    ):
        """Initialize the consumer processor.

        Args:
            producer: The producer processor to consume frames from
            transformer: Function to transform frames before pushing
            direction: Direction to push consumed frames
            **kwargs: Additional arguments passed to parent class
        """
        pass

ProducerProcessor

{ .api }
from pipecat.processors.producer_processor import ProducerProcessor, identity_transformer

async def identity_transformer(frame: Frame):
    """Default transformer that returns the frame unchanged."""
    return frame


class ProducerProcessor(FrameProcessor):
    """A processor that filters frames and distributes them to multiple consumers.

    This processor receives frames, applies a filter to determine which frames
    should be sent to consumers (ConsumerProcessor), optionally transforms those
    frames, and distributes them to registered consumer queues. It can also pass
    frames through to the next processor in the pipeline.

    Use Cases:
        - Fan-out patterns (one producer, multiple consumers)
        - Frame filtering and distribution
        - Parallel processing branches
        - Side-channel frame processing

    Args:
        filter: Async function that determines if a frame should be produced.
               Must return True for frames to be sent to consumers
        transformer: Async function to transform frames before sending to consumers
        passthrough: Whether to pass frames through to the next processor

    Example:
        from pipecat.processors.producer_processor import ProducerProcessor
        from pipecat.processors.consumer_processor import ConsumerProcessor

        # Create producer that filters and distributes audio frames
        async def audio_filter(frame):
            return isinstance(frame, AudioRawFrame)

        async def audio_transform(frame):
            # Apply audio processing
            frame.audio = apply_noise_reduction(frame.audio)
            return frame

        producer = ProducerProcessor(
            filter=audio_filter,
            transformer=audio_transform,
            passthrough=True  # Continue main pipeline
        )

        # Create consumers
        consumer1 = ConsumerProcessor(producer=producer)
        consumer2 = ConsumerProcessor(producer=producer)

        # Both consumers receive filtered audio frames
        # Main pipeline also continues with original frames
    """

    def __init__(
        self,
        *,
        filter: Callable[[Frame], Awaitable[bool]],
        transformer: Callable[[Frame], Awaitable[Frame]] = identity_transformer,
        passthrough: bool = True,
    ):
        """Initialize producer.

        Args:
            filter: Async function that determines if a frame should be produced
            transformer: Async function to transform frames before sending to consumers
            passthrough: Whether to pass frames through to the next processor
        """
        pass

    def add_consumer(self) -> asyncio.Queue:
        """Add a new consumer and return its associated queue.

        Returns:
            asyncio.Queue: The queue for the newly added consumer
        """
        pass

AsyncGeneratorProcessor

{ .api }
from pipecat.processors.async_generator import AsyncGeneratorProcessor
from pipecat.serializers.base_serializer import FrameSerializer

class AsyncGeneratorProcessor(FrameProcessor):
    """A frame processor that serializes frames and provides them via async generator.

    This processor passes frames through unchanged while simultaneously serializing
    them and making the serialized data available through an async generator interface.
    Useful for streaming frame data to external consumers while maintaining the
    normal frame processing pipeline.

    Use Cases:
        - Streaming frame data to external systems
        - Real-time frame monitoring
        - Frame data export/logging
        - WebSocket frame streaming

    Args:
        serializer: The frame serializer to use for converting frames to data

    Example:
        from pipecat.processors.async_generator import AsyncGeneratorProcessor
        from pipecat.serializers.protobuf import ProtobufFrameSerializer

        # Create processor with serializer
        serializer = ProtobufFrameSerializer()
        async_gen = AsyncGeneratorProcessor(serializer=serializer)

        # Use in pipeline
        pipeline = Pipeline([
            transport.input(),
            stt,
            async_gen,  # Serializes frames for external consumption
            llm,
            tts,
            transport.output()
        ])

        # Consume serialized data externally
        async def stream_frames():
            async for data in async_gen.generator():
                # Send serialized frame data to external system
                await websocket.send(data)

        # Start streaming task
        asyncio.create_task(stream_frames())
    """

    def __init__(self, *, serializer: FrameSerializer, **kwargs):
        """Initialize async generator processor.

        Args:
            serializer: The frame serializer to use for converting frames to data
            **kwargs: Additional arguments passed to parent class
        """
        pass

    async def generator(self) -> AsyncGenerator[Any, None]:
        """Generate serialized frame data asynchronously.

        Yields:
            Serialized frame data from the internal queue until a termination
            signal (None) is received
        """
        pass

FrameLogger

{ .api }
from pipecat.processors.frame_processor import FrameLogger

class FrameLogger(FrameProcessor):
    """Logs frames for debugging.

    Logs all frames passing through for debugging and monitoring.

    Args:
        name: Logger name
        level: Logging level (DEBUG, INFO, etc.)
        include_pts: Include presentation timestamp in logs

    Example:
        logger = FrameLogger(name="pipeline", level="DEBUG")
        pipeline = Pipeline([
            processor1,
            logger,  # Log frames between processors
            processor2
        ])
    """

    def __init__(
        self,
        name: Optional[str] = None,
        level: str = "DEBUG",
        include_pts: bool = False,
        **kwargs
    ):
        """Initialize frame logger.

        Args:
            name: Logger name
            level: Logging level
            include_pts: Include timestamp in logs
            **kwargs: Additional arguments
        """
        pass

StatelessTextTransformer

{ .api }
from pipecat.processors.frame_processor import StatelessTextTransformer

class StatelessTextTransformer(FrameProcessor):
    """Transforms text frames without state.

    Applies a transformation function to text frames. Stateless means
    each frame is processed independently.

    Args:
        transform: Function to transform text (str -> str)

    Example:
        # Uppercase transformer
        uppercase = StatelessTextTransformer(
            transform=lambda text: text.upper()
        )

        # Remove punctuation
        no_punct = StatelessTextTransformer(
            transform=lambda text: text.replace(".", "").replace(",", "")
        )
    """

    def __init__(self, transform: Callable[[str], str], **kwargs):
        """Initialize text transformer.

        Args:
            transform: Text transformation function
            **kwargs: Additional arguments
        """
        pass

    async def process_frame(self, frame, direction):
        """Transform text frames.

        Args:
            frame: Frame to process
            direction: Frame direction
        """
        pass

IdleFrameProcessor

{ .api }
from pipecat.processors.frame_processor import IdleFrameProcessor

class IdleFrameProcessor(FrameProcessor):
    """Handles idle timeouts.

    Emits frames after periods of inactivity. Useful for detecting
    when no frames are flowing.

    Args:
        timeout_secs: Idle timeout in seconds
        callback: Optional callback on idle

    Example:
        async def on_idle():
            print("Pipeline is idle!")

        idle = IdleFrameProcessor(
            timeout_secs=5.0,
            callback=on_idle
        )
    """

    def __init__(
        self,
        timeout_secs: float = 5.0,
        callback: Optional[Callable] = None,
        **kwargs
    ):
        """Initialize idle processor.

        Args:
            timeout_secs: Idle timeout in seconds
            callback: Callback on idle
            **kwargs: Additional arguments
        """
        pass

UserIdleProcessor

{ .api }
from pipecat.processors.frame_processor import UserIdleProcessor

class UserIdleProcessor(FrameProcessor):
    """Tracks user idle state.

    Emits events when user has been idle for a specified duration.
    Resets on user input frames.

    Args:
        timeout_secs: User idle timeout in seconds

    Example:
        # Emit event if user idle for 30 seconds
        user_idle = UserIdleProcessor(timeout_secs=30.0)

        @user_idle.event_handler("on_user_idle")
        async def handle_idle():
            print("User has been idle for 30 seconds")
            # Prompt user, end conversation, etc.
    """

    def __init__(self, timeout_secs: float = 30.0, **kwargs):
        """Initialize user idle processor.

        Args:
            timeout_secs: User idle timeout in seconds
            **kwargs: Additional arguments
        """
        pass

Frame Filters

Processors that filter or gate frame flow.

FrameFilter

{ .api }
from pipecat.processors.filters.frame_filter import FrameFilter

class FrameFilter(FrameProcessor):
    """Base class for frame filters.

    Filters selectively pass or drop frames based on criteria.

    Example:
        class TextOnlyFilter(FrameFilter):
            async def process_frame(self, frame, direction):
                if isinstance(frame, TextFrame):
                    await self.push_frame(frame, direction)
                # Other frames are dropped
    """
    pass

IdentityFilter

{ .api }
from pipecat.processors.filters.identity_filter import IdentityFilter

class IdentityFilter(FrameProcessor):
    """Pass-through filter.

    Passes all frames unchanged. Useful as a placeholder or for testing.

    Example:
        filter = IdentityFilter()
        # All frames pass through unchanged
    """
    pass

NullFilter

{ .api }
from pipecat.processors.filters.null_filter import NullFilter

class NullFilter(FrameProcessor):
    """Drops all frames.

    Consumes all frames without passing them downstream.
    Useful for testing or temporarily disabling pipeline sections.

    Example:
        filter = NullFilter()
        # All frames are dropped
    """
    pass

FunctionFilter

{ .api }
from pipecat.processors.filters.function_filter import FunctionFilter

class FunctionFilter(FrameProcessor):
    """Custom filtering logic.

    Applies a user-defined function to decide whether to pass frames.

    Args:
        filter_func: Function taking (frame, direction) returning bool
                     True = pass frame, False = drop frame

    Example:
        # Pass only TextFrames
        text_filter = FunctionFilter(
            filter_func=lambda frame, direction: isinstance(frame, TextFrame)
        )

        # Pass frames with specific metadata
        meta_filter = FunctionFilter(
            filter_func=lambda frame, direction: frame.metadata.get("priority") == "high"
        )
    """

    def __init__(self, filter_func: Callable[[Frame, FrameDirection], bool], **kwargs):
        """Initialize function filter.

        Args:
            filter_func: Filter function returning bool
            **kwargs: Additional arguments
        """
        pass

WakeCheckFilter

{ .api }
from pipecat.processors.filters.wake_check_filter import WakeCheckFilter

class WakeCheckFilter(FrameProcessor):
    """Wake word detection filter.

    Checks transcribed text for wake words/phrases. Passes frames
    only after wake word is detected.

    Args:
        wake_words: List of wake words/phrases
        case_sensitive: Whether matching is case-sensitive

    Example:
        # Listen for "hey assistant"
        wake_filter = WakeCheckFilter(
            wake_words=["hey assistant", "hello assistant"],
            case_sensitive=False
        )

        # After wake word detected, all frames pass through
    """

    def __init__(
        self,
        wake_words: List[str],
        case_sensitive: bool = False,
        **kwargs
    ):
        """Initialize wake word filter.

        Args:
            wake_words: List of wake words/phrases
            case_sensitive: Case-sensitive matching
            **kwargs: Additional arguments
        """
        pass

WakeNotifierFilter

{ .api }
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter

class WakeNotifierFilter(FrameProcessor):
    """Wake word notification filter.

    Similar to WakeCheckFilter but emits notification events
    when wake words are detected.

    Args:
        wake_words: List of wake words/phrases
        case_sensitive: Whether matching is case-sensitive

    Example:
        wake_notifier = WakeNotifierFilter(
            wake_words=["hey bot"],
            case_sensitive=False
        )

        @wake_notifier.event_handler("on_wake_word_detected")
        async def on_wake(word: str):
            print(f"Wake word detected: {word}")
    """

    def __init__(
        self,
        wake_words: List[str],
        case_sensitive: bool = False,
        **kwargs
    ):
        """Initialize wake notifier filter.

        Args:
            wake_words: List of wake words/phrases
            case_sensitive: Case-sensitive matching
            **kwargs: Additional arguments
        """
        pass

STTMuteFilter

{ .api }
from pipecat.processors.filters.stt_mute_filter import STTMuteFilter, STTMuteStrategy, STTMuteConfig

class STTMuteStrategy(Enum):
    """STT muting strategies.

    Attributes:
        FIRST_SPEECH: Mute STT after first speech detected
        CUSTOM: Custom muting logic via callback
    """
    FIRST_SPEECH = "first_speech"
    CUSTOM = "custom"


class STTMuteConfig:
    """Configuration for STT mute filter.

    Attributes:
        strategy (STTMuteStrategy): Muting strategy
        custom_mute_callback (Optional[Callable]): Custom callback for muting
    """

    def __init__(
        self,
        strategy: STTMuteStrategy = STTMuteStrategy.FIRST_SPEECH,
        custom_mute_callback: Optional[Callable] = None
    ):
        """Initialize STT mute config.

        Args:
            strategy: Muting strategy
            custom_mute_callback: Custom callback returning bool (should mute)
        """
        pass


class STTMuteFilter(FrameProcessor):
    """STT muting with strategies.

    Mutes Speech-to-Text based on configurable strategies. Useful for
    scenarios where you want to stop listening after certain conditions.

    Args:
        config: STT mute configuration

    Example:
        # Mute after first speech
        mute_filter = STTMuteFilter(
            config=STTMuteConfig(strategy=STTMuteStrategy.FIRST_SPEECH)
        )

        # Custom muting logic
        async def should_mute(frame):
            # Mute if user said "goodbye"
            if isinstance(frame, TranscriptionFrame):
                return "goodbye" in frame.text.lower()
            return False

        custom_mute = STTMuteFilter(
            config=STTMuteConfig(
                strategy=STTMuteStrategy.CUSTOM,
                custom_mute_callback=should_mute
            )
        )
    """

    def __init__(self, config: STTMuteConfig = None, **kwargs):
        """Initialize STT mute filter.

        Args:
            config: Mute configuration
            **kwargs: Additional arguments
        """
        pass

    def mute(self):
        """Manually mute STT."""
        pass

    def unmute(self):
        """Manually unmute STT."""
        pass

Text Aggregators

Processors that aggregate text into larger units.

SimpleTextAggregator

{ .api }
from pipecat.processors.aggregators.simple_text import SimpleTextAggregator

class SimpleTextAggregator(FrameProcessor):
    """Simple text frame aggregator for basic text accumulation.

    Accumulates text frames into larger units without complex sentence
    detection. Useful for basic text buffering and simple aggregation patterns.

    Example:
        # Basic text aggregation
        aggregator = SimpleTextAggregator()

        # Accumulates: "Hello", " ", "world"
        # Outputs when threshold met or manually flushed
    """

    def __init__(self, **kwargs):
        """Initialize simple text aggregator.

        Args:
            **kwargs: Additional arguments
        """
        pass

SentenceAggregator

{ .api }
from pipecat.processors.aggregators.sentence import SentenceAggregator

class SentenceAggregator(FrameProcessor):
    """Aggregates text into sentences.

    Accumulates text frames and emits complete sentences. Uses punctuation
    and other heuristics to detect sentence boundaries.

    Args:
        aggregation_type: Type of aggregation (SENTENCE, WORD)
        include_punctuation: Include punctuation in output
        sentence_terminators: List of sentence-ending characters

    Example:
        # Aggregate into sentences
        aggregator = SentenceAggregator()

        # Input: "Hello", " world", ".", " How", " are", " you", "?"
        # Output: "Hello world.", "How are you?"
    """

    def __init__(
        self,
        aggregation_type: AggregationType = AggregationType.SENTENCE,
        include_punctuation: bool = True,
        sentence_terminators: List[str] = [".", "!", "?"],
        **kwargs
    ):
        """Initialize sentence aggregator.

        Args:
            aggregation_type: Aggregation type
            include_punctuation: Include punctuation
            sentence_terminators: Sentence ending characters
            **kwargs: Additional arguments
        """
        pass

GatedAggregator

{ .api }
from pipecat.processors.aggregators.gated import GatedAggregator
from pipecat.processors.frame_processor import FrameDirection

class GatedAggregator(FrameProcessor):
    """Accumulate frames with custom functions to start and stop accumulation.

    Yields gate-opening frame before any accumulated frames, then ensuing frames
    until and not including the gate-closed frame. The aggregator maintains an
    internal gate state that controls whether frames are passed through immediately
    or accumulated for later release.

    Gate Behavior:
    - When gate is OPEN: Frames pass through immediately, accumulated frames are released
    - When gate is CLOSED: Frames are accumulated in buffer
    - SystemFrames always pass through regardless of gate state
    - Only frames matching the specified direction are gated

    Args:
        gate_open_fn: Function that returns True when a frame should open the gate
        gate_close_fn: Function that returns True when a frame should close the gate
        start_open: Whether the gate should start in the open state
        direction: The frame direction this aggregator operates on (default: DOWNSTREAM)

    Example:
        from pipecat.processors.aggregators.gated import GatedAggregator
        from pipecat.processors.frame_processor import FrameDirection

        # Gate opens on wake word, closes on goodbye
        def should_open(frame):
            if isinstance(frame, TranscriptionFrame):
                return "hello" in frame.text.lower()
            return False

        def should_close(frame):
            if isinstance(frame, TranscriptionFrame):
                return "goodbye" in frame.text.lower()
            return False

        gated = GatedAggregator(
            gate_open_fn=should_open,
            gate_close_fn=should_close,
            start_open=False,
            direction=FrameDirection.DOWNSTREAM
        )

        # Frames held until "hello" detected, then all released
        # Gate closes again when "goodbye" detected
    """

    def __init__(
        self,
        gate_open_fn: Callable[[Frame], bool],
        gate_close_fn: Callable[[Frame], bool],
        start_open: bool,
        direction: FrameDirection = FrameDirection.DOWNSTREAM,
    ):
        """Initialize gated aggregator.

        Args:
            gate_open_fn: Function that returns True when a frame should open the gate
            gate_close_fn: Function that returns True when a frame should close the gate
            start_open: Whether the gate should start in the open state
            direction: The frame direction this aggregator operates on
        """
        pass

UserResponseAggregator

{ .api }
from pipecat.processors.aggregators.user_response import UserResponseAggregator

class UserResponseAggregator(LLMUserAggregator):
    """DEPRECATED: Aggregates user responses into TextFrame objects.

    .. deprecated:: 0.0.92
        `UserResponseAggregator` is deprecated and will be removed in a future version.
        Use LLMUserAggregator directly instead.

    This aggregator extends LLMUserAggregator to specifically handle
    user input by collecting text responses and outputting them as TextFrame
    objects when the aggregation is complete.

    Migration:
        # Old way (deprecated)
        user_agg = UserResponseAggregator()

        # New way
        from pipecat.processors.aggregators.llm_context import LLMUserAggregator, LLMContext

        context = LLMContext()
        user_agg = LLMUserAggregator(context=context)

    Example:
        # DEPRECATED - Do not use in new code
        aggregator = UserResponseAggregator()

        # Aggregates user speech into complete responses
        # Emits TextFrame when aggregation complete
    """

    def __init__(self, **kwargs):
        """Initialize the user response aggregator.

        .. deprecated:: 0.0.92
            Use LLMUserAggregator instead.

        Args:
            **kwargs: Additional arguments passed to parent LLMUserAggregator
        """
        pass

    async def push_aggregation(self):
        """Push the aggregated user response as a TextFrame.

        Creates a TextFrame from the current aggregation if it contains content,
        resets the aggregation state, and pushes the frame downstream.
        """
        pass

VisionImageFrameAggregator

{ .api }
from pipecat.processors.aggregators.vision_image_frame import VisionImageFrameAggregator

class VisionImageFrameAggregator(FrameProcessor):
    """DEPRECATED: Aggregates consecutive text and image frames into vision frames.

    .. deprecated:: 0.0.85
        VisionImageRawFrame has been removed in favor of context frames
        (LLMContextFrame or OpenAILLMContextFrame), so this aggregator is not
        needed anymore. See the 12* examples for the new recommended pattern.

    This aggregator waits for a consecutive TextFrame and an InputImageRawFrame.
    After the InputImageRawFrame arrives it will output a VisionImageRawFrame
    combining both the text and image data for multimodal processing.

    Migration:
        Use LLMContext with image content instead:

        # Old way (deprecated)
        vision_agg = VisionImageFrameAggregator()

        # New way
        from pipecat.processors.aggregators.llm_context import LLMContext

        context = LLMContext()
        context.add_message({
            "role": "user",
            "content": [
                {"type": "text", "text": "What's in this image?"},
                {"type": "image_url", "image_url": {"url": image_data}}
            ]
        })

    Example:
        # DEPRECATED - Do not use in new code
        vision_agg = VisionImageFrameAggregator()

        # Aggregates: TextFrame("Describe this") + InputImageRawFrame(...)
        # Output: OpenAILLMContextFrame with combined text and image
    """

    def __init__(self):
        """Initialize the vision image frame aggregator.

        The aggregator starts with no cached text, waiting for the first
        TextFrame to arrive before it can create vision frames.
        """
        pass

DTMFAggregator

{ .api }
from pipecat.processors.aggregators.dtmf_aggregator import DTMFAggregator
from pipecat.audio.dtmf.types import KeypadEntry

class DTMFAggregator(FrameProcessor):
    """Aggregates DTMF frames into meaningful sequences for LLM processing.

    The aggregator accumulates digits from InputDTMFFrame instances and flushes when:
    - Timeout occurs (configurable idle period)
    - Termination digit is received (default: '#')
    - EndFrame or CancelFrame is received

    Emits TranscriptionFrame for compatibility with existing LLM context aggregators.

    Frames Consumed:
        - InputDTMFFrame: DTMF keypad input
        - StartFrame: Begins aggregation task
        - EndFrame/CancelFrame: Flushes and stops aggregation

    Frames Produced:
        - TranscriptionFrame: Contains DTMF sequence with prefix

    Example:
        from pipecat.processors.aggregators.dtmf_aggregator import DTMFAggregator
        from pipecat.audio.dtmf.types import KeypadEntry

        # Aggregate DTMF until # pressed or 2 second timeout
        dtmf_agg = DTMFAggregator(
            timeout=2.0,
            termination_digit=KeypadEntry.POUND,
            prefix="DTMF: "
        )

        # Use in phone-based pipeline
        pipeline = Pipeline([
            transport.input(),
            dtmf_agg,              # Convert DTMF to transcription
            context_aggregator.user(),
            llm,
            tts,
            transport.output()
        ])

        # User presses: 1-2-3-4-#
        # Output: TranscriptionFrame("DTMF: 1234")
    """

    def __init__(
        self,
        timeout: float = 2.0,
        termination_digit: KeypadEntry = KeypadEntry.POUND,
        prefix: str = "DTMF: ",
        **kwargs
    ):
        """Initialize DTMF aggregator.

        Args:
            timeout: Idle timeout in seconds before flushing (default: 2.0)
            termination_digit: Digit that triggers immediate flush (default: KeypadEntry.POUND)
            prefix: Prefix added to DTMF sequence in transcription (default: "DTMF: ")
            **kwargs: Additional arguments passed to FrameProcessor
        """
        pass

Audio Processors

Specialized processors for audio stream management.

AudioBufferProcessor

{ .api }
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor

class AudioBufferProcessor(FrameProcessor):
    """Processes and buffers audio frames from both input (user) and output (bot) sources.

    This processor manages audio buffering and synchronization, providing both merged and
    track-specific audio access through event handlers. It supports various audio configurations
    including sample rate conversion and mono/stereo output.

    Events:
        - on_audio_data: Triggered when buffer_size is reached, providing merged audio
        - on_track_audio_data: Triggered when buffer_size is reached, providing separate tracks
        - on_user_turn_audio_data: Triggered when user turn has ended, providing that user turn's audio
        - on_bot_turn_audio_data: Triggered when bot turn has ended, providing that bot turn's audio

    Audio Handling:
        - Mono output (num_channels=1): User and bot audio are mixed
        - Stereo output (num_channels=2): User audio on left, bot audio on right
        - Automatic resampling of incoming audio to match desired sample_rate
        - Silence insertion for non-continuous audio streams
        - Buffer synchronization between user and bot audio

    Args:
        sample_rate: Desired output sample rate. If None, uses source rate
        num_channels: Number of channels (1 for mono, 2 for stereo). Defaults to 1
        buffer_size: Size of buffer before triggering events. 0 for no buffering
        enable_turn_audio: Whether turn audio event handlers should be triggered

    Example:
        from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor

        # Create audio buffer with 16kHz, mono, 16KB buffer
        audio_buffer = AudioBufferProcessor(
            sample_rate=16000,
            num_channels=1,
            buffer_size=16384,
            enable_turn_audio=True
        )

        # Handle buffered audio
        @audio_buffer.event_handler("on_audio_data")
        async def handle_audio(audio: bytes, sample_rate: int, num_channels: int):
            # Process merged audio
            save_to_file(audio)

        # Handle separate tracks
        @audio_buffer.event_handler("on_track_audio_data")
        async def handle_tracks(user_audio: bytes, bot_audio: bytes, sample_rate: int, num_channels: int):
            # Process user and bot audio separately
            analyze_conversation(user_audio, bot_audio)

        # Handle turn-based audio
        @audio_buffer.event_handler("on_user_turn_audio_data")
        async def handle_user_turn(audio: bytes, sample_rate: int, num_channels: int):
            # Process complete user turn
            transcribe_turn(audio)

        # Use in pipeline
        await audio_buffer.start_recording()
        # ... process audio frames ...
        await audio_buffer.stop_recording()
    """

    def __init__(
        self,
        *,
        sample_rate: Optional[int] = None,
        num_channels: int = 1,
        buffer_size: int = 0,
        user_continuous_stream: Optional[bool] = None,
        enable_turn_audio: bool = False,
        **kwargs,
    ):
        """Initialize the audio buffer processor.

        Args:
            sample_rate: Desired output sample rate. If None, uses source rate
            num_channels: Number of channels (1 for mono, 2 for stereo). Defaults to 1
            buffer_size: Size of buffer before triggering events. 0 for no buffering
            user_continuous_stream: DEPRECATED. No longer has any effect
            enable_turn_audio: Whether turn audio event handlers should be triggered
            **kwargs: Additional arguments passed to parent class
        """
        pass

    @property
    def sample_rate(self) -> int:
        """Current sample rate of the audio processor."""
        pass

    @property
    def num_channels(self) -> int:
        """Number of channels in the audio output."""
        pass

    def has_audio(self) -> bool:
        """Check if either user or bot audio buffers contain data."""
        pass

    def merge_audio_buffers(self) -> bytes:
        """Merge user and bot audio buffers into a single audio stream.

        For mono output, audio is mixed. For stereo output, user audio is placed
        on the left channel and bot audio on the right channel.

        Returns:
            Mixed audio data as bytes
        """
        pass

    async def start_recording(self):
        """Start recording audio from both user and bot.

        Initializes recording state and resets audio buffers.
        """
        pass

    async def stop_recording(self):
        """Stop recording and trigger final audio data handlers.

        Calls audio handlers with any remaining buffered audio before stopping.
        """
        pass

Metrics Processors

Processors for collecting and reporting performance metrics.

FrameProcessorMetrics

{ .api }
from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
from pipecat.metrics.metrics import LLMTokenUsage

class FrameProcessorMetrics(BaseObject):
    """Metrics collection and reporting for frame processors.

    Provides comprehensive metrics tracking for frame processing operations,
    including timing measurements, resource usage, and performance analytics.
    Supports TTFB tracking, processing duration metrics, and usage statistics
    for LLM and TTS operations.

    Metrics Types:
        - TTFB (Time To First Byte): Time from request to first response
        - Processing Duration: Total processing time for operations
        - LLM Token Usage: Prompt and completion token counts
        - TTS Character Usage: Character count for TTS operations

    Example:
        from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics
        from pipecat.metrics.metrics import LLMTokenUsage

        # Create metrics collector
        metrics = FrameProcessorMetrics()
        await metrics.setup(task_manager)

        # Track TTFB
        await metrics.start_ttfb_metrics(report_only_initial_ttfb=True)
        # ... LLM processing ...
        ttfb_frame = await metrics.stop_ttfb_metrics()
        await push_frame(ttfb_frame)

        # Track processing time
        await metrics.start_processing_metrics()
        # ... processing ...
        processing_frame = await metrics.stop_processing_metrics()
        await push_frame(processing_frame)

        # Track LLM usage
        tokens = LLMTokenUsage(
            prompt_tokens=100,
            completion_tokens=50,
            total_tokens=150
        )
        usage_frame = await metrics.start_llm_usage_metrics(tokens)
        await push_frame(usage_frame)

        # Track TTS usage
        tts_frame = await metrics.start_tts_usage_metrics("Hello world")
        await push_frame(tts_frame)
    """

    def __init__(self):
        """Initialize the frame processor metrics collector.

        Sets up internal state for tracking various metrics including TTFB,
        processing times, and usage statistics.
        """
        pass

    async def setup(self, task_manager: BaseTaskManager):
        """Set up the metrics collector with a task manager.

        Args:
            task_manager: The task manager for handling async operations
        """
        pass

    @property
    def ttfb(self) -> Optional[float]:
        """Get the current TTFB value in seconds.

        Returns:
            The TTFB value in seconds, or None if not measured
        """
        pass

    def set_core_metrics_data(self, data: MetricsData):
        """Set the core metrics data for this collector.

        Args:
            data: The core metrics data containing processor and model information
        """
        pass

    def set_processor_name(self, name: str):
        """Set the processor name for metrics reporting.

        Args:
            name: The name of the processor to use in metrics
        """
        pass

    async def start_ttfb_metrics(self, report_only_initial_ttfb: bool):
        """Start measuring time-to-first-byte (TTFB).

        Args:
            report_only_initial_ttfb: Whether to report only the first TTFB measurement
        """
        pass

    async def stop_ttfb_metrics(self) -> Optional[MetricsFrame]:
        """Stop TTFB measurement and generate metrics frame.

        Returns:
            MetricsFrame containing TTFB data, or None if not measuring
        """
        pass

    async def start_processing_metrics(self):
        """Start measuring processing time."""
        pass

    async def stop_processing_metrics(self) -> Optional[MetricsFrame]:
        """Stop processing time measurement and generate metrics frame.

        Returns:
            MetricsFrame containing processing duration data, or None if not measuring
        """
        pass

    async def start_llm_usage_metrics(self, tokens: LLMTokenUsage) -> MetricsFrame:
        """Record LLM token usage metrics.

        Args:
            tokens: Token usage information including prompt and completion tokens

        Returns:
            MetricsFrame containing LLM usage data
        """
        pass

    async def start_tts_usage_metrics(self, text: str) -> MetricsFrame:
        """Record TTS character usage metrics.

        Args:
            text: The text being processed by TTS

        Returns:
            MetricsFrame containing TTS usage data
        """
        pass

Usage Patterns

Custom Processor Pattern

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import TextFrame, AudioRawFrame

class CustomProcessor(FrameProcessor):
    """Template for custom processors."""

    def __init__(self, config_value: str, **kwargs):
        super().__init__(**kwargs)
        self._config = config_value
        self._state = {}

    async def start(self):
        """Initialize resources."""
        await super().start()
        # Open connections, load models, etc.
        self._state = {}

    async def process_frame(self, frame, direction):
        """Process frames."""
        # 1. Check frame type
        if isinstance(frame, TextFrame):
            # 2. Process frame
            processed_text = self._process_text(frame.text)

            # 3. Create new frame or modify existing
            frame.text = processed_text

            # 4. Optionally emit additional frames
            # await self.push_frame(AnotherFrame(...))

        # 5. Always push frames (unless filtering)
        await self.push_frame(frame, direction)

    async def stop(self):
        """Clean up resources."""
        # Close connections, save state, etc.
        self._state.clear()
        await super().stop()

    async def interrupt(self):
        """Handle interruptions."""
        # Cancel current operations
        await super().interrupt()

    def _process_text(self, text: str) -> str:
        """Internal processing method."""
        return text.upper()

Filter Pipeline Pattern

{ .api }
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.filters.function_filter import FunctionFilter

# Chain multiple filters
pipeline = Pipeline([
    # 1. Wake word filter
    WakeCheckFilter(wake_words=["hey bot"]),

    # 2. Custom filter
    FunctionFilter(
        filter_func=lambda frame, _: not frame.metadata.get("spam")
    ),

    # 3. Processing
    MyProcessor(),
])

Text Aggregation Pattern

{ .api }
from pipecat.processors.aggregators.sentence_aggregator import SentenceAggregator
from pipecat.services.openai import OpenAITTSService

# Aggregate sentences before TTS
pipeline = Pipeline([
    llm_service,              # Produces TextFrames with words
    SentenceAggregator(),     # Aggregates into sentences
    tts_service,              # Synthesizes complete sentences
    transport.output()
])

# Benefits:
# - More natural TTS output
# - Fewer TTS API calls
# - Better prosody

Stateful Processing Pattern

{ .api }
class StatefulProcessor(FrameProcessor):
    """Process frames with state."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._counter = 0
        self._history = []

    async def process_frame(self, frame, direction):
        if isinstance(frame, TextFrame):
            # Update state
            self._counter += 1
            self._history.append(frame.text)

            # Keep history bounded
            if len(self._history) > 100:
                self._history.pop(0)

            # Add state to metadata
            frame.metadata["count"] = self._counter
            frame.metadata["history_len"] = len(self._history)

        await self.push_frame(frame, direction)

Generator Processor Pattern

{ .api }
async def heartbeat_generator():
    """Generate periodic heartbeat frames."""
    while True:
        await asyncio.sleep(5.0)
        yield HeartbeatFrame()

# Add to pipeline
heartbeat = AsyncGeneratorProcessor(generator=heartbeat_generator())
pipeline = Pipeline([processor1, heartbeat, processor2])

Best Practices

Always Call push_frame

{ .api }
# Good: Always push frames
async def process_frame(self, frame, direction):
    # Process frame
    if isinstance(frame, TextFrame):
        frame.text = frame.text.upper()
    # Push to next processor
    await self.push_frame(frame, direction)

# Bad: Forgetting to push
async def process_frame(self, frame, direction):
    # Process frame
    if isinstance(frame, TextFrame):
        frame.text = frame.text.upper()
    # Frame is lost! Forgot to push

Use Type Checking

{ .api }
# Good: Specific type checks
async def process_frame(self, frame, direction):
    if isinstance(frame, TextFrame):
        # Handle text
        pass
    elif isinstance(frame, AudioRawFrame):
        # Handle audio
        pass
    # Always push
    await self.push_frame(frame, direction)

# Bad: Generic handling
async def process_frame(self, frame, direction):
    if frame.name.startswith("Text"):  # Fragile!
        pass

Clean Up Resources

{ .api }
class ResourceProcessor(FrameProcessor):
    """Always clean up resources."""

    async def start(self):
        await super().start()
        self._connection = await open_connection()
        self._file = open("data.txt", "w")

    async def stop(self):
        # Clean up in reverse order
        if self._file:
            self._file.close()
        if self._connection:
            await self._connection.close()
        await super().stop()

    async def cancel(self):
        # Same cleanup for cancel
        await self.stop()
        await super().cancel()

Use Event Handlers for Side Effects

{ .api }
class MonitoringProcessor(FrameProcessor):
    """Use events for monitoring."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._frame_count = 0

    async def process_frame(self, frame, direction):
        self._frame_count += 1

        # Emit event for monitoring
        await self._emit_event("on_frame_processed", count=self._frame_count)

        await self.push_frame(frame, direction)

# Use events
processor = MonitoringProcessor()

@processor.event_handler("on_frame_processed")
async def log_count(count: int):
    if count % 100 == 0:
        print(f"Processed {count} frames")