tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
RPC provides a request-response communication pattern between participants with typed method handlers. It enables type-safe, asynchronous method invocation across participants in the room.
Key concepts:
from livekit import RpcError, RpcInvocationData, LocalParticipant@dataclass
class RpcInvocationData:
"""Data passed to RPC method handler.
Contains information about the RPC call.
Attributes:
request_id: Unique request identifier
Type: str
Format: UUID-like string
Used for tracking and logging
caller_identity: Identity of the calling participant
Type: str
Participant who initiated the RPC call
Use to identify caller
payload: Request payload
Type: str
Typically JSON string
Contains method parameters
Max size: ~256KB
response_timeout: Timeout in seconds for response
Type: float
Time caller is willing to wait
Handler should respect this
"""
request_id: str
caller_identity: str
payload: str
response_timeout: floatclass RpcError(Exception):
"""Specialized error for RPC methods.
Raised when RPC call fails or handler raises error.
"""
def __init__(
self,
code: Union[int, "RpcError.ErrorCode"],
message: str,
data: Optional[str] = None
):
"""Initialize RpcError.
Args:
code: Error code
Type: int | ErrorCode
Range: 1001-1999 are reserved for built-in errors
Custom codes: 2000+
message: Error message
Type: str
Human-readable description
data: Optional additional data
Type: str | None
JSON recommended for structured error data
Default: None
Example:
>>> raise RpcError(
... code=RpcError.ErrorCode.APPLICATION_ERROR,
... message="Division by zero",
... data='{"param": "denominator", "value": 0}'
... )
"""
@property
def code(self) -> int:
"""Error code.
Returns:
int: Error code (1001-1999 built-in, 2000+ custom)
"""
@property
def message(self) -> str:
"""Error message.
Returns:
str: Human-readable error description
"""
@property
def data(self) -> Optional[str]:
"""Optional error data.
Returns:
str | None: Additional error information (typically JSON)
"""
class ErrorCode(IntEnum):
"""Built-in RPC error codes.
Codes 1001-1999 are reserved for SDK errors.
Codes 2000+ can be used for application-specific errors.
"""
# Application errors (1500-1599)
APPLICATION_ERROR = 1500 # Generic application error
CONNECTION_TIMEOUT = 1501 # Connection lost during call
RESPONSE_TIMEOUT = 1502 # No response within timeout
RECIPIENT_DISCONNECTED = 1503 # Recipient left room
RESPONSE_PAYLOAD_TOO_LARGE = 1504 # Response exceeds size limit
SEND_FAILED = 1505 # Failed to send request
# Client errors (1400-1499)
UNSUPPORTED_METHOD = 1400 # Method not registered
RECIPIENT_NOT_FOUND = 1401 # Participant not in room
REQUEST_PAYLOAD_TOO_LARGE = 1402 # Request exceeds size limit
UNSUPPORTED_SERVER = 1403 # Server doesn't support RPC
UNSUPPORTED_VERSION = 1404 # Protocol version mismatch
ErrorMessage: ClassVar[Dict[ErrorCode, str]]
"""Mapping of error codes to default messages.
Provides standard error messages for each error code.
"""class LocalParticipant:
"""LocalParticipant RPC-related methods."""
async def perform_rpc(
self,
*,
destination_identity: str,
method: str,
payload: str,
response_timeout: Optional[float] = None
) -> str:
"""Initiate an RPC call to a remote participant.
Args:
destination_identity: Identity of destination participant
Type: str
Must be connected to room
Case-sensitive
method: Name of the method to call
Type: str
Must be registered by recipient
Case-sensitive
payload: Method payload
Type: str
Typically JSON for structured data
Max size: ~256KB
response_timeout: Timeout in seconds for response
Type: float | None
Default: None (uses default, typically 10s)
Range: 1.0 - 60.0 seconds recommended
Returns:
str: Response payload from handler
Typically JSON string
Raises:
RpcError: If call fails (see ErrorCode for types)
RuntimeError: If not connected to room
ValueError: If parameters invalid
Example:
>>> import json
>>> result = await local.perform_rpc(
... destination_identity="agent-123",
... method="calculate",
... payload=json.dumps({"a": 5, "b": 3}),
... response_timeout=5.0
... )
>>> data = json.loads(result)
>>> print(data["result"]) # 8
"""
def register_rpc_method(
self,
method_name: str,
handler: Optional[RpcHandler] = None
) -> Union[RpcHandler, Callable[[RpcHandler], RpcHandler]]:
"""Register an RPC method handler.
Can be used as decorator or with callback.
Args:
method_name: Name of the RPC method to handle
Type: str
Case-sensitive
handler: Handler function (optional for decorator)
Type: RpcHandler | None
Can be sync or async
Returns:
Handler function or decorator
Raises:
ValueError: If method already registered
TypeError: If handler not callable
Example (decorator):
>>> @local.register_rpc_method("calculate")
... async def handle_calculate(data: RpcInvocationData) -> str:
... import json
... params = json.loads(data.payload)
... result = params["a"] + params["b"]
... return json.dumps({"result": result})
Example (direct):
>>> async def handler(data):
... return "response"
>>> local.register_rpc_method("method", handler)
"""
def unregister_rpc_method(self, method: str) -> None:
"""Unregister a previously registered RPC method.
Args:
method: Name of RPC method to unregister
Type: str
Raises:
KeyError: If method not registered
Example:
>>> local.unregister_rpc_method("calculate")
"""RpcHandler = Callable[
[RpcInvocationData],
Union[Awaitable[Optional[str]], Optional[str]]
]
"""Type for RPC method handlers.
Handlers can be sync or async functions.
Receive RpcInvocationData and return string response.
Signature:
- Input: RpcInvocationData
- Output: str | None (async: Awaitable[str | None])
Examples:
>>> # Async handler
>>> async def handler(data: RpcInvocationData) -> str:
... return "response"
>>>
>>> # Sync handler
>>> def handler(data: RpcInvocationData) -> str:
... return "response"
"""import json
from livekit import LocalParticipant, RpcInvocationData
local: LocalParticipant = room.local_participant
@local.register_rpc_method("calculate")
async def handle_calculate(data: RpcInvocationData) -> str:
"""Handle calculate RPC call.
Args:
data: RPC invocation data
Returns:
str: JSON response
"""
# Log call info
print(f"RPC from {data.caller_identity}")
print(f"Request ID: {data.request_id}")
print(f"Timeout: {data.response_timeout}s")
# Parse payload
params = json.loads(data.payload)
# Validate
if "a" not in params or "b" not in params:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message="Missing parameters",
data=json.dumps({"required": ["a", "b"]})
)
# Perform calculation
result = params["a"] + params["b"]
# Return response
return json.dumps({"result": result})import json
from livekit import RpcError
try:
# Call RPC method
result = await local.perform_rpc(
destination_identity="remote-user",
method="calculate",
payload=json.dumps({"a": 5, "b": 3}),
response_timeout=5.0
)
# Parse response
data = json.loads(result)
print(f"Result: {data['result']}") # 8
except RpcError as e:
# Handle specific errors
if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
print("Participant not in room")
elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
print("Method not implemented")
elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
print("Request timed out")
elif e.code == RpcError.ErrorCode.APPLICATION_ERROR:
print(f"Application error: {e.message}")
if e.data:
error_data = json.loads(e.data)
print(f"Error details: {error_data}")
else:
print(f"RPC error {e.code}: {e.message}")from livekit import RpcError, RpcInvocationData
import json
@local.register_rpc_method("divide")
async def handle_divide(data: RpcInvocationData) -> str:
"""Handle divide RPC with error handling."""
try:
params = json.loads(data.payload)
# Validate parameters
if "a" not in params or "b" not in params:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message="Missing required parameters: a, b",
data=json.dumps({
"provided": list(params.keys()),
"required": ["a", "b"]
})
)
# Business logic validation
if params["b"] == 0:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message="Division by zero",
data=json.dumps({
"error_type": "math_error",
"param": "b",
"value": 0
})
)
# Perform operation
result = params["a"] / params["b"]
# Return success response
return json.dumps({
"success": True,
"result": result
})
except json.JSONDecodeError:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message="Invalid JSON payload"
)
except KeyError as e:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message=f"Missing parameter: {e}"
)import asyncio
import json
from livekit import Room, RpcError, RpcInvocationData
async def main():
room = Room()
await room.connect(url, token)
local = room.local_participant
# Register multiple handlers
@local.register_rpc_method("echo")
async def echo(data: RpcInvocationData) -> str:
"""Echo handler - returns input."""
return data.payload
@local.register_rpc_method("add")
async def add(data: RpcInvocationData) -> str:
"""Addition handler."""
params = json.loads(data.payload)
result = params["a"] + params["b"]
return json.dumps({"result": result})
@local.register_rpc_method("get_status")
async def get_status(data: RpcInvocationData) -> str:
"""Status handler."""
return json.dumps({
"status": "healthy",
"uptime": 3600,
"version": "1.0.0"
})
# Wait for remote participant
await asyncio.sleep(2)
# Call remote methods if participants present
if len(room.remote_participants) > 0:
remote = next(iter(room.remote_participants.values()))
try:
# Echo test
echo_result = await local.perform_rpc(
destination_identity=remote.identity,
method="echo",
payload="Hello RPC!",
response_timeout=5.0
)
print(f"Echo: {echo_result}")
# Calculation
calc_result = await local.perform_rpc(
destination_identity=remote.identity,
method="add",
payload=json.dumps({"a": 10, "b": 20}),
response_timeout=5.0
)
data = json.loads(calc_result)
print(f"10 + 20 = {data['result']}")
except RpcError as e:
print(f"RPC error {e.code}: {e.message}")
# Keep running
await asyncio.sleep(30)
# Cleanup
local.unregister_rpc_method("echo")
local.unregister_rpc_method("add")
local.unregister_rpc_method("get_status")
await room.disconnect()
if __name__ == "__main__":
asyncio.run(main())import json
# DO: Use JSON for structured parameters
payload = json.dumps({
"action": "calculate",
"params": {"a": 5, "b": 3},
"options": {"precision": 2}
})
# DON'T: Use unstructured strings
# payload = "5,3,2" # Hard to parse reliably@local.register_rpc_method("process")
async def handle_process(data: RpcInvocationData) -> str:
# Parse and validate
try:
params = json.loads(data.payload)
except json.JSONDecodeError:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message="Invalid JSON"
)
# Validate required fields
required = ["input", "format"]
missing = [f for f in required if f not in params]
if missing:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message=f"Missing parameters: {', '.join(missing)}"
)
# Process...
return json.dumps({"status": "ok"})# Short timeout for quick operations
result = await local.perform_rpc(
destination_identity=dest,
method="ping",
payload="{}",
response_timeout=2.0 # 2 seconds
)
# Longer timeout for heavy operations
result = await local.perform_rpc(
destination_identity=dest,
method="process_video",
payload=json.dumps({"video_id": "123"}),
response_timeout=30.0 # 30 seconds
)async def safe_rpc_call(local, dest, method, payload):
"""RPC call with comprehensive error handling."""
try:
return await local.perform_rpc(
destination_identity=dest,
method=method,
payload=payload,
response_timeout=5.0
)
except RpcError as e:
if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
print("Participant left")
elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
print("Method not registered")
elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
print("Timeout")
elif e.code == RpcError.ErrorCode.REQUEST_PAYLOAD_TOO_LARGE:
print("Payload too large")
elif e.code == RpcError.ErrorCode.APPLICATION_ERROR:
print(f"App error: {e.message}")
else:
print(f"Unknown error: {e.code}")
return None# Define custom error codes (2000+)
class MyErrorCodes:
INVALID_USER = 2000
PERMISSION_DENIED = 2001
RESOURCE_NOT_FOUND = 2002
@local.register_rpc_method("admin_action")
async def handle_admin(data: RpcInvocationData) -> str:
# Check permissions
if not is_admin(data.caller_identity):
raise RpcError(
code=MyErrorCodes.PERMISSION_DENIED,
message="Admin access required",
data=json.dumps({
"required_role": "admin",
"user_role": get_role(data.caller_identity)
})
)
# Process...
return json.dumps({"status": "ok"})from typing import TypedDict
import json
class CalculateRequest(TypedDict):
"""Type-safe request schema."""
a: float
b: float
operation: str
class CalculateResponse(TypedDict):
"""Type-safe response schema."""
result: float
operation: str
@local.register_rpc_method("calculate_v2")
async def handle_calculate_v2(data: RpcInvocationData) -> str:
"""Type-safe RPC handler."""
# Parse with type checking
req: CalculateRequest = json.loads(data.payload)
# Validate
if req["operation"] not in ["add", "subtract", "multiply", "divide"]:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message=f"Invalid operation: {req['operation']}"
)
# Calculate
operations = {
"add": lambda a, b: a + b,
"subtract": lambda a, b: a - b,
"multiply": lambda a, b: a * b,
"divide": lambda a, b: a / b if b != 0 else None
}
result = operations[req["operation"]](req["a"], req["b"])
if result is None:
raise RpcError(
code=RpcError.ErrorCode.APPLICATION_ERROR,
message="Division by zero"
)
# Type-safe response
response: CalculateResponse = {
"result": result,
"operation": req["operation"]
}
return json.dumps(response)async def rpc_with_retry(
local,
dest: str,
method: str,
payload: str,
max_retries: int = 3
) -> str:
"""RPC call with automatic retry."""
for attempt in range(max_retries):
try:
return await local.perform_rpc(
destination_identity=dest,
method=method,
payload=payload,
response_timeout=5.0
)
except RpcError as e:
# Retry on timeout or connection issues
if e.code in [
RpcError.ErrorCode.RESPONSE_TIMEOUT,
RpcError.ErrorCode.CONNECTION_TIMEOUT
]:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Retry {attempt + 1}/{max_retries} in {wait_time}s")
await asyncio.sleep(wait_time)
continue
# Don't retry other errors
raise
raise RpcError(
code=RpcError.ErrorCode.RESPONSE_TIMEOUT,
message=f"Failed after {max_retries} attempts"
)