tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
The Room class is the central entry point for LiveKit RTC functionality. It manages WebRTC connections, signaling, participant lifecycle, and serves as the event hub for all room-level activities.
Key characteristics:
EventEmitter inheritancefrom livekit import (
Room,
RoomOptions,
RtcConfiguration,
ConnectError,
DataPacket,
SipDTMF,
RtcStats,
)class Room(EventEmitter[EventTypes]):
"""Main class representing a LiveKit room.
The Room manages the WebRTC connection, tracks participants,
and provides event-driven access to room state changes.
Inherits from EventEmitter to provide event registration via:
- on(event, callback): Register event handler
- once(event, callback): Register one-time handler
- off(event, callback): Unregister handler
"""
def __init__(
self,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
"""Initialize a Room instance.
Args:
loop: Event loop to use. Defaults to current event loop.
If None, uses asyncio.get_event_loop()
Returns:
Room instance
Raises:
RuntimeError: If no event loop is available
Example:
>>> room = Room() # Uses current event loop
>>> room = Room(loop=my_loop) # Uses custom loop
Note:
The room can be reused after disconnection by calling connect() again.
Event handlers remain registered across reconnections.
"""@property
async def sid(self) -> str:
"""Session ID of the room (async property).
Returns:
str: Unique session identifier assigned by the server
Raises:
RuntimeError: If room is not connected
Note:
This is an async property and MUST be awaited:
>>> session_id = await room.sid
Session ID is only available after successful connection.
The session ID is unique per connection attempt.
"""
@property
def local_participant(self) -> LocalParticipant:
"""The local participant in the room.
Returns:
LocalParticipant: The local participant instance representing this client
Note:
Available immediately after Room creation, even before connection.
The LocalParticipant object persists across reconnections.
Use this to publish tracks, send data, and perform RPC calls.
"""
@property
def connection_state(self) -> ConnectionState.ValueType:
"""Current connection state of the room.
Returns:
ConnectionState.ValueType: One of:
- CONN_DISCONNECTED (0): Not connected
- CONN_CONNECTED (1): Fully connected
- CONN_RECONNECTING (2): Reconnecting after connection loss
Note:
Connection state changes trigger 'connection_state_changed' event.
State transitions:
- Initial: CONN_DISCONNECTED
- After connect(): CONN_CONNECTED
- On network failure: CONN_RECONNECTING
- After recovery: CONN_CONNECTED
- After disconnect(): CONN_DISCONNECTED
"""
@property
def remote_participants(self) -> Mapping[str, RemoteParticipant]:
"""Dictionary of remote participants by identity.
Returns:
Mapping[str, RemoteParticipant]: Read-only mapping of participant
identities (strings) to RemoteParticipant objects
Note:
This mapping updates automatically as participants join/leave.
Do NOT modify this dictionary directly.
To iterate safely, use .items(), .values(), or .keys().
Participant identities are unique within a room.
Example:
>>> for identity, participant in room.remote_participants.items():
... print(f"{identity}: {participant.name}")
"""
@property
def name(self) -> str:
"""Name of the room.
Returns:
str: Room name as defined on the server
Raises:
AttributeError: If accessed before connection (may return empty string)
Note:
Available after successful connection.
Room name is determined by the JWT token used to connect.
Room name persists across reconnections.
"""
@property
def metadata(self) -> str:
"""Metadata associated with the room.
Returns:
str: Room metadata (typically JSON string)
Note:
Room metadata can be updated by server-side code.
Metadata changes trigger 'room_metadata_changed' event.
Empty string if no metadata is set.
Client cannot directly update room metadata.
"""
@property
def e2ee_manager(self) -> E2EEManager:
"""End-to-end encryption manager for the room.
Returns:
E2EEManager: The E2EE manager instance
Note:
Available even if E2EE is not enabled.
Check e2ee_manager.enabled to determine if E2EE is active.
Use e2ee_manager.key_provider to manage encryption keys.
"""
@property
def num_participants(self) -> int:
"""Number of participants in the room (eventually consistent).
Returns:
int: Total participant count including local participant
Note:
This count is eventually consistent with the server.
May be slightly out of sync during participant join/leave.
For exact count, use len(remote_participants) + 1.
Includes all participant types (standard, agents, SIP, etc.).
"""
@property
def num_publishers(self) -> int:
"""Number of publishers in the room (eventually consistent).
Returns:
int: Count of participants currently publishing tracks
Note:
This count is eventually consistent with the server.
Publisher count may change as participants publish/unpublish tracks.
Does not distinguish between audio, video, or data publishers.
"""
@property
def creation_time(self) -> datetime.datetime:
"""Time when the room was created.
Returns:
datetime.datetime: Room creation timestamp in UTC
Raises:
AttributeError: If accessed before connection
Note:
Available after successful connection.
Represents server-side room creation time, not client connection time.
Useful for tracking room age or session duration.
"""
@property
def is_recording(self) -> bool:
"""Whether the room is actively being recorded.
Returns:
bool: True if recording is active, False otherwise
Note:
Recording status is managed server-side via API or dashboard.
Status changes trigger 'room_updated' event.
Recording state persists across participant joins/leaves.
"""
@property
def departure_timeout(self) -> float:
"""Time in seconds to hold room open after last participant leaves.
Returns:
float: Timeout in seconds (0 for immediate deletion)
Note:
This is a server-side configuration value.
Default is typically 30 seconds.
Cannot be changed from client side.
After timeout, room is automatically deleted.
"""
@property
def empty_timeout(self) -> float:
"""Time in seconds to keep room open if no participants join.
Returns:
float: Timeout in seconds (0 for no timeout)
Note:
This is a server-side configuration value.
Applies when room is created but no one joins.
Default is typically 300 seconds (5 minutes).
Cannot be changed from client side.
"""async def connect(
self,
url: str,
token: str,
options: RoomOptions = RoomOptions()
) -> None:
"""Connect to a LiveKit room.
Args:
url: WebSocket URL of the LiveKit server.
Must use wss:// protocol for secure connections.
Format: "wss://your-server.com" or "wss://your-server.com:port"
No trailing slash.
token: Access token for authentication.
JWT token generated using server-side SDK.
Token encodes: room name, participant identity, permissions, expiry.
Token format: "eyJhbGc..." (base64-encoded JWT)
options: Room connection options including:
- auto_subscribe: Automatically subscribe to tracks (default: True)
- dynacast: Enable dynamic casting (default: False)
- encryption: E2EE options (default: None)
- rtc_config: WebRTC configuration (default: None)
Returns:
None (awaitable)
Raises:
ConnectError: If connection fails due to:
- Invalid URL format (e.g., missing wss:// protocol)
- Network connectivity issues (server unreachable, DNS failure)
- Invalid or expired token (authentication failure)
- Permission denied (token lacks required permissions)
- Room not found (room name in token doesn't match)
- Server unavailable or overloaded
- WebSocket handshake failure
RuntimeError: If already connected (must disconnect first)
ValueError: If url or token is empty or None
Example:
>>> room = Room()
>>> await room.connect(
... "wss://my-server.livekit.cloud",
... "eyJhbGc...",
... RoomOptions(auto_subscribe=True)
... )
Note:
This method blocks until connection is established or fails.
Connection events are emitted during connection process:
- 'connected' when successfully connected
- 'disconnected' if connection fails
Connection timeout is typically 15 seconds.
Token should be generated server-side to avoid exposing secrets.
Token payload includes: roomName, participantName, permissions, expiry.
Error handling:
>>> try:
... await room.connect(url, token)
... except ConnectError as e:
... print(f"Failed: {e.message}")
... # Check: token validity, network, server status
"""
async def disconnect(self) -> None:
"""Disconnect from the room.
Cleanly closes the connection and cleans up resources.
This unpublishes all local tracks and unsubscribes from remote tracks.
Args:
None
Returns:
None (awaitable)
Raises:
None - This method never raises exceptions
Side effects:
- Unpublishes all local tracks
- Closes all track publications
- Unsubscribes from all remote tracks
- Closes WebSocket connection
- Emits 'disconnected' event with reason CLIENT_INITIATED
Example:
>>> await room.disconnect()
Note:
This method is idempotent - safe to call multiple times.
After disconnection, the Room instance can be reused.
A 'disconnected' event is emitted with reason CLIENT_INITIATED.
All event handlers remain registered after disconnect.
Disconnect is graceful - allows cleanup to complete.
For immediate disconnect, there is no force option (not needed).
Cleanup order:
1. Unpublish local tracks
2. Close publications
3. Unsubscribe from remote tracks
4. Close WebSocket
5. Emit disconnected event
"""
def isconnected(self) -> bool:
"""Check if currently connected to the room.
Args:
None
Returns:
bool: True if connected, False otherwise
Note:
Returns True only when connection_state == CONN_CONNECTED.
Returns False during CONN_RECONNECTING state.
Returns False before connection or after disconnection.
This is a synchronous property check.
Does not perform any I/O or network check.
Example:
>>> if room.isconnected():
... print("Connected!")
... else:
... print("Not connected")
Related:
Use connection_state property for more detailed status.
"""async def get_rtc_stats(self) -> RtcStats:
"""Get RTC statistics for the room session.
Args:
None
Returns:
RtcStats: Object containing publisher and subscriber statistics
- publisher_stats: List[proto_stats.RtcStats] for published tracks
- subscriber_stats: List[proto_stats.RtcStats] for subscribed tracks
Each stat includes:
- Bitrate (bits per second)
- Bytes sent/received
- Packets sent/received
- Packet loss
- Jitter (for audio)
- Round-trip time (RTT)
- Codec information
Raises:
RuntimeError: If room is not connected
Example:
>>> stats = await room.get_rtc_stats()
>>> print(f"Publisher stats: {len(stats.publisher_stats)}")
>>> for pub_stat in stats.publisher_stats:
... print(f"Publisher: {pub_stat}")
>>> for sub_stat in stats.subscriber_stats:
... print(f"Subscriber: {sub_stat}")
Note:
Statistics are snapshots at the time of the call.
For continuous monitoring, call periodically (e.g., every 1-5 seconds).
Statistics include metrics from WebRTC RTCPeerConnection.
Stats are reset on reconnection.
Useful for:
- Monitoring network quality
- Debugging connection issues
- Tracking bandwidth usage
- Detecting packet loss
Performance:
This is a relatively expensive operation.
Don't call too frequently (recommended: max once per second).
"""def register_byte_stream_handler(
self,
topic: str,
handler: ByteStreamHandler
) -> None:
"""Register a handler for incoming byte streams with a specific topic.
Args:
topic: The topic to handle (case-sensitive string)
Topic is used to route streams to appropriate handlers.
Empty string matches streams with no topic.
Max length: 256 characters (recommended: keep short)
handler: Callback function receiving (ByteStreamReader, sender_identity: str)
Handler signature:
async def handler(reader: ByteStreamReader, sender: str) -> None
Handler is called on a background task for each incoming stream.
Returns:
None
Raises:
ValueError: If handler for topic already exists
Use unregister_byte_stream_handler first to replace
TypeError: If handler is not callable
Example:
>>> async def handle_file(reader: ByteStreamReader, sender: str):
... print(f"Receiving {reader.info.name} from {sender}")
... print(f"Size: {reader.info.size} bytes")
... print(f"MIME: {reader.info.mime_type}")
...
... data = bytearray()
... async for chunk in reader:
... data.extend(chunk)
... print(f"Progress: {len(data)}/{reader.info.size}")
...
... print(f"Complete: {len(data)} bytes")
>>>
>>> room.register_byte_stream_handler("files", handle_file)
Note:
Handler is called on a background task for each incoming stream.
Handler should not block for long periods.
Multiple topics can have different handlers.
Topic must match exactly (case-sensitive).
Handler runs concurrently with other handlers.
If handler raises exception, it is logged but doesn't affect other streams.
Related:
- unregister_byte_stream_handler(): Remove handler
- LocalParticipant.stream_bytes(): Send bytes stream
- LocalParticipant.send_file(): Send file
"""
def unregister_byte_stream_handler(self, topic: str) -> None:
"""Unregister a byte stream handler for a topic.
Args:
topic: The topic to unregister (case-sensitive)
Returns:
None
Raises:
KeyError: If no handler registered for topic
Example:
>>> room.unregister_byte_stream_handler("files")
Note:
Unregistering does not affect currently active streams.
Active streams will continue to their registered handler.
Safe to call even if no streams are active.
Related:
- register_byte_stream_handler(): Register handler
"""
def register_text_stream_handler(
self,
topic: str,
handler: TextStreamHandler
) -> None:
"""Register a handler for incoming text streams with a specific topic.
Args:
topic: The topic to handle (case-sensitive string)
Empty string matches streams with no topic.
handler: Callback function receiving (TextStreamReader, sender_identity: str)
Handler signature:
async def handler(reader: TextStreamReader, sender: str) -> None
Returns:
None
Raises:
ValueError: If handler for topic already exists
TypeError: If handler is not callable
Example:
>>> async def handle_chat(reader: TextStreamReader, sender: str):
... print(f"Chat from {sender}:")
... # Read all at once
... text = await reader.read_all()
... print(text)
...
... # Or stream chunks
... # async for chunk in reader:
... # print(chunk, end='')
>>>
>>> room.register_text_stream_handler("chat", handle_chat)
Note:
Handler is called on a background task for each incoming stream.
Handler should not block for long periods.
Text streams automatically decode UTF-8.
For large text streams, consider using async iteration
instead of read_all() to avoid memory issues.
"""
def unregister_text_stream_handler(self, topic: str) -> None:
"""Unregister a text stream handler for a topic.
Args:
topic: The topic to unregister (case-sensitive)
Returns:
None
Raises:
KeyError: If no handler registered for topic
Example:
>>> room.unregister_text_stream_handler("chat")
"""def on(
self,
event: EventTypes,
callback: Optional[Callable] = None
) -> Callable:
"""Register an event handler for a specific event type.
Can be used as a decorator or with a callback function.
Args:
event: The event name to listen for (string).
See "Event Types" section for complete list.
callback: The function to call when the event occurs (optional for decorator usage)
Callback signature depends on event type.
MUST be synchronous function (not async).
Returns:
Callable: The registered callback function (for decorator usage)
Raises:
ValueError: If callback is an async function (use sync functions only)
Example (decorator):
>>> @room.on("participant_connected")
... def on_participant_connected(participant: RemoteParticipant):
... print(f"Participant joined: {participant.identity}")
... # For async operations, use asyncio.create_task()
... asyncio.create_task(handle_participant_async(participant))
>>>
Example (direct call):
>>> def handler(participant):
... print(f"Connected: {participant.identity}")
>>> room.on("participant_connected", handler)
Note:
Event handlers MUST be synchronous functions.
For async operations in handlers, use asyncio.create_task().
Multiple handlers can be registered for same event.
Handlers are called in registration order.
Handler exceptions are caught and logged but don't stop other handlers.
Common pattern:
>>> @room.on("track_subscribed")
... def on_track(track, publication, participant):
... if track.kind == TrackKind.KIND_AUDIO:
... stream = AudioStream(track)
... asyncio.create_task(process_stream(stream))
"""@dataclass
class RoomOptions:
"""Options for configuring room connection.
Attributes:
auto_subscribe: Automatically subscribe to tracks when participants join.
Type: bool
Default: True
When True, all published tracks are automatically subscribed.
When False, manual subscription required via set_subscribed().
Recommendation: Set False for selective subscription or bandwidth control.
dynacast: Enable dynamic casting for adaptive bitrate.
Type: bool
Default: False
When True, enables Dynacast for optimal quality selection.
Requires server-side Dynacast support (LiveKit Cloud has it).
Dynacast dynamically adjusts simulcast layers based on receivers.
Recommendation: Enable for better bandwidth utilization.
e2ee: (Deprecated) End-to-end encryption options.
Type: E2EEOptions | None
Default: None
DEPRECATED: Use 'encryption' parameter instead.
Kept for backward compatibility.
encryption: Options for end-to-end encryption.
Type: E2EEOptions | None
Default: None
When set, enables end-to-end encryption for all tracks.
Requires all participants to use same encryption key.
See E2EEOptions for key management details.
rtc_config: WebRTC-related configuration.
Type: RtcConfiguration | None
Default: None (uses server defaults)
Override for custom ICE server configuration.
Typically only needed for custom TURN servers.
"""
auto_subscribe: bool = True
dynacast: bool = False
e2ee: E2EEOptions | None = None # Deprecated, use encryption
encryption: E2EEOptions | None = None
rtc_config: RtcConfiguration | None = NoneExample:
from livekit import RoomOptions, E2EEOptions, KeyProviderOptions
options = RoomOptions(
auto_subscribe=True, # Subscribe to all tracks automatically
dynacast=True, # Enable dynamic casting
encryption=E2EEOptions(
key_provider_options=KeyProviderOptions(
shared_key=b"my-encryption-key-32-bytes-long!"
)
)
)
await room.connect(url, token, options)@dataclass
class RtcConfiguration:
"""Configuration for WebRTC connection.
Attributes:
ice_transport_type: Specifies the type of ICE transport to use.
Type: IceTransportType.ValueType
Default: IceTransportType.TRANSPORT_ALL
Options:
- TRANSPORT_ALL (2): Allow all connection types (fastest, default)
Uses host, srflx, and relay candidates.
- TRANSPORT_RELAY (0): Force TURN relay (most compatible)
Only uses relay candidates. More reliable through firewalls.
Higher latency and server costs.
- TRANSPORT_NOHOST (1): Exclude host candidates
Uses srflx and relay. Prevents direct P2P connections.
Recommendation: Use TRANSPORT_ALL unless behind strict firewall.
continual_gathering_policy: Policy for continual gathering of ICE candidates.
Type: ContinualGatheringPolicy.ValueType
Default: ContinualGatheringPolicy.GATHER_CONTINUALLY
Options:
- GATHER_CONTINUALLY (1): Keep gathering candidates (default)
Better for changing networks (mobile, roaming).
Adapts to network changes during session.
- GATHER_ONCE (0): Gather candidates only once
Faster initial connect. Less CPU usage.
Won't adapt to network changes.
Recommendation: Use GATHER_CONTINUALLY for mobile or unstable networks.
ice_servers: List of ICE servers for STUN/TURN.
Type: list[proto_room.IceServer]
Default: [] (empty list uses server-provided ICE servers)
When empty, LiveKit server provides default STUN/TURN servers.
Override to use custom STUN/TURN infrastructure.
IceServer format:
- urls: List of server URLs (stun:, turn:, turns:)
- username: Optional username for TURN authentication
- password: Optional password for TURN authentication
"""
ice_transport_type: proto_room.IceTransportType.ValueType = (
proto_room.IceTransportType.TRANSPORT_ALL
)
continual_gathering_policy: proto_room.ContinualGatheringPolicy.ValueType = (
proto_room.ContinualGatheringPolicy.GATHER_CONTINUALLY
)
ice_servers: list[proto_room.IceServer] = field(default_factory=list)Example:
from livekit import (
RtcConfiguration,
RoomOptions,
IceTransportType,
ContinualGatheringPolicy,
IceServer,
)
# Use custom TURN server
ice_servers = [
IceServer(
urls=["turn:turn.example.com:3478"],
username="user",
password="pass"
)
]
rtc_config = RtcConfiguration(
ice_transport_type=IceTransportType.TRANSPORT_RELAY, # Force TURN
continual_gathering_policy=ContinualGatheringPolicy.GATHER_ONCE,
ice_servers=ice_servers
)
options = RoomOptions(rtc_config=rtc_config)
await room.connect(url, token, options)@dataclass
class DataPacket:
"""Represents a data packet received from a participant.
Attributes:
data: The payload of the data packet.
Type: bytes
Raw binary data sent by participant.
No size limit (but large packets may fail).
kind: Type of the data packet.
Type: DataPacketKind.ValueType
Options:
- KIND_RELIABLE (1): TCP-like guaranteed delivery, ordered
Uses SCTP reliable data channel.
Guaranteed arrival and order.
Higher latency on packet loss.
- KIND_LOSSY (0): UDP-like best-effort delivery, lower latency
Uses SCTP unreliable data channel.
May drop packets under congestion.
Lower latency.
participant: Participant who sent the data.
Type: RemoteParticipant | None
None indicates server-originated message (from server SDK).
Use participant.identity to identify sender.
topic: Topic associated with the data packet.
Type: str | None
Optional categorization for filtering.
Empty string or None if no topic specified.
Max length: 256 characters (recommended: keep short).
"""
data: bytes
kind: proto_room.DataPacketKind.ValueType
participant: RemoteParticipant | None
topic: str | NoneExample:
@room.on("data_received")
def on_data_received(packet: DataPacket):
sender = packet.participant.identity if packet.participant else "server"
print(f"Received from {sender}: {packet.data.decode()}")
print(f"Topic: {packet.topic}, Kind: {packet.kind}")
# Check if reliable or lossy
if packet.kind == DataPacketKind.KIND_RELIABLE:
print("Reliable delivery")
else:
print("Lossy delivery")@dataclass
class SipDTMF:
"""Represents a SIP DTMF signal received.
Used for telephony integration (SIP trunking).
Attributes:
code: DTMF code corresponding to the digit.
Type: int
Values:
- 0-9: Digits 0-9
- 10: * (star)
- 11: # (pound)
- 12-15: A-D (extended DTMF, rarely used)
digit: DTMF digit sent.
Type: str
Single character: "0"-"9", "*", "#", "A"-"D"
String representation of the code.
participant: Participant who sent the DTMF.
Type: RemoteParticipant | None
None when sent by server SDK (SIP gateway).
"""
code: int
digit: str
participant: RemoteParticipant | NoneExample:
@room.on("sip_dtmf_received")
def on_dtmf(dtmf: SipDTMF):
sender = dtmf.participant.identity if dtmf.participant else "server"
print(f"DTMF from {sender}: {dtmf.digit} (code: {dtmf.code})")
# Handle phone menu
if dtmf.digit == "1":
print("Option 1 selected")
elif dtmf.digit == "2":
print("Option 2 selected")
elif dtmf.digit == "#":
print("Confirm selected")@dataclass
class RtcStats:
"""Contains RTC statistics for a room.
Attributes:
publisher_stats: List of statistics for publishers (outbound tracks).
Type: list[proto_stats.RtcStats]
Each item is a proto_stats.RtcStats protobuf message.
Contains metrics like:
- Bitrate (bits/second)
- Bytes sent
- Packets sent
- Frames encoded (video)
- RTT (round-trip time)
- Codec information
subscriber_stats: List of statistics for subscribers (inbound tracks).
Type: list[proto_stats.RtcStats]
Each item is a proto_stats.RtcStats protobuf message.
Contains metrics like:
- Bitrate (bits/second)
- Bytes received
- Packets received
- Packet loss
- Jitter
- Frames decoded (video)
- Codec information
"""
publisher_stats: list[proto_stats.RtcStats]
subscriber_stats: list[proto_stats.RtcStats]Example:
stats = await room.get_rtc_stats()
print(f"Publishing {len(stats.publisher_stats)} tracks")
for pub_stat in stats.publisher_stats:
# Access proto_stats.RtcStats fields
print(f"Publisher stat: {pub_stat}")
# Common fields: bytes_sent, packets_sent, bitrate, codec
print(f"Subscribing to {len(stats.subscriber_stats)} tracks")
for sub_stat in stats.subscriber_stats:
# Access proto_stats.RtcStats fields
print(f"Subscriber stat: {sub_stat}")
# Common fields: bytes_received, packets_received, packet_loss, jitter, bitrateclass ConnectError(Exception):
"""Exception raised when room connection fails.
Attributes:
message: The error message describing the failure reason.
Type: str
Common reasons include:
- "Invalid token": Token format invalid or signature mismatch
- "Token expired": Token expiry time (exp claim) has passed
- "Room not found": Room name in token doesn't exist
- "Permission denied": Token lacks required permissions
- "Network error": DNS failure, connection refused, timeout
- "Server unavailable": Server down or overloaded
- "WebSocket handshake failed": Protocol mismatch or firewall
"""
def __init__(self, message: str) -> None:
"""Initialize with error message.
Args:
message: The error message
"""
self.message = messageExample:
try:
await room.connect(url, token)
except ConnectError as e:
print(f"Failed to connect: {e.message}")
# Handle specific errors
if "token" in e.message.lower():
print("Check token validity and expiry")
elif "network" in e.message.lower():
print("Check network connectivity")
elif "server" in e.message.lower():
print("Server may be down or overloaded")
# Retry with exponential backoff
# ...The Room class supports the following event types (all events are strings):
@room.on("connected")
def on_connected():
"""Emitted when successfully connected to the room.
Called after WebRTC connection is fully established.
At this point:
- room.isconnected() returns True
- Local participant info is available
- Can start publishing tracks
- Remote participants may already be present
Note:
This is the signal to start publishing tracks.
Room properties (name, metadata, etc.) are now available.
"""
@room.on("disconnected")
def on_disconnected(reason: DisconnectReason):
"""Emitted when disconnected from the room.
Args:
reason: The reason for disconnection (DisconnectReason enum value)
Common reasons:
- CLIENT_INITIATED (1): disconnect() was called
- DUPLICATE_IDENTITY (2): Another client joined with same identity
- SERVER_SHUTDOWN (3): Server is shutting down
- ROOM_DELETED (5): Room was deleted via API
- PARTICIPANT_REMOVED (4): Participant was removed by server
- ROOM_CLOSED (10): Room was closed by host
- MIGRATION (8): Participant moved to another server
Note:
This event is always emitted, even for expected disconnections.
Check reason to determine if reconnection should be attempted.
Reasons CLIENT_INITIATED and ROOM_CLOSED typically don't need reconnection.
Other reasons may warrant reconnection attempts.
After this event:
- room.isconnected() returns False
- All tracks are unsubscribed
- Local tracks are unpublished
- Remote participants cleared
"""
@room.on("reconnecting")
def on_reconnecting():
"""Emitted when attempting to reconnect to the room.
Triggered when connection is lost and SDK begins reconnection attempts.
During reconnection:
- room.isconnected() returns False
- room.connection_state == ConnectionState.CONN_RECONNECTING
- Track subscriptions and publications are preserved
- SDK automatically retries connection
Note:
SDK handles reconnection automatically.
No action needed from application.
If reconnection succeeds, 'reconnected' event is emitted.
If reconnection fails, 'disconnected' event is emitted.
Typical causes:
- Network disruption
- Server restart
- ICE connection failure
"""
@room.on("reconnected")
def on_reconnected():
"""Emitted when successfully reconnected to the room.
Triggered after successful reconnection following connection loss.
After reconnection:
- All tracks and subscriptions are restored automatically
- No need to republish tracks or resubscribe
- Room state is synchronized with server
- Connection quality may be temporarily degraded
Note:
This is informational - no action required.
Tracks resume automatically.
May want to log or notify user of restored connection.
"""
@room.on("connection_state_changed")
def on_connection_state_changed(state: ConnectionState):
"""Emitted when connection state changes.
Args:
state: The new connection state (ConnectionState enum value)
- CONN_DISCONNECTED (0): Not connected
- CONN_CONNECTED (1): Fully connected
- CONN_RECONNECTING (2): Attempting to reconnect
Note:
This is a more fine-grained event than connected/disconnected.
Useful for UI state management.
State transitions:
- DISCONNECTED -> CONNECTED: Initial connection
- CONNECTED -> RECONNECTING: Connection lost
- RECONNECTING -> CONNECTED: Reconnection successful
- RECONNECTING -> DISCONNECTED: Reconnection failed
- CONNECTED -> DISCONNECTED: Deliberate disconnect
"""@room.on("participant_connected")
def on_participant_connected(participant: RemoteParticipant):
"""Emitted when a remote participant joins the room.
Args:
participant: The participant who joined
Available properties:
- participant.identity: Unique participant identity (string)
- participant.name: Display name (string)
- participant.metadata: Custom metadata (string, typically JSON)
- participant.track_publications: Published tracks (dict)
- participant.kind: Participant type (ParticipantKind enum)
- participant.attributes: Key-value attributes (dict)
Note:
This event is emitted before any tracks are subscribed.
Track subscriptions happen after this event.
If auto_subscribe=True, subscriptions will happen automatically.
If auto_subscribe=False, use set_subscribed() to subscribe manually.
participant.track_publications may already contain publications
if participant was publishing when they joined.
Example:
>>> @room.on("participant_connected")
... def on_participant_connected(participant):
... print(f"Joined: {participant.identity} ({participant.name})")
... print(f"Metadata: {participant.metadata}")
... print(f"Type: {participant.kind}")
... for sid, pub in participant.track_publications.items():
... print(f" Track: {pub.name}")
"""
@room.on("participant_disconnected")
def on_participant_disconnected(participant: RemoteParticipant):
"""Emitted when a remote participant leaves the room.
Args:
participant: The participant who left
Properties available:
- participant.identity: Identity (string)
- participant.disconnect_reason: Reason for leaving (if available)
Note:
All track subscriptions are automatically cleaned up.
participant object remains valid but will not receive updates.
Track streams will end and emit completion.
participant.disconnect_reason may be None or one of:
- CLIENT_INITIATED: User left voluntarily
- DUPLICATE_IDENTITY: Rejoined from another device
- PARTICIPANT_REMOVED: Kicked by moderator
- etc.
Example:
>>> @room.on("participant_disconnected")
... def on_participant_disconnected(participant):
... print(f"Left: {participant.identity}")
... if participant.disconnect_reason:
... print(f"Reason: {participant.disconnect_reason}")
"""@room.on("local_track_published")
def on_local_track_published(
publication: LocalTrackPublication,
track: Track
):
"""Emitted when a local track is published.
Args:
publication: The track publication containing metadata
Properties:
- publication.sid: Track session ID (string)
- publication.name: Track name (string)
- publication.kind: KIND_AUDIO or KIND_VIDEO
- publication.source: SOURCE_CAMERA, SOURCE_MICROPHONE, etc.
- publication.mime_type: Codec MIME type (string)
track: The published track (LocalAudioTrack or LocalVideoTrack)
Properties:
- track.sid: Track session ID (string)
- track.name: Track name (string)
- track.kind: KIND_AUDIO or KIND_VIDEO
- track.muted: Whether track is muted (bool)
Note:
Publication contains metadata about the track.
Track is the actual media stream.
Use publication.wait_for_subscription() to know when
at least one participant has subscribed.
Track SID is assigned by server and unique per session.
Example:
>>> @room.on("local_track_published")
... def on_local_track_published(publication, track):
... print(f"Published: {publication.name} ({publication.sid})")
... print(f"MIME type: {publication.mime_type}")
... asyncio.create_task(wait_for_sub(publication))
...
... async def wait_for_sub(pub):
... await pub.wait_for_subscription()
... print("Track is being received!")
"""
@room.on("local_track_unpublished")
def on_local_track_unpublished(publication: LocalTrackPublication):
"""Emitted when a local track is unpublished.
Args:
publication: The unpublished track publication
publication.track may be None at this point
Note:
Track is no longer available to other participants.
Resources should be cleaned up (sources, streams).
Track unpublish is immediate - no grace period.
Subscribers will receive track_unsubscribed event.
Example:
>>> @room.on("local_track_unpublished")
... def on_local_track_unpublished(publication):
... print(f"Unpublished: {publication.name}")
... # Clean up associated resources
"""
@room.on("local_track_subscribed")
def on_local_track_subscribed(track: Track):
"""Emitted when local track has been subscribed by a remote participant.
Args:
track: The subscribed local track
track.sid: Track session ID (string)
track.name: Track name (string)
Note:
This confirms at least one participant is receiving the track.
Useful for confirming track delivery before important operations.
May be emitted multiple times as different participants subscribe.
This event indicates first subscription, not each subscription.
Example:
>>> @room.on("local_track_subscribed")
... def on_local_track_subscribed(track):
... print(f"Track {track.name} is being received")
"""
@room.on("track_published")
def on_track_published(
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
"""Emitted when a remote participant publishes a track.
Args:
publication: The published track publication
Properties:
- publication.sid: Track SID (string)
- publication.name: Track name (string)
- publication.kind: KIND_AUDIO or KIND_VIDEO
- publication.source: SOURCE_CAMERA, SOURCE_MICROPHONE, etc.
- publication.mime_type: Codec MIME type
- publication.width: Video width (0 for audio)
- publication.height: Video height (0 for audio)
- publication.simulcasted: Whether simulcast enabled (bool)
participant: The participant who published
Note:
Track is not yet subscribed at this point.
If auto_subscribe=True, subscription happens automatically.
If auto_subscribe=False, call publication.set_subscribed(True).
track_subscribed event will fire when subscription completes.
Example:
>>> @room.on("track_published")
... def on_track_published(publication, participant):
... print(f"{participant.identity} published: {publication.name}")
... print(f" Kind: {publication.kind}")
... print(f" Source: {publication.source}")
... if publication.kind == TrackKind.KIND_VIDEO:
... print(f" Resolution: {publication.width}x{publication.height}")
...
... # Manual subscription if auto_subscribe=False
... # publication.set_subscribed(True)
"""
@room.on("track_unpublished")
def on_track_unpublished(
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
"""Emitted when a remote participant unpublishes a track.
Args:
publication: The unpublished track publication
publication.track will be None
participant: The participant who unpublished
Note:
Subscription is automatically cleaned up.
Track streams will end (async iterators complete).
No action needed for cleanup.
Example:
>>> @room.on("track_unpublished")
... def on_track_unpublished(publication, participant):
... print(f"{participant.identity} unpublished: {publication.name}")
"""
@room.on("track_subscribed")
def on_track_subscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
"""Emitted when successfully subscribed to a remote track.
Args:
track: The subscribed track (RemoteAudioTrack or RemoteVideoTrack)
Properties:
- track.sid: Track session ID (string)
- track.kind: KIND_AUDIO or KIND_VIDEO
- track.name: Track name (string)
- track.muted: Whether muted (bool)
- track.stream_state: STATE_ACTIVE or STATE_PAUSED
publication: The track publication
participant: The participant who owns the track
Note:
Track is now ready to receive media.
Create AudioStream or VideoStream to consume media.
This is the primary event for starting media processing.
Subscription may happen automatically (auto_subscribe=True)
or manually (set_subscribed(True)).
Example:
>>> @room.on("track_subscribed")
... def on_track_subscribed(track, publication, participant):
... print(f"Subscribed to {track.name} from {participant.identity}")
...
... if track.kind == TrackKind.KIND_AUDIO:
... stream = AudioStream(track)
... asyncio.create_task(process_audio(stream))
... elif track.kind == TrackKind.KIND_VIDEO:
... stream = VideoStream(track)
... asyncio.create_task(process_video(stream))
"""
@room.on("track_unsubscribed")
def on_track_unsubscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
"""Emitted when unsubscribed from a remote track.
Args:
track: The unsubscribed track
publication: The track publication
participant: The participant who owns the track
Note:
Media streams will stop.
Clean up any resources (streams, processors, etc.).
Happens automatically when participant leaves or unpublishes.
Can also happen when manually calling set_subscribed(False).
Example:
>>> @room.on("track_unsubscribed")
... def on_track_unsubscribed(track, publication, participant):
... print(f"Unsubscribed from {track.name}")
... # Clean up associated resources
"""
@room.on("track_subscription_failed")
def on_track_subscription_failed(
participant: RemoteParticipant,
track_sid: str,
error: str
):
"""Emitted when track subscription fails.
Args:
participant: The participant whose track failed to subscribe
track_sid: Session ID of the track (string)
error: Error message describing the failure
Common errors:
- "Track not found": Track was unpublished
- "Codec not supported": Client doesn't support codec
- "Permission denied": Token lacks subscribe permission
- "Network error": Connection failed
- "Timeout": Subscription took too long
Note:
Subscription will not be retried automatically.
May need to check codec support or permissions.
Can manually retry with set_subscribed(True).
Example:
>>> @room.on("track_subscription_failed")
... def on_track_subscription_failed(participant, track_sid, error):
... print(f"Failed to subscribe to {track_sid}: {error}")
...
... if "codec" in error.lower():
... print("Codec not supported by this client")
... elif "permission" in error.lower():
... print("Check token permissions")
"""
@room.on("track_muted")
def on_track_muted(
participant: Participant,
publication: TrackPublication
):
"""Emitted when a track is muted.
Args:
participant: The participant who owns the track
Can be LocalParticipant or RemoteParticipant
publication: The muted track publication
publication.muted will be True
Note:
Muted tracks still maintain connection but send silence/black frames.
For audio: Silence frames are sent (not actual audio).
For video: Black frames are sent (not actual video).
Muting saves bandwidth while maintaining track state.
Muted state is signaled to all participants.
UI should reflect muted state.
Example:
>>> @room.on("track_muted")
... def on_track_muted(participant, publication):
... print(f"{participant.identity} muted {publication.name}")
... # Update UI to show muted icon
"""
@room.on("track_unmuted")
def on_track_unmuted(
participant: Participant,
publication: TrackPublication
):
"""Emitted when a track is unmuted.
Args:
participant: The participant who owns the track
publication: The unmuted track publication
publication.muted will be False
Note:
Track resumes sending media.
No need to resubscribe.
Media starts flowing immediately.
Example:
>>> @room.on("track_unmuted")
... def on_track_unmuted(participant, publication):
... print(f"{participant.identity} unmuted {publication.name}")
... # Update UI to remove muted icon
"""@room.on("room_metadata_changed")
def on_room_metadata_changed(old_metadata: str, new_metadata: str):
"""Emitted when room metadata changes.
Args:
old_metadata: Previous metadata (string, typically JSON)
new_metadata: New metadata (string, typically JSON)
Note:
Room metadata is managed server-side via API or SDK.
Use for storing room-level state (e.g., recording status, session info).
Metadata is broadcast to all participants.
Typical use cases:
- Recording status
- Room settings
- Session information
- Custom room properties
Example:
>>> @room.on("room_metadata_changed")
... def on_room_metadata_changed(old_metadata, new_metadata):
... import json
... try:
... old_data = json.loads(old_metadata) if old_metadata else {}
... new_data = json.loads(new_metadata) if new_metadata else {}
... print(f"Room metadata updated: {new_data}")
... except json.JSONDecodeError:
... print(f"Non-JSON metadata: {new_metadata}")
"""
@room.on("participant_metadata_changed")
def on_participant_metadata_changed(
participant: Participant,
old_metadata: str,
new_metadata: str
):
"""Emitted when participant metadata changes.
Args:
participant: The participant whose metadata changed
old_metadata: Previous metadata (string)
new_metadata: New metadata (string)
Note:
Local participant can update own metadata via set_metadata().
Server can update any participant's metadata via API.
Metadata changes are broadcast to all participants.
Typical use cases:
- User status (online, away, busy)
- User preferences
- Custom user properties
- Rich presence information
Example:
>>> @room.on("participant_metadata_changed")
... def on_participant_metadata_changed(participant, old_metadata, new_metadata):
... import json
... try:
... data = json.loads(new_metadata) if new_metadata else {}
... print(f"{participant.identity} metadata: {data}")
... if "status" in data:
... print(f"Status: {data['status']}")
... except json.JSONDecodeError:
... print(f"Non-JSON metadata: {new_metadata}")
"""
@room.on("participant_name_changed")
def on_participant_name_changed(
participant: Participant,
old_name: str,
new_name: str
):
"""Emitted when participant name changes.
Args:
participant: The participant whose name changed
old_name: Previous display name (string)
new_name: New display name (string)
Note:
Local participant can update own name via set_name().
Display name is separate from identity (identity never changes).
Name changes are broadcast to all participants.
Identity: Unique, immutable identifier
Name: Display name, mutable, not necessarily unique
Example:
>>> @room.on("participant_name_changed")
... def on_participant_name_changed(participant, old_name, new_name):
... print(f"{participant.identity} changed name:")
... print(f" From: {old_name}")
... print(f" To: {new_name}")
"""
@room.on("participant_attributes_changed")
def on_participant_attributes_changed(
changed_attributes: dict,
participant: Participant
):
"""Emitted when participant attributes change.
Args:
changed_attributes: Dictionary of changed attributes (key-value pairs)
Only includes attributes that changed
Keys and values are strings
participant: The participant whose attributes changed
Note:
Attributes are key-value pairs for storing arbitrary metadata.
Local participant can update via set_attributes().
More granular than metadata field.
Attributes are merged, not replaced.
To remove an attribute, set it to empty string.
Typical use cases:
- Feature flags
- User roles
- Custom properties
- Per-user settings
Example:
>>> @room.on("participant_attributes_changed")
... def on_participant_attributes_changed(changed_attributes, participant):
... print(f"{participant.identity} attributes changed:")
... for key, value in changed_attributes.items():
... print(f" {key}: {value}")
...
... # Access all attributes
... all_attrs = participant.attributes
... print(f"All attributes: {all_attrs}")
"""@room.on("data_received")
def on_data_received(packet: DataPacket):
"""Emitted when data is received from a participant.
Args:
packet: The received data packet
packet.data: bytes payload (raw binary data)
packet.kind: KIND_RELIABLE or KIND_LOSSY
packet.participant: Sender (None for server-sent data)
packet.topic: Optional topic string (may be None)
Note:
Reliable packets guarantee delivery and ordering (TCP-like).
Uses SCTP reliable data channel.
Higher latency on packet loss (waits for retransmission).
Lossy packets prioritize low latency (UDP-like).
Uses SCTP unreliable data channel.
May drop packets under congestion.
Lower latency.
For large data, consider using data streaming instead.
Data packets are limited by SCTP message size (~256KB typical).
Example:
>>> @room.on("data_received")
... def on_data_received(packet):
... sender = packet.participant.identity if packet.participant else "server"
...
... try:
... # Try to decode as string
... message = packet.data.decode('utf-8')
... print(f"Message from {sender}: {message}")
... except UnicodeDecodeError:
... # Binary data
... print(f"Binary data from {sender}: {len(packet.data)} bytes")
...
... print(f"Topic: {packet.topic}")
... print(f"Reliable: {packet.kind == DataPacketKind.KIND_RELIABLE}")
"""
@room.on("sip_dtmf_received")
def on_sip_dtmf_received(dtmf: SipDTMF):
"""Emitted when a SIP DTMF signal is received.
Args:
dtmf: The received DTMF signal
dtmf.code: DTMF code (0-15)
dtmf.digit: Digit character ("0"-"9", "*", "#", "A"-"D")
dtmf.participant: Sender (None for server-sent)
Note:
Used for phone system integration (SIP trunking).
Codes 0-9 map to digits 0-9.
Code 10 = *, Code 11 = #.
Codes 12-15 = A-D (extended DTMF, rarely used).
DTMF is typically used for:
- Phone menu navigation
- Authentication (PIN entry)
- Conference controls (mute, unmute)
- Custom phone interactions
Example:
>>> @room.on("sip_dtmf_received")
... def on_sip_dtmf_received(dtmf):
... sender = dtmf.participant.identity if dtmf.participant else "server"
... print(f"DTMF from {sender}: {dtmf.digit} (code: {dtmf.code})")
...
... # Handle phone menu
... if dtmf.digit == "1":
... print("Option 1 selected")
... elif dtmf.digit == "2":
... print("Option 2 selected")
... elif dtmf.digit == "*":
... print("Previous menu")
... elif dtmf.digit == "#":
... print("Confirm")
"""
@room.on("transcription_received")
def on_transcription_received(
segments: list[TranscriptionSegment],
participant: Participant,
publication: TrackPublication
):
"""Emitted when transcription data is received.
Args:
segments: List of transcription segments
Each segment contains:
- id: Segment identifier (string)
- text: Transcribed text (string)
- start_time: Start time in milliseconds (int)
- end_time: End time in milliseconds (int)
- language: Language code (e.g., "en", "es") (string)
- final: Whether this is final transcription vs interim (bool)
participant: The participant whose track is transcribed
publication: The track publication being transcribed
Note:
Transcription is performed server-side or by agents.
Interim segments may be updated with final segments.
Use segment.id to match interim and final segments.
Interim segments (final=False):
- Sent during speech
- May change as more context is available
- Lower latency
Final segments (final=True):
- Sent after speech pause
- Won't change
- Higher accuracy
Example:
>>> @room.on("transcription_received")
... def on_transcription_received(segments, participant, publication):
... print(f"Transcription from {participant.identity}:")
... for segment in segments:
... status = "FINAL" if segment.final else "interim"
... print(f"[{segment.start_time}-{segment.end_time}ms] ({status})")
... print(f" {segment.text}")
... print(f" Language: {segment.language}")
"""@room.on("active_speakers_changed")
def on_active_speakers_changed(speakers: list[Participant]):
"""Emitted when the list of active speakers changes.
Args:
speakers: List of currently active speakers
Ordered by audio level (loudest first)
Empty list means no active speakers
Contains both LocalParticipant and RemoteParticipant
Note:
Updated frequently based on audio levels.
Useful for highlighting speaking participants in UI.
Speaker detection has built-in debouncing.
Active speaker detection:
- Based on audio level (not just presence of audio)
- Threshold prevents noise triggering
- Smoothed to prevent rapid changes
Update frequency: Typically every 1-2 seconds when changes occur.
Example:
>>> @room.on("active_speakers_changed")
... def on_active_speakers_changed(speakers):
... if speakers:
... print(f"Active speakers ({len(speakers)}):")
... for i, speaker in enumerate(speakers, 1):
... print(f" {i}. {speaker.identity} (loudest first)")
... else:
... print("No active speakers")
"""
@room.on("connection_quality_changed")
def on_connection_quality_changed(
participant: Participant,
quality: ConnectionQuality
):
"""Emitted when a participant's connection quality changes.
Args:
participant: The participant whose quality changed
quality: The new connection quality
- QUALITY_EXCELLENT (2): Excellent connection
Low packet loss, low jitter, low RTT
- QUALITY_GOOD (1): Good connection
Acceptable packet loss, jitter, RTT
- QUALITY_POOR (0): Poor connection
May experience issues (stuttering, freezing)
High packet loss, jitter, or RTT
- QUALITY_LOST (3): Connection lost
No connectivity
Note:
Based on packet loss, jitter, and other network metrics.
Useful for displaying connection status in UI.
Quality can fluctuate frequently on mobile networks.
Quality metrics:
- EXCELLENT: <3% packet loss, <30ms jitter
- GOOD: <10% packet loss, <100ms jitter
- POOR: >10% packet loss or >100ms jitter
- LOST: No packets received recently
Example:
>>> @room.on("connection_quality_changed")
... def on_connection_quality_changed(participant, quality):
... identity = participant.identity
...
... if quality == ConnectionQuality.QUALITY_EXCELLENT:
... print(f"{identity}: Excellent connection")
... elif quality == ConnectionQuality.QUALITY_GOOD:
... print(f"{identity}: Good connection")
... elif quality == ConnectionQuality.QUALITY_POOR:
... print(f"{identity}: Poor connection (may have issues)")
... elif quality == ConnectionQuality.QUALITY_LOST:
... print(f"{identity}: Connection lost")
"""@room.on("e2ee_state_changed")
def on_e2ee_state_changed(
participant: Participant,
state: EncryptionState
):
"""Emitted when encryption state changes for a participant.
Args:
participant: The participant whose encryption state changed
state: The new encryption state
- NEW (0): Encryption initialized
- OK (1): Encryption working correctly
- ENCRYPTION_FAILED (2): Failed to encrypt outbound frames
- DECRYPTION_FAILED (3): Failed to decrypt inbound frames
- MISSING_KEY (4): Encryption key not available
- KEY_RATCHETED (5): Key was ratcheted (rotated)
- INTERNAL_ERROR (6): Internal encryption error
Note:
Monitor this event to detect encryption issues.
MISSING_KEY: Usually means key needs to be set or shared.
Solution: Call set_shared_key() or set_key() with correct key.
DECRYPTION_FAILED: May indicate wrong key or corrupted data.
Solution: Verify all participants use same key.
ENCRYPTION_FAILED: Usually indicates internal error.
Solution: Check logs, may need to restart.
KEY_RATCHETED: Informational, key was rotated for forward secrecy.
No action needed.
Example:
>>> @room.on("e2ee_state_changed")
... def on_e2ee_state_changed(participant, state):
... identity = participant.identity
...
... if state == EncryptionState.OK:
... print(f"{identity}: Encryption OK")
... elif state == EncryptionState.MISSING_KEY:
... print(f"{identity}: Missing encryption key!")
... # Set key: room.e2ee_manager.key_provider.set_shared_key(key, 0)
... elif state == EncryptionState.DECRYPTION_FAILED:
... print(f"{identity}: Decryption failed (wrong key?)")
... elif state == EncryptionState.KEY_RATCHETED:
... print(f"{identity}: Key ratcheted (forward secrecy)")
"""
@room.on("participant_encryption_status_changed")
def on_participant_encryption_status_changed(
is_encrypted: bool,
participant: Participant
):
"""Emitted when participant encryption status changes.
Args:
is_encrypted: Whether the participant is now encrypted
True: Participant's frames are encrypted
False: Participant's frames are not encrypted
participant: The participant whose status changed
Note:
All participants should have same encryption status.
Mixed encryption states may indicate configuration issues.
Use to verify E2EE is properly configured.
Encryption status reflects whether E2EE is active for participant.
Different from encryption state (which indicates errors).
Example:
>>> @room.on("participant_encryption_status_changed")
... def on_participant_encryption_status_changed(is_encrypted, participant):
... identity = participant.identity
... status = "encrypted" if is_encrypted else "not encrypted"
... print(f"{identity} is now {status}")
...
... # Verify all participants have E2EE
... if not is_encrypted:
... print("WARNING: Participant without E2EE detected!")
"""@room.on("room_updated")
def on_room_updated():
"""Emitted when room properties are updated.
Triggered when server-side room properties change, such as:
- is_recording status
- num_participants
- num_publishers
- metadata (triggers separate event too)
Note:
Check room properties after this event for latest values.
This is a catch-all event for room property updates.
Example:
>>> @room.on("room_updated")
... def on_room_updated():
... print(f"Room updated:")
... print(f" Recording: {room.is_recording}")
... print(f" Participants: {room.num_participants}")
... print(f" Publishers: {room.num_publishers}")
"""
@room.on("moved")
def on_moved():
"""Emitted when the participant is moved to a different room.
Triggered during server-side room migration or region changes.
Note:
Migration is transparent - no action needed from application.
All tracks and subscriptions are preserved.
Connection may briefly reconnect during migration.
This is an advanced server feature for load balancing.
Example:
>>> @room.on("moved")
... def on_moved():
... print("Room migrated (transparent)")
"""
@room.on("token_refreshed")
def on_token_refreshed():
"""Emitted when the access token is refreshed.
LiveKit server automatically refreshes tokens before expiration.
Note:
No action needed - token refresh is automatic.
Useful for logging or monitoring token lifecycle.
Token refresh happens before expiry to prevent disconnection.
Example:
>>> @room.on("token_refreshed")
... def on_token_refreshed():
... print("Access token refreshed automatically")
"""import asyncio
from livekit import (
Room,
RoomOptions,
RtcConfiguration,
ConnectError,
RemoteParticipant,
Track,
RemoteTrackPublication,
TrackKind,
ConnectionState,
DisconnectReason,
)
async def main():
# Create room
room = Room()
# Set up event handlers
@room.on("connected")
def on_connected():
print(f"Connected to room: {room.name}")
# Note: room.sid is async, so we need to create a task
asyncio.create_task(print_sid())
async def print_sid():
sid = await room.sid
print(f"Session ID: {sid}")
@room.on("participant_connected")
def on_participant_connected(participant: RemoteParticipant):
print(f"Participant joined: {participant.identity}")
print(f" Name: {participant.name}")
print(f" Kind: {participant.kind}")
print(f" Metadata: {participant.metadata}")
@room.on("participant_disconnected")
def on_participant_disconnected(participant: RemoteParticipant):
reason_name = participant.disconnect_reason or "unknown"
print(f"Participant left: {participant.identity} (reason: {reason_name})")
@room.on("track_subscribed")
def on_track_subscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
print(f"Subscribed to track: {track.name} from {participant.identity}")
if track.kind == TrackKind.KIND_AUDIO:
print(" -> Audio track")
elif track.kind == TrackKind.KIND_VIDEO:
print(" -> Video track")
@room.on("connection_state_changed")
def on_connection_state_changed(state: ConnectionState):
if state == ConnectionState.CONN_DISCONNECTED:
print("Disconnected")
elif state == ConnectionState.CONN_CONNECTED:
print("Connected")
elif state == ConnectionState.CONN_RECONNECTING:
print("Reconnecting...")
@room.on("disconnected")
def on_disconnected(reason: DisconnectReason):
print(f"Disconnected from room: {reason}")
# Configure connection options
options = RoomOptions(
auto_subscribe=True,
dynacast=True,
)
# Connect
try:
await room.connect(
url="wss://your-server.livekit.cloud",
token="your-access-token",
options=options
)
except ConnectError as e:
print(f"Failed to connect: {e.message}")
return
# Wait while connected
while room.isconnected():
await asyncio.sleep(1)
# Print stats periodically
if room.isconnected():
stats = await room.get_rtc_stats()
print(f"Publishers: {len(stats.publisher_stats)}, "
f"Subscribers: {len(stats.subscriber_stats)}")
# Disconnect
await room.disconnect()
if __name__ == "__main__":
asyncio.run(main())import asyncio
from livekit import Room, ConnectError
async def connect_with_retry(url: str, token: str, max_retries: int = 3):
"""Connect with exponential backoff retry."""
room = Room()
for attempt in range(max_retries):
try:
await room.connect(url, token)
return room
except ConnectError as e:
print(f"Attempt {attempt + 1} failed: {e.message}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f"Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
else:
raise# DO: Use synchronous handlers with asyncio.create_task()
@room.on("track_subscribed")
def on_track(track, publication, participant):
print(f"Track: {track.name}")
asyncio.create_task(process_track_async(track))
async def process_track_async(track):
# Async operations here
stream = AudioStream(track)
async for event in stream:
# Process frames
pass
# DON'T: Use async handlers directly
# @room.on("track_subscribed")
# async def on_track(track, publication, participant): # This will raise error
# await process_track(track)async def run_session(url: str, token: str):
"""Run session with proper cleanup."""
room = None
sources = []
try:
room = Room()
await room.connect(url, token)
# Create resources
audio_source = AudioSource(48000, 1)
sources.append(audio_source)
# Use resources
# ...
finally:
# Cleanup
for source in sources:
await source.aclose()
if room and room.isconnected():
await room.disconnect()@room.on("connection_state_changed")
def on_connection_state_changed(state):
if state == ConnectionState.CONN_RECONNECTING:
print("Connection lost, attempting reconnection...")
# Optionally notify user
elif state == ConnectionState.CONN_CONNECTED:
print("Connection restored")
# Resume operations
@room.on("disconnected")
def on_disconnected(reason):
# Determine if reconnection is appropriate
if reason in [DisconnectReason.CLIENT_INITIATED, DisconnectReason.ROOM_CLOSED]:
print("Normal disconnect, no reconnection")
else:
print(f"Unexpected disconnect: {reason}")
# Consider reconnection logic