or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

participants.mddocs/

Participants

Overview

Participants represent users in a LiveKit room. The SDK provides LocalParticipant for the current user and RemoteParticipant for other users. Both inherit from the abstract Participant base class.

Key concepts:

  • LocalParticipant: Represents the current client with capabilities to publish tracks, send data, and perform RPC calls
  • RemoteParticipant: Represents other clients in the room (read-only properties)
  • Identity: Unique, immutable identifier for each participant (set via token)
  • Name: Mutable display name (can be changed via set_name())
  • Metadata: Arbitrary JSON string for custom properties
  • Attributes: Key-value pairs for structured metadata

Import

from livekit import (
    Participant,
    LocalParticipant,
    RemoteParticipant,
    ParticipantKind,
    DisconnectReason,
    ParticipantTrackPermission,
)

Classes

Participant (Abstract Base Class)

class Participant(ABC):
    """Base class for local and remote participants.

    This abstract class defines common properties and behavior
    shared by both LocalParticipant and RemoteParticipant.
    
    Cannot be instantiated directly - use LocalParticipant or RemoteParticipant.
    """

    def __init__(
        self,
        owned_info: proto_participant.OwnedParticipant
    ) -> None:
        """Initialize a Participant.

        Args:
            owned_info: Internal participant information from FFI
            
        Note:
            This constructor is for internal use.
            Participants are created by the SDK, not by application code.
        """

Properties

@property
@abstractmethod
def track_publications(self) -> Mapping[str, TrackPublication]:
    """Track publications associated with this participant.

    Returns:
        Dictionary mapping track SID (string) to TrackPublication
        For LocalParticipant: Maps to LocalTrackPublication
        For RemoteParticipant: Maps to RemoteTrackPublication
        
    Note:
        This is a read-only mapping.
        Track SIDs are unique within a room session.
        Publications contain metadata about tracks (name, kind, dimensions, etc.).
        
    Example:
        >>> for sid, pub in participant.track_publications.items():
        ...     print(f"{sid}: {pub.name} ({pub.kind})")
    """

@property
def sid(self) -> str:
    """Session ID of the participant.

    Returns:
        str: Unique session identifier assigned by server
        
    Note:
        Session ID is unique per room session.
        Changes if participant rejoins room.
        Different from identity (which persists across sessions).
        Format: "PA_" followed by random string.
    """

@property
def name(self) -> str:
    """Display name of the participant.

    Returns:
        str: Participant's display name (may be empty)
        
    Note:
        Display name can be updated via set_name() (local participant only).
        Not required to be unique (unlike identity).
        May be empty string if not set.
        Typically shown in UI.
    """

@property
def identity(self) -> str:
    """Identity of the participant.

    Returns:
        str: Unique participant identity
        
    Note:
        Identity is set via JWT token and cannot be changed.
        Guaranteed to be unique within a room.
        Persists across reconnections and sessions.
        Use this as the primary identifier, not name.
        Format: Arbitrary string (typically username, user ID, etc.).
    """

@property
def metadata(self) -> str:
    """Metadata associated with the participant.

    Returns:
        str: JSON string or arbitrary metadata (may be empty)
        
    Note:
        Metadata can be updated via set_metadata() (local participant only).
        Typically JSON for structured data.
        Empty string if not set.
        Broadcast to all participants when changed.
        Max recommended size: 4KB.
    """

@property
def attributes(self) -> dict[str, str]:
    """Custom attributes associated with the participant.

    Returns:
        dict[str, str]: Dictionary of key-value attributes
        
    Note:
        Attributes can be updated via set_attributes() (local participant only).
        More structured than metadata field.
        Keys and values are strings.
        Attributes are merged, not replaced.
        To remove attribute, set to empty string.
        
    Example:
        >>> attrs = participant.attributes
        >>> print(f"Role: {attrs.get('role', 'guest')}")
        >>> print(f"Team: {attrs.get('team', 'none')}")
    """

@property
def kind(self) -> proto_participant.ParticipantKind.ValueType:
    """Participant's kind (type).

    Returns:
        ParticipantKind enum value:
        - STANDARD (0): Regular participant (human user)
        - INGRESS (1): Ingress participant (RTMP, WHIP stream)
        - EGRESS (2): Egress participant (recording, streaming out)
        - SIP (3): SIP participant (phone call)
        - AGENT (4): AI agent participant
        
    Note:
        Participant kind is set by server and cannot be changed.
        Useful for distinguishing between human users and bots/streams.
        Most participants are STANDARD.
    """

@property
def disconnect_reason(
    self
) -> Optional[proto_participant.DisconnectReason.ValueType]:
    """Reason for disconnection if participant has disconnected.

    Returns:
        DisconnectReason enum value or None if not disconnected
        Possible values:
        - CLIENT_INITIATED (1): User disconnected voluntarily
        - DUPLICATE_IDENTITY (2): Rejoined from another device
        - SERVER_SHUTDOWN (3): Server shutting down
        - PARTICIPANT_REMOVED (4): Kicked by moderator
        - ROOM_DELETED (5): Room was deleted
        - etc.
        
    Note:
        Only available after participant has disconnected.
        Returns None if participant is still connected.
        Useful for understanding why participant left.
    """

LocalParticipant

class LocalParticipant(Participant):
    """Represents the local participant in a room.

    The LocalParticipant provides methods to publish tracks,
    send data, perform RPC calls, and update local metadata.
    
    Accessible via room.local_participant.
    """

    def __init__(
        self,
        room_queue: BroadcastQueue[proto_ffi.FfiEvent],
        owned_info: proto_participant.OwnedParticipant
    ) -> None:
        """Initialize a LocalParticipant.

        Args:
            room_queue: Internal event queue
            owned_info: Internal participant information
            
        Note:
            This constructor is for internal use.
            Access local participant via room.local_participant.
        """

Properties

@property
def track_publications(self) -> Mapping[str, LocalTrackPublication]:
    """Dictionary of local track publications.

    Returns:
        Mapping[str, LocalTrackPublication]: Maps track SID to publication
        
    Note:
        Read-only mapping.
        Updated automatically when tracks are published/unpublished.
        Contains metadata for all published tracks.
        
    Example:
        >>> for sid, pub in local.track_publications.items():
        ...     print(f"{pub.name}: subscribed={pub.subscribed}")
        ...     await pub.wait_for_subscription()
    """

Track Publishing Methods

async def publish_track(
    self,
    track: LocalTrack,
    options: TrackPublishOptions = TrackPublishOptions()
) -> LocalTrackPublication:
    """Publish a local track to the room.

    Args:
        track: The local track to publish (LocalAudioTrack or LocalVideoTrack)
              Track must be created from AudioSource or VideoSource
        options: Publishing options (encoding, simulcast, etc.)
                Default: TrackPublishOptions() with default settings

    Returns:
        LocalTrackPublication: Publication for the published track
        Contains metadata and can be used to wait for subscription
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If track is invalid or already published
        Exception: If publishing fails (network error, permission denied)

    Example:
        >>> # Audio track
        >>> source = AudioSource(48000, 1)
        >>> track = LocalAudioTrack.create_audio_track("mic", source)
        >>> options = TrackPublishOptions()
        >>> options.source = TrackSource.SOURCE_MICROPHONE
        >>> publication = await local.publish_track(track, options)
        >>> print(f"Published: {publication.sid}")
        >>> 
        >>> # Video track with encoding
        >>> source = VideoSource(1920, 1080)
        >>> track = LocalVideoTrack.create_video_track("camera", source)
        >>> options = TrackPublishOptions()
        >>> options.video_codec = VideoCodec.VP8
        >>> options.simulcast = True
        >>> encoding = VideoEncoding()
        >>> encoding.max_bitrate = 2_000_000
        >>> encoding.max_framerate = 30.0
        >>> options.video_encoding = encoding
        >>> publication = await local.publish_track(track, options)
        
    Note:
        Track must be created before publishing.
        Publishing is async and may take a few hundred ms.
        Track will be available to other participants after publishing.
        If auto_subscribe=True, others will automatically subscribe.
        
        Track SID is assigned by server after publishing.
        Use publication.wait_for_subscription() to confirm delivery.
        
    Performance:
        Initial publish: ~100-500ms depending on network
        Video encoding setup may take longer first time
    """

async def unpublish_track(self, track_sid: str) -> None:
    """Unpublish a track from the room.

    Args:
        track_sid: Session ID of the track to unpublish
                  Format: "TR_" followed by random string
                  Get from publication.sid or track.sid

    Returns:
        None (awaitable)
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If track SID is invalid or track not published
        KeyError: If track SID not found in publications

    Example:
        >>> # Unpublish specific track
        >>> await local.unpublish_track("TR_XXXXX")
        >>> 
        >>> # Unpublish all tracks
        >>> for sid in list(local.track_publications.keys()):
        ...     await local.unpublish_track(sid)
        
    Note:
        Unpublishing is immediate.
        Other participants will receive track_unsubscribed event.
        Track streams will end for subscribers.
        
        After unpublishing, track and source are still valid
        and can be republished if needed.
        
        Remember to clean up source if no longer needed:
        >>> await source.aclose()
    """

Data Publishing Methods

async def publish_data(
    self,
    payload: Union[bytes, str],
    *,
    reliable: bool = True,
    destination_identities: List[str] = [],
    topic: str = ""
) -> None:
    """Publish arbitrary data to the room.

    Args:
        payload: Data to publish
                Type: bytes or str (str will be encoded as UTF-8)
                Max size: ~256KB (SCTP message limit)
        reliable: Whether to send reliably or lossy
                 Type: bool
                 Default: True (reliable delivery)
                 True: TCP-like guaranteed delivery, ordered (KIND_RELIABLE)
                 False: UDP-like best-effort delivery, lower latency (KIND_LOSSY)
        destination_identities: List of participant identities to send to
                               Type: List[str]
                               Default: [] (broadcast to all)
                               Empty list sends to all participants
                               Specify identities for targeted delivery
        topic: Optional topic for categorizing data
              Type: str
              Default: "" (no topic)
              Max length: 256 characters
              Used for filtering on receiver side

    Returns:
        None (awaitable)
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If payload exceeds size limit or destinations invalid
        Exception: If send fails (network error)

    Example:
        >>> # Send to all participants (reliable)
        >>> await local.publish_data(b"Hello everyone", reliable=True)
        >>> 
        >>> # Send text to specific participant (lossy for low latency)
        >>> await local.publish_data(
        ...     "Quick update",
        ...     reliable=False,
        ...     destination_identities=["user123"]
        ... )
        >>> 
        >>> # Send with topic
        >>> import json
        >>> message = json.dumps({"type": "chat", "text": "Hello"})
        >>> await local.publish_data(
        ...     message,
        ...     topic="chat"
        ... )
        >>> 
        >>> # Binary data
        >>> image_bytes = load_image()
        >>> await local.publish_data(
        ...     image_bytes,
        ...     reliable=True,
        ...     topic="images"
        ... )
        
    Note:
        Reliable delivery (TCP-like):
        - Guaranteed arrival and order
        - Retransmission on packet loss
        - Higher latency on poor networks
        - Use for critical data (commands, state updates)
        
        Lossy delivery (UDP-like):
        - Best-effort, may drop packets
        - Lower latency
        - No retransmission
        - Use for non-critical data (position updates, ephemeral state)
        
        For large data (>256KB), use data streaming instead:
        >>> writer = await local.stream_bytes(name="file.pdf")
        >>> await writer.write(chunk1)
        >>> await writer.write(chunk2)
        >>> await writer.aclose()
    """

async def publish_dtmf(self, *, code: int, digit: str) -> None:
    """Publish SIP DTMF message.

    Args:
        code: DTMF code
              Type: int
              Range: 0-15
              0-9: Digits 0-9
              10: * (star)
              11: # (pound)
              12-15: A-D (extended DTMF)
        digit: DTMF digit
               Type: str
               Single character: "0"-"9", "*", "#", "A"-"D"
               Must match code

    Returns:
        None (awaitable)
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If code/digit invalid or mismatched

    Example:
        >>> # Send digit 1
        >>> await local.publish_dtmf(code=1, digit="1")
        >>> 
        >>> # Send star
        >>> await local.publish_dtmf(code=10, digit="*")
        >>> 
        >>> # Send pound
        >>> await local.publish_dtmf(code=11, digit="#")
        
    Note:
        Used for SIP integration (phone calls).
        DTMF signals are received by SIP gateway.
        Useful for phone menu navigation, PIN entry, etc.
    """

async def publish_transcription(
    self,
    transcription: Transcription
) -> None:
    """Publish transcription data to the room.

    Args:
        transcription: Transcription data to publish
                      Contains participant_identity, track_sid, and segments

    Returns:
        None (awaitable)
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If transcription data is invalid

    Example:
        >>> from livekit import Transcription, TranscriptionSegment
        >>> 
        >>> # Create transcription
        >>> transcription = Transcription(
        ...     participant_identity=local.identity,
        ...     track_sid="TR_XXXXX",
        ...     segments=[
        ...         TranscriptionSegment(
        ...             id="seg1",
        ...             text="Hello world",
        ...             start_time=0,     # Milliseconds
        ...             end_time=1000,    # Milliseconds
        ...             language="en",    # BCP 47 language code
        ...             final=True        # True for final, False for interim
        ...         )
        ...     ]
        ... )
        >>> 
        >>> # Publish
        >>> await local.publish_transcription(transcription)
        
    Note:
        Transcription is typically generated by AI transcription service.
        Can publish interim results (final=False) for low latency.
        Final transcriptions (final=True) are more accurate.
        Other participants receive via 'transcription_received' event.
    """

Data Streaming Methods

async def stream_text(
    self,
    *,
    destination_identities: Optional[List[str]] = None,
    topic: str = "",
    attributes: Optional[Dict[str, str]] = None,
    stream_id: str | None = None,
    reply_to_id: str | None = None,
    total_size: int | None = None,
    sender_identity: str | None = None
) -> TextStreamWriter:
    """Create a text stream writer for streaming text chunks.

    Args:
        destination_identities: List of participant identities to send to
                               Default: None (broadcast to all)
        topic: Topic for the stream
              Default: "" (no topic)
              Used for routing on receiver side
        attributes: Key-value attributes for the stream
                   Type: Dict[str, str] | None
                   Default: None
                   Metadata about the stream
        stream_id: Custom stream ID
                  Type: str | None
                  Default: None (auto-generated)
                  Useful for tracking or replying
        reply_to_id: ID of stream this is replying to
                    Type: str | None
                    Default: None
                    For threaded conversations
        total_size: Total size in bytes if known
                   Type: int | None
                   Default: None
                   For progress tracking
        sender_identity: Override sender identity
                        Type: str | None
                        Default: None (uses local participant identity)
                        Advanced use only

    Returns:
        TextStreamWriter: Writer for writing text chunks
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If parameters invalid

    Example:
        >>> # Simple text stream
        >>> writer = await local.stream_text(topic="chat")
        >>> await writer.write("Hello ")
        >>> await writer.write("world!")
        >>> await writer.aclose()
        >>> 
        >>> # Stream with metadata
        >>> writer = await local.stream_text(
        ...     topic="chat",
        ...     attributes={"priority": "high", "sender_name": "Alice"}
        ... )
        >>> await writer.write("Important message")
        >>> await writer.aclose()
        >>> 
        >>> # Reply to previous stream
        >>> writer = await local.stream_text(
        ...     topic="chat",
        ...     reply_to_id="previous_stream_id"
        ... )
        >>> await writer.write("Thanks for the message!")
        >>> await writer.aclose()
        
    Note:
        Text is automatically encoded as UTF-8.
        Writer must be closed with aclose() to signal end of stream.
        Chunks are sent as they're written (not buffered).
        
        For complete messages, use send_text() convenience method:
        >>> info = await local.send_text("Quick message", topic="chat")
    """

async def send_text(
    self,
    text: str,
    *,
    destination_identities: Optional[List[str]] = None,
    topic: str = "",
    attributes: Optional[Dict[str, str]] = None,
    reply_to_id: str | None = None
) -> TextStreamInfo:
    """Send a complete text message in one call.

    Args:
        text: Complete text message to send
             Type: str
             Encoded as UTF-8
        destination_identities: List of participant identities to send to
                               Default: None (broadcast to all)
        topic: Topic for the message
              Default: ""
        attributes: Key-value attributes
                   Default: None
        reply_to_id: ID of stream this is replying to
                    Default: None

    Returns:
        TextStreamInfo: Information about the sent stream
        Contains: stream_id, mime_type, topic, timestamp, size, attributes
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If parameters invalid

    Example:
        >>> # Simple message
        >>> info = await local.send_text("Hello everyone!")
        >>> print(f"Sent stream: {info.stream_id}")
        >>> 
        >>> # Message to specific participant
        >>> info = await local.send_text(
        ...     "Private message",
        ...     destination_identities=["user123"],
        ...     topic="private-chat"
        ... )
        >>> 
        >>> # Message with attributes
        >>> info = await local.send_text(
        ...     "Announcement",
        ...     topic="announcements",
        ...     attributes={"priority": "high"}
        ... )
        
    Note:
        Convenience method for short messages.
        For long messages or streaming, use stream_text().
        Message is sent immediately (not chunked).
    """

async def stream_bytes(
    self,
    name: str,
    *,
    total_size: int | None = None,
    mime_type: str = "application/octet-stream",
    attributes: Optional[Dict[str, str]] = None,
    stream_id: str | None = None,
    destination_identities: Optional[List[str]] = None,
    topic: str = ""
) -> ByteStreamWriter:
    """Create a byte stream writer for streaming byte chunks.

    Args:
        name: Name of the byte stream (e.g., filename)
             Type: str
             Required
             Displayed to receiver
        total_size: Total size in bytes if known
                   Type: int | None
                   Default: None
                   For progress tracking
        mime_type: MIME type of the data
                  Type: str
                  Default: "application/octet-stream"
                  Examples: "image/jpeg", "application/pdf", "video/mp4"
        attributes: Key-value attributes for the stream
                   Default: None
        stream_id: Custom stream ID
                  Default: None (auto-generated)
        destination_identities: List of participant identities to send to
                               Default: None (broadcast to all)
        topic: Topic for the stream
              Default: ""

    Returns:
        ByteStreamWriter: Writer for writing byte chunks
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If parameters invalid

    Example:
        >>> # Stream file in chunks
        >>> writer = await local.stream_bytes(
        ...     name="document.pdf",
        ...     mime_type="application/pdf",
        ...     total_size=102400  # 100 KB
        ... )
        >>> 
        >>> # Read and send in chunks
        >>> with open("document.pdf", "rb") as f:
        ...     while chunk := f.read(16000):
        ...         await writer.write(chunk)
        >>> 
        >>> await writer.aclose()
        >>> 
        >>> # Stream image
        >>> writer = await local.stream_bytes(
        ...     name="photo.jpg",
        ...     mime_type="image/jpeg",
        ...     topic="images"
        ... )
        >>> await writer.write(image_data)
        >>> await writer.aclose()
        
    Note:
        Chunks are sent as written (streaming, not buffered).
        total_size enables progress tracking on receiver.
        Writer must be closed to signal end of stream.
        
        Chunk size recommendation: 15KB for optimal performance.
        Smaller chunks: Higher overhead
        Larger chunks: Higher latency
        
        For complete files, use send_file() convenience method:
        >>> info = await local.send_file("/path/to/file.pdf")
    """

async def send_file(
    self,
    file_path: str,
    *,
    topic: str = "",
    destination_identities: Optional[List[str]] = None,
    attributes: Optional[Dict[str, str]] = None,
    stream_id: str | None = None
) -> ByteStreamInfo:
    """Send a file from the filesystem.

    Args:
        file_path: Path to the file to send
                  Type: str
                  Must exist and be readable
        topic: Topic for the file transfer
              Default: ""
        destination_identities: List of participant identities to send to
                               Default: None (broadcast to all)
        attributes: Key-value attributes for the transfer
                   Default: None
        stream_id: Custom stream ID
                  Default: None (auto-generated)

    Returns:
        ByteStreamInfo: Information about the sent stream
        Contains: stream_id, name (filename), mime_type, size, etc.
        
    Raises:
        RuntimeError: If not connected to room
        FileNotFoundError: If file doesn't exist
        PermissionError: If file not readable
        ValueError: If parameters invalid

    Example:
        >>> # Send file
        >>> info = await local.send_file(
        ...     "/path/to/document.pdf",
        ...     topic="file-sharing"
        ... )
        >>> print(f"Sent file: {info.name}, size: {info.size}")
        >>> 
        >>> # Send to specific participant
        >>> info = await local.send_file(
        ...     "/path/to/image.jpg",
        ...     destination_identities=["user123"],
        ...     topic="private-files"
        ... )
        >>> 
        >>> # Send with attributes
        >>> info = await local.send_file(
        ...     "/path/to/data.csv",
        ...     topic="data-transfer",
        ...     attributes={"description": "Sales report Q4"}
        ... )
        
    Note:
        Convenience method that:
        1. Opens file
        2. Determines MIME type from extension
        3. Streams file in chunks
        4. Closes stream automatically
        
        For large files (>100MB), consider showing progress:
        >>> # Use stream_bytes() instead for progress tracking
        >>> file_size = os.path.getsize(file_path)
        >>> writer = await local.stream_bytes(
        ...     name=os.path.basename(file_path),
        ...     total_size=file_size
        ... )
        >>> with open(file_path, "rb") as f:
        ...     while chunk := f.read(16000):
        ...         await writer.write(chunk)
        ...         progress = (f.tell() / file_size) * 100
        ...         print(f"Progress: {progress:.1f}%")
        >>> await writer.aclose()
    """

RPC Methods

async def perform_rpc(
    self,
    *,
    destination_identity: str,
    method: str,
    payload: str,
    response_timeout: Optional[float] = None
) -> str:
    """Initiate an RPC call to a remote participant.

    Args:
        destination_identity: Identity of the destination participant
                             Type: str
                             Must be connected to room
        method: Name of the method to call
               Type: str
               Must be registered by recipient
        payload: Method payload
                Type: str
                Typically JSON for structured data
                Max size: ~256KB
        response_timeout: Timeout in seconds for receiving response
                         Type: float | None
                         Default: None (uses default timeout, typically 10s)
                         Minimum: 1.0 second
                         Maximum: 60.0 seconds recommended

    Returns:
        str: Response payload string from the remote handler
        Typically JSON string
        
    Raises:
        RpcError: If RPC call fails
                 Common error codes:
                 - RECIPIENT_NOT_FOUND: Participant not in room
                 - UNSUPPORTED_METHOD: Method not registered
                 - RESPONSE_TIMEOUT: No response within timeout
                 - CONNECTION_TIMEOUT: Connection lost during call
                 - APPLICATION_ERROR: Handler raised error
                 - REQUEST_PAYLOAD_TOO_LARGE: Payload exceeds limit
                 - RESPONSE_PAYLOAD_TOO_LARGE: Response exceeds limit
        RuntimeError: If not connected to room

    Example:
        >>> # Simple RPC call
        >>> import json
        >>> 
        >>> try:
        ...     result = await local.perform_rpc(
        ...         destination_identity="agent-123",
        ...         method="calculate",
        ...         payload=json.dumps({"a": 5, "b": 3}),
        ...         response_timeout=5.0
        ...     )
        ...     data = json.loads(result)
        ...     print(f"Result: {data}")  # {"result": 8}
        ... except RpcError as e:
        ...     print(f"RPC failed: {e.code} - {e.message}")
        >>> 
        >>> # Error handling
        >>> try:
        ...     result = await local.perform_rpc(
        ...         destination_identity="user-123",
        ...         method="get_status",
        ...         payload="{}"
        ...     )
        ... except RpcError as e:
        ...     if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
        ...         print("Participant not in room")
        ...     elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
        ...         print("Method not implemented by participant")
        ...     elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
        ...         print("Participant didn't respond in time")
        ...     else:
        ...         print(f"Unknown error: {e.code}")
        
    Note:
        RPC is request-response pattern (not fire-and-forget).
        Blocks until response received or timeout.
        Payload should be JSON for type safety:
        
        >>> # Good: Structured data
        >>> payload = json.dumps({
        ...     "action": "calculate",
        ...     "params": {"a": 5, "b": 3}
        ... })
        >>> 
        >>> # Bad: Unstructured string
        >>> payload = "5+3"  # Handler can't reliably parse
        
        For fire-and-forget, use publish_data():
        >>> await local.publish_data("command", topic="commands")
    """

def register_rpc_method(
    self,
    method_name: str,
    handler: Optional[RpcHandler] = None
) -> Union[RpcHandler, Callable[[RpcHandler], RpcHandler]]:
    """Register an RPC method handler.

    Can be used as a decorator or with a callback function.

    Args:
        method_name: Name of the RPC method to handle
                    Type: str
                    Case-sensitive
                    Should be descriptive (e.g., "calculate", "get_status")
        handler: Handler function (optional for decorator usage)
                Type: RpcHandler | None
                Signature: async def handler(data: RpcInvocationData) -> str
                Can be sync or async function

    Returns:
        Handler function or decorator
        
    Raises:
        ValueError: If method already registered
        TypeError: If handler not callable

    Example:
        >>> # Decorator usage
        >>> @local.register_rpc_method("calculate")
        ... async def handle_calculate(data: RpcInvocationData) -> str:
        ...     import json
        ...     
        ...     print(f"Called by: {data.caller_identity}")
        ...     print(f"Request ID: {data.request_id}")
        ...     print(f"Timeout: {data.response_timeout}")
        ...     
        ...     # Parse payload
        ...     params = json.loads(data.payload)
        ...     
        ...     # Perform calculation
        ...     result = params["a"] + params["b"]
        ...     
        ...     # Return response
        ...     return json.dumps({"result": result})
        >>> 
        >>> # Direct registration
        >>> async def my_handler(data: RpcInvocationData) -> str:
        ...     return json.dumps({"status": "ok"})
        >>> local.register_rpc_method("status", my_handler)
        >>> 
        >>> # Error handling in handler
        >>> @local.register_rpc_method("divide")
        ... async def handle_divide(data: RpcInvocationData) -> str:
        ...     import json
        ...     from livekit import RpcError
        ...     
        ...     params = json.loads(data.payload)
        ...     if params["b"] == 0:
        ...         raise RpcError(
        ...             code=RpcError.ErrorCode.APPLICATION_ERROR,
        ...             message="Division by zero",
        ...             data=json.dumps({"param": "b", "value": 0})
        ...         )
        ...     
        ...     result = params["a"] / params["b"]
        ...     return json.dumps({"result": result})
        
    Note:
        Handler can be sync or async function.
        Handler must return string (typically JSON).
        Handler exceptions are caught and returned as RpcError.
        
        Handler receives RpcInvocationData with:
        - caller_identity: Who called (string)
        - payload: Request payload (string)
        - request_id: Unique request ID (string)
        - response_timeout: Timeout in seconds (float)
        
        Multiple methods can be registered:
        >>> @local.register_rpc_method("method1")
        ... async def handler1(data):
        ...     return "response1"
        >>> 
        >>> @local.register_rpc_method("method2")
        ... async def handler2(data):
        ...     return "response2"
    """

def unregister_rpc_method(self, method: str) -> None:
    """Unregister a previously registered RPC method.

    Args:
        method: Name of the RPC method to unregister
               Type: str
               Must be previously registered

    Returns:
        None
        
    Raises:
        KeyError: If method not registered

    Example:
        >>> # Register method
        >>> @local.register_rpc_method("calculate")
        ... async def handler(data):
        ...     return "result"
        >>> 
        >>> # Later, unregister
        >>> local.unregister_rpc_method("calculate")
        
    Note:
        Unregistering takes effect immediately.
        Future calls to this method will fail with UNSUPPORTED_METHOD.
        Calls in progress are not affected.
    """

Track Subscription Management

def set_track_subscription_permissions(
    self,
    *,
    allow_all_participants: bool,
    participant_permissions: Optional[List[ParticipantTrackPermission]] = None
) -> None:
    """Set track subscription permissions.

    Control which participants can subscribe to the local participant's tracks.

    Args:
        allow_all_participants: Whether to allow all participants to subscribe
                               Type: bool
                               True: All participants can subscribe
                               False: Only specified participants can subscribe
        participant_permissions: Per-participant track permissions
                                Type: List[ParticipantTrackPermission] | None
                                Default: None
                                Each permission specifies:
                                - participant_identity: Participant ID
                                - allow_all: Allow all tracks
                                - allowed_track_sids: Specific track SIDs

    Returns:
        None
        
    Raises:
        RuntimeError: If not connected to room
        ValueError: If permissions invalid

    Example:
        >>> from livekit import ParticipantTrackPermission
        >>> 
        >>> # Allow only specific participants to subscribe
        >>> local.set_track_subscription_permissions(
        ...     allow_all_participants=False,
        ...     participant_permissions=[
        ...         # Allow all tracks for user123
        ...         ParticipantTrackPermission(
        ...             participant_identity="user123",
        ...             allow_all=True,
        ...             allowed_track_sids=[]
        ...         ),
        ...         # Allow specific tracks for user456
        ...         ParticipantTrackPermission(
        ...             participant_identity="user456",
        ...             allow_all=False,
        ...             allowed_track_sids=["TR_XXXXX", "TR_YYYYY"]
        ...         )
        ...     ]
        ... )
        >>> 
        >>> # Allow all participants
        >>> local.set_track_subscription_permissions(
        ...     allow_all_participants=True
        ... )
        
    Note:
        Permissions are enforced server-side.
        Changes take effect immediately.
        Existing subscriptions are not affected (not revoked).
        Only prevents new subscriptions.
        
        Use cases:
        - Private conversations
        - Selective screen sharing
        - Role-based access control
        - Presenter-only video
    """

Metadata Methods

async def set_metadata(self, metadata: str) -> None:
    """Set metadata for the local participant.

    Requires canUpdateOwnMetadata permission in token.

    Args:
        metadata: New metadata
                 Type: str
                 Typically JSON string
                 Max recommended size: 4KB
                 Empty string to clear

    Returns:
        None (awaitable)
        
    Raises:
        RuntimeError: If not connected to room
        PermissionError: If token lacks canUpdateOwnMetadata permission
        ValueError: If metadata exceeds size limit

    Example:
        >>> import json
        >>> 
        >>> # Set structured metadata
        >>> await local.set_metadata(json.dumps({
        ...     "status": "available",
        ...     "mood": "happy",
        ...     "last_active": "2024-01-01T00:00:00Z"
        ... }))
        >>> 
        >>> # Update status
        >>> metadata = json.loads(local.metadata) if local.metadata else {}
        >>> metadata["status"] = "away"
        >>> await local.set_metadata(json.dumps(metadata))
        >>> 
        >>> # Clear metadata
        >>> await local.set_metadata("")
        
    Note:
        Metadata is broadcast to all participants.
        Triggers 'participant_metadata_changed' event for all.
        Updates replace entire metadata (not merged).
        
        For structured updates, use attributes instead:
        >>> await local.set_attributes({"status": "away"})
        
        Metadata vs Attributes:
        - Metadata: Single string, less structured
        - Attributes: Key-value pairs, more structured
    """

async def set_name(self, name: str) -> None:
    """Set display name for the local participant.

    Requires canUpdateOwnMetadata permission in token.

    Args:
        name: New display name
             Type: str
             Max recommended length: 256 characters
             Can be empty string

    Returns:
        None (awaitable)
        
    Raises:
        RuntimeError: If not connected to room
        PermissionError: If token lacks canUpdateOwnMetadata permission
        ValueError: If name exceeds length limit

    Example:
        >>> # Set name
        >>> await local.set_name("John Doe")
        >>> 
        >>> # Update name
        >>> await local.set_name("John Doe (Host)")
        >>> 
        >>> # Clear name
        >>> await local.set_name("")
        
    Note:
        Name is broadcast to all participants.
        Triggers 'participant_name_changed' event for all.
        Name is separate from identity (identity never changes).
        
        Identity vs Name:
        - Identity: Unique, immutable, from token
        - Name: Non-unique, mutable, display-friendly
    """

async def set_attributes(self, attributes: dict[str, str]) -> None:
    """Set custom attributes for the local participant.

    Requires canUpdateOwnMetadata permission in token.

    Args:
        attributes: Dictionary of key-value attributes
                   Type: dict[str, str]
                   Keys and values must be strings
                   Attributes are merged (not replaced)
                   To remove attribute, set to empty string

    Returns:
        None (awaitable)
        
    Raises:
        RuntimeError: If not connected to room
        PermissionError: If token lacks canUpdateOwnMetadata permission
        TypeError: If keys/values not strings
        ValueError: If attributes exceed size limit

    Example:
        >>> # Set attributes
        >>> await local.set_attributes({
        ...     "role": "moderator",
        ...     "team": "red",
        ...     "level": "5"
        ... })
        >>> 
        >>> # Update single attribute (others preserved)
        >>> await local.set_attributes({
        ...     "status": "speaking"
        ... })
        >>> # Now has: role, team, level, status
        >>> 
        >>> # Remove attribute
        >>> await local.set_attributes({
        ...     "status": ""  # Removes status
        ... })
        
    Note:
        Attributes are broadcast to all participants.
        Triggers 'participant_attributes_changed' event.
        Only changed attributes are included in event.
        
        Attributes are merged, not replaced:
        >>> # First call
        >>> await local.set_attributes({"a": "1", "b": "2"})
        >>> # Second call
        >>> await local.set_attributes({"b": "3", "c": "4"})
        >>> # Result: {"a": "1", "b": "3", "c": "4"}
        
        To clear all attributes:
        >>> await local.set_attributes({
        ...     key: "" for key in local.attributes.keys()
        ... })
    """

RemoteParticipant

class RemoteParticipant(Participant):
    """Represents a remote participant in a room.

    RemoteParticipant provides access to track publications
    from other participants in the room.
    
    All properties are read-only.
    Cannot publish tracks or send data (use LocalParticipant for that).
    """

    def __init__(
        self,
        owned_info: proto_participant.OwnedParticipant
    ) -> None:
        """Initialize a RemoteParticipant.

        Args:
            owned_info: Internal participant information
            
        Note:
            This constructor is for internal use.
            Remote participants are created automatically by SDK.
            Access via room.remote_participants.
        """

Properties

@property
def track_publications(self) -> Mapping[str, RemoteTrackPublication]:
    """Dictionary of remote track publications.

    Returns:
        Mapping[str, RemoteTrackPublication]: Maps track SID to publication
        
    Note:
        Read-only mapping.
        Contains all tracks published by this participant.
        Use publication.set_subscribed(True) to subscribe.
        
    Example:
        >>> for sid, pub in participant.track_publications.items():
        ...     print(f"{pub.name}: subscribed={pub.subscribed}")
        ...     if not pub.subscribed:
        ...         pub.set_subscribed(True)
    """

Type Aliases

RpcHandler = Callable[
    [RpcInvocationData],
    Union[Awaitable[Optional[str]], Optional[str]]
]
"""Type for RPC method handlers.

Handlers can be sync or async functions that receive RpcInvocationData
and return an optional string response.

Handler signature:
- Input: RpcInvocationData
  - caller_identity: str
  - payload: str
  - request_id: str
  - response_timeout: float
- Output: str | None (for async: Awaitable[str | None])

Example:
    >>> async def handler(data: RpcInvocationData) -> str:
    ...     # Process request
    ...     return "response"
    >>> 
    >>> def sync_handler(data: RpcInvocationData) -> str:
    ...     return "response"
"""

Complete Example

import asyncio
import json
from livekit import (
    Room,
    LocalParticipant,
    RemoteParticipant,
    RpcInvocationData,
    RpcError,
    AudioSource,
    LocalAudioTrack,
    TrackPublishOptions,
    ParticipantKind,
)

async def main():
    room = Room()

    # Handle new participants
    @room.on("participant_connected")
    def on_participant_connected(participant: RemoteParticipant):
        print(f"Participant joined: {participant.identity}")
        print(f"  Name: {participant.name}")
        print(f"  Metadata: {participant.metadata}")
        print(f"  Kind: {participant.kind}")
        print(f"  Attributes: {participant.attributes}")

        # Access their tracks
        for track_sid, publication in participant.track_publications.items():
            print(f"  Track: {publication.name} ({track_sid})")

    # Handle participant metadata changes
    @room.on("participant_metadata_changed")
    def on_metadata_changed(
        participant,
        old_metadata: str,
        new_metadata: str
    ):
        print(f"{participant.identity} metadata changed:")
        print(f"  Old: {old_metadata}")
        print(f"  New: {new_metadata}")

    # Connect to room
    await room.connect(url, token)

    # Access local participant
    local: LocalParticipant = room.local_participant
    print(f"Local identity: {local.identity}")

    # Update local metadata
    await local.set_metadata(json.dumps({
        "status": "active",
        "version": "1.0"
    }))

    await local.set_name("My Display Name")

    await local.set_attributes({
        "role": "presenter",
        "team": "blue"
    })

    # Publish audio track
    source = AudioSource(sample_rate=48000, num_channels=1)
    track = LocalAudioTrack.create_audio_track("microphone", source)
    publication = await local.publish_track(track, TrackPublishOptions())

    print(f"Published track: {publication.sid}")

    # Register RPC handler
    @local.register_rpc_method("get_status")
    async def handle_get_status(data: RpcInvocationData) -> str:
        print(f"RPC call from {data.caller_identity}")
        return json.dumps({
            "status": "healthy",
            "uptime": 3600
        })

    # Send data to all participants
    await local.publish_data(
        json.dumps({"type": "announcement", "text": "Hello everyone"}),
        topic="announcements"
    )

    # Send data to specific participant
    if len(room.remote_participants) > 0:
        first_participant = next(iter(room.remote_participants.values()))
        await local.publish_data(
            "Private message",
            destination_identities=[first_participant.identity],
            topic="private"
        )

        # Perform RPC call
        try:
            response = await local.perform_rpc(
                destination_identity=first_participant.identity,
                method="echo",
                payload="test message",
                response_timeout=5.0
            )
            print(f"RPC response: {response}")
        except RpcError as e:
            print(f"RPC failed: {e.code} - {e.message}")

    # Stream text
    writer = await local.stream_text(topic="chat")
    await writer.write("Hello ")
    await writer.write("from ")
    await writer.write("stream!")
    await writer.aclose()

    # Send file
    file_info = await local.send_file(
        "/path/to/document.pdf",
        topic="file-sharing"
    )
    print(f"Sent file: {file_info.name}")

    # Set track subscription permissions
    from livekit import ParticipantTrackPermission
    local.set_track_subscription_permissions(
        allow_all_participants=False,
        participant_permissions=[
            ParticipantTrackPermission(
                participant_identity=first_participant.identity,
                allow_all=True,
                allowed_track_sids=[]
            )
        ]
    )

    # Keep running
    await asyncio.sleep(60)

    # Cleanup
    await local.unpublish_track(track.sid)
    await room.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

1. Participant Identity vs Name

# DO: Use identity for identification
participant_map = {p.identity: p for p in room.remote_participants.values()}
target = participant_map.get("user123")

# DON'T: Use name for identification (not unique)
# target = [p for p in room.remote_participants.values() if p.name == "John"][0]

2. Metadata Structure

import json

# DO: Use JSON for structured metadata
await local.set_metadata(json.dumps({
    "status": "available",
    "preferences": {"theme": "dark"},
    "timestamp": "2024-01-01T00:00:00Z"
}))

# Later, parse safely
try:
    metadata = json.loads(participant.metadata) if participant.metadata else {}
    status = metadata.get("status", "unknown")
except json.JSONDecodeError:
    status = "unknown"

# DON'T: Use unstructured strings
# await local.set_metadata("status:available,theme:dark")

3. Attributes for Dynamic Properties

# DO: Use attributes for frequently changing properties
await local.set_attributes({"status": "speaking"})
await local.set_attributes({"status": "idle"})  # Only updates status

# DON'T: Use metadata for frequently changing properties
# (requires parsing and reconstructing entire JSON each time)

4. RPC Error Handling

async def safe_rpc_call(local, dest, method, payload):
    """RPC call with comprehensive error handling."""
    try:
        return await local.perform_rpc(
            destination_identity=dest,
            method=method,
            payload=payload,
            response_timeout=5.0
        )
    except RpcError as e:
        if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
            print("Participant left room")
        elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
            print("Participant didn't respond")
        elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
            print("Method not implemented")
        else:
            print(f"RPC error: {e.code} - {e.message}")
        return None

5. Data Publishing Patterns

# For critical data: Use reliable delivery
await local.publish_data(
    json.dumps({"command": "start_recording"}),
    reliable=True,
    topic="commands"
)

# For frequent updates: Use lossy delivery
await local.publish_data(
    json.dumps({"x": 100, "y": 200}),
    reliable=False,
    topic="cursor-position"
)

# For large data: Use streaming
writer = await local.stream_bytes(name="file.pdf")
# ... write chunks
await writer.aclose()

See Also

  • Room and Connection Management - Room lifecycle and events
  • Audio Tracks - Publishing and managing audio tracks
  • Video Tracks - Publishing and managing video tracks
  • Track Publications - Track publication details
  • Data Streaming - Streaming data between participants
  • RPC - Remote procedure calls in detail
  • Transcription - Transcription support