or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

transports.mddocs/

Transports

Transports provide real-time audio/video communication for Pipecat pipelines. They handle network I/O, media streaming, and integrate with WebRTC, WebSocket, and local audio systems.

Base Transport

BaseTransport

{ .api }
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.turn_detection import BaseTurnAnalyzer
from typing import Optional

class BaseTransport(BaseObject):
    """Abstract base class for all transports.

    Provides interface for audio/video I/O, connection management,
    and integration with pipelines.

    Key Features:
    - Audio input/output streaming
    - Video input/output (optional)
    - VAD integration
    - Turn detection
    - Message passing

    Methods:
        start(): Start transport
        stop(): Stop transport
        send_audio(audio): Send audio
        send_image(image): Send image/video
        send_message(message): Send data message
        input(): Get input processor
        output(): Get output processor

    Example:
        transport = SomeTransport(
            params=TransportParams(
                audio_in_enabled=True,
                audio_out_enabled=True
            )
        )

        pipeline = Pipeline([
            transport.input(),   # Receive frames
            # ... processors ...
            transport.output()   # Send frames
        ])
    """

    def __init__(self, params: TransportParams, **kwargs):
        """Initialize transport.

        Args:
            params: Transport configuration
            **kwargs: Additional arguments
        """
        self.params = params
        self._input_processor = None
        self._output_processor = None

    async def start(self):
        """Start the transport."""
        raise NotImplementedError("Subclasses must implement start()")

    async def stop(self):
        """Stop the transport."""
        raise NotImplementedError("Subclasses must implement stop()")

    async def send_audio(self, audio: bytes):
        """Send audio data.

        Args:
            audio: Audio bytes
        """
        raise NotImplementedError("Subclasses must implement send_audio()")

    async def send_image(self, image: bytes):
        """Send image/video data.

        Args:
            image: Image bytes
        """
        raise NotImplementedError("Subclasses must implement send_image()")

    async def send_message(self, message: dict):
        """Send data message.

        Args:
            message: Message dictionary
        """
        raise NotImplementedError("Subclasses must implement send_message()")

    def input(self) -> FrameProcessor:
        """Get input processor.

        Returns:
            Processor that receives transport input
        """
        return self._input_processor

    def output(self) -> FrameProcessor:
        """Get output processor.

        Returns:
            Processor that sends to transport
        """
        return self._output_processor

TransportParams

{ .api }
from pipecat.transports.base_transport import TransportParams
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.turn_detection import BaseTurnAnalyzer
from typing import Optional

class TransportParams:
    """Transport configuration parameters.

    Configures audio/video I/O, VAD, turn detection, and other
    transport features.

    Attributes:
        audio_in_enabled (bool): Enable audio input
        audio_out_enabled (bool): Enable audio output
        video_in_enabled (bool): Enable video input
        video_out_enabled (bool): Enable video output
        vad_enabled (bool): Enable voice activity detection
        vad_analyzer (Optional[VADAnalyzer]): VAD analyzer instance
        turn_analyzer (Optional[BaseTurnAnalyzer]): Turn detection analyzer
        audio_in_sample_rate (int): Input sample rate (Hz)
        audio_out_sample_rate (int): Output sample rate (Hz)
        audio_in_channels (int): Input channel count
        audio_out_channels (int): Output channel count
        audio_out_bitrate (int): Output bitrate (bps)
        video_in_width (int): Input video width
        video_in_height (int): Input video height
        video_in_framerate (int): Input frame rate
        video_out_width (int): Output video width
        video_out_height (int): Output video height
        video_out_framerate (int): Output frame rate
        video_out_bitrate (int): Output video bitrate (bps)

    Example:
        params = TransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            audio_in_sample_rate=16000,
            audio_out_sample_rate=24000,
            vad_enabled=True,
            vad_analyzer=SileroVADAnalyzer()
        )
    """

    def __init__(
        self,
        audio_in_enabled: bool = True,
        audio_out_enabled: bool = True,
        video_in_enabled: bool = False,
        video_out_enabled: bool = False,
        vad_enabled: bool = False,
        vad_analyzer: Optional[VADAnalyzer] = None,
        turn_analyzer: Optional[BaseTurnAnalyzer] = None,
        audio_in_sample_rate: int = 16000,
        audio_out_sample_rate: int = 24000,
        audio_in_channels: int = 1,
        audio_out_channels: int = 1,
        audio_out_bitrate: int = 64000,
        video_in_width: int = 1280,
        video_in_height: int = 720,
        video_in_framerate: int = 30,
        video_out_width: int = 1280,
        video_out_height: int = 720,
        video_out_framerate: int = 30,
        video_out_bitrate: int = 1000000
    ):
        """Initialize transport parameters.

        Args:
            audio_in_enabled: Enable audio input
            audio_out_enabled: Enable audio output
            video_in_enabled: Enable video input
            video_out_enabled: Enable video output
            vad_enabled: Enable VAD
            vad_analyzer: VAD analyzer instance
            turn_analyzer: Turn detection analyzer
            audio_in_sample_rate: Input sample rate
            audio_out_sample_rate: Output sample rate
            audio_in_channels: Input channels
            audio_out_channels: Output channels
            audio_out_bitrate: Output bitrate
            video_in_width: Input width
            video_in_height: Input height
            video_in_framerate: Input framerate
            video_out_width: Output width
            video_out_height: Output height
            video_out_framerate: Output framerate
            video_out_bitrate: Output bitrate
        """
        self.audio_in_enabled = audio_in_enabled
        self.audio_out_enabled = audio_out_enabled
        self.video_in_enabled = video_in_enabled
        self.video_out_enabled = video_out_enabled
        self.vad_enabled = vad_enabled
        self.vad_analyzer = vad_analyzer
        self.turn_analyzer = turn_analyzer
        self.audio_in_sample_rate = audio_in_sample_rate
        self.audio_out_sample_rate = audio_out_sample_rate
        self.audio_in_channels = audio_in_channels
        self.audio_out_channels = audio_out_channels
        self.audio_out_bitrate = audio_out_bitrate
        self.video_in_width = video_in_width
        self.video_in_height = video_in_height
        self.video_in_framerate = video_in_framerate
        self.video_out_width = video_out_width
        self.video_out_height = video_out_height
        self.video_out_framerate = video_out_framerate
        self.video_out_bitrate = video_out_bitrate

WebRTC Transports

Daily Transport

{ .api }
from pipecat.transports.daily.transport import DailyTransport, DailyParams
from pipecat.transports.base_transport import BaseTransport
from typing import Optional

class DailyTransport(BaseTransport):
    """Daily.co WebRTC transport.

    Integrates with Daily.co for WebRTC audio/video communication.

    Args:
        room_url: Daily room URL
        token: Daily meeting token
        bot_name: Bot display name
        params: Transport parameters

    Example:
        transport = DailyTransport(
            room_url="https://your-domain.daily.co/room-name",
            token="your-token",
            bot_name="Assistant",
            params=DailyParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                vad_enabled=True
            )
        )

        pipeline = Pipeline([
            transport.input(),
            stt, user_agg, llm, tts,
            transport.output()
        ])

        task = PipelineTask(pipeline)
        await task.run()
    """

    def __init__(
        self,
        room_url: str,
        token: Optional[str] = None,
        bot_name: str = "Bot",
        params: Optional[DailyParams] = None,
        **kwargs
    ):
        """Initialize Daily transport.

        Args:
            room_url: Daily room URL
            token: Daily meeting token
            bot_name: Bot display name
            params: Transport parameters
            **kwargs: Additional arguments
        """
        super().__init__(params or DailyParams(), **kwargs)
        self.room_url = room_url
        self.token = token
        self.bot_name = bot_name


class DailyParams(TransportParams):
    """Daily-specific parameters.

    Extends TransportParams with Daily.co specific options.

    Attributes:
        api_url (str): Daily API base URL. Default: "https://api.daily.co/v1"
        api_key (str): Daily API authentication key. Default: ""
        audio_in_user_tracks (bool): Receive users' audio in separate tracks. Default: True
        dialin_settings (Optional[DailyDialinSettings]): Optional settings for dial-in functionality
        camera_out_enabled (bool): Whether to enable the main camera output track. Default: True
        microphone_out_enabled (bool): Whether to enable the main microphone track. Default: True
        transcription_enabled (bool): Whether to enable speech transcription. Default: False
        transcription_settings (DailyTranscriptionSettings): Configuration for transcription service

    Example:
        params = DailyParams(
            api_key="your-daily-api-key",
            audio_in_user_tracks=True,
            camera_out_enabled=True,
            microphone_out_enabled=True,
            transcription_enabled=True,
            transcription_settings=DailyTranscriptionSettings(
                language="en",
                tier="nova"
            )
        )

    Note: DailyParams is a Pydantic BaseModel, not a dataclass.

LiveKit Transport

{ .api }
from pipecat.transports.livekit import LiveKitTransport, LiveKitParams, LiveKitCallbacks
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer
from pydantic import BaseModel
from typing import Optional

class LiveKitTransport(BaseTransport):
    """LiveKit WebRTC transport.

    Provides comprehensive LiveKit integration including audio streaming, data
    messaging, participant management, and room event handling for conversational
    AI applications.

    Args:
        url: LiveKit server URL to connect to (e.g., "wss://your-livekit.com")
        token: Authentication token for the LiveKit room
        room_name: Name of the LiveKit room to join
        params: Configuration parameters for the transport
        input_name: Optional name for the input transport
        output_name: Optional name for the output transport

    Methods:
        participant_id: Get the participant ID for this transport
        get_participants(): Get list of participant IDs in the room
        get_participant_metadata(participant_id): Get metadata for a participant
        set_metadata(metadata): Set metadata for the local participant
        mute_participant(participant_id): Mute a participant's audio tracks
        unmute_participant(participant_id): Unmute a participant's audio tracks
        send_message(message, participant_id): Send message to participants
        send_message_urgent(message, participant_id): Send urgent message

    Example:
        from pipecat.transports.livekit import LiveKitTransport, LiveKitParams

        transport = LiveKitTransport(
            url="wss://your-livekit-server.com",
            token="your-access-token",
            room_name="my-conversation-room",
            params=LiveKitParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                audio_in_sample_rate=16000,
                audio_out_sample_rate=24000,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer()
            )
        )

        # Register event handlers
        @transport.event_handler("on_participant_connected")
        async def on_participant_connected(participant_id):
            print(f"Participant {participant_id} joined")

        @transport.event_handler("on_data_received")
        async def on_data_received(data, participant_id):
            print(f"Received data from {participant_id}: {data}")

        pipeline = Pipeline([
            transport.input(),
            stt, user_agg, llm, tts,
            transport.output()
        ])

        task = PipelineTask(pipeline)
        await task.run()
    """

    def __init__(
        self,
        url: str,
        token: str,
        room_name: str,
        params: Optional[LiveKitParams] = None,
        input_name: Optional[str] = None,
        output_name: Optional[str] = None
    ):
        """Initialize LiveKit transport.

        Args:
            url: LiveKit server URL
            token: Authentication token
            room_name: Room name
            params: Transport parameters
            input_name: Input processor name
            output_name: Output processor name
        """
        super().__init__(params or LiveKitParams())
        self.url = url
        self.token = token
        self.room_name = room_name
        self._input_name = input_name
        self._output_name = output_name


class LiveKitParams(TransportParams):
    """Configuration parameters for LiveKit transport.

    Inherits all parameters from TransportParams without additional configuration.
    """
    pass


class LiveKitCallbacks(BaseModel):
    """Callback handlers for LiveKit events.

    Parameters:
        on_connected: Called when connected to the LiveKit room
        on_disconnected: Called when disconnected from the LiveKit room
        on_before_disconnect: Called before disconnecting from the room
        on_participant_connected: Called when a participant joins the room
        on_participant_disconnected: Called when a participant leaves the room
        on_audio_track_subscribed: Called when an audio track is subscribed
        on_audio_track_unsubscribed: Called when an audio track is unsubscribed
        on_video_track_subscribed: Called when a video track is subscribed
        on_video_track_unsubscribed: Called when a video track is unsubscribed
        on_data_received: Called when data is received from a participant
        on_first_participant_joined: Called when the first participant joins
    """
    pass

WebSocket Transports

WebSocket Server

{ .api }
from pipecat.transports.websocket import (
    WebsocketServerTransport,
    WebsocketServerParams
)

class WebsocketServerTransport(BaseTransport):
    """WebSocket server transport for bidirectional real-time communication.

    Provides a complete WebSocket server implementation with separate input and
    output transports, client connection management, and event handling for
    real-time audio and data streaming applications.

    Args:
        params: WebSocket server configuration parameters
        host: Host address to bind the server to. Defaults to "localhost"
        port: Port number to bind the server to. Defaults to 8765
        input_name: Optional name for the input processor
        output_name: Optional name for the output processor

    Example:
        from pipecat.transports.websocket import (
            WebsocketServerTransport,
            WebsocketServerParams
        )
        from pipecat.serializers.protobuf import ProtobufFrameSerializer

        transport = WebsocketServerTransport(
            params=WebsocketServerParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                audio_in_sample_rate=16000,
                audio_out_sample_rate=24000,
                serializer=ProtobufFrameSerializer(),
                add_wav_header=False,
                session_timeout=300  # 5 minutes
            ),
            host="0.0.0.0",
            port=8765
        )

        # Register event handlers
        @transport.event_handler("on_client_connected")
        async def on_client_connected(websocket):
            print(f"Client connected: {websocket.remote_address}")

        @transport.event_handler("on_client_disconnected")
        async def on_client_disconnected(websocket):
            print(f"Client disconnected: {websocket.remote_address}")

        @transport.event_handler("on_session_timeout")
        async def on_session_timeout(websocket):
            print(f"Session timeout for: {websocket.remote_address}")

        # Build pipeline
        pipeline = Pipeline([
            transport.input(),
            stt, user_agg, llm, tts,
            transport.output()
        ])

        # Clients connect to ws://host:8765
        task = PipelineTask(pipeline)
        await task.run()
    """

    def __init__(
        self,
        params: WebsocketServerParams,
        host: str = "localhost",
        port: int = 8765,
        input_name: Optional[str] = None,
        output_name: Optional[str] = None
    ):
        """Initialize WebSocket server transport.

        Args:
            params: WebSocket server parameters
            host: Host address to bind
            port: Port number to bind
            input_name: Input processor name
            output_name: Output processor name
        """
        super().__init__(params)
        self.host = host
        self.port = port
        self._input_name = input_name
        self._output_name = output_name


class WebsocketServerParams(TransportParams):
    """Configuration parameters for WebSocket server transport.

    Parameters:
        add_wav_header (bool): Whether to add WAV headers to audio frames. Default: False
        serializer (Optional[FrameSerializer]): Frame serializer for message encoding/decoding
        session_timeout (Optional[int]): Timeout in seconds for client sessions. None for no timeout
    """
    add_wav_header: bool = False
    serializer: Optional[FrameSerializer] = None
    session_timeout: Optional[int] = None

WebSocket Client

{ .api }
from pipecat.transports.websocket import (
    WebsocketClientTransport,
    WebsocketClientParams
)

class WebsocketClientTransport(BaseTransport):
    """WebSocket client transport for bidirectional communication.

    Provides a complete WebSocket client transport implementation with
    input and output capabilities, connection management, and event handling.

    Args:
        uri: The WebSocket URI to connect to (e.g., "ws://server:8765" or "wss://secure-server:8765")
        params: Optional configuration parameters for the transport

    Example:
        from pipecat.transports.websocket import (
            WebsocketClientTransport,
            WebsocketClientParams
        )
        from pipecat.serializers.protobuf import ProtobufFrameSerializer

        transport = WebsocketClientTransport(
            uri="ws://localhost:8765",
            params=WebsocketClientParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                audio_in_sample_rate=16000,
                audio_out_sample_rate=24000,
                serializer=ProtobufFrameSerializer(),
                add_wav_header=True,
                additional_headers={
                    "Authorization": "Bearer token123",
                    "X-Custom-Header": "value"
                }
            )
        )

        # Register event handlers
        @transport.event_handler("on_connected")
        async def on_connected(websocket):
            print("Connected to WebSocket server")

        @transport.event_handler("on_disconnected")
        async def on_disconnected(websocket):
            print("Disconnected from WebSocket server")

        pipeline = Pipeline([
            transport.input(),
            stt, user_agg, llm, tts,
            transport.output()
        ])

        task = PipelineTask(pipeline)
        await task.run()
    """

    def __init__(
        self,
        uri: str,
        params: Optional[WebsocketClientParams] = None
    ):
        """Initialize WebSocket client transport.

        Args:
            uri: WebSocket URI to connect to
            params: WebSocket client parameters
        """
        super().__init__(params or WebsocketClientParams())
        self.uri = uri


class WebsocketClientParams(TransportParams):
    """Configuration parameters for WebSocket client transport.

    Parameters:
        add_wav_header (bool): Whether to add WAV headers to audio frames. Default: True
        additional_headers (Optional[dict[str, str]]): Additional HTTP headers to send with connection
        serializer (Optional[FrameSerializer]): Frame serializer for encoding/decoding messages
    """
    add_wav_header: bool = True
    additional_headers: Optional[dict[str, str]] = None
    serializer: Optional[FrameSerializer] = None

FastAPI WebSocket

{ .api }
from pipecat.transports.websocket import (
    FastAPIWebsocketTransport,
    FastAPIWebsocketParams
)
from fastapi import WebSocket

class FastAPIWebsocketTransport(BaseTransport):
    """FastAPI WebSocket transport for real-time audio/video streaming.

    Provides bidirectional WebSocket communication with frame serialization,
    session management, and event handling for client connections and timeouts.

    Args:
        websocket: The FastAPI WebSocket connection
        params: Transport configuration parameters
        input_name: Optional name for the input processor
        output_name: Optional name for the output processor

    Example:
        from fastapi import FastAPI, WebSocket, WebSocketDisconnect
        from pipecat.transports.websocket import (
            FastAPIWebsocketTransport,
            FastAPIWebsocketParams
        )
        from pipecat.serializers.protobuf import ProtobufFrameSerializer

        app = FastAPI()

        @app.websocket("/ws")
        async def websocket_endpoint(websocket: WebSocket):
            await websocket.accept()

            transport = FastAPIWebsocketTransport(
                websocket=websocket,
                params=FastAPIWebsocketParams(
                    audio_in_enabled=True,
                    audio_out_enabled=True,
                    audio_in_sample_rate=16000,
                    audio_out_sample_rate=24000,
                    serializer=ProtobufFrameSerializer(),
                    add_wav_header=False,
                    session_timeout=300,  # 5 minutes
                    fixed_audio_packet_size=640  # 20ms @ 16kHz PCM16 mono
                )
            )

            # Register event handlers
            @transport.event_handler("on_client_connected")
            async def on_client_connected(ws):
                print("Client connected")

            @transport.event_handler("on_client_disconnected")
            async def on_client_disconnected(ws):
                print("Client disconnected")

            @transport.event_handler("on_session_timeout")
            async def on_session_timeout(ws):
                print("Session timed out")
                await ws.close()

            # Build pipeline
            pipeline = Pipeline([
                transport.input(),
                stt, user_agg, llm, tts,
                transport.output()
            ])

            try:
                task = PipelineTask(pipeline)
                await task.run()
            except WebSocketDisconnect:
                print("WebSocket disconnected")
    """

    def __init__(
        self,
        websocket: WebSocket,
        params: FastAPIWebsocketParams,
        input_name: Optional[str] = None,
        output_name: Optional[str] = None
    ):
        """Initialize FastAPI WebSocket transport.

        Args:
            websocket: FastAPI WebSocket connection
            params: FastAPI WebSocket parameters
            input_name: Input processor name
            output_name: Output processor name
        """
        super().__init__(params)
        self.websocket = websocket
        self._input_name = input_name
        self._output_name = output_name


class FastAPIWebsocketParams(TransportParams):
    """Configuration parameters for FastAPI WebSocket transport.

    Parameters:
        add_wav_header (bool): Whether to add WAV headers to audio frames. Default: False
        serializer (Optional[FrameSerializer]): Frame serializer for encoding/decoding messages
        session_timeout (Optional[int]): Session timeout in seconds, None for no timeout
        fixed_audio_packet_size (Optional[int]): Optional fixed-size packetization for raw PCM audio payloads.
            Useful when the remote WebSocket media endpoint requires strict audio framing.
            For example, 640 bytes for 20ms @ 16kHz PCM16 mono audio
    """
    add_wav_header: bool = False
    serializer: Optional[FrameSerializer] = None
    session_timeout: Optional[int] = None
    fixed_audio_packet_size: Optional[int] = None

Local Transports

Local Audio

{ .api }
from pipecat.transports.local import (
    LocalAudioTransport,
    LocalAudioTransportParams
)

class LocalAudioTransport(BaseTransport):
    """Complete local audio transport with input and output capabilities.

    Provides a unified interface for local audio I/O using PyAudio, supporting
    both audio capture and playback through the system's audio devices.
    Useful for local testing and development.

    Args:
        params: Local audio transport configuration parameters

    Example:
        from pipecat.transports.local import (
            LocalAudioTransport,
            LocalAudioTransportParams
        )

        # Use default audio devices
        transport = LocalAudioTransport(
            params=LocalAudioTransportParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                audio_in_sample_rate=16000,
                audio_out_sample_rate=24000,
                audio_in_channels=1,
                audio_out_channels=1
            )
        )

        # Use specific audio devices
        transport = LocalAudioTransport(
            params=LocalAudioTransportParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                audio_in_sample_rate=16000,
                audio_out_sample_rate=24000,
                input_device_index=1,   # Use device index 1 for input
                output_device_index=2   # Use device index 2 for output
            )
        )

        # Build pipeline for local testing
        pipeline = Pipeline([
            transport.input(),
            stt, user_agg, llm, tts,
            transport.output()
        ])

        task = PipelineTask(pipeline)
        await task.run()
    """

    def __init__(self, params: LocalAudioTransportParams):
        """Initialize local audio transport.

        Args:
            params: Local audio transport parameters
        """
        super().__init__(params)
        self._pyaudio = None


class LocalAudioTransportParams(TransportParams):
    """Configuration parameters for local audio transport.

    Parameters:
        input_device_index (Optional[int]): PyAudio device index for audio input. If None, uses default
        output_device_index (Optional[int]): PyAudio device index for audio output. If None, uses default
    """
    input_device_index: Optional[int] = None
    output_device_index: Optional[int] = None

SmallWebRTC Transport

SmallWebRTC Transport

{ .api }
from pipecat.transports.smallwebrtc import SmallWebRTCTransport
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection

class SmallWebRTCTransport(BaseTransport):
    """WebRTC transport implementation for real-time communication.

    Provides bidirectional audio and video streaming over WebRTC connections
    with support for application messaging and connection event handling using aiortc.

    Args:
        webrtc_connection: The underlying WebRTC connection handler
        params: Transport configuration parameters
        input_name: Optional name for the input processor
        output_name: Optional name for the output processor

    Methods:
        send_image(frame): Send an image frame through the transport
        send_audio(frame): Send an audio frame through the transport
        capture_participant_video(video_source): Capture video from participant
        capture_participant_audio(audio_source): Capture audio from participant

    Example:
        from pipecat.transports.smallwebrtc import SmallWebRTCTransport
        from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection

        # Create WebRTC connection (implementation depends on signaling)
        webrtc_connection = SmallWebRTCConnection(
            pc_id="unique-peer-id",
            # ... connection configuration
        )

        transport = SmallWebRTCTransport(
            webrtc_connection=webrtc_connection,
            params=TransportParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                video_in_enabled=True,
                video_out_enabled=True,
                audio_in_sample_rate=16000,
                audio_out_sample_rate=24000,
                video_out_width=1280,
                video_out_height=720
            )
        )

        # Register event handlers
        @transport.event_handler("on_client_connected")
        async def on_client_connected(connection):
            print(f"Client connected: {connection.pc_id}")

        @transport.event_handler("on_client_disconnected")
        async def on_client_disconnected(connection):
            print(f"Client disconnected: {connection.pc_id}")

        @transport.event_handler("on_app_message")
        async def on_app_message(message, sender):
            print(f"Message from {sender}: {message}")

        # Build pipeline
        pipeline = Pipeline([
            transport.input(),
            stt, user_agg, llm, tts,
            transport.output()
        ])

        task = PipelineTask(pipeline)
        await task.run()
    """

    def __init__(
        self,
        webrtc_connection: SmallWebRTCConnection,
        params: TransportParams,
        input_name: Optional[str] = None,
        output_name: Optional[str] = None
    ):
        """Initialize SmallWebRTC transport.

        Args:
            webrtc_connection: WebRTC connection handler
            params: Transport parameters
            input_name: Input processor name
            output_name: Output processor name
        """
        super().__init__(params)
        self.webrtc_connection = webrtc_connection
        self._input_name = input_name
        self._output_name = output_name

Avatar Transports

HeyGen Transport

{ .api }
from pipecat.transports.heygen import HeyGenTransport, HeyGenParams
from pipecat.services.heygen.api_interactive_avatar import NewSessionRequest
from pipecat.services.heygen.api_liveavatar import LiveAvatarNewSessionRequest
from pipecat.services.heygen.client import ServiceType
import aiohttp

class HeyGenTransport(BaseTransport):
    """Transport implementation for HeyGen video calls.

    When used, the Pipecat bot joins the same virtual room as the HeyGen Avatar and the user.
    This is achieved by using HeyGenClient, which initiates the conversation via
    HeyGenApi and obtains a room URL that all participants connect to.

    Args:
        session: aiohttp session for making async HTTP requests
        api_key: HeyGen API key for authentication
        params: HeyGen-specific configuration parameters
        input_name: Optional custom name for the input transport
        output_name: Optional custom name for the output transport
        session_request: Configuration for the HeyGen session
        service_type: Service type for the avatar session

    Example:
        from pipecat.transports.heygen import HeyGenTransport, HeyGenParams
        from pipecat.services.heygen.api_interactive_avatar import (
            NewSessionRequest,
            AvatarQuality,
            VoiceSettings
        )
        import aiohttp

        async def main():
            async with aiohttp.ClientSession() as session:
                transport = HeyGenTransport(
                    session=session,
                    api_key="your-heygen-api-key",
                    params=HeyGenParams(
                        audio_in_enabled=True,
                        audio_out_enabled=True
                    ),
                    session_request=NewSessionRequest(
                        quality=AvatarQuality.MEDIUM,
                        avatar_name="your-avatar-id",
                        voice=VoiceSettings(
                            voice_id="your-voice-id"
                        )
                    )
                )

                # Register event handlers
                @transport.event_handler("on_client_connected")
                async def on_client_connected(participant):
                    print(f"Client connected: {participant}")

                @transport.event_handler("on_client_disconnected")
                async def on_client_disconnected(participant):
                    print(f"Client disconnected: {participant}")

                pipeline = Pipeline([
                    transport.input(),
                    stt, user_agg, llm, tts,
                    transport.output()
                ])

                task = PipelineTask(pipeline)
                await task.run()
    """

    def __init__(
        self,
        session: aiohttp.ClientSession,
        api_key: str,
        params: HeyGenParams = HeyGenParams(),
        input_name: Optional[str] = None,
        output_name: Optional[str] = None,
        session_request: Optional[Union[LiveAvatarNewSessionRequest, NewSessionRequest]] = None,
        service_type: Optional[ServiceType] = None
    ):
        """Initialize HeyGen transport.

        Args:
            session: aiohttp session
            api_key: HeyGen API key
            params: HeyGen parameters
            input_name: Input processor name
            output_name: Output processor name
            session_request: Session configuration
            service_type: Service type for avatar
        """
        super().__init__(params)
        self.session = session
        self.api_key = api_key
        self._input_name = input_name
        self._output_name = output_name
        self.session_request = session_request
        self.service_type = service_type


class HeyGenParams(TransportParams):
    """Configuration parameters for the HeyGen transport.

    Parameters:
        audio_in_enabled (bool): Whether to enable audio input from participants. Default: True
        audio_out_enabled (bool): Whether to enable audio output to participants. Default: True
    """
    audio_in_enabled: bool = True
    audio_out_enabled: bool = True

Tavus Transport

{ .api }
from pipecat.transports.tavus import TavusTransport, TavusParams
import aiohttp

class TavusTransport(BaseTransport):
    """Transport implementation for Tavus video calls.

    When used, the Pipecat bot joins the same virtual room as the Tavus Avatar and the user.
    This is achieved by using TavusTransportClient, which initiates the conversation via
    TavusApi and obtains a room URL that all participants connect to.

    Args:
        bot_name: The name of the Pipecat bot
        session: aiohttp session used for async HTTP requests
        api_key: Tavus API key for authentication
        replica_id: ID of the replica model used for voice generation
        persona_id: ID of the Tavus persona. Defaults to "pipecat-stream"
            to use the Pipecat TTS voice
        params: Optional Tavus-specific configuration parameters
        input_name: Optional name for the input transport
        output_name: Optional name for the output transport

    Methods:
        update_subscriptions(participant_settings, profile_settings): Update subscription settings

    Example:
        from pipecat.transports.tavus import TavusTransport, TavusParams
        import aiohttp

        async def main():
            async with aiohttp.ClientSession() as session:
                transport = TavusTransport(
                    bot_name="MyAssistant",
                    session=session,
                    api_key="your-tavus-api-key",
                    replica_id="your-replica-id",
                    persona_id="pipecat-stream",  # Use Pipecat TTS
                    params=TavusParams(
                        audio_in_enabled=True,
                        audio_out_enabled=True,
                        microphone_out_enabled=False
                    )
                )

                # Register event handlers
                @transport.event_handler("on_client_connected")
                async def on_client_connected(participant):
                    print(f"Client connected: {participant}")

                @transport.event_handler("on_client_disconnected")
                async def on_client_disconnected(participant):
                    print(f"Client disconnected: {participant}")

                pipeline = Pipeline([
                    transport.input(),
                    stt, user_agg, llm, tts,
                    transport.output()
                ])

                task = PipelineTask(pipeline)
                await task.run()
    """

    def __init__(
        self,
        bot_name: str,
        session: aiohttp.ClientSession,
        api_key: str,
        replica_id: str,
        persona_id: str = "pipecat-stream",
        params: TavusParams = TavusParams(),
        input_name: Optional[str] = None,
        output_name: Optional[str] = None
    ):
        """Initialize Tavus transport.

        Args:
            bot_name: Bot name
            session: aiohttp session
            api_key: Tavus API key
            replica_id: Replica ID
            persona_id: Persona ID
            params: Tavus parameters
            input_name: Input processor name
            output_name: Output processor name
        """
        super().__init__(params)
        self.bot_name = bot_name
        self.session = session
        self.api_key = api_key
        self.replica_id = replica_id
        self.persona_id = persona_id
        self._input_name = input_name
        self._output_name = output_name


class TavusParams(DailyParams):
    """Configuration parameters for the Tavus transport.

    Extends DailyParams as Tavus uses Daily for WebRTC connectivity.

    Parameters:
        audio_in_enabled (bool): Whether to enable audio input from participants. Default: True
        audio_out_enabled (bool): Whether to enable audio output to participants. Default: True
        microphone_out_enabled (bool): Whether to enable microphone output track. Default: False
    """
    audio_in_enabled: bool = True
    audio_out_enabled: bool = True
    microphone_out_enabled: bool = False

WhatsApp Transport

WhatsApp voice calling integration using the WhatsApp Cloud API and WebRTC. Enables Pipecat bots to receive and handle voice calls from WhatsApp users.

{ .api }
from pipecat.transports.whatsapp.client import WhatsAppClient
from pipecat.transports.whatsapp.api import WhatsAppApi, WhatsAppWebhookRequest

# WhatsApp client handles webhook processing and call management
client = WhatsAppClient(
    whatsapp_token="your-token",
    phone_number_id="your-phone-number-id",
    session=aiohttp_session,
    whatsapp_secret="your-app-secret"  # For webhook validation
)

# Handle incoming call webhook
await client.handle_webhook_request(
    request=webhook_request,
    connection_callback=handle_call_function
)

Documentation: WhatsApp Transport →

Usage Patterns

Basic WebRTC Setup

{ .api }
from pipecat.transports.daily import DailyTransport, DailyParams
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer, VADParams

# Configure VAD
vad = SileroVADAnalyzer(
    params=VADParams(
        threshold=0.5,
        min_speech_duration_ms=250
    )
)

# Create transport
transport = DailyTransport(
    room_url="https://your-domain.daily.co/room",
    token="token",
    params=DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        audio_in_sample_rate=16000,
        audio_out_sample_rate=24000,
        vad_enabled=True,
        vad_analyzer=vad
    )
)

# Build pipeline
pipeline = Pipeline([
    transport.input(),   # Audio from user
    stt,
    user_agg,
    llm,
    tts,
    transport.output()   # Audio to user
])

Local Testing

{ .api }
from pipecat.transports.local import LocalAudioTransport

# Test locally without WebRTC
transport = LocalAudioTransport(
    params=TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True
    )
)

# Same pipeline works with local audio
pipeline = Pipeline([
    transport.input(),
    stt, user_agg, llm, tts,
    transport.output()
])

Custom WebSocket Protocol

{ .api }
from pipecat.transports.websocket import WebsocketServerTransport

class CustomWebSocketTransport(WebsocketServerTransport):
    """Custom WebSocket protocol."""

    async def send_message(self, message: dict):
        """Custom message format."""
        custom_format = {
            "type": "custom",
            "data": message,
            "timestamp": time.time()
        }
        await super().send_message(custom_format)

transport = CustomWebSocketTransport(host="0.0.0.0", port=8080)

Best Practices

Match Sample Rates

{ .api }
# Good: Match transport and service sample rates
transport = DailyTransport(
    params=DailyParams(
        audio_in_sample_rate=16000,   # For STT
        audio_out_sample_rate=24000   # For TTS
    )
)

stt = DeepgramSTTService(...)  # Expects 16kHz
tts = ElevenLabsTTSService(...)  # Produces 24kHz

# Bad: Mismatched rates require resampling
transport = DailyTransport(
    params=DailyParams(audio_in_sample_rate=48000)  # Mismatch!
)
stt = DeepgramSTTService(...)  # Expects 16kHz
# Requires resampling, adds latency

Enable VAD for Interruptions

{ .api }
# Good: Enable VAD for natural interruptions
transport = DailyTransport(
    params=DailyParams(
        vad_enabled=True,
        vad_analyzer=SileroVADAnalyzer()
    )
)

task = PipelineTask(
    pipeline,
    params=PipelineParams(allow_interruptions=True)
)

# Bad: No VAD, no interruptions
transport = DailyTransport(params=DailyParams(vad_enabled=False))
# User must wait for bot to finish

Handle Transport Errors

{ .api }
@transport.event_handler("on_transport_error")
async def handle_error(error: Exception):
    print(f"Transport error: {error}")
    # Implement reconnection or cleanup

Choose Right Transport

{ .api }
# Good: WebRTC for production voice agents
transport = DailyTransport(...)  # Low latency, reliable

# Good: WebSocket for custom protocols
transport = WebsocketServerTransport(...)  # Full control

# Good: Local for development/testing
transport = LocalAudioTransport(...)  # Easy setup

# Bad: Local in production
transport = LocalAudioTransport(...)  # Not suitable for remote users