docs
tessl install tessl/pypi-pipecat-ai@0.0.0An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols
Transports provide real-time audio/video communication for Pipecat pipelines. They handle network I/O, media streaming, and integrate with WebRTC, WebSocket, and local audio systems.
{ .api }
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.turn_detection import BaseTurnAnalyzer
from typing import Optional
class BaseTransport(BaseObject):
"""Abstract base class for all transports.
Provides interface for audio/video I/O, connection management,
and integration with pipelines.
Key Features:
- Audio input/output streaming
- Video input/output (optional)
- VAD integration
- Turn detection
- Message passing
Methods:
start(): Start transport
stop(): Stop transport
send_audio(audio): Send audio
send_image(image): Send image/video
send_message(message): Send data message
input(): Get input processor
output(): Get output processor
Example:
transport = SomeTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True
)
)
pipeline = Pipeline([
transport.input(), # Receive frames
# ... processors ...
transport.output() # Send frames
])
"""
def __init__(self, params: TransportParams, **kwargs):
"""Initialize transport.
Args:
params: Transport configuration
**kwargs: Additional arguments
"""
self.params = params
self._input_processor = None
self._output_processor = None
async def start(self):
"""Start the transport."""
raise NotImplementedError("Subclasses must implement start()")
async def stop(self):
"""Stop the transport."""
raise NotImplementedError("Subclasses must implement stop()")
async def send_audio(self, audio: bytes):
"""Send audio data.
Args:
audio: Audio bytes
"""
raise NotImplementedError("Subclasses must implement send_audio()")
async def send_image(self, image: bytes):
"""Send image/video data.
Args:
image: Image bytes
"""
raise NotImplementedError("Subclasses must implement send_image()")
async def send_message(self, message: dict):
"""Send data message.
Args:
message: Message dictionary
"""
raise NotImplementedError("Subclasses must implement send_message()")
def input(self) -> FrameProcessor:
"""Get input processor.
Returns:
Processor that receives transport input
"""
return self._input_processor
def output(self) -> FrameProcessor:
"""Get output processor.
Returns:
Processor that sends to transport
"""
return self._output_processor{ .api }
from pipecat.transports.base_transport import TransportParams
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.turn_detection import BaseTurnAnalyzer
from typing import Optional
class TransportParams:
"""Transport configuration parameters.
Configures audio/video I/O, VAD, turn detection, and other
transport features.
Attributes:
audio_in_enabled (bool): Enable audio input
audio_out_enabled (bool): Enable audio output
video_in_enabled (bool): Enable video input
video_out_enabled (bool): Enable video output
vad_enabled (bool): Enable voice activity detection
vad_analyzer (Optional[VADAnalyzer]): VAD analyzer instance
turn_analyzer (Optional[BaseTurnAnalyzer]): Turn detection analyzer
audio_in_sample_rate (int): Input sample rate (Hz)
audio_out_sample_rate (int): Output sample rate (Hz)
audio_in_channels (int): Input channel count
audio_out_channels (int): Output channel count
audio_out_bitrate (int): Output bitrate (bps)
video_in_width (int): Input video width
video_in_height (int): Input video height
video_in_framerate (int): Input frame rate
video_out_width (int): Output video width
video_out_height (int): Output video height
video_out_framerate (int): Output frame rate
video_out_bitrate (int): Output video bitrate (bps)
Example:
params = TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
"""
def __init__(
self,
audio_in_enabled: bool = True,
audio_out_enabled: bool = True,
video_in_enabled: bool = False,
video_out_enabled: bool = False,
vad_enabled: bool = False,
vad_analyzer: Optional[VADAnalyzer] = None,
turn_analyzer: Optional[BaseTurnAnalyzer] = None,
audio_in_sample_rate: int = 16000,
audio_out_sample_rate: int = 24000,
audio_in_channels: int = 1,
audio_out_channels: int = 1,
audio_out_bitrate: int = 64000,
video_in_width: int = 1280,
video_in_height: int = 720,
video_in_framerate: int = 30,
video_out_width: int = 1280,
video_out_height: int = 720,
video_out_framerate: int = 30,
video_out_bitrate: int = 1000000
):
"""Initialize transport parameters.
Args:
audio_in_enabled: Enable audio input
audio_out_enabled: Enable audio output
video_in_enabled: Enable video input
video_out_enabled: Enable video output
vad_enabled: Enable VAD
vad_analyzer: VAD analyzer instance
turn_analyzer: Turn detection analyzer
audio_in_sample_rate: Input sample rate
audio_out_sample_rate: Output sample rate
audio_in_channels: Input channels
audio_out_channels: Output channels
audio_out_bitrate: Output bitrate
video_in_width: Input width
video_in_height: Input height
video_in_framerate: Input framerate
video_out_width: Output width
video_out_height: Output height
video_out_framerate: Output framerate
video_out_bitrate: Output bitrate
"""
self.audio_in_enabled = audio_in_enabled
self.audio_out_enabled = audio_out_enabled
self.video_in_enabled = video_in_enabled
self.video_out_enabled = video_out_enabled
self.vad_enabled = vad_enabled
self.vad_analyzer = vad_analyzer
self.turn_analyzer = turn_analyzer
self.audio_in_sample_rate = audio_in_sample_rate
self.audio_out_sample_rate = audio_out_sample_rate
self.audio_in_channels = audio_in_channels
self.audio_out_channels = audio_out_channels
self.audio_out_bitrate = audio_out_bitrate
self.video_in_width = video_in_width
self.video_in_height = video_in_height
self.video_in_framerate = video_in_framerate
self.video_out_width = video_out_width
self.video_out_height = video_out_height
self.video_out_framerate = video_out_framerate
self.video_out_bitrate = video_out_bitrate{ .api }
from pipecat.transports.daily.transport import DailyTransport, DailyParams
from pipecat.transports.base_transport import BaseTransport
from typing import Optional
class DailyTransport(BaseTransport):
"""Daily.co WebRTC transport.
Integrates with Daily.co for WebRTC audio/video communication.
Args:
room_url: Daily room URL
token: Daily meeting token
bot_name: Bot display name
params: Transport parameters
Example:
transport = DailyTransport(
room_url="https://your-domain.daily.co/room-name",
token="your-token",
bot_name="Assistant",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True
)
)
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(
self,
room_url: str,
token: Optional[str] = None,
bot_name: str = "Bot",
params: Optional[DailyParams] = None,
**kwargs
):
"""Initialize Daily transport.
Args:
room_url: Daily room URL
token: Daily meeting token
bot_name: Bot display name
params: Transport parameters
**kwargs: Additional arguments
"""
super().__init__(params or DailyParams(), **kwargs)
self.room_url = room_url
self.token = token
self.bot_name = bot_name
class DailyParams(TransportParams):
"""Daily-specific parameters.
Extends TransportParams with Daily.co specific options.
Attributes:
api_url (str): Daily API base URL. Default: "https://api.daily.co/v1"
api_key (str): Daily API authentication key. Default: ""
audio_in_user_tracks (bool): Receive users' audio in separate tracks. Default: True
dialin_settings (Optional[DailyDialinSettings]): Optional settings for dial-in functionality
camera_out_enabled (bool): Whether to enable the main camera output track. Default: True
microphone_out_enabled (bool): Whether to enable the main microphone track. Default: True
transcription_enabled (bool): Whether to enable speech transcription. Default: False
transcription_settings (DailyTranscriptionSettings): Configuration for transcription service
Example:
params = DailyParams(
api_key="your-daily-api-key",
audio_in_user_tracks=True,
camera_out_enabled=True,
microphone_out_enabled=True,
transcription_enabled=True,
transcription_settings=DailyTranscriptionSettings(
language="en",
tier="nova"
)
)
Note: DailyParams is a Pydantic BaseModel, not a dataclass.{ .api }
from pipecat.transports.livekit import LiveKitTransport, LiveKitParams, LiveKitCallbacks
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer
from pydantic import BaseModel
from typing import Optional
class LiveKitTransport(BaseTransport):
"""LiveKit WebRTC transport.
Provides comprehensive LiveKit integration including audio streaming, data
messaging, participant management, and room event handling for conversational
AI applications.
Args:
url: LiveKit server URL to connect to (e.g., "wss://your-livekit.com")
token: Authentication token for the LiveKit room
room_name: Name of the LiveKit room to join
params: Configuration parameters for the transport
input_name: Optional name for the input transport
output_name: Optional name for the output transport
Methods:
participant_id: Get the participant ID for this transport
get_participants(): Get list of participant IDs in the room
get_participant_metadata(participant_id): Get metadata for a participant
set_metadata(metadata): Set metadata for the local participant
mute_participant(participant_id): Mute a participant's audio tracks
unmute_participant(participant_id): Unmute a participant's audio tracks
send_message(message, participant_id): Send message to participants
send_message_urgent(message, participant_id): Send urgent message
Example:
from pipecat.transports.livekit import LiveKitTransport, LiveKitParams
transport = LiveKitTransport(
url="wss://your-livekit-server.com",
token="your-access-token",
room_name="my-conversation-room",
params=LiveKitParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
# Register event handlers
@transport.event_handler("on_participant_connected")
async def on_participant_connected(participant_id):
print(f"Participant {participant_id} joined")
@transport.event_handler("on_data_received")
async def on_data_received(data, participant_id):
print(f"Received data from {participant_id}: {data}")
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(
self,
url: str,
token: str,
room_name: str,
params: Optional[LiveKitParams] = None,
input_name: Optional[str] = None,
output_name: Optional[str] = None
):
"""Initialize LiveKit transport.
Args:
url: LiveKit server URL
token: Authentication token
room_name: Room name
params: Transport parameters
input_name: Input processor name
output_name: Output processor name
"""
super().__init__(params or LiveKitParams())
self.url = url
self.token = token
self.room_name = room_name
self._input_name = input_name
self._output_name = output_name
class LiveKitParams(TransportParams):
"""Configuration parameters for LiveKit transport.
Inherits all parameters from TransportParams without additional configuration.
"""
pass
class LiveKitCallbacks(BaseModel):
"""Callback handlers for LiveKit events.
Parameters:
on_connected: Called when connected to the LiveKit room
on_disconnected: Called when disconnected from the LiveKit room
on_before_disconnect: Called before disconnecting from the room
on_participant_connected: Called when a participant joins the room
on_participant_disconnected: Called when a participant leaves the room
on_audio_track_subscribed: Called when an audio track is subscribed
on_audio_track_unsubscribed: Called when an audio track is unsubscribed
on_video_track_subscribed: Called when a video track is subscribed
on_video_track_unsubscribed: Called when a video track is unsubscribed
on_data_received: Called when data is received from a participant
on_first_participant_joined: Called when the first participant joins
"""
pass{ .api }
from pipecat.transports.websocket import (
WebsocketServerTransport,
WebsocketServerParams
)
class WebsocketServerTransport(BaseTransport):
"""WebSocket server transport for bidirectional real-time communication.
Provides a complete WebSocket server implementation with separate input and
output transports, client connection management, and event handling for
real-time audio and data streaming applications.
Args:
params: WebSocket server configuration parameters
host: Host address to bind the server to. Defaults to "localhost"
port: Port number to bind the server to. Defaults to 8765
input_name: Optional name for the input processor
output_name: Optional name for the output processor
Example:
from pipecat.transports.websocket import (
WebsocketServerTransport,
WebsocketServerParams
)
from pipecat.serializers.protobuf import ProtobufFrameSerializer
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
serializer=ProtobufFrameSerializer(),
add_wav_header=False,
session_timeout=300 # 5 minutes
),
host="0.0.0.0",
port=8765
)
# Register event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(websocket):
print(f"Client connected: {websocket.remote_address}")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(websocket):
print(f"Client disconnected: {websocket.remote_address}")
@transport.event_handler("on_session_timeout")
async def on_session_timeout(websocket):
print(f"Session timeout for: {websocket.remote_address}")
# Build pipeline
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
# Clients connect to ws://host:8765
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(
self,
params: WebsocketServerParams,
host: str = "localhost",
port: int = 8765,
input_name: Optional[str] = None,
output_name: Optional[str] = None
):
"""Initialize WebSocket server transport.
Args:
params: WebSocket server parameters
host: Host address to bind
port: Port number to bind
input_name: Input processor name
output_name: Output processor name
"""
super().__init__(params)
self.host = host
self.port = port
self._input_name = input_name
self._output_name = output_name
class WebsocketServerParams(TransportParams):
"""Configuration parameters for WebSocket server transport.
Parameters:
add_wav_header (bool): Whether to add WAV headers to audio frames. Default: False
serializer (Optional[FrameSerializer]): Frame serializer for message encoding/decoding
session_timeout (Optional[int]): Timeout in seconds for client sessions. None for no timeout
"""
add_wav_header: bool = False
serializer: Optional[FrameSerializer] = None
session_timeout: Optional[int] = None{ .api }
from pipecat.transports.websocket import (
WebsocketClientTransport,
WebsocketClientParams
)
class WebsocketClientTransport(BaseTransport):
"""WebSocket client transport for bidirectional communication.
Provides a complete WebSocket client transport implementation with
input and output capabilities, connection management, and event handling.
Args:
uri: The WebSocket URI to connect to (e.g., "ws://server:8765" or "wss://secure-server:8765")
params: Optional configuration parameters for the transport
Example:
from pipecat.transports.websocket import (
WebsocketClientTransport,
WebsocketClientParams
)
from pipecat.serializers.protobuf import ProtobufFrameSerializer
transport = WebsocketClientTransport(
uri="ws://localhost:8765",
params=WebsocketClientParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
serializer=ProtobufFrameSerializer(),
add_wav_header=True,
additional_headers={
"Authorization": "Bearer token123",
"X-Custom-Header": "value"
}
)
)
# Register event handlers
@transport.event_handler("on_connected")
async def on_connected(websocket):
print("Connected to WebSocket server")
@transport.event_handler("on_disconnected")
async def on_disconnected(websocket):
print("Disconnected from WebSocket server")
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(
self,
uri: str,
params: Optional[WebsocketClientParams] = None
):
"""Initialize WebSocket client transport.
Args:
uri: WebSocket URI to connect to
params: WebSocket client parameters
"""
super().__init__(params or WebsocketClientParams())
self.uri = uri
class WebsocketClientParams(TransportParams):
"""Configuration parameters for WebSocket client transport.
Parameters:
add_wav_header (bool): Whether to add WAV headers to audio frames. Default: True
additional_headers (Optional[dict[str, str]]): Additional HTTP headers to send with connection
serializer (Optional[FrameSerializer]): Frame serializer for encoding/decoding messages
"""
add_wav_header: bool = True
additional_headers: Optional[dict[str, str]] = None
serializer: Optional[FrameSerializer] = None{ .api }
from pipecat.transports.websocket import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams
)
from fastapi import WebSocket
class FastAPIWebsocketTransport(BaseTransport):
"""FastAPI WebSocket transport for real-time audio/video streaming.
Provides bidirectional WebSocket communication with frame serialization,
session management, and event handling for client connections and timeouts.
Args:
websocket: The FastAPI WebSocket connection
params: Transport configuration parameters
input_name: Optional name for the input processor
output_name: Optional name for the output processor
Example:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pipecat.transports.websocket import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams
)
from pipecat.serializers.protobuf import ProtobufFrameSerializer
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
serializer=ProtobufFrameSerializer(),
add_wav_header=False,
session_timeout=300, # 5 minutes
fixed_audio_packet_size=640 # 20ms @ 16kHz PCM16 mono
)
)
# Register event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(ws):
print("Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(ws):
print("Client disconnected")
@transport.event_handler("on_session_timeout")
async def on_session_timeout(ws):
print("Session timed out")
await ws.close()
# Build pipeline
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
try:
task = PipelineTask(pipeline)
await task.run()
except WebSocketDisconnect:
print("WebSocket disconnected")
"""
def __init__(
self,
websocket: WebSocket,
params: FastAPIWebsocketParams,
input_name: Optional[str] = None,
output_name: Optional[str] = None
):
"""Initialize FastAPI WebSocket transport.
Args:
websocket: FastAPI WebSocket connection
params: FastAPI WebSocket parameters
input_name: Input processor name
output_name: Output processor name
"""
super().__init__(params)
self.websocket = websocket
self._input_name = input_name
self._output_name = output_name
class FastAPIWebsocketParams(TransportParams):
"""Configuration parameters for FastAPI WebSocket transport.
Parameters:
add_wav_header (bool): Whether to add WAV headers to audio frames. Default: False
serializer (Optional[FrameSerializer]): Frame serializer for encoding/decoding messages
session_timeout (Optional[int]): Session timeout in seconds, None for no timeout
fixed_audio_packet_size (Optional[int]): Optional fixed-size packetization for raw PCM audio payloads.
Useful when the remote WebSocket media endpoint requires strict audio framing.
For example, 640 bytes for 20ms @ 16kHz PCM16 mono audio
"""
add_wav_header: bool = False
serializer: Optional[FrameSerializer] = None
session_timeout: Optional[int] = None
fixed_audio_packet_size: Optional[int] = None{ .api }
from pipecat.transports.local import (
LocalAudioTransport,
LocalAudioTransportParams
)
class LocalAudioTransport(BaseTransport):
"""Complete local audio transport with input and output capabilities.
Provides a unified interface for local audio I/O using PyAudio, supporting
both audio capture and playback through the system's audio devices.
Useful for local testing and development.
Args:
params: Local audio transport configuration parameters
Example:
from pipecat.transports.local import (
LocalAudioTransport,
LocalAudioTransportParams
)
# Use default audio devices
transport = LocalAudioTransport(
params=LocalAudioTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
audio_in_channels=1,
audio_out_channels=1
)
)
# Use specific audio devices
transport = LocalAudioTransport(
params=LocalAudioTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
input_device_index=1, # Use device index 1 for input
output_device_index=2 # Use device index 2 for output
)
)
# Build pipeline for local testing
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(self, params: LocalAudioTransportParams):
"""Initialize local audio transport.
Args:
params: Local audio transport parameters
"""
super().__init__(params)
self._pyaudio = None
class LocalAudioTransportParams(TransportParams):
"""Configuration parameters for local audio transport.
Parameters:
input_device_index (Optional[int]): PyAudio device index for audio input. If None, uses default
output_device_index (Optional[int]): PyAudio device index for audio output. If None, uses default
"""
input_device_index: Optional[int] = None
output_device_index: Optional[int] = None{ .api }
from pipecat.transports.smallwebrtc import SmallWebRTCTransport
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
class SmallWebRTCTransport(BaseTransport):
"""WebRTC transport implementation for real-time communication.
Provides bidirectional audio and video streaming over WebRTC connections
with support for application messaging and connection event handling using aiortc.
Args:
webrtc_connection: The underlying WebRTC connection handler
params: Transport configuration parameters
input_name: Optional name for the input processor
output_name: Optional name for the output processor
Methods:
send_image(frame): Send an image frame through the transport
send_audio(frame): Send an audio frame through the transport
capture_participant_video(video_source): Capture video from participant
capture_participant_audio(audio_source): Capture audio from participant
Example:
from pipecat.transports.smallwebrtc import SmallWebRTCTransport
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
# Create WebRTC connection (implementation depends on signaling)
webrtc_connection = SmallWebRTCConnection(
pc_id="unique-peer-id",
# ... connection configuration
)
transport = SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
video_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
video_out_width=1280,
video_out_height=720
)
)
# Register event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(connection):
print(f"Client connected: {connection.pc_id}")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(connection):
print(f"Client disconnected: {connection.pc_id}")
@transport.event_handler("on_app_message")
async def on_app_message(message, sender):
print(f"Message from {sender}: {message}")
# Build pipeline
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(
self,
webrtc_connection: SmallWebRTCConnection,
params: TransportParams,
input_name: Optional[str] = None,
output_name: Optional[str] = None
):
"""Initialize SmallWebRTC transport.
Args:
webrtc_connection: WebRTC connection handler
params: Transport parameters
input_name: Input processor name
output_name: Output processor name
"""
super().__init__(params)
self.webrtc_connection = webrtc_connection
self._input_name = input_name
self._output_name = output_name{ .api }
from pipecat.transports.heygen import HeyGenTransport, HeyGenParams
from pipecat.services.heygen.api_interactive_avatar import NewSessionRequest
from pipecat.services.heygen.api_liveavatar import LiveAvatarNewSessionRequest
from pipecat.services.heygen.client import ServiceType
import aiohttp
class HeyGenTransport(BaseTransport):
"""Transport implementation for HeyGen video calls.
When used, the Pipecat bot joins the same virtual room as the HeyGen Avatar and the user.
This is achieved by using HeyGenClient, which initiates the conversation via
HeyGenApi and obtains a room URL that all participants connect to.
Args:
session: aiohttp session for making async HTTP requests
api_key: HeyGen API key for authentication
params: HeyGen-specific configuration parameters
input_name: Optional custom name for the input transport
output_name: Optional custom name for the output transport
session_request: Configuration for the HeyGen session
service_type: Service type for the avatar session
Example:
from pipecat.transports.heygen import HeyGenTransport, HeyGenParams
from pipecat.services.heygen.api_interactive_avatar import (
NewSessionRequest,
AvatarQuality,
VoiceSettings
)
import aiohttp
async def main():
async with aiohttp.ClientSession() as session:
transport = HeyGenTransport(
session=session,
api_key="your-heygen-api-key",
params=HeyGenParams(
audio_in_enabled=True,
audio_out_enabled=True
),
session_request=NewSessionRequest(
quality=AvatarQuality.MEDIUM,
avatar_name="your-avatar-id",
voice=VoiceSettings(
voice_id="your-voice-id"
)
)
)
# Register event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(participant):
print(f"Client connected: {participant}")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(participant):
print(f"Client disconnected: {participant}")
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(
self,
session: aiohttp.ClientSession,
api_key: str,
params: HeyGenParams = HeyGenParams(),
input_name: Optional[str] = None,
output_name: Optional[str] = None,
session_request: Optional[Union[LiveAvatarNewSessionRequest, NewSessionRequest]] = None,
service_type: Optional[ServiceType] = None
):
"""Initialize HeyGen transport.
Args:
session: aiohttp session
api_key: HeyGen API key
params: HeyGen parameters
input_name: Input processor name
output_name: Output processor name
session_request: Session configuration
service_type: Service type for avatar
"""
super().__init__(params)
self.session = session
self.api_key = api_key
self._input_name = input_name
self._output_name = output_name
self.session_request = session_request
self.service_type = service_type
class HeyGenParams(TransportParams):
"""Configuration parameters for the HeyGen transport.
Parameters:
audio_in_enabled (bool): Whether to enable audio input from participants. Default: True
audio_out_enabled (bool): Whether to enable audio output to participants. Default: True
"""
audio_in_enabled: bool = True
audio_out_enabled: bool = True{ .api }
from pipecat.transports.tavus import TavusTransport, TavusParams
import aiohttp
class TavusTransport(BaseTransport):
"""Transport implementation for Tavus video calls.
When used, the Pipecat bot joins the same virtual room as the Tavus Avatar and the user.
This is achieved by using TavusTransportClient, which initiates the conversation via
TavusApi and obtains a room URL that all participants connect to.
Args:
bot_name: The name of the Pipecat bot
session: aiohttp session used for async HTTP requests
api_key: Tavus API key for authentication
replica_id: ID of the replica model used for voice generation
persona_id: ID of the Tavus persona. Defaults to "pipecat-stream"
to use the Pipecat TTS voice
params: Optional Tavus-specific configuration parameters
input_name: Optional name for the input transport
output_name: Optional name for the output transport
Methods:
update_subscriptions(participant_settings, profile_settings): Update subscription settings
Example:
from pipecat.transports.tavus import TavusTransport, TavusParams
import aiohttp
async def main():
async with aiohttp.ClientSession() as session:
transport = TavusTransport(
bot_name="MyAssistant",
session=session,
api_key="your-tavus-api-key",
replica_id="your-replica-id",
persona_id="pipecat-stream", # Use Pipecat TTS
params=TavusParams(
audio_in_enabled=True,
audio_out_enabled=True,
microphone_out_enabled=False
)
)
# Register event handlers
@transport.event_handler("on_client_connected")
async def on_client_connected(participant):
print(f"Client connected: {participant}")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(participant):
print(f"Client disconnected: {participant}")
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
])
task = PipelineTask(pipeline)
await task.run()
"""
def __init__(
self,
bot_name: str,
session: aiohttp.ClientSession,
api_key: str,
replica_id: str,
persona_id: str = "pipecat-stream",
params: TavusParams = TavusParams(),
input_name: Optional[str] = None,
output_name: Optional[str] = None
):
"""Initialize Tavus transport.
Args:
bot_name: Bot name
session: aiohttp session
api_key: Tavus API key
replica_id: Replica ID
persona_id: Persona ID
params: Tavus parameters
input_name: Input processor name
output_name: Output processor name
"""
super().__init__(params)
self.bot_name = bot_name
self.session = session
self.api_key = api_key
self.replica_id = replica_id
self.persona_id = persona_id
self._input_name = input_name
self._output_name = output_name
class TavusParams(DailyParams):
"""Configuration parameters for the Tavus transport.
Extends DailyParams as Tavus uses Daily for WebRTC connectivity.
Parameters:
audio_in_enabled (bool): Whether to enable audio input from participants. Default: True
audio_out_enabled (bool): Whether to enable audio output to participants. Default: True
microphone_out_enabled (bool): Whether to enable microphone output track. Default: False
"""
audio_in_enabled: bool = True
audio_out_enabled: bool = True
microphone_out_enabled: bool = FalseWhatsApp voice calling integration using the WhatsApp Cloud API and WebRTC. Enables Pipecat bots to receive and handle voice calls from WhatsApp users.
{ .api }
from pipecat.transports.whatsapp.client import WhatsAppClient
from pipecat.transports.whatsapp.api import WhatsAppApi, WhatsAppWebhookRequest
# WhatsApp client handles webhook processing and call management
client = WhatsAppClient(
whatsapp_token="your-token",
phone_number_id="your-phone-number-id",
session=aiohttp_session,
whatsapp_secret="your-app-secret" # For webhook validation
)
# Handle incoming call webhook
await client.handle_webhook_request(
request=webhook_request,
connection_callback=handle_call_function
)Documentation: WhatsApp Transport →
{ .api }
from pipecat.transports.daily import DailyTransport, DailyParams
from pipecat.audio.vad.vad_analyzer import SileroVADAnalyzer, VADParams
# Configure VAD
vad = SileroVADAnalyzer(
params=VADParams(
threshold=0.5,
min_speech_duration_ms=250
)
)
# Create transport
transport = DailyTransport(
room_url="https://your-domain.daily.co/room",
token="token",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_enabled=True,
vad_analyzer=vad
)
)
# Build pipeline
pipeline = Pipeline([
transport.input(), # Audio from user
stt,
user_agg,
llm,
tts,
transport.output() # Audio to user
]){ .api }
from pipecat.transports.local import LocalAudioTransport
# Test locally without WebRTC
transport = LocalAudioTransport(
params=TransportParams(
audio_in_enabled=True,
audio_out_enabled=True
)
)
# Same pipeline works with local audio
pipeline = Pipeline([
transport.input(),
stt, user_agg, llm, tts,
transport.output()
]){ .api }
from pipecat.transports.websocket import WebsocketServerTransport
class CustomWebSocketTransport(WebsocketServerTransport):
"""Custom WebSocket protocol."""
async def send_message(self, message: dict):
"""Custom message format."""
custom_format = {
"type": "custom",
"data": message,
"timestamp": time.time()
}
await super().send_message(custom_format)
transport = CustomWebSocketTransport(host="0.0.0.0", port=8080){ .api }
# Good: Match transport and service sample rates
transport = DailyTransport(
params=DailyParams(
audio_in_sample_rate=16000, # For STT
audio_out_sample_rate=24000 # For TTS
)
)
stt = DeepgramSTTService(...) # Expects 16kHz
tts = ElevenLabsTTSService(...) # Produces 24kHz
# Bad: Mismatched rates require resampling
transport = DailyTransport(
params=DailyParams(audio_in_sample_rate=48000) # Mismatch!
)
stt = DeepgramSTTService(...) # Expects 16kHz
# Requires resampling, adds latency{ .api }
# Good: Enable VAD for natural interruptions
transport = DailyTransport(
params=DailyParams(
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
task = PipelineTask(
pipeline,
params=PipelineParams(allow_interruptions=True)
)
# Bad: No VAD, no interruptions
transport = DailyTransport(params=DailyParams(vad_enabled=False))
# User must wait for bot to finish{ .api }
@transport.event_handler("on_transport_error")
async def handle_error(error: Exception):
print(f"Transport error: {error}")
# Implement reconnection or cleanup{ .api }
# Good: WebRTC for production voice agents
transport = DailyTransport(...) # Low latency, reliable
# Good: WebSocket for custom protocols
transport = WebsocketServerTransport(...) # Full control
# Good: Local for development/testing
transport = LocalAudioTransport(...) # Easy setup
# Bad: Local in production
transport = LocalAudioTransport(...) # Not suitable for remote users