docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Pipecat provides integration processors for popular AI frameworks including Langchain, Strands Agents, and RTVI protocol. These processors allow you to incorporate existing framework code into Pipecat pipelines.
{ .api }
from pipecat.processors.frameworks.langchain import LangchainProcessor
class LangchainProcessor(FrameProcessor):
"""Processor that integrates Langchain runnables with Pipecat's frame pipeline.
This processor takes LLM message frames, extracts the latest user message,
and processes it through a Langchain runnable chain. The response is streamed
back as text frames with appropriate response markers.
Features:
- Seamless integration with Langchain LCEL chains
- Streaming response support
- Session management via participant IDs
- Compatible with any Langchain runnable
Frames Consumed:
- LLMContextFrame: Universal context with messages
- OpenAILLMContextFrame: Legacy OpenAI context
Frames Produced:
- LLMFullResponseStartFrame: Marks response start
- TextFrame: Streaming response tokens
- LLMFullResponseEndFrame: Marks response end
Example:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pipecat.processors.frameworks.langchain import LangchainProcessor
# Create Langchain chain
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "{input}")
])
llm = ChatOpenAI(model="gpt-4")
chain = prompt | llm
# Create processor
langchain = LangchainProcessor(
chain=chain,
transcript_key="input"
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
langchain, # Langchain processing
tts,
transport.output()
])
"""
def __init__(self, chain: Runnable, transcript_key: str = "input"):
"""Initialize the Langchain processor.
Args:
chain: The Langchain runnable to use for processing messages
transcript_key: The key to use when passing input to the chain. Defaults to "input"
"""
pass
def set_participant_id(self, participant_id: str):
"""Set the participant ID for session tracking.
Args:
participant_id: The participant ID to use for session configuration
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle LLM message frames.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
pass{ .api }
from pipecat.processors.frameworks.strands_agents import StrandsAgentsProcessor
class StrandsAgentsProcessor(FrameProcessor):
"""Processor that integrates Strands Agents with Pipecat's frame pipeline.
This processor takes LLM message frames, extracts the latest user message,
and processes it through either a single Strands Agent or a multi-agent Graph.
The response is streamed back as text frames with appropriate response markers.
Features:
- Single agent streaming support
- Multi-agent graph workflows
- Automatic token usage metrics
- TTFB (Time to First Byte) tracking
Supports both single agent streaming and graph-based multi-agent workflows.
Frames Consumed:
- LLMContextFrame: Universal context with messages
- OpenAILLMContextFrame: Legacy OpenAI context
Frames Produced:
- LLMFullResponseStartFrame: Marks response start
- LLMTextFrame: Streaming response tokens
- LLMFullResponseEndFrame: Marks response end
- MetricsFrame: Token usage metrics
Example:
from strands import Agent
from pipecat.processors.frameworks.strands_agents import StrandsAgentsProcessor
# Single agent example
agent = Agent(
name="assistant",
model="anthropic:claude-3-5-sonnet-20241022",
instructions="You are a helpful assistant."
)
processor = StrandsAgentsProcessor(agent=agent)
# Multi-agent graph example
from strands.multiagent.graph import Graph
graph = Graph()
# ... configure graph ...
processor = StrandsAgentsProcessor(
graph=graph,
graph_exit_node="final_agent"
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
processor, # Strands processing
tts,
transport.output()
])
"""
def __init__(
self,
agent: Optional[Agent] = None,
graph: Optional[Graph] = None,
graph_exit_node: Optional[str] = None,
):
"""Initialize the Strands Agents processor.
Args:
agent: The Strands Agent to use for single-agent processing
graph: The Strands multi-agent Graph to use for graph-based processing
graph_exit_node: The exit node name when using graph-based processing
Raises:
AssertionError: If neither agent nor graph is provided, or if graph is
provided without a graph_exit_node
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle LLM message frames.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
pass
def can_generate_metrics(self) -> bool:
"""Check if this service can generate performance metrics.
Returns:
True as this service supports metrics generation
"""
pass{ .api }
from pipecat.processors.frameworks.rtvi import RTVIProcessor
class RTVIProcessor(FrameProcessor):
"""RTVI (Real-Time Voice Interface) protocol implementation for Pipecat.
This processor implements the RTVI protocol for real-time voice interactions
between clients and AI agents. It handles bidirectional messaging, action
processing, and protocol-level events.
RTVI Protocol Features:
- Real-time bidirectional messaging
- Action/response pattern
- Client configuration messages
- Bot state notifications
- Audio level monitoring
- Metrics reporting
- Transport message handling
Protocol Version: 1.1.0
Message Types:
- Client messages: Actions and configuration from client
- Server messages: Responses and state updates to client
- Bot messages: Bot state changes (speaking, stopped, etc.)
- Error messages: Protocol and processing errors
Frames Consumed:
- InputTransportMessageFrame: Client messages
- BotStartedSpeakingFrame: Bot speech start
- BotStoppedSpeakingFrame: Bot speech stop
- UserStartedSpeakingFrame: User speech start
- UserStoppedSpeakingFrame: User speech stop
- TranscriptionFrame: Speech transcriptions
- LLMTextFrame: LLM responses
- TTSAudioRawFrame: TTS audio output
- MetricsFrame: Performance metrics
Frames Produced:
- OutputTransportMessageUrgentFrame: Protocol messages to client
- LLMMessagesAppendFrame: Context updates
- LLMConfigureOutputFrame: LLM configuration
Example:
from pipecat.processors.frameworks.rtvi import RTVIProcessor
from pipecat.transports.daily import DailyTransport
# Create RTVI processor
rtvi = RTVIProcessor()
# Register custom message handlers
@rtvi.register_action("custom_action")
async def handle_custom_action(processor, action):
# Handle custom action
return {"status": "success"}
# Use in pipeline
pipeline = Pipeline([
transport.input(),
rtvi, # RTVI protocol handling
stt,
context_aggregator.user(),
llm,
tts,
transport.output()
])
Reference:
RTVI Protocol Specification: https://docs.rtvi.ai
"""
def __init__(
self,
*,
params: Optional["RTVIProcessorParams"] = None,
**kwargs
):
"""Initialize the RTVI processor.
Args:
params: RTVI processor configuration parameters
**kwargs: Additional processor arguments
"""
pass
def register_action(self, action_name: str):
"""Register a custom action handler.
Args:
action_name: Name of the action to handle
Returns:
Decorator function for the action handler
Example:
@rtvi.register_action("get_weather")
async def get_weather(processor, action):
city = action.arguments.get("city")
return {"temperature": 72, "condition": "sunny"}
"""
pass
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle RTVI protocol messages.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
pass{ .api }
from pipecat.processors.frameworks.rtvi import RTVIProcessorParams
class RTVIProcessorParams(BaseModel):
"""Configuration parameters for RTVI processor.
Parameters:
enable_metrics: Enable automatic metrics reporting. Defaults to True
enable_audio_volume: Enable audio volume level reporting. Defaults to True
enable_transcriptions: Enable transcription messages. Defaults to True
"""
enable_metrics: bool = True
enable_audio_volume: bool = True
enable_transcriptions: bool = True{ .api }
from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource
class OutputParams(BaseModel):
"""Output configuration parameters for GStreamer pipeline.
Parameters:
video_width: Width of output video frames in pixels (default: 1280)
video_height: Height of output video frames in pixels (default: 720)
audio_sample_rate: Sample rate for audio output. If None, uses frame sample rate
audio_channels: Number of audio channels for output (default: 1)
clock_sync: Whether to synchronize output with pipeline clock (default: True)
"""
video_width: int = 1280
video_height: int = 720
audio_sample_rate: Optional[int] = None
audio_channels: int = 1
clock_sync: bool = True
class GStreamerPipelineSource(FrameProcessor):
"""A frame processor that uses GStreamer pipelines as media sources.
This processor creates and manages GStreamer pipelines to generate audio and video
output frames. It handles pipeline lifecycle, decoding, format conversion, and
frame generation with configurable output parameters.
Features:
- Audio and video pipeline support
- Custom GStreamer pipeline definitions
- Automatic decoding with decodebin
- Format conversion and resampling
- Clock synchronization
- Frame generation from GStreamer samples
Pipeline Lifecycle:
1. Initialize GStreamer pipeline with description string
2. On StartFrame: Start pipeline playback
3. Generate OutputAudioRawFrame and OutputImageRawFrame
4. On EndFrame: Stop pipeline
5. On CancelFrame: Cancel pipeline
Frames Consumed:
- StartFrame: Starts GStreamer pipeline
- EndFrame: Stops GStreamer pipeline
- CancelFrame: Cancels GStreamer pipeline
Frames Produced:
- OutputAudioRawFrame: Audio samples from pipeline
- OutputImageRawFrame: Video frames from pipeline
Requirements:
- GStreamer 1.0 installed on system
- pipecat-ai[gstreamer] package installed
Example:
from pipecat.processors.gstreamer.pipeline_source import (
GStreamerPipelineSource,
GStreamerPipelineSource.OutputParams
)
# Create GStreamer audio source
gstreamer = GStreamerPipelineSource(
pipeline="audiotestsrc wave=sine freq=440 ! audioconvert",
out_params=GStreamerPipelineSource.OutputParams(
audio_sample_rate=16000,
audio_channels=1
)
)
# Create video file source
video_source = GStreamerPipelineSource(
pipeline="filesrc location=video.mp4 ! qtdemux name=demux",
out_params=GStreamerPipelineSource.OutputParams(
video_width=1920,
video_height=1080,
audio_sample_rate=16000,
clock_sync=True
)
)
# RTSP camera source
rtsp_source = GStreamerPipelineSource(
pipeline="rtspsrc location=rtsp://camera:554/stream ! rtph264depay ! h264parse",
out_params=GStreamerPipelineSource.OutputParams(
video_width=1280,
video_height=720
)
)
# Use in pipeline
pipeline = Pipeline([
gstreamer, # GStreamer audio/video source
stt,
llm,
tts,
transport.output()
])
"""
def __init__(
self,
*,
pipeline: str,
out_params: Optional[OutputParams] = None,
**kwargs
):
"""Initialize GStreamer pipeline source.
Args:
pipeline: GStreamer pipeline description string for the source
out_params: Output configuration parameters. If None, uses defaults
**kwargs: Additional processor arguments
"""
pass{ .api }
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
# Create chain with memory
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}")
])
llm = ChatOpenAI(model="gpt-4")
chain = prompt | llm
# Wrap with message history
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
# Create processor
langchain = LangchainProcessor(
chain=chain_with_history,
transcript_key="input"
)
# Set participant ID for session
langchain.set_participant_id("user_123"){ .api }
from strands import Agent
from strands.multiagent.graph import Graph
# Create agents
researcher = Agent(
name="researcher",
model="anthropic:claude-3-5-sonnet-20241022",
instructions="Research and gather information."
)
writer = Agent(
name="writer",
model="anthropic:claude-3-5-sonnet-20241022",
instructions="Write content based on research."
)
# Create graph
graph = Graph()
graph.add_edge(researcher, writer)
# Create processor
processor = StrandsAgentsProcessor(
graph=graph,
graph_exit_node="writer"
)
# Use in pipeline
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
processor,
tts,
transport.output()
]){ .api }
from pipecat.processors.frameworks.rtvi import RTVIProcessor
rtvi = RTVIProcessor()
# Register custom actions
@rtvi.register_action("get_weather")
async def get_weather(processor, action):
"""Get weather for a city."""
city = action.arguments.get("city", "San Francisco")
# Call weather API
return {
"city": city,
"temperature": 72,
"condition": "sunny"
}
@rtvi.register_action("set_reminder")
async def set_reminder(processor, action):
"""Set a reminder."""
time = action.arguments.get("time")
message = action.arguments.get("message")
# Store reminder
return {
"status": "created",
"time": time,
"message": message
}
# Use in pipeline with RTVI-capable transport
pipeline = Pipeline([
daily_transport.input(),
rtvi,
stt,
context_aggregator.user(),
llm,
tts,
daily_transport.output()
]){ .api }
from pipecat.processors.gstreamer import GStreamerPipelineSource
# Create audio source with effects
gstreamer = GStreamerPipelineSource(
pipeline_str="""
audiotestsrc wave=sine freq=440 !
audioconvert !
audioresample !
audio/x-raw,format=S16LE,rate=16000,channels=1
"""
)
# Or use file source
gstreamer_file = GStreamerPipelineSource(
pipeline_str="""
filesrc location=audio.wav !
decodebin !
audioconvert !
audioresample !
audio/x-raw,format=S16LE,rate=16000,channels=1
"""
)
pipeline = Pipeline([
gstreamer,
stt,
llm,
tts,
transport.output()
]){ .api }
# Good: Use Langchain for existing LCEL chains
if you_have_langchain_chain:
processor = LangchainProcessor(chain=chain)
# Good: Use Strands for multi-agent workflows
if you_need_multiple_agents:
processor = StrandsAgentsProcessor(graph=graph, graph_exit_node="final")
# Good: Use RTVI for real-time voice interfaces
if you_need_rtvi_protocol:
processor = RTVIProcessor()
# Bad: Don't use framework integration if not needed
# Just use LLMService directly for simple cases{ .api }
from pipecat.processors.frameworks.langchain import LangchainProcessor
langchain = LangchainProcessor(chain=chain)
# Good: Catch framework-specific errors
try:
await langchain.process_frame(context_frame)
except Exception as e:
logger.error(f"Langchain error: {e}")
# Handle error appropriately
# Good: Use processor error handling
langchain.on_error = lambda error: handle_error(error){ .api }
# Good: Set session identifiers for stateful frameworks
langchain.set_participant_id(participant_id)
# Good: Use Langchain's built-in session management
chain_with_history = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
# Bad: Don't mix session management approaches
# Pick one: framework-native or Pipecat context{ .api }
# Good: Enable metrics for monitoring
from pipecat.processors.frameworks.strands_agents import StrandsAgentsProcessor
processor = StrandsAgentsProcessor(agent=agent)
if processor.can_generate_metrics():
# Metrics will be automatically collected
pass
# Good: Handle metrics frames
@pipeline.event_handler("on_metrics")
async def handle_metrics(metrics_frame):
if isinstance(metrics_frame.data, LLMUsageMetricsData):
print(f"Tokens: {metrics_frame.data.tokens.total_tokens}")
# Bad: Don't ignore metrics
# They provide valuable performance insights{ .api }
# Good: Test with mock data
async def test_langchain_integration():
processor = LangchainProcessor(chain=test_chain)
# Send test frame
context = LLMContext([{"role": "user", "content": "Hello"}])
frame = LLMContextFrame(context=context)
result = await processor.process_frame(frame)
assert result is not None
# Good: Test error handling
async def test_error_handling():
processor = LangchainProcessor(chain=failing_chain)
try:
await processor.process_frame(bad_frame)
except Exception as e:
# Verify error is handled gracefully
assert "error" in str(e).lower()