or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

rpc.mddocs/

RPC (Remote Procedure Call)

Overview

RPC provides a request-response communication pattern between participants with typed method handlers. It enables type-safe, asynchronous method invocation across participants in the room.

Key concepts:

  • RPC: Request-response pattern (not fire-and-forget)
  • Method registration: Participants register named handlers
  • Invocation: Call methods on remote participants by identity
  • Timeout: Configurable timeout for response
  • Error handling: Typed error codes for different failure scenarios
  • Payload: String-based (JSON recommended for structured data)

Import

from livekit import RpcError, RpcInvocationData, LocalParticipant

Classes

RpcInvocationData

@dataclass
class RpcInvocationData:
    """Data passed to RPC method handler.
    
    Contains information about the RPC call.
    
    Attributes:
        request_id: Unique request identifier
                   Type: str
                   Format: UUID-like string
                   Used for tracking and logging
                   
        caller_identity: Identity of the calling participant
                        Type: str
                        Participant who initiated the RPC call
                        Use to identify caller
                        
        payload: Request payload
                Type: str
                Typically JSON string
                Contains method parameters
                Max size: ~256KB
                
        response_timeout: Timeout in seconds for response
                         Type: float
                         Time caller is willing to wait
                         Handler should respect this
    """
    
    request_id: str
    caller_identity: str
    payload: str
    response_timeout: float

RpcError

class RpcError(Exception):
    """Specialized error for RPC methods.
    
    Raised when RPC call fails or handler raises error.
    """
    
    def __init__(
        self,
        code: Union[int, "RpcError.ErrorCode"],
        message: str,
        data: Optional[str] = None
    ):
        """Initialize RpcError.
        
        Args:
            code: Error code
                 Type: int | ErrorCode
                 Range: 1001-1999 are reserved for built-in errors
                 Custom codes: 2000+
            message: Error message
                    Type: str
                    Human-readable description
            data: Optional additional data
                 Type: str | None
                 JSON recommended for structured error data
                 Default: None
        
        Example:
            >>> raise RpcError(
            ...     code=RpcError.ErrorCode.APPLICATION_ERROR,
            ...     message="Division by zero",
            ...     data='{"param": "denominator", "value": 0}'
            ... )
        """
    
    @property
    def code(self) -> int:
        """Error code.
        
        Returns:
            int: Error code (1001-1999 built-in, 2000+ custom)
        """
    
    @property
    def message(self) -> str:
        """Error message.
        
        Returns:
            str: Human-readable error description
        """
    
    @property
    def data(self) -> Optional[str]:
        """Optional error data.
        
        Returns:
            str | None: Additional error information (typically JSON)
        """
    
    class ErrorCode(IntEnum):
        """Built-in RPC error codes.
        
        Codes 1001-1999 are reserved for SDK errors.
        Codes 2000+ can be used for application-specific errors.
        """
        
        # Application errors (1500-1599)
        APPLICATION_ERROR = 1500            # Generic application error
        CONNECTION_TIMEOUT = 1501           # Connection lost during call
        RESPONSE_TIMEOUT = 1502             # No response within timeout
        RECIPIENT_DISCONNECTED = 1503       # Recipient left room
        RESPONSE_PAYLOAD_TOO_LARGE = 1504   # Response exceeds size limit
        SEND_FAILED = 1505                  # Failed to send request
        
        # Client errors (1400-1499)
        UNSUPPORTED_METHOD = 1400           # Method not registered
        RECIPIENT_NOT_FOUND = 1401          # Participant not in room
        REQUEST_PAYLOAD_TOO_LARGE = 1402    # Request exceeds size limit
        UNSUPPORTED_SERVER = 1403           # Server doesn't support RPC
        UNSUPPORTED_VERSION = 1404          # Protocol version mismatch
    
    ErrorMessage: ClassVar[Dict[ErrorCode, str]]
    """Mapping of error codes to default messages.
    
    Provides standard error messages for each error code.
    """

LocalParticipant RPC Methods

class LocalParticipant:
    """LocalParticipant RPC-related methods."""
    
    async def perform_rpc(
        self,
        *,
        destination_identity: str,
        method: str,
        payload: str,
        response_timeout: Optional[float] = None
    ) -> str:
        """Initiate an RPC call to a remote participant.
        
        Args:
            destination_identity: Identity of destination participant
                                 Type: str
                                 Must be connected to room
                                 Case-sensitive
            method: Name of the method to call
                   Type: str
                   Must be registered by recipient
                   Case-sensitive
            payload: Method payload
                    Type: str
                    Typically JSON for structured data
                    Max size: ~256KB
            response_timeout: Timeout in seconds for response
                             Type: float | None
                             Default: None (uses default, typically 10s)
                             Range: 1.0 - 60.0 seconds recommended
        
        Returns:
            str: Response payload from handler
                Typically JSON string
        
        Raises:
            RpcError: If call fails (see ErrorCode for types)
            RuntimeError: If not connected to room
            ValueError: If parameters invalid
        
        Example:
            >>> import json
            >>> result = await local.perform_rpc(
            ...     destination_identity="agent-123",
            ...     method="calculate",
            ...     payload=json.dumps({"a": 5, "b": 3}),
            ...     response_timeout=5.0
            ... )
            >>> data = json.loads(result)
            >>> print(data["result"])  # 8
        """
    
    def register_rpc_method(
        self,
        method_name: str,
        handler: Optional[RpcHandler] = None
    ) -> Union[RpcHandler, Callable[[RpcHandler], RpcHandler]]:
        """Register an RPC method handler.
        
        Can be used as decorator or with callback.
        
        Args:
            method_name: Name of the RPC method to handle
                        Type: str
                        Case-sensitive
            handler: Handler function (optional for decorator)
                    Type: RpcHandler | None
                    Can be sync or async
        
        Returns:
            Handler function or decorator
        
        Raises:
            ValueError: If method already registered
            TypeError: If handler not callable
        
        Example (decorator):
            >>> @local.register_rpc_method("calculate")
            ... async def handle_calculate(data: RpcInvocationData) -> str:
            ...     import json
            ...     params = json.loads(data.payload)
            ...     result = params["a"] + params["b"]
            ...     return json.dumps({"result": result})
        
        Example (direct):
            >>> async def handler(data):
            ...     return "response"
            >>> local.register_rpc_method("method", handler)
        """
    
    def unregister_rpc_method(self, method: str) -> None:
        """Unregister a previously registered RPC method.
        
        Args:
            method: Name of RPC method to unregister
                   Type: str
        
        Raises:
            KeyError: If method not registered
        
        Example:
            >>> local.unregister_rpc_method("calculate")
        """

Type Aliases

RpcHandler = Callable[
    [RpcInvocationData],
    Union[Awaitable[Optional[str]], Optional[str]]
]
"""Type for RPC method handlers.

Handlers can be sync or async functions.
Receive RpcInvocationData and return string response.

Signature:
- Input: RpcInvocationData
- Output: str | None (async: Awaitable[str | None])

Examples:
    >>> # Async handler
    >>> async def handler(data: RpcInvocationData) -> str:
    ...     return "response"
    >>> 
    >>> # Sync handler
    >>> def handler(data: RpcInvocationData) -> str:
    ...     return "response"
"""

Usage Examples

Register Handler

import json
from livekit import LocalParticipant, RpcInvocationData

local: LocalParticipant = room.local_participant

@local.register_rpc_method("calculate")
async def handle_calculate(data: RpcInvocationData) -> str:
    """Handle calculate RPC call.
    
    Args:
        data: RPC invocation data
    
    Returns:
        str: JSON response
    """
    # Log call info
    print(f"RPC from {data.caller_identity}")
    print(f"Request ID: {data.request_id}")
    print(f"Timeout: {data.response_timeout}s")
    
    # Parse payload
    params = json.loads(data.payload)
    
    # Validate
    if "a" not in params or "b" not in params:
        raise RpcError(
            code=RpcError.ErrorCode.APPLICATION_ERROR,
            message="Missing parameters",
            data=json.dumps({"required": ["a", "b"]})
        )
    
    # Perform calculation
    result = params["a"] + params["b"]
    
    # Return response
    return json.dumps({"result": result})

Call RPC Method

import json
from livekit import RpcError

try:
    # Call RPC method
    result = await local.perform_rpc(
        destination_identity="remote-user",
        method="calculate",
        payload=json.dumps({"a": 5, "b": 3}),
        response_timeout=5.0
    )
    
    # Parse response
    data = json.loads(result)
    print(f"Result: {data['result']}")  # 8
    
except RpcError as e:
    # Handle specific errors
    if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
        print("Participant not in room")
    elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
        print("Method not implemented")
    elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
        print("Request timed out")
    elif e.code == RpcError.ErrorCode.APPLICATION_ERROR:
        print(f"Application error: {e.message}")
        if e.data:
            error_data = json.loads(e.data)
            print(f"Error details: {error_data}")
    else:
        print(f"RPC error {e.code}: {e.message}")

Error Handling in Handler

from livekit import RpcError, RpcInvocationData
import json

@local.register_rpc_method("divide")
async def handle_divide(data: RpcInvocationData) -> str:
    """Handle divide RPC with error handling."""
    try:
        params = json.loads(data.payload)
        
        # Validate parameters
        if "a" not in params or "b" not in params:
            raise RpcError(
                code=RpcError.ErrorCode.APPLICATION_ERROR,
                message="Missing required parameters: a, b",
                data=json.dumps({
                    "provided": list(params.keys()),
                    "required": ["a", "b"]
                })
            )
        
        # Business logic validation
        if params["b"] == 0:
            raise RpcError(
                code=RpcError.ErrorCode.APPLICATION_ERROR,
                message="Division by zero",
                data=json.dumps({
                    "error_type": "math_error",
                    "param": "b",
                    "value": 0
                })
            )
        
        # Perform operation
        result = params["a"] / params["b"]
        
        # Return success response
        return json.dumps({
            "success": True,
            "result": result
        })
        
    except json.JSONDecodeError:
        raise RpcError(
            code=RpcError.ErrorCode.APPLICATION_ERROR,
            message="Invalid JSON payload"
        )
    except KeyError as e:
        raise RpcError(
            code=RpcError.ErrorCode.APPLICATION_ERROR,
            message=f"Missing parameter: {e}"
        )

Complete Example

import asyncio
import json
from livekit import Room, RpcError, RpcInvocationData

async def main():
    room = Room()
    await room.connect(url, token)
    
    local = room.local_participant
    
    # Register multiple handlers
    @local.register_rpc_method("echo")
    async def echo(data: RpcInvocationData) -> str:
        """Echo handler - returns input."""
        return data.payload
    
    @local.register_rpc_method("add")
    async def add(data: RpcInvocationData) -> str:
        """Addition handler."""
        params = json.loads(data.payload)
        result = params["a"] + params["b"]
        return json.dumps({"result": result})
    
    @local.register_rpc_method("get_status")
    async def get_status(data: RpcInvocationData) -> str:
        """Status handler."""
        return json.dumps({
            "status": "healthy",
            "uptime": 3600,
            "version": "1.0.0"
        })
    
    # Wait for remote participant
    await asyncio.sleep(2)
    
    # Call remote methods if participants present
    if len(room.remote_participants) > 0:
        remote = next(iter(room.remote_participants.values()))
        
        try:
            # Echo test
            echo_result = await local.perform_rpc(
                destination_identity=remote.identity,
                method="echo",
                payload="Hello RPC!",
                response_timeout=5.0
            )
            print(f"Echo: {echo_result}")
            
            # Calculation
            calc_result = await local.perform_rpc(
                destination_identity=remote.identity,
                method="add",
                payload=json.dumps({"a": 10, "b": 20}),
                response_timeout=5.0
            )
            data = json.loads(calc_result)
            print(f"10 + 20 = {data['result']}")
            
        except RpcError as e:
            print(f"RPC error {e.code}: {e.message}")
    
    # Keep running
    await asyncio.sleep(30)
    
    # Cleanup
    local.unregister_rpc_method("echo")
    local.unregister_rpc_method("add")
    local.unregister_rpc_method("get_status")
    await room.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

1. Use JSON for Structured Data

import json

# DO: Use JSON for structured parameters
payload = json.dumps({
    "action": "calculate",
    "params": {"a": 5, "b": 3},
    "options": {"precision": 2}
})

# DON'T: Use unstructured strings
# payload = "5,3,2"  # Hard to parse reliably

2. Validate Input

@local.register_rpc_method("process")
async def handle_process(data: RpcInvocationData) -> str:
    # Parse and validate
    try:
        params = json.loads(data.payload)
    except json.JSONDecodeError:
        raise RpcError(
            code=RpcError.ErrorCode.APPLICATION_ERROR,
            message="Invalid JSON"
        )
    
    # Validate required fields
    required = ["input", "format"]
    missing = [f for f in required if f not in params]
    if missing:
        raise RpcError(
            code=RpcError.ErrorCode.APPLICATION_ERROR,
            message=f"Missing parameters: {', '.join(missing)}"
        )
    
    # Process...
    return json.dumps({"status": "ok"})

3. Set Appropriate Timeouts

# Short timeout for quick operations
result = await local.perform_rpc(
    destination_identity=dest,
    method="ping",
    payload="{}",
    response_timeout=2.0  # 2 seconds
)

# Longer timeout for heavy operations
result = await local.perform_rpc(
    destination_identity=dest,
    method="process_video",
    payload=json.dumps({"video_id": "123"}),
    response_timeout=30.0  # 30 seconds
)

4. Handle All Error Cases

async def safe_rpc_call(local, dest, method, payload):
    """RPC call with comprehensive error handling."""
    try:
        return await local.perform_rpc(
            destination_identity=dest,
            method=method,
            payload=payload,
            response_timeout=5.0
        )
    except RpcError as e:
        if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
            print("Participant left")
        elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
            print("Method not registered")
        elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
            print("Timeout")
        elif e.code == RpcError.ErrorCode.REQUEST_PAYLOAD_TOO_LARGE:
            print("Payload too large")
        elif e.code == RpcError.ErrorCode.APPLICATION_ERROR:
            print(f"App error: {e.message}")
        else:
            print(f"Unknown error: {e.code}")
        return None

5. Use Custom Error Codes

# Define custom error codes (2000+)
class MyErrorCodes:
    INVALID_USER = 2000
    PERMISSION_DENIED = 2001
    RESOURCE_NOT_FOUND = 2002

@local.register_rpc_method("admin_action")
async def handle_admin(data: RpcInvocationData) -> str:
    # Check permissions
    if not is_admin(data.caller_identity):
        raise RpcError(
            code=MyErrorCodes.PERMISSION_DENIED,
            message="Admin access required",
            data=json.dumps({
                "required_role": "admin",
                "user_role": get_role(data.caller_identity)
            })
        )
    
    # Process...
    return json.dumps({"status": "ok"})

Advanced Patterns

Request-Response Schema

from typing import TypedDict
import json

class CalculateRequest(TypedDict):
    """Type-safe request schema."""
    a: float
    b: float
    operation: str

class CalculateResponse(TypedDict):
    """Type-safe response schema."""
    result: float
    operation: str

@local.register_rpc_method("calculate_v2")
async def handle_calculate_v2(data: RpcInvocationData) -> str:
    """Type-safe RPC handler."""
    # Parse with type checking
    req: CalculateRequest = json.loads(data.payload)
    
    # Validate
    if req["operation"] not in ["add", "subtract", "multiply", "divide"]:
        raise RpcError(
            code=RpcError.ErrorCode.APPLICATION_ERROR,
            message=f"Invalid operation: {req['operation']}"
        )
    
    # Calculate
    operations = {
        "add": lambda a, b: a + b,
        "subtract": lambda a, b: a - b,
        "multiply": lambda a, b: a * b,
        "divide": lambda a, b: a / b if b != 0 else None
    }
    
    result = operations[req["operation"]](req["a"], req["b"])
    
    if result is None:
        raise RpcError(
            code=RpcError.ErrorCode.APPLICATION_ERROR,
            message="Division by zero"
        )
    
    # Type-safe response
    response: CalculateResponse = {
        "result": result,
        "operation": req["operation"]
    }
    
    return json.dumps(response)

RPC with Retries

async def rpc_with_retry(
    local,
    dest: str,
    method: str,
    payload: str,
    max_retries: int = 3
) -> str:
    """RPC call with automatic retry."""
    for attempt in range(max_retries):
        try:
            return await local.perform_rpc(
                destination_identity=dest,
                method=method,
                payload=payload,
                response_timeout=5.0
            )
        except RpcError as e:
            # Retry on timeout or connection issues
            if e.code in [
                RpcError.ErrorCode.RESPONSE_TIMEOUT,
                RpcError.ErrorCode.CONNECTION_TIMEOUT
            ]:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"Retry {attempt + 1}/{max_retries} in {wait_time}s")
                    await asyncio.sleep(wait_time)
                    continue
            # Don't retry other errors
            raise
    
    raise RpcError(
        code=RpcError.ErrorCode.RESPONSE_TIMEOUT,
        message=f"Failed after {max_retries} attempts"
    )

See Also

  • Participants - Participant RPC methods
  • Data Streaming - Alternative communication patterns