docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
LLM processors handle context management and message aggregation for Large Language Model integrations. They provide a universal abstraction for managing conversation context across different LLM providers.
Universal context container for LLM conversations.
{ .api }
from pipecat.processors.aggregators.llm_context import LLMContext
class LLMContext:
"""Universal LLM context container.
Manages conversation messages, tools/functions, and settings
in a provider-agnostic format. Supports all major LLM providers
through adapters.
Attributes:
messages (List[Dict]): Conversation messages in universal format
tools (List[Dict]): Available tools/functions
settings (Dict[str, Any]): LLM settings (temperature, etc.)
Message Format:
{
"role": "system" | "user" | "assistant" | "tool",
"content": str | List[Dict], # Text or multimodal content
"name": Optional[str], # Speaker name
"tool_calls": Optional[List], # Function calls
"tool_call_id": Optional[str] # Tool response ID
}
Example:
context = LLMContext(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
{"role": "assistant", "content": "Hi! How can I help?"}
],
tools=[
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
}
}
}
}
],
settings={
"temperature": 0.7,
"max_tokens": 1000
}
)
"""
def __init__(
self,
messages: Optional[List[Dict]] = None,
tools: Optional[List[Dict]] = None,
settings: Optional[Dict[str, Any]] = None
):
"""Initialize LLM context.
Args:
messages: List of conversation messages
tools: List of available tools/functions
settings: LLM generation settings
"""
pass
def add_message(self, message: Dict):
"""Add message to context.
Args:
message: Message dictionary
"""
pass
def get_messages(self) -> List[Dict]:
"""Get all messages.
Returns:
List of messages
"""
pass
def set_tools(self, tools: List[Dict]):
"""Set available tools.
Args:
tools: List of tool definitions
"""
pass
def get_tools(self) -> List[Dict]:
"""Get available tools.
Returns:
List of tools
"""
pass
def update_settings(self, settings: Dict[str, Any]):
"""Update LLM settings.
Args:
settings: Settings to update
"""
pass
def get_settings(self) -> Dict[str, Any]:
"""Get LLM settings.
Returns:
Settings dictionary
"""
passProcessors that aggregate messages into LLM context.
{ .api }
from pipecat.processors.aggregators.llm_context import LLMContextAggregator
class LLMContextAggregator(FrameProcessor):
"""Universal context aggregation.
Base class for aggregating conversation turns into LLM context.
Handles message accumulation, turn detection, and context updates.
Subclasses:
- LLMUserAggregator: User turn aggregation
- LLMAssistantAggregator: Assistant turn aggregation
Example:
# Base class, typically use specific subclasses
aggregator = LLMContextAggregator()
"""
def __init__(self, context: Optional[LLMContext] = None, **kwargs):
"""Initialize context aggregator.
Args:
context: LLM context to aggregate into
**kwargs: Additional arguments
"""
pass
async def process_frame(self, frame, direction):
"""Aggregate frames into context.
Args:
frame: Frame to process
direction: Frame direction
"""
pass{ .api }
from pipecat.processors.aggregators.llm_context import LLMUserAggregator, LLMUserAggregatorParams
class LLMUserAggregatorParams:
"""User aggregator configuration.
Attributes:
accumulate_text_parts (bool): Accumulate text before adding to context
expect_stripped_words (bool): Expect whitespace-stripped word frames
custom_accumulate_callback (Optional[Callable]): Custom accumulation logic
custom_append_callback (Optional[Callable]): Custom append logic
Example:
params = LLMUserAggregatorParams(
accumulate_text_parts=True,
expect_stripped_words=False
)
"""
def __init__(
self,
accumulate_text_parts: bool = True,
expect_stripped_words: bool = False,
custom_accumulate_callback: Optional[Callable] = None,
custom_append_callback: Optional[Callable] = None
):
"""Initialize user aggregator params.
Args:
accumulate_text_parts: Accumulate text parts
expect_stripped_words: Words are whitespace-stripped
custom_accumulate_callback: Custom accumulation
custom_append_callback: Custom append logic
"""
pass
class LLMUserAggregator(LLMContextAggregator):
"""User turn aggregation.
Aggregates user input (transcriptions, text) into complete user messages.
Handles turn detection, message formatting, and context updates.
Flow:
1. Receives TranscriptionFrame, TextFrame, or InputTextRawFrame
2. Accumulates into complete user message
3. On turn end (UserStoppedSpeakingFrame), adds to context
4. Emits LLMMessagesAppendFrame
Args:
context: LLM context
params: Aggregator parameters
Example:
# Basic usage
user_agg = LLMUserAggregator(context=context)
# With custom parameters
user_agg = LLMUserAggregator(
context=context,
params=LLMUserAggregatorParams(
accumulate_text_parts=True
)
)
# In pipeline
pipeline = Pipeline([
transport.input(),
stt_service,
user_agg, # Aggregates user speech
llm_service,
tts_service,
transport.output()
])
"""
def __init__(
self,
context: Optional[LLMContext] = None,
params: Optional[LLMUserAggregatorParams] = None,
**kwargs
):
"""Initialize user aggregator.
Args:
context: LLM context
params: Aggregator parameters
**kwargs: Additional arguments
"""
pass
async def process_frame(self, frame, direction):
"""Aggregate user frames.
Processes:
- TranscriptionFrame: STT output
- TextFrame: Direct text input
- InputTextRawFrame: Raw text input
- UserStoppedSpeakingFrame: Triggers message append
Args:
frame: Frame to process
direction: Frame direction
"""
pass{ .api }
from pipecat.processors.aggregators.llm_context import LLMAssistantAggregator, LLMAssistantAggregatorParams
class LLMAssistantAggregatorParams:
"""Assistant aggregator configuration.
Attributes:
accumulate_text_parts (bool): Accumulate text before adding to context
expect_stripped_words (bool): Expect whitespace-stripped word frames
custom_accumulate_callback (Optional[Callable]): Custom accumulation logic
custom_append_callback (Optional[Callable]): Custom append logic
Example:
params = LLMAssistantAggregatorParams(
accumulate_text_parts=True
)
"""
def __init__(
self,
accumulate_text_parts: bool = True,
expect_stripped_words: bool = False,
custom_accumulate_callback: Optional[Callable] = None,
custom_append_callback: Optional[Callable] = None
):
"""Initialize assistant aggregator params.
Args:
accumulate_text_parts: Accumulate text parts
expect_stripped_words: Words are whitespace-stripped
custom_accumulate_callback: Custom accumulation
custom_append_callback: Custom append logic
"""
pass
class LLMAssistantAggregator(LLMContextAggregator):
"""Assistant turn aggregation.
Aggregates assistant output (LLM generated text, function calls) into
complete assistant messages. Maintains context for multi-turn conversations.
Flow:
1. Receives LLMTextFrame, FunctionCallResultFrame
2. Accumulates into complete assistant message
3. On completion, adds to context
4. Emits LLMMessagesAppendFrame
Args:
context: LLM context
params: Aggregator parameters
Example:
# Basic usage
assistant_agg = LLMAssistantAggregator(context=context)
# In pipeline
pipeline = Pipeline([
transport.input(),
stt_service,
user_agg,
llm_service,
assistant_agg, # Aggregates LLM output
tts_service,
transport.output()
])
"""
def __init__(
self,
context: Optional[LLMContext] = None,
params: Optional[LLMAssistantAggregatorParams] = None,
**kwargs
):
"""Initialize assistant aggregator.
Args:
context: LLM context
params: Aggregator parameters
**kwargs: Additional arguments
"""
pass
async def process_frame(self, frame, direction):
"""Aggregate assistant frames.
Processes:
- LLMTextFrame: LLM text output
- FunctionCallResultFrame: Function call results
- LLMFullResponseEndFrame: Triggers message append
Args:
frame: Frame to process
direction: Frame direction
"""
pass{ .api }
from pipecat.processors.aggregators.llm_context import LLMContextAggregatorPair
class LLMContextAggregatorPair:
"""Pair of user/assistant aggregators.
Convenience class that creates and manages both user and assistant
aggregators with shared context.
Attributes:
user: User aggregator
assistant: Assistant aggregator
context: Shared LLM context
Example:
# Create aggregator pair
context = LLMContext()
aggregators = LLMContextAggregatorPair(context=context)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt_service,
aggregators.user, # User aggregation
llm_service,
aggregators.assistant, # Assistant aggregation
tts_service,
transport.output()
])
# Access shared context
print(aggregators.context.get_messages())
"""
def __init__(
self,
context: Optional[LLMContext] = None,
user_params: Optional[LLMUserAggregatorParams] = None,
assistant_params: Optional[LLMAssistantAggregatorParams] = None
):
"""Initialize aggregator pair.
Args:
context: Shared LLM context
user_params: User aggregator parameters
assistant_params: Assistant aggregator parameters
"""
pass
@property
def user(self) -> LLMUserAggregator:
"""Get user aggregator."""
pass
@property
def assistant(self) -> LLMAssistantAggregator:
"""Get assistant aggregator."""
pass
@property
def context(self) -> LLMContext:
"""Get shared context."""
passProcessors for aggregating complete LLM responses.
{ .api }
from pipecat.processors.aggregators.llm_response_aggregator import LLMFullResponseAggregator
class LLMFullResponseAggregator(FrameProcessor):
"""Aggregates full LLM response.
Accumulates all LLM output frames (text, function calls) into a
complete response. Emits aggregated response when complete.
Useful for:
- Getting complete LLM response text
- Processing response before TTS
- Analytics and logging
Example:
aggregator = LLMFullResponseAggregator()
@aggregator.event_handler("on_full_response")
async def handle_response(response: str):
print(f"Complete LLM response: {response}")
pipeline = Pipeline([
user_agg,
llm_service,
aggregator, # Captures full response
tts_service,
transport.output()
])
"""
def __init__(self, **kwargs):
"""Initialize full response aggregator.
Args:
**kwargs: Additional arguments
"""
pass
async def process_frame(self, frame, direction):
"""Aggregate response frames.
Processes:
- LLMFullResponseStartFrame: Start aggregation
- LLMTextFrame: Accumulate text
- LLMFullResponseEndFrame: Emit complete response
Args:
frame: Frame to process
direction: Frame direction
"""
pass{ .api }
from pipecat.processors.aggregators.llm_text_processor import LLMTextProcessor
from pipecat.utils.text.base_text_aggregator import BaseTextAggregator
from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator
class LLMTextProcessor(FrameProcessor):
"""A processor for handling or manipulating LLM text frames before they are processed further.
This processor will convert LLMTextFrames into AggregatedTextFrames based on the configured
text aggregator. Using the customizable aggregator, it provides functionality to handle or
manipulate LLM text frames before they are sent to other components such as TTS services or
context aggregators. It can be used to pre-aggregate and categorize, modify, or filter direct
output tokens from the LLM.
Processing Flow:
1. Receives LLMTextFrame from LLM service
2. Passes text through text aggregator
3. Emits AggregatedTextFrame when aggregation complete
4. Handles interruptions by resetting aggregator
5. Flushes remaining text on LLMFullResponseEndFrame
Frames Consumed:
- LLMTextFrame: Raw LLM text output
- LLMFullResponseEndFrame: Triggers flush
- InterruptionFrame: Resets aggregation
- EndFrame: Triggers flush
Frames Produced:
- AggregatedTextFrame: Aggregated text with type annotation
Args:
text_aggregator: An optional text aggregator to use for processing LLM text frames.
By default, a SimpleTextAggregator aggregating by sentence will be used
Example:
from pipecat.processors.aggregators.llm_text_processor import LLMTextProcessor
from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator
# Use default sentence aggregation
processor = LLMTextProcessor()
# Custom aggregation strategy
custom_aggregator = SimpleTextAggregator(
aggregation_type="word",
min_words=5
)
processor = LLMTextProcessor(text_aggregator=custom_aggregator)
# Use in pipeline
pipeline = Pipeline([
user_aggregator,
llm_service, # Produces LLMTextFrame
processor, # Converts to AggregatedTextFrame
tts_service,
transport.output()
])
# With custom transformation
class EmojiTextAggregator(BaseTextAggregator):
async def aggregate(self, text: str):
# Add emojis to text
enhanced = text.replace("!", " 🎉!")
yield Aggregation(text=enhanced, type="enhanced")
emoji_processor = LLMTextProcessor(
text_aggregator=EmojiTextAggregator()
)
"""
def __init__(
self,
*,
text_aggregator: Optional[BaseTextAggregator] = None,
**kwargs
):
"""Initialize LLM text processor.
Args:
text_aggregator: An optional text aggregator to use for processing LLM text frames.
By default, a SimpleTextAggregator aggregating by sentence will be used
**kwargs: Additional arguments passed to parent class
"""
pass
async def reset(self):
"""Reset the internal state of the text processor and its aggregator."""
passProcessors for controlled LLM context flow based on external conditions.
{ .api }
from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
from pipecat.utils.sync.base_notifier import BaseNotifier
class GatedLLMContextAggregator(FrameProcessor):
"""Aggregator that gates LLM context frames until notified.
This aggregator captures LLM context frames and holds them until a notifier
signals that they can be released. This is useful for controlling the flow
of context frames based on external conditions or timing.
Use Cases:
- Wait for user confirmation before processing context
- Coordinate context updates with external events
- Implement manual approval flows
- Synchronize context with other pipeline stages
Frames Consumed:
- LLMContextFrame: Universal context frames (held until notified)
- OpenAILLMContextFrame: Legacy OpenAI context frames (held until notified)
- StartFrame: Begins gate task handler
- EndFrame/CancelFrame: Stops gate task handler
Frames Produced:
- LLMContextFrame/OpenAILLMContextFrame: Released when notifier signals
Args:
notifier: The notifier that controls when frames are released
start_open: If True, the first context frame passes through immediately
Example:
from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
from pipecat.utils.sync.base_notifier import BaseNotifier
# Create notifier for manual control
notifier = BaseNotifier()
# Create gated aggregator
gated_context = GatedLLMContextAggregator(
notifier=notifier,
start_open=False
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
user_aggregator,
gated_context, # Holds context until notified
llm,
tts,
transport.output()
])
# Manually release context when ready
async def on_user_approval():
await notifier.notify() # Releases held context frame
# With automatic notification
@user_interface.event_handler("on_confirm")
async def handle_confirm():
await notifier.notify()
"""
def __init__(
self,
*,
notifier: BaseNotifier,
start_open: bool = False,
**kwargs
):
"""Initialize the gated context aggregator.
Args:
notifier: The notifier that controls when frames are released
start_open: If True, the first context frame passes through immediately
**kwargs: Additional arguments passed to the parent FrameProcessor
"""
pass{ .api }
from pipecat.processors.aggregators.gated_open_ai_llm_context import GatedOpenAILLMContextAggregator
class GatedOpenAILLMContextAggregator(GatedLLMContextAggregator):
"""DEPRECATED: OpenAI-specific gated context aggregator.
Use GatedLLMContextAggregator instead for universal provider support.
This class is maintained for backward compatibility but should not be
used in new code.
Example:
# Deprecated
gated = GatedOpenAILLMContextAggregator(notifier=notifier)
# Use instead
from pipecat.processors.aggregators.gated_llm_context import GatedLLMContextAggregator
gated = GatedLLMContextAggregator(notifier=notifier)
"""
passThese aggregators are deprecated but still supported for backward compatibility.
{ .api }
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
class OpenAILLMContext:
"""DEPRECATED: OpenAI-specific context.
Use LLMContext instead for universal provider support.
Legacy context format for OpenAI chat completions.
Migrating to LLMContext is recommended.
Example:
# Deprecated
context = OpenAILLMContext(messages=[...])
# Use instead
from pipecat.processors.aggregators.llm_context import LLMContext
context = LLMContext(messages=[...])
"""
pass{ .api }
from pipecat.processors.aggregators.llm_response_aggregator import LLMUserResponseAggregator
class LLMUserResponseAggregator(FrameProcessor):
"""DEPRECATED: OpenAI-specific user aggregation.
Use LLMUserAggregator instead for universal provider support.
Example:
# Deprecated
user_agg = LLMUserResponseAggregator(messages=messages)
# Use instead
from pipecat.processors.aggregators.llm_context import LLMUserAggregator
user_agg = LLMUserAggregator(context=context)
"""
pass{ .api }
from pipecat.processors.aggregators.llm_response_aggregator import LLMAssistantResponseAggregator
class LLMAssistantResponseAggregator(FrameProcessor):
"""DEPRECATED: OpenAI-specific assistant aggregation.
Use LLMAssistantAggregator instead for universal provider support.
Example:
# Deprecated
assistant_agg = LLMAssistantResponseAggregator(messages=messages)
# Use instead
from pipecat.processors.aggregators.llm_context import LLMAssistantAggregator
assistant_agg = LLMAssistantAggregator(context=context)
"""
pass{ .api }
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextAggregatorPair
)
from pipecat.services.openai import OpenAILLMService
from pipecat.pipeline.pipeline import Pipeline
# Create context with system message
context = LLMContext(
messages=[
{
"role": "system",
"content": "You are a helpful voice assistant."
}
],
settings={
"temperature": 0.7,
"max_tokens": 1000
}
)
# Create aggregator pair
aggregators = LLMContextAggregatorPair(context=context)
# Create LLM service with context
llm_service = OpenAILLMService(api_key="key", model="gpt-4")
llm_service.set_context(context)
# Build pipeline
pipeline = Pipeline([
transport.input(),
stt_service,
aggregators.user, # User messages added to context
llm_service, # Uses context for generation
aggregators.assistant, # Assistant messages added to context
tts_service,
transport.output()
]){ .api }
from pipecat.processors.aggregators.llm_context import LLMContext
# Define function tools
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name"
},
"units": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["location"]
}
}
}
]
# Create context with tools
context = LLMContext(
messages=[
{"role": "system", "content": "You are a weather assistant."}
],
tools=tools
)
# Register function handler
async def get_weather(location: str, units: str = "celsius") -> dict:
return {"temp": 72, "units": units, "condition": "sunny"}
llm_service.register_function("get_weather", get_weather)
llm_service.set_context(context){ .api }
from pipecat.processors.aggregators.llm_context import LLMContext
# Context automatically maintains conversation history
context = LLMContext(
messages=[
{"role": "system", "content": "You are helpful."}
]
)
# After user says "What's the weather?"
# Context becomes:
# [
# {"role": "system", "content": "You are helpful."},
# {"role": "user", "content": "What's the weather?"},
# {"role": "assistant", "content": "I need your location..."}
# ]
# After user says "San Francisco"
# Context becomes:
# [
# {"role": "system", "content": "You are helpful."},
# {"role": "user", "content": "What's the weather?"},
# {"role": "assistant", "content": "I need your location..."},
# {"role": "user", "content": "San Francisco"},
# {"role": "assistant", "content": "In SF it's 65°F and sunny."}
# ]
# Access conversation history
messages = context.get_messages()
print(f"Conversation has {len(messages)} messages"){ .api }
from pipecat.frames.frames import LLMMessagesAppendFrame, LLMSetToolsFrame
# Add message programmatically
append_frame = LLMMessagesAppendFrame(
messages=[
{
"role": "system",
"content": "Additional context: User is in a hurry."
}
]
)
await task.queue_frame(append_frame)
# Update tools dynamically
new_tools = [
{
"type": "function",
"function": {
"name": "book_ride",
"description": "Book a ride",
"parameters": {...}
}
}
]
tools_frame = LLMSetToolsFrame(tools=new_tools)
await task.queue_frame(tools_frame){ .api }
from pipecat.processors.aggregators.llm_context import (
LLMUserAggregator,
LLMUserAggregatorParams
)
# Custom accumulation callback
async def custom_accumulate(aggregator, frame):
"""Custom logic for accumulating frames."""
if isinstance(frame, TranscriptionFrame):
# Filter profanity
text = frame.text.replace("bad_word", "***")
aggregator._accumulate_text(text)
# Custom append callback
async def custom_append(aggregator):
"""Custom logic for appending to context."""
# Add timestamp to user messages
message = {
"role": "user",
"content": aggregator._accumulated_text,
"timestamp": datetime.now().isoformat()
}
aggregator._context.add_message(message)
# Create aggregator with custom callbacks
user_agg = LLMUserAggregator(
context=context,
params=LLMUserAggregatorParams(
custom_accumulate_callback=custom_accumulate,
custom_append_callback=custom_append
)
){ .api }
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import LLMMessagesAppendFrame
class ContextMonitor(FrameProcessor):
"""Monitor context updates."""
def __init__(self, context: LLMContext, **kwargs):
super().__init__(**kwargs)
self._context = context
async def process_frame(self, frame, direction):
if isinstance(frame, LLMMessagesAppendFrame):
messages = self._context.get_messages()
print(f"Context updated: {len(messages)} messages")
# Log last message
if messages:
last = messages[-1]
print(f" Role: {last['role']}")
print(f" Content: {last['content'][:100]}...")
await self.push_frame(frame, direction)
# Add to pipeline
monitor = ContextMonitor(context=context)
pipeline = Pipeline([
user_agg,
monitor, # Monitor context changes
llm_service,
assistant_agg,
tts_service,
transport.output()
]){ .api }
# Good: Universal context works with all providers
from pipecat.processors.aggregators.llm_context import LLMContext
context = LLMContext(messages=[...])
# Works with OpenAI
openai_llm = OpenAILLMService(api_key="key")
openai_llm.set_context(context)
# Works with Anthropic
anthropic_llm = AnthropicLLMService(api_key="key")
anthropic_llm.set_context(context)
# Bad: Provider-specific context
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
openai_context = OpenAILLMContext(messages=[...])
# Only works with OpenAI{ .api }
# Good: Clear system message defines behavior
context = LLMContext(
messages=[
{
"role": "system",
"content": "You are a helpful voice assistant. Keep responses concise and natural for speech."
}
]
)
# Bad: No system message
context = LLMContext(messages=[])
# LLM behavior may be unpredictable{ .api }
# Good: Aggregator pair ensures shared context
from pipecat.processors.aggregators.llm_context import LLMContextAggregatorPair
aggregators = LLMContextAggregatorPair(context=context)
pipeline = Pipeline([
aggregators.user,
llm_service,
aggregators.assistant
])
# Bad: Separate aggregators with different contexts
user_agg = LLMUserAggregator(context=context1)
assistant_agg = LLMAssistantAggregator(context=context2)
# Contexts not synchronized!{ .api }
from pipecat.processors.frame_processor import FrameProcessor
class ContextTrimmer(FrameProcessor):
"""Trim context to prevent token limits."""
def __init__(self, context: LLMContext, max_messages: int = 20, **kwargs):
super().__init__(**kwargs)
self._context = context
self._max_messages = max_messages
async def process_frame(self, frame, direction):
if isinstance(frame, LLMMessagesAppendFrame):
messages = self._context.get_messages()
# Keep system message + recent messages
if len(messages) > self._max_messages:
system_msgs = [m for m in messages if m["role"] == "system"]
other_msgs = [m for m in messages if m["role"] != "system"]
trimmed = system_msgs + other_msgs[-(self._max_messages - len(system_msgs)):]
# Update context
self._context.messages = trimmed
print(f"Trimmed context: {len(messages)} -> {len(trimmed)} messages")
await self.push_frame(frame, direction){ .api }
import pytest
async def test_user_aggregation():
"""Test user message aggregation."""
context = LLMContext()
user_agg = LLMUserAggregator(context=context)
# Start user speech
await user_agg.process_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
# Send transcription
await user_agg.process_frame(
TranscriptionFrame(text="Hello world"),
FrameDirection.DOWNSTREAM
)
# End user speech
await user_agg.process_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
# Verify context
messages = context.get_messages()
assert len(messages) == 1
assert messages[0]["role"] == "user"
assert messages[0]["content"] == "Hello world"