or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

llm-processors.mddocs/processors/

LLM Processors

LLM processors handle context management and message aggregation for Large Language Model integrations. They provide a universal abstraction for managing conversation context across different LLM providers.

LLM Context

Universal context container for LLM conversations.

LLMContext

{ .api }
from pipecat.processors.aggregators.llm_context import LLMContext

class LLMContext:
    """Universal LLM context container.

    Manages conversation messages, tools/functions, and settings
    in a provider-agnostic format. Supports all major LLM providers
    through adapters.

    Attributes:
        messages (List[Dict]): Conversation messages in universal format
        tools (List[Dict]): Available tools/functions
        settings (Dict[str, Any]): LLM settings (temperature, etc.)

    Message Format:
        {
            "role": "system" | "user" | "assistant" | "tool",
            "content": str | List[Dict],  # Text or multimodal content
            "name": Optional[str],         # Speaker name
            "tool_calls": Optional[List],  # Function calls
            "tool_call_id": Optional[str]  # Tool response ID
        }

    Example:
        context = LLMContext(
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "Hello!"},
                {"role": "assistant", "content": "Hi! How can I help?"}
            ],
            tools=[
                {
                    "type": "function",
                    "function": {
                        "name": "get_weather",
                        "description": "Get weather for a location",
                        "parameters": {
                            "type": "object",
                            "properties": {
                                "location": {"type": "string"}
                            }
                        }
                    }
                }
            ],
            settings={
                "temperature": 0.7,
                "max_tokens": 1000
            }
        )
    """

    def __init__(
        self,
        messages: Optional[List[Dict]] = None,
        tools: Optional[List[Dict]] = None,
        settings: Optional[Dict[str, Any]] = None
    ):
        """Initialize LLM context.

        Args:
            messages: List of conversation messages
            tools: List of available tools/functions
            settings: LLM generation settings
        """
        pass

    def add_message(self, message: Dict):
        """Add message to context.

        Args:
            message: Message dictionary
        """
        pass

    def get_messages(self) -> List[Dict]:
        """Get all messages.

        Returns:
            List of messages
        """
        pass

    def set_tools(self, tools: List[Dict]):
        """Set available tools.

        Args:
            tools: List of tool definitions
        """
        pass

    def get_tools(self) -> List[Dict]:
        """Get available tools.

        Returns:
            List of tools
        """
        pass

    def update_settings(self, settings: Dict[str, Any]):
        """Update LLM settings.

        Args:
            settings: Settings to update
        """
        pass

    def get_settings(self) -> Dict[str, Any]:
        """Get LLM settings.

        Returns:
            Settings dictionary
        """
        pass

LLM Context Aggregators

Processors that aggregate messages into LLM context.

LLMContextAggregator

{ .api }
from pipecat.processors.aggregators.llm_context import LLMContextAggregator

class LLMContextAggregator(FrameProcessor):
    """Universal context aggregation.

    Base class for aggregating conversation turns into LLM context.
    Handles message accumulation, turn detection, and context updates.

    Subclasses:
        - LLMUserAggregator: User turn aggregation
        - LLMAssistantAggregator: Assistant turn aggregation

    Example:
        # Base class, typically use specific subclasses
        aggregator = LLMContextAggregator()
    """

    def __init__(self, context: Optional[LLMContext] = None, **kwargs):
        """Initialize context aggregator.

        Args:
            context: LLM context to aggregate into
            **kwargs: Additional arguments
        """
        pass

    async def process_frame(self, frame, direction):
        """Aggregate frames into context.

        Args:
            frame: Frame to process
            direction: Frame direction
        """
        pass

LLMUserAggregator

{ .api }
from pipecat.processors.aggregators.llm_context import LLMUserAggregator, LLMUserAggregatorParams

class LLMUserAggregatorParams:
    """User aggregator configuration.

    Attributes:
        accumulate_text_parts (bool): Accumulate text before adding to context
        expect_stripped_words (bool): Expect whitespace-stripped word frames
        custom_accumulate_callback (Optional[Callable]): Custom accumulation logic
        custom_append_callback (Optional[Callable]): Custom append logic

    Example:
        params = LLMUserAggregatorParams(
            accumulate_text_parts=True,
            expect_stripped_words=False
        )
    """

    def __init__(
        self,
        accumulate_text_parts: bool = True,
        expect_stripped_words: bool = False,
        custom_accumulate_callback: Optional[Callable] = None,
        custom_append_callback: Optional[Callable] = None
    ):
        """Initialize user aggregator params.

        Args:
            accumulate_text_parts: Accumulate text parts
            expect_stripped_words: Words are whitespace-stripped
            custom_accumulate_callback: Custom accumulation
            custom_append_callback: Custom append logic
        """
        pass


class LLMUserAggregator(LLMContextAggregator):
    """User turn aggregation.

    Aggregates user input (transcriptions, text) into complete user messages.
    Handles turn detection, message formatting, and context updates.

    Flow:
    1. Receives TranscriptionFrame, TextFrame, or InputTextRawFrame
    2. Accumulates into complete user message
    3. On turn end (UserStoppedSpeakingFrame), adds to context
    4. Emits LLMMessagesAppendFrame

    Args:
        context: LLM context
        params: Aggregator parameters

    Example:
        # Basic usage
        user_agg = LLMUserAggregator(context=context)

        # With custom parameters
        user_agg = LLMUserAggregator(
            context=context,
            params=LLMUserAggregatorParams(
                accumulate_text_parts=True
            )
        )

        # In pipeline
        pipeline = Pipeline([
            transport.input(),
            stt_service,
            user_agg,              # Aggregates user speech
            llm_service,
            tts_service,
            transport.output()
        ])
    """

    def __init__(
        self,
        context: Optional[LLMContext] = None,
        params: Optional[LLMUserAggregatorParams] = None,
        **kwargs
    ):
        """Initialize user aggregator.

        Args:
            context: LLM context
            params: Aggregator parameters
            **kwargs: Additional arguments
        """
        pass

    async def process_frame(self, frame, direction):
        """Aggregate user frames.

        Processes:
        - TranscriptionFrame: STT output
        - TextFrame: Direct text input
        - InputTextRawFrame: Raw text input
        - UserStoppedSpeakingFrame: Triggers message append

        Args:
            frame: Frame to process
            direction: Frame direction
        """
        pass

LLMAssistantAggregator

{ .api }
from pipecat.processors.aggregators.llm_context import LLMAssistantAggregator, LLMAssistantAggregatorParams

class LLMAssistantAggregatorParams:
    """Assistant aggregator configuration.

    Attributes:
        accumulate_text_parts (bool): Accumulate text before adding to context
        expect_stripped_words (bool): Expect whitespace-stripped word frames
        custom_accumulate_callback (Optional[Callable]): Custom accumulation logic
        custom_append_callback (Optional[Callable]): Custom append logic

    Example:
        params = LLMAssistantAggregatorParams(
            accumulate_text_parts=True
        )
    """

    def __init__(
        self,
        accumulate_text_parts: bool = True,
        expect_stripped_words: bool = False,
        custom_accumulate_callback: Optional[Callable] = None,
        custom_append_callback: Optional[Callable] = None
    ):
        """Initialize assistant aggregator params.

        Args:
            accumulate_text_parts: Accumulate text parts
            expect_stripped_words: Words are whitespace-stripped
            custom_accumulate_callback: Custom accumulation
            custom_append_callback: Custom append logic
        """
        pass


class LLMAssistantAggregator(LLMContextAggregator):
    """Assistant turn aggregation.

    Aggregates assistant output (LLM generated text, function calls) into
    complete assistant messages. Maintains context for multi-turn conversations.

    Flow:
    1. Receives LLMTextFrame, FunctionCallResultFrame
    2. Accumulates into complete assistant message
    3. On completion, adds to context
    4. Emits LLMMessagesAppendFrame

    Args:
        context: LLM context
        params: Aggregator parameters

    Example:
        # Basic usage
        assistant_agg = LLMAssistantAggregator(context=context)

        # In pipeline
        pipeline = Pipeline([
            transport.input(),
            stt_service,
            user_agg,
            llm_service,
            assistant_agg,         # Aggregates LLM output
            tts_service,
            transport.output()
        ])
    """

    def __init__(
        self,
        context: Optional[LLMContext] = None,
        params: Optional[LLMAssistantAggregatorParams] = None,
        **kwargs
    ):
        """Initialize assistant aggregator.

        Args:
            context: LLM context
            params: Aggregator parameters
            **kwargs: Additional arguments
        """
        pass

    async def process_frame(self, frame, direction):
        """Aggregate assistant frames.

        Processes:
        - LLMTextFrame: LLM text output
        - FunctionCallResultFrame: Function call results
        - LLMFullResponseEndFrame: Triggers message append

        Args:
            frame: Frame to process
            direction: Frame direction
        """
        pass

LLMContextAggregatorPair

{ .api }
from pipecat.processors.aggregators.llm_context import LLMContextAggregatorPair

class LLMContextAggregatorPair:
    """Pair of user/assistant aggregators.

    Convenience class that creates and manages both user and assistant
    aggregators with shared context.

    Attributes:
        user: User aggregator
        assistant: Assistant aggregator
        context: Shared LLM context

    Example:
        # Create aggregator pair
        context = LLMContext()
        aggregators = LLMContextAggregatorPair(context=context)

        # Use in pipeline
        pipeline = Pipeline([
            transport.input(),
            stt_service,
            aggregators.user,       # User aggregation
            llm_service,
            aggregators.assistant,  # Assistant aggregation
            tts_service,
            transport.output()
        ])

        # Access shared context
        print(aggregators.context.get_messages())
    """

    def __init__(
        self,
        context: Optional[LLMContext] = None,
        user_params: Optional[LLMUserAggregatorParams] = None,
        assistant_params: Optional[LLMAssistantAggregatorParams] = None
    ):
        """Initialize aggregator pair.

        Args:
            context: Shared LLM context
            user_params: User aggregator parameters
            assistant_params: Assistant aggregator parameters
        """
        pass

    @property
    def user(self) -> LLMUserAggregator:
        """Get user aggregator."""
        pass

    @property
    def assistant(self) -> LLMAssistantAggregator:
        """Get assistant aggregator."""
        pass

    @property
    def context(self) -> LLMContext:
        """Get shared context."""
        pass

LLM Response Aggregators

Processors for aggregating complete LLM responses.

LLMFullResponseAggregator

{ .api }
from pipecat.processors.aggregators.llm_response_aggregator import LLMFullResponseAggregator

class LLMFullResponseAggregator(FrameProcessor):
    """Aggregates full LLM response.

    Accumulates all LLM output frames (text, function calls) into a
    complete response. Emits aggregated response when complete.

    Useful for:
    - Getting complete LLM response text
    - Processing response before TTS
    - Analytics and logging

    Example:
        aggregator = LLMFullResponseAggregator()

        @aggregator.event_handler("on_full_response")
        async def handle_response(response: str):
            print(f"Complete LLM response: {response}")

        pipeline = Pipeline([
            user_agg,
            llm_service,
            aggregator,            # Captures full response
            tts_service,
            transport.output()
        ])
    """

    def __init__(self, **kwargs):
        """Initialize full response aggregator.

        Args:
            **kwargs: Additional arguments
        """
        pass

    async def process_frame(self, frame, direction):
        """Aggregate response frames.

        Processes:
        - LLMFullResponseStartFrame: Start aggregation
        - LLMTextFrame: Accumulate text
        - LLMFullResponseEndFrame: Emit complete response

        Args:
            frame: Frame to process
            direction: Frame direction
        """
        pass

LLMTextProcessor

{ .api }
from pipecat.processors.aggregators.llm_text_processor import LLMTextProcessor
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator

class LLMTextProcessor(FrameProcessor):
    """A processor for handling or manipulating LLM text frames before they are processed further.

    This processor will convert LLMTextFrames into AggregatedTextFrames based on the configured
    text aggregator. Using the customizable aggregator, it provides functionality to handle or
    manipulate LLM text frames before they are sent to other components such as TTS services or
    context aggregators. It can be used to pre-aggregate and categorize, modify, or filter direct
    output tokens from the LLM.

    Processing Flow:
        1. Receives LLMTextFrame from LLM service
        2. Passes text through text aggregator
        3. Emits AggregatedTextFrame when aggregation complete
        4. Handles interruptions by resetting aggregator
        5. Flushes remaining text on LLMFullResponseEndFrame

    Frames Consumed:
        - LLMTextFrame: Raw LLM text output
        - LLMFullResponseEndFrame: Triggers flush
        - InterruptionFrame: Resets aggregation
        - EndFrame: Triggers flush

    Frames Produced:
        - AggregatedTextFrame: Aggregated text with type annotation

    Args:
        text_aggregator: An optional text aggregator to use for processing LLM text frames.
                        By default, a SimpleTextAggregator aggregating by sentence will be used

    Example:
        from pipecat.processors.aggregators.llm_text_processor import LLMTextProcessor
        from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator

        # Use default sentence aggregation
        processor = LLMTextProcessor()

        # Custom aggregation strategy
        custom_aggregator = SimpleTextAggregator(
            aggregation_type="word",
            min_words=5
        )
        processor = LLMTextProcessor(text_aggregator=custom_aggregator)

        # Use in pipeline
        pipeline = Pipeline([
            user_aggregator,
            llm_service,           # Produces LLMTextFrame
            processor,             # Converts to AggregatedTextFrame
            tts_service,
            transport.output()
        ])

        # With custom transformation
        class EmojiTextAggregator(BaseTextAggregator):
            async def aggregate(self, text: str):
                # Add emojis to text
                enhanced = text.replace("!", " 🎉!")
                yield Aggregation(text=enhanced, type="enhanced")

        emoji_processor = LLMTextProcessor(
            text_aggregator=EmojiTextAggregator()
        )
    """

    def __init__(
        self,
        *,
        text_aggregator: Optional[BaseTextAggregator] = None,
        **kwargs
    ):
        """Initialize LLM text processor.

        Args:
            text_aggregator: An optional text aggregator to use for processing LLM text frames.
                            By default, a SimpleTextAggregator aggregating by sentence will be used
            **kwargs: Additional arguments passed to parent class
        """
        pass

    async def reset(self):
        """Reset the internal state of the text processor and its aggregator."""
        pass

Gated Context Aggregators

Processors for controlled LLM context flow based on external conditions.

GatedLLMContextAggregator

{ .api }
from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
from pipecat.utils.sync.base_notifier import BaseNotifier

class GatedLLMContextAggregator(FrameProcessor):
    """Aggregator that gates LLM context frames until notified.

    This aggregator captures LLM context frames and holds them until a notifier
    signals that they can be released. This is useful for controlling the flow
    of context frames based on external conditions or timing.

    Use Cases:
        - Wait for user confirmation before processing context
        - Coordinate context updates with external events
        - Implement manual approval flows
        - Synchronize context with other pipeline stages

    Frames Consumed:
        - LLMContextFrame: Universal context frames (held until notified)
        - OpenAILLMContextFrame: Legacy OpenAI context frames (held until notified)
        - StartFrame: Begins gate task handler
        - EndFrame/CancelFrame: Stops gate task handler

    Frames Produced:
        - LLMContextFrame/OpenAILLMContextFrame: Released when notifier signals

    Args:
        notifier: The notifier that controls when frames are released
        start_open: If True, the first context frame passes through immediately

    Example:
        from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
        from pipecat.utils.sync.base_notifier import BaseNotifier

        # Create notifier for manual control
        notifier = BaseNotifier()

        # Create gated aggregator
        gated_context = GatedLLMContextAggregator(
            notifier=notifier,
            start_open=False
        )

        # Use in pipeline
        pipeline = Pipeline([
            transport.input(),
            stt,
            user_aggregator,
            gated_context,         # Holds context until notified
            llm,
            tts,
            transport.output()
        ])

        # Manually release context when ready
        async def on_user_approval():
            await notifier.notify()  # Releases held context frame

        # With automatic notification
        @user_interface.event_handler("on_confirm")
        async def handle_confirm():
            await notifier.notify()
    """

    def __init__(
        self,
        *,
        notifier: BaseNotifier,
        start_open: bool = False,
        **kwargs
    ):
        """Initialize the gated context aggregator.

        Args:
            notifier: The notifier that controls when frames are released
            start_open: If True, the first context frame passes through immediately
            **kwargs: Additional arguments passed to the parent FrameProcessor
        """
        pass

GatedOpenAILLMContextAggregator

{ .api }
from pipecat.processors.aggregators.gated_open_ai_llm_context import GatedOpenAILLMContextAggregator

class GatedOpenAILLMContextAggregator(GatedLLMContextAggregator):
    """DEPRECATED: OpenAI-specific gated context aggregator.

    Use GatedLLMContextAggregator instead for universal provider support.

    This class is maintained for backward compatibility but should not be
    used in new code.

    Example:
        # Deprecated
        gated = GatedOpenAILLMContextAggregator(notifier=notifier)

        # Use instead
        from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
        gated = GatedLLMContextAggregator(notifier=notifier)
    """
    pass

Legacy Aggregators (OpenAI-specific)

These aggregators are deprecated but still supported for backward compatibility.

OpenAILLMContext

{ .api }
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext

class OpenAILLMContext:
    """DEPRECATED: OpenAI-specific context.

    Use LLMContext instead for universal provider support.

    Legacy context format for OpenAI chat completions.
    Migrating to LLMContext is recommended.

    Example:
        # Deprecated
        context = OpenAILLMContext(messages=[...])

        # Use instead
        from pipecat.processors.aggregators.llm_context import LLMContext
        context = LLMContext(messages=[...])
    """
    pass

LLMUserResponseAggregator

{ .api }
from pipecat.processors.aggregators.llm_response_aggregator import LLMUserResponseAggregator

class LLMUserResponseAggregator(FrameProcessor):
    """DEPRECATED: OpenAI-specific user aggregation.

    Use LLMUserAggregator instead for universal provider support.

    Example:
        # Deprecated
        user_agg = LLMUserResponseAggregator(messages=messages)

        # Use instead
        from pipecat.processors.aggregators.llm_context import LLMUserAggregator
        user_agg = LLMUserAggregator(context=context)
    """
    pass

LLMAssistantResponseAggregator

{ .api }
from pipecat.processors.aggregators.llm_response_aggregator import LLMAssistantResponseAggregator

class LLMAssistantResponseAggregator(FrameProcessor):
    """DEPRECATED: OpenAI-specific assistant aggregation.

    Use LLMAssistantAggregator instead for universal provider support.

    Example:
        # Deprecated
        assistant_agg = LLMAssistantResponseAggregator(messages=messages)

        # Use instead
        from pipecat.processors.aggregators.llm_context import LLMAssistantAggregator
        assistant_agg = LLMAssistantAggregator(context=context)
    """
    pass

Usage Patterns

Basic Context Management

{ .api }
from pipecat.processors.aggregators.llm_context import (
    LLMContext,
    LLMContextAggregatorPair
)
from pipecat.services.openai import OpenAILLMService
from pipecat.pipeline.pipeline import Pipeline

# Create context with system message
context = LLMContext(
    messages=[
        {
            "role": "system",
            "content": "You are a helpful voice assistant."
        }
    ],
    settings={
        "temperature": 0.7,
        "max_tokens": 1000
    }
)

# Create aggregator pair
aggregators = LLMContextAggregatorPair(context=context)

# Create LLM service with context
llm_service = OpenAILLMService(api_key="key", model="gpt-4")
llm_service.set_context(context)

# Build pipeline
pipeline = Pipeline([
    transport.input(),
    stt_service,
    aggregators.user,       # User messages added to context
    llm_service,            # Uses context for generation
    aggregators.assistant,  # Assistant messages added to context
    tts_service,
    transport.output()
])

Function Calling Context

{ .api }
from pipecat.processors.aggregators.llm_context import LLMContext

# Define function tools
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City name"
                    },
                    "units": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"]
                    }
                },
                "required": ["location"]
            }
        }
    }
]

# Create context with tools
context = LLMContext(
    messages=[
        {"role": "system", "content": "You are a weather assistant."}
    ],
    tools=tools
)

# Register function handler
async def get_weather(location: str, units: str = "celsius") -> dict:
    return {"temp": 72, "units": units, "condition": "sunny"}

llm_service.register_function("get_weather", get_weather)
llm_service.set_context(context)

Multi-Turn Conversation

{ .api }
from pipecat.processors.aggregators.llm_context import LLMContext

# Context automatically maintains conversation history
context = LLMContext(
    messages=[
        {"role": "system", "content": "You are helpful."}
    ]
)

# After user says "What's the weather?"
# Context becomes:
# [
#   {"role": "system", "content": "You are helpful."},
#   {"role": "user", "content": "What's the weather?"},
#   {"role": "assistant", "content": "I need your location..."}
# ]

# After user says "San Francisco"
# Context becomes:
# [
#   {"role": "system", "content": "You are helpful."},
#   {"role": "user", "content": "What's the weather?"},
#   {"role": "assistant", "content": "I need your location..."},
#   {"role": "user", "content": "San Francisco"},
#   {"role": "assistant", "content": "In SF it's 65°F and sunny."}
# ]

# Access conversation history
messages = context.get_messages()
print(f"Conversation has {len(messages)} messages")

Dynamic Context Updates

{ .api }
from pipecat.frames.frames import LLMMessagesAppendFrame, LLMSetToolsFrame

# Add message programmatically
append_frame = LLMMessagesAppendFrame(
    messages=[
        {
            "role": "system",
            "content": "Additional context: User is in a hurry."
        }
    ]
)
await task.queue_frame(append_frame)

# Update tools dynamically
new_tools = [
    {
        "type": "function",
        "function": {
            "name": "book_ride",
            "description": "Book a ride",
            "parameters": {...}
        }
    }
]
tools_frame = LLMSetToolsFrame(tools=new_tools)
await task.queue_frame(tools_frame)

Custom Aggregation Logic

{ .api }
from pipecat.processors.aggregators.llm_context import (
    LLMUserAggregator,
    LLMUserAggregatorParams
)

# Custom accumulation callback
async def custom_accumulate(aggregator, frame):
    """Custom logic for accumulating frames."""
    if isinstance(frame, TranscriptionFrame):
        # Filter profanity
        text = frame.text.replace("bad_word", "***")
        aggregator._accumulate_text(text)

# Custom append callback
async def custom_append(aggregator):
    """Custom logic for appending to context."""
    # Add timestamp to user messages
    message = {
        "role": "user",
        "content": aggregator._accumulated_text,
        "timestamp": datetime.now().isoformat()
    }
    aggregator._context.add_message(message)

# Create aggregator with custom callbacks
user_agg = LLMUserAggregator(
    context=context,
    params=LLMUserAggregatorParams(
        custom_accumulate_callback=custom_accumulate,
        custom_append_callback=custom_append
    )
)

Monitoring Context State

{ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import LLMMessagesAppendFrame

class ContextMonitor(FrameProcessor):
    """Monitor context updates."""

    def __init__(self, context: LLMContext, **kwargs):
        super().__init__(**kwargs)
        self._context = context

    async def process_frame(self, frame, direction):
        if isinstance(frame, LLMMessagesAppendFrame):
            messages = self._context.get_messages()
            print(f"Context updated: {len(messages)} messages")

            # Log last message
            if messages:
                last = messages[-1]
                print(f"  Role: {last['role']}")
                print(f"  Content: {last['content'][:100]}...")

        await self.push_frame(frame, direction)

# Add to pipeline
monitor = ContextMonitor(context=context)
pipeline = Pipeline([
    user_agg,
    monitor,               # Monitor context changes
    llm_service,
    assistant_agg,
    tts_service,
    transport.output()
])

Best Practices

Use Universal LLMContext

{ .api }
# Good: Universal context works with all providers
from pipecat.processors.aggregators.llm_context import LLMContext

context = LLMContext(messages=[...])

# Works with OpenAI
openai_llm = OpenAILLMService(api_key="key")
openai_llm.set_context(context)

# Works with Anthropic
anthropic_llm = AnthropicLLMService(api_key="key")
anthropic_llm.set_context(context)

# Bad: Provider-specific context
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
openai_context = OpenAILLMContext(messages=[...])
# Only works with OpenAI

Always Set System Message

{ .api }
# Good: Clear system message defines behavior
context = LLMContext(
    messages=[
        {
            "role": "system",
            "content": "You are a helpful voice assistant. Keep responses concise and natural for speech."
        }
    ]
)

# Bad: No system message
context = LLMContext(messages=[])
# LLM behavior may be unpredictable

Use Aggregator Pairs

{ .api }
# Good: Aggregator pair ensures shared context
from pipecat.processors.aggregators.llm_context import LLMContextAggregatorPair

aggregators = LLMContextAggregatorPair(context=context)
pipeline = Pipeline([
    aggregators.user,
    llm_service,
    aggregators.assistant
])

# Bad: Separate aggregators with different contexts
user_agg = LLMUserAggregator(context=context1)
assistant_agg = LLMAssistantAggregator(context=context2)
# Contexts not synchronized!

Handle Long Conversations

{ .api }
from pipecat.processors.frame_processor import FrameProcessor

class ContextTrimmer(FrameProcessor):
    """Trim context to prevent token limits."""

    def __init__(self, context: LLMContext, max_messages: int = 20, **kwargs):
        super().__init__(**kwargs)
        self._context = context
        self._max_messages = max_messages

    async def process_frame(self, frame, direction):
        if isinstance(frame, LLMMessagesAppendFrame):
            messages = self._context.get_messages()

            # Keep system message + recent messages
            if len(messages) > self._max_messages:
                system_msgs = [m for m in messages if m["role"] == "system"]
                other_msgs = [m for m in messages if m["role"] != "system"]
                trimmed = system_msgs + other_msgs[-(self._max_messages - len(system_msgs)):]

                # Update context
                self._context.messages = trimmed
                print(f"Trimmed context: {len(messages)} -> {len(trimmed)} messages")

        await self.push_frame(frame, direction)

Test Context State

{ .api }
import pytest

async def test_user_aggregation():
    """Test user message aggregation."""
    context = LLMContext()
    user_agg = LLMUserAggregator(context=context)

    # Start user speech
    await user_agg.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)

    # Send transcription
    await user_agg.process_frame(
        TranscriptionFrame(text="Hello world"),
        FrameDirection.DOWNSTREAM
    )

    # End user speech
    await user_agg.process_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)

    # Verify context
    messages = context.get_messages()
    assert len(messages) == 1
    assert messages[0]["role"] == "user"
    assert messages[0]["content"] == "Hello world"