or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

framework-integrations.mddocs/processors/

Framework Integration Processors

Pipecat provides integration processors for popular AI frameworks including Langchain, Strands Agents, and RTVI protocol. These processors allow you to incorporate existing framework code into Pipecat pipelines.

Langchain Integration

LangchainProcessor

{ .api }
from pipecat.processors.frameworks.langchain import LangchainProcessor

class LangchainProcessor(FrameProcessor):
    """Processor that integrates Langchain runnables with Pipecat's frame pipeline.

    This processor takes LLM message frames, extracts the latest user message,
    and processes it through a Langchain runnable chain. The response is streamed
    back as text frames with appropriate response markers.

    Features:
        - Seamless integration with Langchain LCEL chains
        - Streaming response support
        - Session management via participant IDs
        - Compatible with any Langchain runnable

    Frames Consumed:
        - LLMContextFrame: Universal context with messages
        - OpenAILLMContextFrame: Legacy OpenAI context

    Frames Produced:
        - LLMFullResponseStartFrame: Marks response start
        - TextFrame: Streaming response tokens
        - LLMFullResponseEndFrame: Marks response end

    Example:
        from langchain_core.prompts import ChatPromptTemplate
        from langchain_openai import ChatOpenAI
        from pipecat.processors.frameworks.langchain import LangchainProcessor

        # Create Langchain chain
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a helpful assistant."),
            ("human", "{input}")
        ])
        llm = ChatOpenAI(model="gpt-4")
        chain = prompt | llm

        # Create processor
        langchain = LangchainProcessor(
            chain=chain,
            transcript_key="input"
        )

        # Use in pipeline
        pipeline = Pipeline([
            transport.input(),
            stt,
            context_aggregator.user(),
            langchain,  # Langchain processing
            tts,
            transport.output()
        ])
    """

    def __init__(self, chain: Runnable, transcript_key: str = "input"):
        """Initialize the Langchain processor.

        Args:
            chain: The Langchain runnable to use for processing messages
            transcript_key: The key to use when passing input to the chain. Defaults to "input"
        """
        pass

    def set_participant_id(self, participant_id: str):
        """Set the participant ID for session tracking.

        Args:
            participant_id: The participant ID to use for session configuration
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames and handle LLM message frames.

        Args:
            frame: The incoming frame to process
            direction: The direction of frame flow in the pipeline
        """
        pass

Strands Agents Integration

StrandsAgentsProcessor

{ .api }
from pipecat.processors.frameworks.strands_agents import StrandsAgentsProcessor

class StrandsAgentsProcessor(FrameProcessor):
    """Processor that integrates Strands Agents with Pipecat's frame pipeline.

    This processor takes LLM message frames, extracts the latest user message,
    and processes it through either a single Strands Agent or a multi-agent Graph.
    The response is streamed back as text frames with appropriate response markers.

    Features:
        - Single agent streaming support
        - Multi-agent graph workflows
        - Automatic token usage metrics
        - TTFB (Time to First Byte) tracking

    Supports both single agent streaming and graph-based multi-agent workflows.

    Frames Consumed:
        - LLMContextFrame: Universal context with messages
        - OpenAILLMContextFrame: Legacy OpenAI context

    Frames Produced:
        - LLMFullResponseStartFrame: Marks response start
        - LLMTextFrame: Streaming response tokens
        - LLMFullResponseEndFrame: Marks response end
        - MetricsFrame: Token usage metrics

    Example:
        from strands import Agent
        from pipecat.processors.frameworks.strands_agents import StrandsAgentsProcessor

        # Single agent example
        agent = Agent(
            name="assistant",
            model="anthropic:claude-3-5-sonnet-20241022",
            instructions="You are a helpful assistant."
        )

        processor = StrandsAgentsProcessor(agent=agent)

        # Multi-agent graph example
        from strands.multiagent.graph import Graph

        graph = Graph()
        # ... configure graph ...

        processor = StrandsAgentsProcessor(
            graph=graph,
            graph_exit_node="final_agent"
        )

        # Use in pipeline
        pipeline = Pipeline([
            transport.input(),
            stt,
            context_aggregator.user(),
            processor,  # Strands processing
            tts,
            transport.output()
        ])
    """

    def __init__(
        self,
        agent: Optional[Agent] = None,
        graph: Optional[Graph] = None,
        graph_exit_node: Optional[str] = None,
    ):
        """Initialize the Strands Agents processor.

        Args:
            agent: The Strands Agent to use for single-agent processing
            graph: The Strands multi-agent Graph to use for graph-based processing
            graph_exit_node: The exit node name when using graph-based processing

        Raises:
            AssertionError: If neither agent nor graph is provided, or if graph is
                          provided without a graph_exit_node
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames and handle LLM message frames.

        Args:
            frame: The incoming frame to process
            direction: The direction of frame flow in the pipeline
        """
        pass

    def can_generate_metrics(self) -> bool:
        """Check if this service can generate performance metrics.

        Returns:
            True as this service supports metrics generation
        """
        pass

RTVI Protocol Integration

RTVIProcessor

{ .api }
from pipecat.processors.frameworks.rtvi import RTVIProcessor

class RTVIProcessor(FrameProcessor):
    """RTVI (Real-Time Voice Interface) protocol implementation for Pipecat.

    This processor implements the RTVI protocol for real-time voice interactions
    between clients and AI agents. It handles bidirectional messaging, action
    processing, and protocol-level events.

    RTVI Protocol Features:
        - Real-time bidirectional messaging
        - Action/response pattern
        - Client configuration messages
        - Bot state notifications
        - Audio level monitoring
        - Metrics reporting
        - Transport message handling

    Protocol Version: 1.1.0

    Message Types:
        - Client messages: Actions and configuration from client
        - Server messages: Responses and state updates to client
        - Bot messages: Bot state changes (speaking, stopped, etc.)
        - Error messages: Protocol and processing errors

    Frames Consumed:
        - InputTransportMessageFrame: Client messages
        - BotStartedSpeakingFrame: Bot speech start
        - BotStoppedSpeakingFrame: Bot speech stop
        - UserStartedSpeakingFrame: User speech start
        - UserStoppedSpeakingFrame: User speech stop
        - TranscriptionFrame: Speech transcriptions
        - LLMTextFrame: LLM responses
        - TTSAudioRawFrame: TTS audio output
        - MetricsFrame: Performance metrics

    Frames Produced:
        - OutputTransportMessageUrgentFrame: Protocol messages to client
        - LLMMessagesAppendFrame: Context updates
        - LLMConfigureOutputFrame: LLM configuration

    Example:
        from pipecat.processors.frameworks.rtvi import RTVIProcessor
        from pipecat.transports.daily import DailyTransport

        # Create RTVI processor
        rtvi = RTVIProcessor()

        # Register custom message handlers
        @rtvi.register_action("custom_action")
        async def handle_custom_action(processor, action):
            # Handle custom action
            return {"status": "success"}

        # Use in pipeline
        pipeline = Pipeline([
            transport.input(),
            rtvi,  # RTVI protocol handling
            stt,
            context_aggregator.user(),
            llm,
            tts,
            transport.output()
        ])

    Reference:
        RTVI Protocol Specification: https://docs.rtvi.ai
    """

    def __init__(
        self,
        *,
        params: Optional["RTVIProcessorParams"] = None,
        **kwargs
    ):
        """Initialize the RTVI processor.

        Args:
            params: RTVI processor configuration parameters
            **kwargs: Additional processor arguments
        """
        pass

    def register_action(self, action_name: str):
        """Register a custom action handler.

        Args:
            action_name: Name of the action to handle

        Returns:
            Decorator function for the action handler

        Example:
            @rtvi.register_action("get_weather")
            async def get_weather(processor, action):
                city = action.arguments.get("city")
                return {"temperature": 72, "condition": "sunny"}
        """
        pass

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames and handle RTVI protocol messages.

        Args:
            frame: The incoming frame to process
            direction: The direction of frame flow in the pipeline
        """
        pass

RTVIProcessorParams

{ .api }
from pipecat.processors.frameworks.rtvi import RTVIProcessorParams

class RTVIProcessorParams(BaseModel):
    """Configuration parameters for RTVI processor.

    Parameters:
        enable_metrics: Enable automatic metrics reporting. Defaults to True
        enable_audio_volume: Enable audio volume level reporting. Defaults to True
        enable_transcriptions: Enable transcription messages. Defaults to True
    """

    enable_metrics: bool = True
    enable_audio_volume: bool = True
    enable_transcriptions: bool = True

GStreamer Integration

GStreamerPipelineSource

{ .api }
from pipecat.processors.gstreamer.pipeline_source import GStreamerPipelineSource

class OutputParams(BaseModel):
    """Output configuration parameters for GStreamer pipeline.

    Parameters:
        video_width: Width of output video frames in pixels (default: 1280)
        video_height: Height of output video frames in pixels (default: 720)
        audio_sample_rate: Sample rate for audio output. If None, uses frame sample rate
        audio_channels: Number of audio channels for output (default: 1)
        clock_sync: Whether to synchronize output with pipeline clock (default: True)
    """

    video_width: int = 1280
    video_height: int = 720
    audio_sample_rate: Optional[int] = None
    audio_channels: int = 1
    clock_sync: bool = True


class GStreamerPipelineSource(FrameProcessor):
    """A frame processor that uses GStreamer pipelines as media sources.

    This processor creates and manages GStreamer pipelines to generate audio and video
    output frames. It handles pipeline lifecycle, decoding, format conversion, and
    frame generation with configurable output parameters.

    Features:
        - Audio and video pipeline support
        - Custom GStreamer pipeline definitions
        - Automatic decoding with decodebin
        - Format conversion and resampling
        - Clock synchronization
        - Frame generation from GStreamer samples

    Pipeline Lifecycle:
        1. Initialize GStreamer pipeline with description string
        2. On StartFrame: Start pipeline playback
        3. Generate OutputAudioRawFrame and OutputImageRawFrame
        4. On EndFrame: Stop pipeline
        5. On CancelFrame: Cancel pipeline

    Frames Consumed:
        - StartFrame: Starts GStreamer pipeline
        - EndFrame: Stops GStreamer pipeline
        - CancelFrame: Cancels GStreamer pipeline

    Frames Produced:
        - OutputAudioRawFrame: Audio samples from pipeline
        - OutputImageRawFrame: Video frames from pipeline

    Requirements:
        - GStreamer 1.0 installed on system
        - pipecat-ai[gstreamer] package installed

    Example:
        from pipecat.processors.gstreamer.pipeline_source import (
            GStreamerPipelineSource,
            GStreamerPipelineSource.OutputParams
        )

        # Create GStreamer audio source
        gstreamer = GStreamerPipelineSource(
            pipeline="audiotestsrc wave=sine freq=440 ! audioconvert",
            out_params=GStreamerPipelineSource.OutputParams(
                audio_sample_rate=16000,
                audio_channels=1
            )
        )

        # Create video file source
        video_source = GStreamerPipelineSource(
            pipeline="filesrc location=video.mp4 ! qtdemux name=demux",
            out_params=GStreamerPipelineSource.OutputParams(
                video_width=1920,
                video_height=1080,
                audio_sample_rate=16000,
                clock_sync=True
            )
        )

        # RTSP camera source
        rtsp_source = GStreamerPipelineSource(
            pipeline="rtspsrc location=rtsp://camera:554/stream ! rtph264depay ! h264parse",
            out_params=GStreamerPipelineSource.OutputParams(
                video_width=1280,
                video_height=720
            )
        )

        # Use in pipeline
        pipeline = Pipeline([
            gstreamer,  # GStreamer audio/video source
            stt,
            llm,
            tts,
            transport.output()
        ])
    """

    def __init__(
        self,
        *,
        pipeline: str,
        out_params: Optional[OutputParams] = None,
        **kwargs
    ):
        """Initialize GStreamer pipeline source.

        Args:
            pipeline: GStreamer pipeline description string for the source
            out_params: Output configuration parameters. If None, uses defaults
            **kwargs: Additional processor arguments
        """
        pass

Usage Patterns

Langchain with Memory

{ .api }
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI

# Create chain with memory
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])

llm = ChatOpenAI(model="gpt-4")
chain = prompt | llm

# Wrap with message history
chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)

# Create processor
langchain = LangchainProcessor(
    chain=chain_with_history,
    transcript_key="input"
)

# Set participant ID for session
langchain.set_participant_id("user_123")

Strands Multi-Agent Workflow

{ .api }
from strands import Agent
from strands.multiagent.graph import Graph

# Create agents
researcher = Agent(
    name="researcher",
    model="anthropic:claude-3-5-sonnet-20241022",
    instructions="Research and gather information."
)

writer = Agent(
    name="writer",
    model="anthropic:claude-3-5-sonnet-20241022",
    instructions="Write content based on research."
)

# Create graph
graph = Graph()
graph.add_edge(researcher, writer)

# Create processor
processor = StrandsAgentsProcessor(
    graph=graph,
    graph_exit_node="writer"
)

# Use in pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    context_aggregator.user(),
    processor,
    tts,
    transport.output()
])

RTVI with Custom Actions

{ .api }
from pipecat.processors.frameworks.rtvi import RTVIProcessor

rtvi = RTVIProcessor()

# Register custom actions
@rtvi.register_action("get_weather")
async def get_weather(processor, action):
    """Get weather for a city."""
    city = action.arguments.get("city", "San Francisco")
    # Call weather API
    return {
        "city": city,
        "temperature": 72,
        "condition": "sunny"
    }

@rtvi.register_action("set_reminder")
async def set_reminder(processor, action):
    """Set a reminder."""
    time = action.arguments.get("time")
    message = action.arguments.get("message")
    # Store reminder
    return {
        "status": "created",
        "time": time,
        "message": message
    }

# Use in pipeline with RTVI-capable transport
pipeline = Pipeline([
    daily_transport.input(),
    rtvi,
    stt,
    context_aggregator.user(),
    llm,
    tts,
    daily_transport.output()
])

GStreamer Audio Processing

{ .api }
from pipecat.processors.gstreamer import GStreamerPipelineSource

# Create audio source with effects
gstreamer = GStreamerPipelineSource(
    pipeline_str="""
        audiotestsrc wave=sine freq=440 !
        audioconvert !
        audioresample !
        audio/x-raw,format=S16LE,rate=16000,channels=1
    """
)

# Or use file source
gstreamer_file = GStreamerPipelineSource(
    pipeline_str="""
        filesrc location=audio.wav !
        decodebin !
        audioconvert !
        audioresample !
        audio/x-raw,format=S16LE,rate=16000,channels=1
    """
)

pipeline = Pipeline([
    gstreamer,
    stt,
    llm,
    tts,
    transport.output()
])

Best Practices

Choose the Right Integration

{ .api }
# Good: Use Langchain for existing LCEL chains
if you_have_langchain_chain:
    processor = LangchainProcessor(chain=chain)

# Good: Use Strands for multi-agent workflows
if you_need_multiple_agents:
    processor = StrandsAgentsProcessor(graph=graph, graph_exit_node="final")

# Good: Use RTVI for real-time voice interfaces
if you_need_rtvi_protocol:
    processor = RTVIProcessor()

# Bad: Don't use framework integration if not needed
# Just use LLMService directly for simple cases

Handle Framework-Specific Errors

{ .api }
from pipecat.processors.frameworks.langchain import LangchainProcessor

langchain = LangchainProcessor(chain=chain)

# Good: Catch framework-specific errors
try:
    await langchain.process_frame(context_frame)
except Exception as e:
    logger.error(f"Langchain error: {e}")
    # Handle error appropriately

# Good: Use processor error handling
langchain.on_error = lambda error: handle_error(error)

Manage Session State

{ .api }
# Good: Set session identifiers for stateful frameworks
langchain.set_participant_id(participant_id)

# Good: Use Langchain's built-in session management
chain_with_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)

# Bad: Don't mix session management approaches
# Pick one: framework-native or Pipecat context

Monitor Metrics

{ .api }
# Good: Enable metrics for monitoring
from pipecat.processors.frameworks.strands_agents import StrandsAgentsProcessor

processor = StrandsAgentsProcessor(agent=agent)

if processor.can_generate_metrics():
    # Metrics will be automatically collected
    pass

# Good: Handle metrics frames
@pipeline.event_handler("on_metrics")
async def handle_metrics(metrics_frame):
    if isinstance(metrics_frame.data, LLMUsageMetricsData):
        print(f"Tokens: {metrics_frame.data.tokens.total_tokens}")

# Bad: Don't ignore metrics
# They provide valuable performance insights

Test Framework Integrations

{ .api }
# Good: Test with mock data
async def test_langchain_integration():
    processor = LangchainProcessor(chain=test_chain)

    # Send test frame
    context = LLMContext([{"role": "user", "content": "Hello"}])
    frame = LLMContextFrame(context=context)

    result = await processor.process_frame(frame)
    assert result is not None

# Good: Test error handling
async def test_error_handling():
    processor = LangchainProcessor(chain=failing_chain)

    try:
        await processor.process_frame(bad_frame)
    except Exception as e:
        # Verify error is handled gracefully
        assert "error" in str(e).lower()