tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Participants represent users in a LiveKit room. The SDK provides LocalParticipant for the current user and RemoteParticipant for other users. Both inherit from the abstract Participant base class.
Key concepts:
from livekit import (
Participant,
LocalParticipant,
RemoteParticipant,
ParticipantKind,
DisconnectReason,
ParticipantTrackPermission,
)class Participant(ABC):
"""Base class for local and remote participants.
This abstract class defines common properties and behavior
shared by both LocalParticipant and RemoteParticipant.
Cannot be instantiated directly - use LocalParticipant or RemoteParticipant.
"""
def __init__(
self,
owned_info: proto_participant.OwnedParticipant
) -> None:
"""Initialize a Participant.
Args:
owned_info: Internal participant information from FFI
Note:
This constructor is for internal use.
Participants are created by the SDK, not by application code.
"""@property
@abstractmethod
def track_publications(self) -> Mapping[str, TrackPublication]:
"""Track publications associated with this participant.
Returns:
Dictionary mapping track SID (string) to TrackPublication
For LocalParticipant: Maps to LocalTrackPublication
For RemoteParticipant: Maps to RemoteTrackPublication
Note:
This is a read-only mapping.
Track SIDs are unique within a room session.
Publications contain metadata about tracks (name, kind, dimensions, etc.).
Example:
>>> for sid, pub in participant.track_publications.items():
... print(f"{sid}: {pub.name} ({pub.kind})")
"""
@property
def sid(self) -> str:
"""Session ID of the participant.
Returns:
str: Unique session identifier assigned by server
Note:
Session ID is unique per room session.
Changes if participant rejoins room.
Different from identity (which persists across sessions).
Format: "PA_" followed by random string.
"""
@property
def name(self) -> str:
"""Display name of the participant.
Returns:
str: Participant's display name (may be empty)
Note:
Display name can be updated via set_name() (local participant only).
Not required to be unique (unlike identity).
May be empty string if not set.
Typically shown in UI.
"""
@property
def identity(self) -> str:
"""Identity of the participant.
Returns:
str: Unique participant identity
Note:
Identity is set via JWT token and cannot be changed.
Guaranteed to be unique within a room.
Persists across reconnections and sessions.
Use this as the primary identifier, not name.
Format: Arbitrary string (typically username, user ID, etc.).
"""
@property
def metadata(self) -> str:
"""Metadata associated with the participant.
Returns:
str: JSON string or arbitrary metadata (may be empty)
Note:
Metadata can be updated via set_metadata() (local participant only).
Typically JSON for structured data.
Empty string if not set.
Broadcast to all participants when changed.
Max recommended size: 4KB.
"""
@property
def attributes(self) -> dict[str, str]:
"""Custom attributes associated with the participant.
Returns:
dict[str, str]: Dictionary of key-value attributes
Note:
Attributes can be updated via set_attributes() (local participant only).
More structured than metadata field.
Keys and values are strings.
Attributes are merged, not replaced.
To remove attribute, set to empty string.
Example:
>>> attrs = participant.attributes
>>> print(f"Role: {attrs.get('role', 'guest')}")
>>> print(f"Team: {attrs.get('team', 'none')}")
"""
@property
def kind(self) -> proto_participant.ParticipantKind.ValueType:
"""Participant's kind (type).
Returns:
ParticipantKind enum value:
- STANDARD (0): Regular participant (human user)
- INGRESS (1): Ingress participant (RTMP, WHIP stream)
- EGRESS (2): Egress participant (recording, streaming out)
- SIP (3): SIP participant (phone call)
- AGENT (4): AI agent participant
Note:
Participant kind is set by server and cannot be changed.
Useful for distinguishing between human users and bots/streams.
Most participants are STANDARD.
"""
@property
def disconnect_reason(
self
) -> Optional[proto_participant.DisconnectReason.ValueType]:
"""Reason for disconnection if participant has disconnected.
Returns:
DisconnectReason enum value or None if not disconnected
Possible values:
- CLIENT_INITIATED (1): User disconnected voluntarily
- DUPLICATE_IDENTITY (2): Rejoined from another device
- SERVER_SHUTDOWN (3): Server shutting down
- PARTICIPANT_REMOVED (4): Kicked by moderator
- ROOM_DELETED (5): Room was deleted
- etc.
Note:
Only available after participant has disconnected.
Returns None if participant is still connected.
Useful for understanding why participant left.
"""class LocalParticipant(Participant):
"""Represents the local participant in a room.
The LocalParticipant provides methods to publish tracks,
send data, perform RPC calls, and update local metadata.
Accessible via room.local_participant.
"""
def __init__(
self,
room_queue: BroadcastQueue[proto_ffi.FfiEvent],
owned_info: proto_participant.OwnedParticipant
) -> None:
"""Initialize a LocalParticipant.
Args:
room_queue: Internal event queue
owned_info: Internal participant information
Note:
This constructor is for internal use.
Access local participant via room.local_participant.
"""@property
def track_publications(self) -> Mapping[str, LocalTrackPublication]:
"""Dictionary of local track publications.
Returns:
Mapping[str, LocalTrackPublication]: Maps track SID to publication
Note:
Read-only mapping.
Updated automatically when tracks are published/unpublished.
Contains metadata for all published tracks.
Example:
>>> for sid, pub in local.track_publications.items():
... print(f"{pub.name}: subscribed={pub.subscribed}")
... await pub.wait_for_subscription()
"""async def publish_track(
self,
track: LocalTrack,
options: TrackPublishOptions = TrackPublishOptions()
) -> LocalTrackPublication:
"""Publish a local track to the room.
Args:
track: The local track to publish (LocalAudioTrack or LocalVideoTrack)
Track must be created from AudioSource or VideoSource
options: Publishing options (encoding, simulcast, etc.)
Default: TrackPublishOptions() with default settings
Returns:
LocalTrackPublication: Publication for the published track
Contains metadata and can be used to wait for subscription
Raises:
RuntimeError: If not connected to room
ValueError: If track is invalid or already published
Exception: If publishing fails (network error, permission denied)
Example:
>>> # Audio track
>>> source = AudioSource(48000, 1)
>>> track = LocalAudioTrack.create_audio_track("mic", source)
>>> options = TrackPublishOptions()
>>> options.source = TrackSource.SOURCE_MICROPHONE
>>> publication = await local.publish_track(track, options)
>>> print(f"Published: {publication.sid}")
>>>
>>> # Video track with encoding
>>> source = VideoSource(1920, 1080)
>>> track = LocalVideoTrack.create_video_track("camera", source)
>>> options = TrackPublishOptions()
>>> options.video_codec = VideoCodec.VP8
>>> options.simulcast = True
>>> encoding = VideoEncoding()
>>> encoding.max_bitrate = 2_000_000
>>> encoding.max_framerate = 30.0
>>> options.video_encoding = encoding
>>> publication = await local.publish_track(track, options)
Note:
Track must be created before publishing.
Publishing is async and may take a few hundred ms.
Track will be available to other participants after publishing.
If auto_subscribe=True, others will automatically subscribe.
Track SID is assigned by server after publishing.
Use publication.wait_for_subscription() to confirm delivery.
Performance:
Initial publish: ~100-500ms depending on network
Video encoding setup may take longer first time
"""
async def unpublish_track(self, track_sid: str) -> None:
"""Unpublish a track from the room.
Args:
track_sid: Session ID of the track to unpublish
Format: "TR_" followed by random string
Get from publication.sid or track.sid
Returns:
None (awaitable)
Raises:
RuntimeError: If not connected to room
ValueError: If track SID is invalid or track not published
KeyError: If track SID not found in publications
Example:
>>> # Unpublish specific track
>>> await local.unpublish_track("TR_XXXXX")
>>>
>>> # Unpublish all tracks
>>> for sid in list(local.track_publications.keys()):
... await local.unpublish_track(sid)
Note:
Unpublishing is immediate.
Other participants will receive track_unsubscribed event.
Track streams will end for subscribers.
After unpublishing, track and source are still valid
and can be republished if needed.
Remember to clean up source if no longer needed:
>>> await source.aclose()
"""async def publish_data(
self,
payload: Union[bytes, str],
*,
reliable: bool = True,
destination_identities: List[str] = [],
topic: str = ""
) -> None:
"""Publish arbitrary data to the room.
Args:
payload: Data to publish
Type: bytes or str (str will be encoded as UTF-8)
Max size: ~256KB (SCTP message limit)
reliable: Whether to send reliably or lossy
Type: bool
Default: True (reliable delivery)
True: TCP-like guaranteed delivery, ordered (KIND_RELIABLE)
False: UDP-like best-effort delivery, lower latency (KIND_LOSSY)
destination_identities: List of participant identities to send to
Type: List[str]
Default: [] (broadcast to all)
Empty list sends to all participants
Specify identities for targeted delivery
topic: Optional topic for categorizing data
Type: str
Default: "" (no topic)
Max length: 256 characters
Used for filtering on receiver side
Returns:
None (awaitable)
Raises:
RuntimeError: If not connected to room
ValueError: If payload exceeds size limit or destinations invalid
Exception: If send fails (network error)
Example:
>>> # Send to all participants (reliable)
>>> await local.publish_data(b"Hello everyone", reliable=True)
>>>
>>> # Send text to specific participant (lossy for low latency)
>>> await local.publish_data(
... "Quick update",
... reliable=False,
... destination_identities=["user123"]
... )
>>>
>>> # Send with topic
>>> import json
>>> message = json.dumps({"type": "chat", "text": "Hello"})
>>> await local.publish_data(
... message,
... topic="chat"
... )
>>>
>>> # Binary data
>>> image_bytes = load_image()
>>> await local.publish_data(
... image_bytes,
... reliable=True,
... topic="images"
... )
Note:
Reliable delivery (TCP-like):
- Guaranteed arrival and order
- Retransmission on packet loss
- Higher latency on poor networks
- Use for critical data (commands, state updates)
Lossy delivery (UDP-like):
- Best-effort, may drop packets
- Lower latency
- No retransmission
- Use for non-critical data (position updates, ephemeral state)
For large data (>256KB), use data streaming instead:
>>> writer = await local.stream_bytes(name="file.pdf")
>>> await writer.write(chunk1)
>>> await writer.write(chunk2)
>>> await writer.aclose()
"""
async def publish_dtmf(self, *, code: int, digit: str) -> None:
"""Publish SIP DTMF message.
Args:
code: DTMF code
Type: int
Range: 0-15
0-9: Digits 0-9
10: * (star)
11: # (pound)
12-15: A-D (extended DTMF)
digit: DTMF digit
Type: str
Single character: "0"-"9", "*", "#", "A"-"D"
Must match code
Returns:
None (awaitable)
Raises:
RuntimeError: If not connected to room
ValueError: If code/digit invalid or mismatched
Example:
>>> # Send digit 1
>>> await local.publish_dtmf(code=1, digit="1")
>>>
>>> # Send star
>>> await local.publish_dtmf(code=10, digit="*")
>>>
>>> # Send pound
>>> await local.publish_dtmf(code=11, digit="#")
Note:
Used for SIP integration (phone calls).
DTMF signals are received by SIP gateway.
Useful for phone menu navigation, PIN entry, etc.
"""
async def publish_transcription(
self,
transcription: Transcription
) -> None:
"""Publish transcription data to the room.
Args:
transcription: Transcription data to publish
Contains participant_identity, track_sid, and segments
Returns:
None (awaitable)
Raises:
RuntimeError: If not connected to room
ValueError: If transcription data is invalid
Example:
>>> from livekit import Transcription, TranscriptionSegment
>>>
>>> # Create transcription
>>> transcription = Transcription(
... participant_identity=local.identity,
... track_sid="TR_XXXXX",
... segments=[
... TranscriptionSegment(
... id="seg1",
... text="Hello world",
... start_time=0, # Milliseconds
... end_time=1000, # Milliseconds
... language="en", # BCP 47 language code
... final=True # True for final, False for interim
... )
... ]
... )
>>>
>>> # Publish
>>> await local.publish_transcription(transcription)
Note:
Transcription is typically generated by AI transcription service.
Can publish interim results (final=False) for low latency.
Final transcriptions (final=True) are more accurate.
Other participants receive via 'transcription_received' event.
"""async def stream_text(
self,
*,
destination_identities: Optional[List[str]] = None,
topic: str = "",
attributes: Optional[Dict[str, str]] = None,
stream_id: str | None = None,
reply_to_id: str | None = None,
total_size: int | None = None,
sender_identity: str | None = None
) -> TextStreamWriter:
"""Create a text stream writer for streaming text chunks.
Args:
destination_identities: List of participant identities to send to
Default: None (broadcast to all)
topic: Topic for the stream
Default: "" (no topic)
Used for routing on receiver side
attributes: Key-value attributes for the stream
Type: Dict[str, str] | None
Default: None
Metadata about the stream
stream_id: Custom stream ID
Type: str | None
Default: None (auto-generated)
Useful for tracking or replying
reply_to_id: ID of stream this is replying to
Type: str | None
Default: None
For threaded conversations
total_size: Total size in bytes if known
Type: int | None
Default: None
For progress tracking
sender_identity: Override sender identity
Type: str | None
Default: None (uses local participant identity)
Advanced use only
Returns:
TextStreamWriter: Writer for writing text chunks
Raises:
RuntimeError: If not connected to room
ValueError: If parameters invalid
Example:
>>> # Simple text stream
>>> writer = await local.stream_text(topic="chat")
>>> await writer.write("Hello ")
>>> await writer.write("world!")
>>> await writer.aclose()
>>>
>>> # Stream with metadata
>>> writer = await local.stream_text(
... topic="chat",
... attributes={"priority": "high", "sender_name": "Alice"}
... )
>>> await writer.write("Important message")
>>> await writer.aclose()
>>>
>>> # Reply to previous stream
>>> writer = await local.stream_text(
... topic="chat",
... reply_to_id="previous_stream_id"
... )
>>> await writer.write("Thanks for the message!")
>>> await writer.aclose()
Note:
Text is automatically encoded as UTF-8.
Writer must be closed with aclose() to signal end of stream.
Chunks are sent as they're written (not buffered).
For complete messages, use send_text() convenience method:
>>> info = await local.send_text("Quick message", topic="chat")
"""
async def send_text(
self,
text: str,
*,
destination_identities: Optional[List[str]] = None,
topic: str = "",
attributes: Optional[Dict[str, str]] = None,
reply_to_id: str | None = None
) -> TextStreamInfo:
"""Send a complete text message in one call.
Args:
text: Complete text message to send
Type: str
Encoded as UTF-8
destination_identities: List of participant identities to send to
Default: None (broadcast to all)
topic: Topic for the message
Default: ""
attributes: Key-value attributes
Default: None
reply_to_id: ID of stream this is replying to
Default: None
Returns:
TextStreamInfo: Information about the sent stream
Contains: stream_id, mime_type, topic, timestamp, size, attributes
Raises:
RuntimeError: If not connected to room
ValueError: If parameters invalid
Example:
>>> # Simple message
>>> info = await local.send_text("Hello everyone!")
>>> print(f"Sent stream: {info.stream_id}")
>>>
>>> # Message to specific participant
>>> info = await local.send_text(
... "Private message",
... destination_identities=["user123"],
... topic="private-chat"
... )
>>>
>>> # Message with attributes
>>> info = await local.send_text(
... "Announcement",
... topic="announcements",
... attributes={"priority": "high"}
... )
Note:
Convenience method for short messages.
For long messages or streaming, use stream_text().
Message is sent immediately (not chunked).
"""
async def stream_bytes(
self,
name: str,
*,
total_size: int | None = None,
mime_type: str = "application/octet-stream",
attributes: Optional[Dict[str, str]] = None,
stream_id: str | None = None,
destination_identities: Optional[List[str]] = None,
topic: str = ""
) -> ByteStreamWriter:
"""Create a byte stream writer for streaming byte chunks.
Args:
name: Name of the byte stream (e.g., filename)
Type: str
Required
Displayed to receiver
total_size: Total size in bytes if known
Type: int | None
Default: None
For progress tracking
mime_type: MIME type of the data
Type: str
Default: "application/octet-stream"
Examples: "image/jpeg", "application/pdf", "video/mp4"
attributes: Key-value attributes for the stream
Default: None
stream_id: Custom stream ID
Default: None (auto-generated)
destination_identities: List of participant identities to send to
Default: None (broadcast to all)
topic: Topic for the stream
Default: ""
Returns:
ByteStreamWriter: Writer for writing byte chunks
Raises:
RuntimeError: If not connected to room
ValueError: If parameters invalid
Example:
>>> # Stream file in chunks
>>> writer = await local.stream_bytes(
... name="document.pdf",
... mime_type="application/pdf",
... total_size=102400 # 100 KB
... )
>>>
>>> # Read and send in chunks
>>> with open("document.pdf", "rb") as f:
... while chunk := f.read(16000):
... await writer.write(chunk)
>>>
>>> await writer.aclose()
>>>
>>> # Stream image
>>> writer = await local.stream_bytes(
... name="photo.jpg",
... mime_type="image/jpeg",
... topic="images"
... )
>>> await writer.write(image_data)
>>> await writer.aclose()
Note:
Chunks are sent as written (streaming, not buffered).
total_size enables progress tracking on receiver.
Writer must be closed to signal end of stream.
Chunk size recommendation: 15KB for optimal performance.
Smaller chunks: Higher overhead
Larger chunks: Higher latency
For complete files, use send_file() convenience method:
>>> info = await local.send_file("/path/to/file.pdf")
"""
async def send_file(
self,
file_path: str,
*,
topic: str = "",
destination_identities: Optional[List[str]] = None,
attributes: Optional[Dict[str, str]] = None,
stream_id: str | None = None
) -> ByteStreamInfo:
"""Send a file from the filesystem.
Args:
file_path: Path to the file to send
Type: str
Must exist and be readable
topic: Topic for the file transfer
Default: ""
destination_identities: List of participant identities to send to
Default: None (broadcast to all)
attributes: Key-value attributes for the transfer
Default: None
stream_id: Custom stream ID
Default: None (auto-generated)
Returns:
ByteStreamInfo: Information about the sent stream
Contains: stream_id, name (filename), mime_type, size, etc.
Raises:
RuntimeError: If not connected to room
FileNotFoundError: If file doesn't exist
PermissionError: If file not readable
ValueError: If parameters invalid
Example:
>>> # Send file
>>> info = await local.send_file(
... "/path/to/document.pdf",
... topic="file-sharing"
... )
>>> print(f"Sent file: {info.name}, size: {info.size}")
>>>
>>> # Send to specific participant
>>> info = await local.send_file(
... "/path/to/image.jpg",
... destination_identities=["user123"],
... topic="private-files"
... )
>>>
>>> # Send with attributes
>>> info = await local.send_file(
... "/path/to/data.csv",
... topic="data-transfer",
... attributes={"description": "Sales report Q4"}
... )
Note:
Convenience method that:
1. Opens file
2. Determines MIME type from extension
3. Streams file in chunks
4. Closes stream automatically
For large files (>100MB), consider showing progress:
>>> # Use stream_bytes() instead for progress tracking
>>> file_size = os.path.getsize(file_path)
>>> writer = await local.stream_bytes(
... name=os.path.basename(file_path),
... total_size=file_size
... )
>>> with open(file_path, "rb") as f:
... while chunk := f.read(16000):
... await writer.write(chunk)
... progress = (f.tell() / file_size) * 100
... print(f"Progress: {progress:.1f}%")
>>> await writer.aclose()
"""async def perform_rpc(
self,
*,
destination_identity: str,
method: str,
payload: str,
response_timeout: Optional[float] = None
) -> str:
"""Initiate an RPC call to a remote participant.
Args:
destination_identity: Identity of the destination participant
Type: str
Must be connected to room
method: Name of the method to call
Type: str
Must be registered by recipient
payload: Method payload
Type: str
Typically JSON for structured data
Max size: ~256KB
response_timeout: Timeout in seconds for receiving response
Type: float | None
Default: None (uses default timeout, typically 10s)
Minimum: 1.0 second
Maximum: 60.0 seconds recommended
Returns:
str: Response payload string from the remote handler
Typically JSON string
Raises:
RpcError: If RPC call fails
Common error codes:
- RECIPIENT_NOT_FOUND: Participant not in room
- UNSUPPORTED_METHOD: Method not registered
- RESPONSE_TIMEOUT: No response within timeout
- CONNECTION_TIMEOUT: Connection lost during call
- APPLICATION_ERROR: Handler raised error
- REQUEST_PAYLOAD_TOO_LARGE: Payload exceeds limit
- RESPONSE_PAYLOAD_TOO_LARGE: Response exceeds limit
RuntimeError: If not connected to room
Example:
>>> # Simple RPC call
>>> import json
>>>
>>> try:
... result = await local.perform_rpc(
... destination_identity="agent-123",
... method="calculate",
... payload=json.dumps({"a": 5, "b": 3}),
... response_timeout=5.0
... )
... data = json.loads(result)
... print(f"Result: {data}") # {"result": 8}
... except RpcError as e:
... print(f"RPC failed: {e.code} - {e.message}")
>>>
>>> # Error handling
>>> try:
... result = await local.perform_rpc(
... destination_identity="user-123",
... method="get_status",
... payload="{}"
... )
... except RpcError as e:
... if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
... print("Participant not in room")
... elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
... print("Method not implemented by participant")
... elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
... print("Participant didn't respond in time")
... else:
... print(f"Unknown error: {e.code}")
Note:
RPC is request-response pattern (not fire-and-forget).
Blocks until response received or timeout.
Payload should be JSON for type safety:
>>> # Good: Structured data
>>> payload = json.dumps({
... "action": "calculate",
... "params": {"a": 5, "b": 3}
... })
>>>
>>> # Bad: Unstructured string
>>> payload = "5+3" # Handler can't reliably parse
For fire-and-forget, use publish_data():
>>> await local.publish_data("command", topic="commands")
"""
def register_rpc_method(
self,
method_name: str,
handler: Optional[RpcHandler] = None
) -> Union[RpcHandler, Callable[[RpcHandler], RpcHandler]]:
"""Register an RPC method handler.
Can be used as a decorator or with a callback function.
Args:
method_name: Name of the RPC method to handle
Type: str
Case-sensitive
Should be descriptive (e.g., "calculate", "get_status")
handler: Handler function (optional for decorator usage)
Type: RpcHandler | None
Signature: async def handler(data: RpcInvocationData) -> str
Can be sync or async function
Returns:
Handler function or decorator
Raises:
ValueError: If method already registered
TypeError: If handler not callable
Example:
>>> # Decorator usage
>>> @local.register_rpc_method("calculate")
... async def handle_calculate(data: RpcInvocationData) -> str:
... import json
...
... print(f"Called by: {data.caller_identity}")
... print(f"Request ID: {data.request_id}")
... print(f"Timeout: {data.response_timeout}")
...
... # Parse payload
... params = json.loads(data.payload)
...
... # Perform calculation
... result = params["a"] + params["b"]
...
... # Return response
... return json.dumps({"result": result})
>>>
>>> # Direct registration
>>> async def my_handler(data: RpcInvocationData) -> str:
... return json.dumps({"status": "ok"})
>>> local.register_rpc_method("status", my_handler)
>>>
>>> # Error handling in handler
>>> @local.register_rpc_method("divide")
... async def handle_divide(data: RpcInvocationData) -> str:
... import json
... from livekit import RpcError
...
... params = json.loads(data.payload)
... if params["b"] == 0:
... raise RpcError(
... code=RpcError.ErrorCode.APPLICATION_ERROR,
... message="Division by zero",
... data=json.dumps({"param": "b", "value": 0})
... )
...
... result = params["a"] / params["b"]
... return json.dumps({"result": result})
Note:
Handler can be sync or async function.
Handler must return string (typically JSON).
Handler exceptions are caught and returned as RpcError.
Handler receives RpcInvocationData with:
- caller_identity: Who called (string)
- payload: Request payload (string)
- request_id: Unique request ID (string)
- response_timeout: Timeout in seconds (float)
Multiple methods can be registered:
>>> @local.register_rpc_method("method1")
... async def handler1(data):
... return "response1"
>>>
>>> @local.register_rpc_method("method2")
... async def handler2(data):
... return "response2"
"""
def unregister_rpc_method(self, method: str) -> None:
"""Unregister a previously registered RPC method.
Args:
method: Name of the RPC method to unregister
Type: str
Must be previously registered
Returns:
None
Raises:
KeyError: If method not registered
Example:
>>> # Register method
>>> @local.register_rpc_method("calculate")
... async def handler(data):
... return "result"
>>>
>>> # Later, unregister
>>> local.unregister_rpc_method("calculate")
Note:
Unregistering takes effect immediately.
Future calls to this method will fail with UNSUPPORTED_METHOD.
Calls in progress are not affected.
"""def set_track_subscription_permissions(
self,
*,
allow_all_participants: bool,
participant_permissions: Optional[List[ParticipantTrackPermission]] = None
) -> None:
"""Set track subscription permissions.
Control which participants can subscribe to the local participant's tracks.
Args:
allow_all_participants: Whether to allow all participants to subscribe
Type: bool
True: All participants can subscribe
False: Only specified participants can subscribe
participant_permissions: Per-participant track permissions
Type: List[ParticipantTrackPermission] | None
Default: None
Each permission specifies:
- participant_identity: Participant ID
- allow_all: Allow all tracks
- allowed_track_sids: Specific track SIDs
Returns:
None
Raises:
RuntimeError: If not connected to room
ValueError: If permissions invalid
Example:
>>> from livekit import ParticipantTrackPermission
>>>
>>> # Allow only specific participants to subscribe
>>> local.set_track_subscription_permissions(
... allow_all_participants=False,
... participant_permissions=[
... # Allow all tracks for user123
... ParticipantTrackPermission(
... participant_identity="user123",
... allow_all=True,
... allowed_track_sids=[]
... ),
... # Allow specific tracks for user456
... ParticipantTrackPermission(
... participant_identity="user456",
... allow_all=False,
... allowed_track_sids=["TR_XXXXX", "TR_YYYYY"]
... )
... ]
... )
>>>
>>> # Allow all participants
>>> local.set_track_subscription_permissions(
... allow_all_participants=True
... )
Note:
Permissions are enforced server-side.
Changes take effect immediately.
Existing subscriptions are not affected (not revoked).
Only prevents new subscriptions.
Use cases:
- Private conversations
- Selective screen sharing
- Role-based access control
- Presenter-only video
"""async def set_metadata(self, metadata: str) -> None:
"""Set metadata for the local participant.
Requires canUpdateOwnMetadata permission in token.
Args:
metadata: New metadata
Type: str
Typically JSON string
Max recommended size: 4KB
Empty string to clear
Returns:
None (awaitable)
Raises:
RuntimeError: If not connected to room
PermissionError: If token lacks canUpdateOwnMetadata permission
ValueError: If metadata exceeds size limit
Example:
>>> import json
>>>
>>> # Set structured metadata
>>> await local.set_metadata(json.dumps({
... "status": "available",
... "mood": "happy",
... "last_active": "2024-01-01T00:00:00Z"
... }))
>>>
>>> # Update status
>>> metadata = json.loads(local.metadata) if local.metadata else {}
>>> metadata["status"] = "away"
>>> await local.set_metadata(json.dumps(metadata))
>>>
>>> # Clear metadata
>>> await local.set_metadata("")
Note:
Metadata is broadcast to all participants.
Triggers 'participant_metadata_changed' event for all.
Updates replace entire metadata (not merged).
For structured updates, use attributes instead:
>>> await local.set_attributes({"status": "away"})
Metadata vs Attributes:
- Metadata: Single string, less structured
- Attributes: Key-value pairs, more structured
"""
async def set_name(self, name: str) -> None:
"""Set display name for the local participant.
Requires canUpdateOwnMetadata permission in token.
Args:
name: New display name
Type: str
Max recommended length: 256 characters
Can be empty string
Returns:
None (awaitable)
Raises:
RuntimeError: If not connected to room
PermissionError: If token lacks canUpdateOwnMetadata permission
ValueError: If name exceeds length limit
Example:
>>> # Set name
>>> await local.set_name("John Doe")
>>>
>>> # Update name
>>> await local.set_name("John Doe (Host)")
>>>
>>> # Clear name
>>> await local.set_name("")
Note:
Name is broadcast to all participants.
Triggers 'participant_name_changed' event for all.
Name is separate from identity (identity never changes).
Identity vs Name:
- Identity: Unique, immutable, from token
- Name: Non-unique, mutable, display-friendly
"""
async def set_attributes(self, attributes: dict[str, str]) -> None:
"""Set custom attributes for the local participant.
Requires canUpdateOwnMetadata permission in token.
Args:
attributes: Dictionary of key-value attributes
Type: dict[str, str]
Keys and values must be strings
Attributes are merged (not replaced)
To remove attribute, set to empty string
Returns:
None (awaitable)
Raises:
RuntimeError: If not connected to room
PermissionError: If token lacks canUpdateOwnMetadata permission
TypeError: If keys/values not strings
ValueError: If attributes exceed size limit
Example:
>>> # Set attributes
>>> await local.set_attributes({
... "role": "moderator",
... "team": "red",
... "level": "5"
... })
>>>
>>> # Update single attribute (others preserved)
>>> await local.set_attributes({
... "status": "speaking"
... })
>>> # Now has: role, team, level, status
>>>
>>> # Remove attribute
>>> await local.set_attributes({
... "status": "" # Removes status
... })
Note:
Attributes are broadcast to all participants.
Triggers 'participant_attributes_changed' event.
Only changed attributes are included in event.
Attributes are merged, not replaced:
>>> # First call
>>> await local.set_attributes({"a": "1", "b": "2"})
>>> # Second call
>>> await local.set_attributes({"b": "3", "c": "4"})
>>> # Result: {"a": "1", "b": "3", "c": "4"}
To clear all attributes:
>>> await local.set_attributes({
... key: "" for key in local.attributes.keys()
... })
"""class RemoteParticipant(Participant):
"""Represents a remote participant in a room.
RemoteParticipant provides access to track publications
from other participants in the room.
All properties are read-only.
Cannot publish tracks or send data (use LocalParticipant for that).
"""
def __init__(
self,
owned_info: proto_participant.OwnedParticipant
) -> None:
"""Initialize a RemoteParticipant.
Args:
owned_info: Internal participant information
Note:
This constructor is for internal use.
Remote participants are created automatically by SDK.
Access via room.remote_participants.
"""@property
def track_publications(self) -> Mapping[str, RemoteTrackPublication]:
"""Dictionary of remote track publications.
Returns:
Mapping[str, RemoteTrackPublication]: Maps track SID to publication
Note:
Read-only mapping.
Contains all tracks published by this participant.
Use publication.set_subscribed(True) to subscribe.
Example:
>>> for sid, pub in participant.track_publications.items():
... print(f"{pub.name}: subscribed={pub.subscribed}")
... if not pub.subscribed:
... pub.set_subscribed(True)
"""RpcHandler = Callable[
[RpcInvocationData],
Union[Awaitable[Optional[str]], Optional[str]]
]
"""Type for RPC method handlers.
Handlers can be sync or async functions that receive RpcInvocationData
and return an optional string response.
Handler signature:
- Input: RpcInvocationData
- caller_identity: str
- payload: str
- request_id: str
- response_timeout: float
- Output: str | None (for async: Awaitable[str | None])
Example:
>>> async def handler(data: RpcInvocationData) -> str:
... # Process request
... return "response"
>>>
>>> def sync_handler(data: RpcInvocationData) -> str:
... return "response"
"""import asyncio
import json
from livekit import (
Room,
LocalParticipant,
RemoteParticipant,
RpcInvocationData,
RpcError,
AudioSource,
LocalAudioTrack,
TrackPublishOptions,
ParticipantKind,
)
async def main():
room = Room()
# Handle new participants
@room.on("participant_connected")
def on_participant_connected(participant: RemoteParticipant):
print(f"Participant joined: {participant.identity}")
print(f" Name: {participant.name}")
print(f" Metadata: {participant.metadata}")
print(f" Kind: {participant.kind}")
print(f" Attributes: {participant.attributes}")
# Access their tracks
for track_sid, publication in participant.track_publications.items():
print(f" Track: {publication.name} ({track_sid})")
# Handle participant metadata changes
@room.on("participant_metadata_changed")
def on_metadata_changed(
participant,
old_metadata: str,
new_metadata: str
):
print(f"{participant.identity} metadata changed:")
print(f" Old: {old_metadata}")
print(f" New: {new_metadata}")
# Connect to room
await room.connect(url, token)
# Access local participant
local: LocalParticipant = room.local_participant
print(f"Local identity: {local.identity}")
# Update local metadata
await local.set_metadata(json.dumps({
"status": "active",
"version": "1.0"
}))
await local.set_name("My Display Name")
await local.set_attributes({
"role": "presenter",
"team": "blue"
})
# Publish audio track
source = AudioSource(sample_rate=48000, num_channels=1)
track = LocalAudioTrack.create_audio_track("microphone", source)
publication = await local.publish_track(track, TrackPublishOptions())
print(f"Published track: {publication.sid}")
# Register RPC handler
@local.register_rpc_method("get_status")
async def handle_get_status(data: RpcInvocationData) -> str:
print(f"RPC call from {data.caller_identity}")
return json.dumps({
"status": "healthy",
"uptime": 3600
})
# Send data to all participants
await local.publish_data(
json.dumps({"type": "announcement", "text": "Hello everyone"}),
topic="announcements"
)
# Send data to specific participant
if len(room.remote_participants) > 0:
first_participant = next(iter(room.remote_participants.values()))
await local.publish_data(
"Private message",
destination_identities=[first_participant.identity],
topic="private"
)
# Perform RPC call
try:
response = await local.perform_rpc(
destination_identity=first_participant.identity,
method="echo",
payload="test message",
response_timeout=5.0
)
print(f"RPC response: {response}")
except RpcError as e:
print(f"RPC failed: {e.code} - {e.message}")
# Stream text
writer = await local.stream_text(topic="chat")
await writer.write("Hello ")
await writer.write("from ")
await writer.write("stream!")
await writer.aclose()
# Send file
file_info = await local.send_file(
"/path/to/document.pdf",
topic="file-sharing"
)
print(f"Sent file: {file_info.name}")
# Set track subscription permissions
from livekit import ParticipantTrackPermission
local.set_track_subscription_permissions(
allow_all_participants=False,
participant_permissions=[
ParticipantTrackPermission(
participant_identity=first_participant.identity,
allow_all=True,
allowed_track_sids=[]
)
]
)
# Keep running
await asyncio.sleep(60)
# Cleanup
await local.unpublish_track(track.sid)
await room.disconnect()
if __name__ == "__main__":
asyncio.run(main())# DO: Use identity for identification
participant_map = {p.identity: p for p in room.remote_participants.values()}
target = participant_map.get("user123")
# DON'T: Use name for identification (not unique)
# target = [p for p in room.remote_participants.values() if p.name == "John"][0]import json
# DO: Use JSON for structured metadata
await local.set_metadata(json.dumps({
"status": "available",
"preferences": {"theme": "dark"},
"timestamp": "2024-01-01T00:00:00Z"
}))
# Later, parse safely
try:
metadata = json.loads(participant.metadata) if participant.metadata else {}
status = metadata.get("status", "unknown")
except json.JSONDecodeError:
status = "unknown"
# DON'T: Use unstructured strings
# await local.set_metadata("status:available,theme:dark")# DO: Use attributes for frequently changing properties
await local.set_attributes({"status": "speaking"})
await local.set_attributes({"status": "idle"}) # Only updates status
# DON'T: Use metadata for frequently changing properties
# (requires parsing and reconstructing entire JSON each time)async def safe_rpc_call(local, dest, method, payload):
"""RPC call with comprehensive error handling."""
try:
return await local.perform_rpc(
destination_identity=dest,
method=method,
payload=payload,
response_timeout=5.0
)
except RpcError as e:
if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
print("Participant left room")
elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
print("Participant didn't respond")
elif e.code == RpcError.ErrorCode.UNSUPPORTED_METHOD:
print("Method not implemented")
else:
print(f"RPC error: {e.code} - {e.message}")
return None# For critical data: Use reliable delivery
await local.publish_data(
json.dumps({"command": "start_recording"}),
reliable=True,
topic="commands"
)
# For frequent updates: Use lossy delivery
await local.publish_data(
json.dumps({"x": 100, "y": 200}),
reliable=False,
topic="cursor-position"
)
# For large data: Use streaming
writer = await local.stream_bytes(name="file.pdf")
# ... write chunks
await writer.aclose()