or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pipecat-ai@0.0.x

docs

transports

core-concepts.mdindex.mdpipeline.mdrunner.mdtransports.mdturns.md
tile.json

tessl/pypi-pipecat-ai

tessl install tessl/pypi-pipecat-ai@0.0.0

An open source framework for building real-time voice and multimodal conversational AI agents with support for speech-to-text, text-to-speech, LLMs, and multiple transport protocols

whatsapp.mddocs/transports/

WhatsApp Transport

WhatsApp transport integration for handling voice calls via the WhatsApp Cloud API. This transport enables Pipecat bots to receive and manage voice calls from WhatsApp users through WebRTC connections.

Overview

The WhatsApp transport consists of two main components:

  • WhatsAppApi: Low-level API client for WhatsApp Cloud API operations
  • WhatsAppClient: High-level client managing WebRTC connections and webhook processing

WhatsAppApi

WhatsAppApi Class

{ .api }
from pipecat.transports.whatsapp.api import WhatsAppApi
import aiohttp

class WhatsAppApi:
    """WhatsApp Cloud API client for handling calls.

    Provides methods to interact with the WhatsApp Cloud API for managing
    voice calls, including answering, rejecting, and terminating calls.

    Parameters:
        BASE_URL: Base URL for WhatsApp Graph API v23.0
        phone_number_id: Your WhatsApp Business phone number ID
        session: aiohttp client session for making HTTP requests
        whatsapp_url: Complete URL for the calls endpoint
        whatsapp_token: Bearer token for API authentication
    """

    BASE_URL = "https://graph.facebook.com/v23.0/"

    def __init__(
        self,
        whatsapp_token: str,
        phone_number_id: str,
        session: aiohttp.ClientSession
    ) -> None:
        """Initialize the WhatsApp API client.

        Args:
            whatsapp_token: WhatsApp access token for authentication
            phone_number_id: Business phone number ID from WhatsApp Business API
            session: aiohttp ClientSession for making HTTP requests
        """
        pass

    def update_whatsapp_token(self, whatsapp_token: str):
        """Update the WhatsApp access token for authentication."""
        pass

    def update_whatsapp_phone_number_id(self, phone_number_id: str):
        """Update the WhatsApp phone number ID for authentication."""
        pass

    async def answer_call_to_whatsapp(
        self,
        call_id: str,
        action: str,
        sdp: str,
        from_: str
    ) -> dict:
        """Answer an incoming WhatsApp call.

        Handles the call answering process, supporting both "pre_accept"
        and "accept" actions as required by the WhatsApp calling workflow.

        Args:
            call_id: Unique identifier for the call (from connect webhook)
            action: Action to perform ("pre_accept" or "accept")
            sdp: Session Description Protocol answer for WebRTC connection
            from_: Caller's phone number (WhatsApp ID format)

        Returns:
            Dict containing the API response with success status and error details

        Note:
            Calls must be pre-accepted before being accepted. The typical flow is:
            1. Receive connect webhook
            2. Call with action="pre_accept"
            3. Call with action="accept"
        """
        pass

    async def reject_call_to_whatsapp(self, call_id: str) -> dict:
        """Reject an incoming WhatsApp call.

        Rejects a call that was received via connect webhook. The caller will
        receive a rejection notification and a terminate webhook will be sent
        with status "REJECTED".

        Args:
            call_id: Unique identifier for the call (from connect webhook)

        Returns:
            Dict containing the API response with success status and error details

        Note:
            This should be called instead of answer_call_to_whatsapp when you
            want to decline the incoming call.
        """
        pass

    async def terminate_call_to_whatsapp(self, call_id: str) -> dict:
        """Terminate an active WhatsApp call.

        Ends an ongoing call that has been previously accepted. Both parties
        will be disconnected and a terminate webhook will be sent with status
        "COMPLETED".

        Args:
            call_id: Unique identifier for the active call

        Returns:
            Dict containing the API response with success status and error details

        Note:
            This should only be called for calls that have been accepted and
            are currently active. For incoming calls that haven't been accepted
            yet, use reject_call_to_whatsapp instead.
        """
        pass

WhatsAppClient

WhatsAppClient Class

{ .api }
from pipecat.transports.whatsapp.client import WhatsAppClient
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from typing import Awaitable, Callable, Dict, List, Optional
import aiohttp

class WhatsAppClient:
    """WhatsApp Cloud API client for handling calls and webhook requests.

    This client manages WhatsApp call connections using WebRTC, processes
    webhook events from WhatsApp, and maintains ongoing call state. It
    supports both incoming call handling and call termination through the
    WhatsApp Cloud API.

    Attributes:
        _whatsapp_api: WhatsApp API instance for making API calls
        _ongoing_calls_map: Dictionary mapping call IDs to WebRTC connections
        _ice_servers: List of ICE servers for WebRTC connections
    """

    def __init__(
        self,
        whatsapp_token: str,
        phone_number_id: str,
        session: aiohttp.ClientSession,
        ice_servers: Optional[List[IceServer]] = None,
        whatsapp_secret: Optional[str] = None,
    ) -> None:
        """Initialize the WhatsApp client.

        Args:
            whatsapp_token: WhatsApp API access token
            phone_number_id: WhatsApp phone number ID for the business account
            session: aiohttp session for making HTTP requests
            ice_servers: List of ICE servers for WebRTC connections. If None,
                        defaults to Google's public STUN server
            whatsapp_secret: WhatsApp APP secret for validating webhook requests
        """
        pass

    def update_ice_servers(self, ice_servers: Optional[List[IceServer]] = None):
        """Update the list of ICE servers used for WebRTC connections."""
        pass

    def update_whatsapp_secret(self, whatsapp_secret: Optional[str] = None):
        """Update the WhatsApp APP secret for validating webhook requests."""
        pass

    def update_whatsapp_token(self, whatsapp_token: str):
        """Update the WhatsApp API access token."""
        pass

    def update_whatsapp_phone_number_id(self, phone_number_id: str):
        """Update the WhatsApp phone number ID for authentication."""
        pass

    async def terminate_all_calls(self) -> None:
        """Terminate all ongoing WhatsApp calls.

        This method will:
        1. Send termination requests to WhatsApp API for each ongoing call
        2. Disconnect all WebRTC connections
        3. Clear the ongoing calls map

        All terminations are executed concurrently for efficiency.
        """
        pass

    async def handle_verify_webhook_request(
        self,
        params: Dict[str, str],
        expected_verification_token: str
    ) -> int:
        """Handle a verify webhook request from WhatsApp.

        Args:
            params: Dictionary containing webhook parameters from query string
            expected_verification_token: Expected verification token to validate

        Returns:
            int: The challenge value if verification succeeds

        Raises:
            ValueError: If verification fails due to missing parameters or invalid token
        """
        pass

    async def handle_webhook_request(
        self,
        request: WhatsAppWebhookRequest,
        connection_callback: Optional[Callable[[SmallWebRTCConnection], Awaitable[None]]] = None,
        raw_body: Optional[bytes] = None,
        sha256_signature: Optional[str] = None,
    ) -> bool:
        """Handle a webhook request from WhatsApp.

        This method processes incoming webhook requests and handles both
        connect and terminate events. For connect events, it establishes
        a WebRTC connection and optionally invokes a callback with the
        new connection.

        Args:
            request: The webhook request from WhatsApp containing call events
            connection_callback: Optional callback function to invoke when a new
                               WebRTC connection is established. The callback
                               receives the SmallWebRTCConnection instance.
            raw_body: Optional bytes containing the raw request body for signature validation
            sha256_signature: Optional X-Hub-Signature-256 header value from the request

        Returns:
            bool: True if the webhook request was handled successfully, False otherwise

        Raises:
            ValueError: If the webhook request contains no supported events
            Exception: If connection establishment or API calls fail
        """
        pass

Webhook Data Models

WhatsApp Session

{ .api }
from pipecat.transports.whatsapp.api import WhatsAppSession

class WhatsAppSession(BaseModel):
    """WebRTC session information for WhatsApp calls.

    Parameters:
        sdp: Session Description Protocol (SDP) data for WebRTC connection
        sdp_type: Type of SDP (e.g., "offer", "answer")
    """
    sdp: str
    sdp_type: str

WhatsApp Error

{ .api }
from pipecat.transports.whatsapp.api import WhatsAppError

class WhatsAppError(BaseModel):
    """Error information from WhatsApp API responses.

    Parameters:
        code: Error code number
        message: Human-readable error message
        href: URL for more information about the error
        error_data: Additional error-specific data
    """
    code: int
    message: str
    href: str
    error_data: Dict[str, Any]

WhatsApp Connect Call

{ .api }
from pipecat.transports.whatsapp.api import WhatsAppConnectCall

class WhatsAppConnectCall(BaseModel):
    """Incoming call connection event data.

    Represents a user-initiated call that requires handling. This is sent
    when a WhatsApp user initiates a call to your business number.

    Parameters:
        id: Unique call identifier
        from_: Phone number of the caller (WhatsApp ID format)
        to: Your business phone number that received the call
        event: Always "connect" for incoming calls
        timestamp: ISO 8601 timestamp when the call was initiated
        direction: Optional call direction ("inbound" for user-initiated calls)
        session: WebRTC session data containing SDP offer from the caller
    """
    id: str
    from_: str  # Field alias: "from"
    to: str
    event: str  # "connect"
    timestamp: str
    direction: Optional[str]
    session: WhatsAppSession

WhatsApp Terminate Call

{ .api }
from pipecat.transports.whatsapp.api import WhatsAppTerminateCall

class WhatsAppTerminateCall(BaseModel):
    """Call termination event data.

    Represents the end of a call session, whether completed successfully,
    failed, or was rejected by either party.

    Parameters:
        id: Unique call identifier (matches the connect event)
        from_: Phone number of the caller
        to: Your business phone number
        event: Always "terminate" for call end events
        timestamp: ISO 8601 timestamp when the call ended
        direction: Optional call direction
        biz_opaque_callback_data: Optional business-specific callback data
        status: Call completion status ("FAILED", "COMPLETED", "REJECTED")
        start_time: ISO 8601 timestamp when call actually started (after acceptance)
        end_time: ISO 8601 timestamp when call ended
        duration: Call duration in seconds (only for completed calls)
    """
    id: str
    from_: str  # Field alias: "from"
    to: str
    event: str  # "terminate"
    timestamp: str
    direction: Optional[str]
    biz_opaque_callback_data: Optional[str] = None
    status: Optional[str] = None  # "FAILED" or "COMPLETED" or "REJECTED"
    start_time: Optional[str] = None
    end_time: Optional[str] = None
    duration: Optional[int] = None

WhatsApp Webhook Request

{ .api }
from pipecat.transports.whatsapp.api import (
    WhatsAppWebhookRequest,
    WhatsAppProfile,
    WhatsAppContact,
    WhatsAppMetadata,
    WhatsAppConnectCallValue,
    WhatsAppTerminateCallValue,
    WhatsAppChange,
    WhatsAppEntry
)

class WhatsAppProfile(BaseModel):
    """User profile information.

    Parameters:
        name: Display name of the WhatsApp user
    """
    name: str

class WhatsAppContact(BaseModel):
    """Contact information for a WhatsApp user.

    Parameters:
        profile: User's profile information
        wa_id: WhatsApp ID (phone number in international format without +)
    """
    profile: WhatsAppProfile
    wa_id: str

class WhatsAppMetadata(BaseModel):
    """Business phone number metadata.

    Parameters:
        display_phone_number: Formatted phone number for display
        phone_number_id: WhatsApp Business API phone number ID
    """
    display_phone_number: str
    phone_number_id: str

class WhatsAppConnectCallValue(BaseModel):
    """Webhook payload for incoming call events.

    Parameters:
        messaging_product: Always "whatsapp"
        metadata: Business phone number information
        contacts: List of contact information for involved parties
        calls: List of call connection events
    """
    messaging_product: str
    metadata: WhatsAppMetadata
    contacts: List[WhatsAppContact]
    calls: List[WhatsAppConnectCall]

class WhatsAppTerminateCallValue(BaseModel):
    """Webhook payload for call termination events.

    Parameters:
        messaging_product: Always "whatsapp"
        metadata: Business phone number information
        calls: List of call termination events
        errors: Optional list of errors that occurred during the call
    """
    messaging_product: str
    metadata: WhatsAppMetadata
    calls: List[WhatsAppTerminateCall]
    errors: Optional[List[WhatsAppError]] = None

class WhatsAppChange(BaseModel):
    """Webhook change event wrapper.

    Parameters:
        value: The actual event data (connect or terminate)
        field: Always "calls" for calling webhooks
    """
    value: Union[WhatsAppConnectCallValue, WhatsAppTerminateCallValue]
    field: str

class WhatsAppEntry(BaseModel):
    """Webhook entry containing one or more changes.

    Parameters:
        id: WhatsApp Business Account ID
        changes: List of change events in this webhook delivery
    """
    id: str
    changes: List[WhatsAppChange]

class WhatsAppWebhookRequest(BaseModel):
    """Complete webhook request from WhatsApp.

    This is the top-level structure for all webhook deliveries from
    the WhatsApp Cloud API for calling events.

    Parameters:
        object: Always "whatsapp_business_account"
        entry: List of webhook entries (usually contains one entry)
    """
    object: str
    entry: List[WhatsAppEntry]

Usage Example

{ .api }
import asyncio
import aiohttp
from pipecat.transports.whatsapp.client import WhatsAppClient
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask

async def handle_whatsapp_call(connection: SmallWebRTCConnection):
    """Handle an incoming WhatsApp call with a Pipecat pipeline."""
    # Build your pipeline with processors, services, etc.
    pipeline = Pipeline([
        # Your processors here
        # e.g., STT, LLM, TTS
    ])

    # Create task and run
    task = PipelineTask(pipeline, params=...)
    await task.run()

async def main():
    # Initialize client
    async with aiohttp.ClientSession() as session:
        client = WhatsAppClient(
            whatsapp_token="your-whatsapp-token",
            phone_number_id="your-phone-number-id",
            session=session,
            whatsapp_secret="your-app-secret"  # For webhook validation
        )

        # In your webhook endpoint handler:
        # 1. Parse incoming webhook
        webhook_data = # ... parse from HTTP request
        request = WhatsAppWebhookRequest(**webhook_data)

        # 2. Handle the webhook with callback
        await client.handle_webhook_request(
            request=request,
            connection_callback=handle_whatsapp_call,
            raw_body=raw_request_body,  # For signature validation
            sha256_signature=x_hub_signature_header
        )

        # To terminate all calls when shutting down:
        await client.terminate_all_calls()

asyncio.run(main())

Webhook Setup

To receive WhatsApp calls, you need to:

  1. Configure WhatsApp Business API: Set up a WhatsApp Business account and phone number
  2. Set up webhook endpoint: Create an HTTP endpoint to receive webhook events
  3. Verify webhook: Handle the verification request from WhatsApp
  4. Process events: Handle connect and terminate events for calls

Webhook Verification Example

{ .api }
from pipecat.transports.whatsapp.client import WhatsAppClient

async def verify_webhook(query_params: dict):
    """Handle webhook verification from WhatsApp."""
    client = WhatsAppClient(...)

    try:
        challenge = await client.handle_verify_webhook_request(
            params=query_params,
            expected_verification_token="your-verification-token"
        )
        return challenge  # Return to WhatsApp
    except ValueError as e:
        # Verification failed
        raise

Integration with SmallWebRTC

WhatsApp transport uses the SmallWebRTC connection for WebRTC handling. See SmallWebRTC Transport for more details on the underlying WebRTC implementation.

Notes

  • WhatsApp only supports SHA-256 fingerprints in SDP
  • Calls must be pre-accepted before final acceptance
  • Webhook signature validation is recommended for production
  • All calls use WebRTC for audio streaming
  • Default ICE server is Google's public STUN server