tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
The LiveKit SDK uses an event-driven architecture based on the EventEmitter class. The Room class inherits from EventEmitter and provides comprehensive event handling for all room, participant, and track events.
Key concepts:
on(), once(), off()from livekit import EventEmitterclass EventEmitter[T_contra]:
"""Event emitter for managing event listeners.
Generic type T_contra represents the event types.
For Room, this is the union of all event type strings.
Thread-safe when used with asyncio event loop.
"""
def __init__(self) -> None:
"""Initialize an EventEmitter.
Creates empty handler registry.
No events are registered initially.
"""
def emit(self, event: T_contra, *args) -> None:
"""Trigger all callbacks for an event.
Args:
event: Event to emit (string)
*args: Arguments to pass to callbacks
Number and types depend on event
Note:
Calls all registered handlers in order.
Handler exceptions are caught and logged.
One handler exception doesn't stop others.
Example (internal use):
>>> # SDK calls emit internally
>>> # emitter.emit("participant_connected", participant)
"""
def on(
self,
event: T_contra,
callback: Optional[Callable] = None
) -> Callable:
"""Register a callback for an event (can be used as decorator).
Args:
event: Event to listen for (string)
Must be valid event type for emitter
callback: Callback function (optional for decorator)
Must be synchronous function
Signature depends on event type
Returns:
Callback or decorator function
Raises:
ValueError: If callback is async function
Use synchronous functions only
For async operations, use asyncio.create_task()
Example (decorator):
>>> @room.on("participant_connected")
... def on_participant(participant):
... print(f"Joined: {participant.identity}")
... # For async operations:
... asyncio.create_task(async_handler(participant))
Example (direct call):
>>> def handler(participant):
... print(f"Joined: {participant.identity}")
>>> room.on("participant_connected", handler)
Note:
Handlers MUST be synchronous functions.
Multiple handlers can be registered for same event.
Handlers are called in registration order.
Handler is not automatically unregistered.
"""
def once(
self,
event: T_contra,
callback: Optional[Callable] = None
) -> Callable:
"""Register a callback to be called only once (can be used as decorator).
Args:
event: Event to listen for (string)
callback: Callback function (optional for decorator)
Returns:
Callback or decorator function
Raises:
ValueError: If callback is async function
Example:
>>> @room.once("connected")
... def on_connected():
... print("Connected!") # Only called once
Note:
Handler is automatically unregistered after first call.
Useful for one-time setup or initialization.
"""
def off(self, event: T_contra, callback: Callable) -> None:
"""Unregister a callback from an event.
Args:
event: Event to stop listening for (string)
callback: The exact callback function to remove
Must be same reference as registered
Returns:
None
Raises:
ValueError: If callback not registered for event
(silently ignored in some implementations)
Example:
>>> def handler(participant):
... print("Handler")
>>>
>>> # Register
>>> room.on("participant_connected", handler)
>>>
>>> # Later, unregister
>>> room.off("participant_connected", handler)
Note:
Must pass exact same function reference.
Lambda functions cannot be unregistered:
>>> # BAD: Cannot unregister
>>> room.on("event", lambda x: print(x))
>>>
>>> # GOOD: Can unregister
>>> handler = lambda x: print(x)
>>> room.on("event", handler)
>>> room.off("event", handler)
"""All Room events are documented in detail in Room and Connection Management.
The Room class emits events in the following categories:
Events related to room connection state:
# Connection lifecycle events
"connected" # Successfully connected to room
"disconnected" # Disconnected from room (with reason)
"reconnecting" # Attempting to reconnect
"reconnected" # Successfully reconnected
"connection_state_changed" # Connection state changed (with new state)Events related to participant join/leave:
# Participant lifecycle events
"participant_connected" # Remote participant joined
"participant_disconnected" # Remote participant leftEvents related to track publishing and subscription:
# Local track events
"local_track_published" # Local track published
"local_track_unpublished" # Local track unpublished
"local_track_subscribed" # Local track subscribed by someone
# Remote track events
"track_published" # Remote participant published track
"track_unpublished" # Remote participant unpublished track
"track_subscribed" # Successfully subscribed to remote track
"track_unsubscribed" # Unsubscribed from remote track
"track_subscription_failed" # Track subscription failed (with error)
# Track state events
"track_muted" # Track muted
"track_unmuted" # Track unmutedEvents related to metadata changes:
# Metadata change events
"room_metadata_changed" # Room metadata changed
"participant_metadata_changed" # Participant metadata changed
"participant_name_changed" # Participant name changed
"participant_attributes_changed" # Participant attributes changedEvents related to data communication:
# Data communication events
"data_received" # Data packet received
"sip_dtmf_received" # SIP DTMF signal received
"transcription_received" # Transcription data receivedEvents related to connection and audio quality:
# Quality monitoring events
"active_speakers_changed" # Active speakers list changed
"connection_quality_changed" # Participant connection quality changedEvents related to end-to-end encryption:
# E2EE events
"e2ee_state_changed" # Encryption state changed
"participant_encryption_status_changed" # Participant encryption status changedMiscellaneous events:
# Miscellaneous events
"room_updated" # Room properties updated
"moved" # Participant moved to different room
"token_refreshed" # Access token refreshedfrom livekit import Room, RemoteParticipant
room = Room()
@room.on("participant_connected")
def on_participant(participant: RemoteParticipant):
"""Handle participant connection.
Args:
participant: The participant who joined
"""
print(f"Participant joined: {participant.identity}")
print(f"Name: {participant.name}")
print(f"Metadata: {participant.metadata}")@room.once("connected")
def on_connected():
"""Called only once when first connected.
Automatically unregistered after first call.
"""
print("Connected!") # Only printed oncedef my_handler(participant):
"""Example handler function."""
print(f"Participant: {participant.identity}")
# Register
room.on("participant_connected", my_handler)
# Later, unregister
room.off("participant_connected", my_handler)# Multiple handlers for same event
@room.on("participant_connected")
def handler1(participant):
print(f"Handler 1: {participant.identity}")
@room.on("participant_connected")
def handler2(participant):
print(f"Handler 2: {participant.identity}")
# Both handlers will be called in registration order
# Output:
# Handler 1: user123
# Handler 2: user123import asyncio
from livekit import Room, Track, AudioStream
room = Room()
@room.on("track_subscribed")
def on_track(track, publication, participant):
"""Handle track subscription.
Event handlers MUST be synchronous.
For async operations, use asyncio.create_task().
"""
print(f"Track: {track.name}")
# CORRECT: Use create_task for async operations
asyncio.create_task(process_track(track))
async def process_track(track: Track):
"""Async function to process track.
Called from event handler via create_task().
"""
stream = AudioStream(track)
try:
async for event in stream:
# Process frames
print(f"Frame: {event.frame.samples_per_channel}")
finally:
await stream.aclose()@room.on("participant_connected")
def on_participant(participant):
"""Handler with error handling."""
try:
# Handler logic
print(f"Participant: {participant.identity}")
# Some operation that might fail
if not participant.metadata:
raise ValueError("Missing metadata")
except Exception as e:
# Exceptions are caught and logged by SDK
# But good practice to handle explicitly
print(f"Error in handler: {e}")class EventHandler:
"""Class-based event handler pattern."""
def __init__(self, room: Room):
self.room = room
self._register_handlers()
def _register_handlers(self):
"""Register all event handlers."""
self.room.on("participant_connected", self.on_participant_connected)
self.room.on("participant_disconnected", self.on_participant_disconnected)
self.room.on("track_subscribed", self.on_track_subscribed)
def on_participant_connected(self, participant):
"""Handle participant connection."""
print(f"Joined: {participant.identity}")
# Can access self and other methods
self.log_event("join", participant.identity)
def on_participant_disconnected(self, participant):
"""Handle participant disconnection."""
print(f"Left: {participant.identity}")
self.log_event("leave", participant.identity)
def on_track_subscribed(self, track, publication, participant):
"""Handle track subscription."""
print(f"Track from {participant.identity}: {track.name}")
def log_event(self, event_type: str, identity: str):
"""Helper method for logging."""
print(f"Event: {event_type}, Participant: {identity}")
def unregister_handlers(self):
"""Cleanup handlers when done."""
self.room.off("participant_connected", self.on_participant_connected)
self.room.off("participant_disconnected", self.on_participant_disconnected)
self.room.off("track_subscribed", self.on_track_subscribed)import asyncio
from livekit import Room, RemoteParticipant, Track, AudioStream, TrackKind
async def main():
room = Room()
# Connection events
@room.on("connected")
def on_connected():
"""Handle successful connection."""
print("Connected to room")
print(f"Room name: {room.name}")
@room.on("disconnected")
def on_disconnected(reason):
"""Handle disconnection."""
print(f"Disconnected: {reason}")
# Participant events
@room.on("participant_connected")
def on_participant_joined(participant: RemoteParticipant):
"""Handle participant joining."""
print(f"Joined: {participant.identity}")
print(f" Name: {participant.name}")
print(f" Kind: {participant.kind}")
@room.on("participant_disconnected")
def on_participant_left(participant: RemoteParticipant):
"""Handle participant leaving."""
print(f"Left: {participant.identity}")
# Track events
@room.on("track_subscribed")
def on_track(track: Track, publication, participant: RemoteParticipant):
"""Handle track subscription."""
print(f"Track from {participant.identity}: {track.name}")
# Process based on track type
if track.kind == TrackKind.KIND_AUDIO:
asyncio.create_task(process_audio(track))
elif track.kind == TrackKind.KIND_VIDEO:
asyncio.create_task(process_video(track))
# Data events
@room.on("data_received")
def on_data(packet):
"""Handle data packet."""
sender = packet.participant.identity if packet.participant else "server"
try:
message = packet.data.decode('utf-8')
print(f"Data from {sender}: {message}")
except UnicodeDecodeError:
print(f"Binary data from {sender}: {len(packet.data)} bytes")
# Quality events
@room.on("active_speakers_changed")
def on_speakers(speakers):
"""Handle active speakers change."""
if speakers:
identities = [s.identity for s in speakers]
print(f"Active speakers: {', '.join(identities)}")
else:
print("No active speakers")
# Connect
await room.connect(url, token)
# Keep running
await asyncio.sleep(30)
# Disconnect
await room.disconnect()
async def process_audio(track: Track):
"""Process audio track asynchronously."""
stream = AudioStream(track)
try:
async for event in stream:
# Process audio frame
frame = event.frame
print(f"Audio: {frame.samples_per_channel} samples")
finally:
await stream.aclose()
async def process_video(track: Track):
"""Process video track asynchronously."""
from livekit import VideoStream
stream = VideoStream(track)
try:
async for event in stream:
# Process video frame
frame = event.frame
print(f"Video: {frame.width}x{frame.height}")
finally:
await stream.aclose()
asyncio.run(main())# DO: Synchronous handler with async task
@room.on("track_subscribed")
def on_track(track, publication, participant):
asyncio.create_task(process_track_async(track))
async def process_track_async(track):
# Async operations here
pass
# DON'T: Async event handler
# @room.on("track_subscribed")
# async def on_track(track, publication, participant):
# await process_track(track) # Will raise error@room.on("participant_connected")
def on_participant(participant):
try:
# Handler logic
process_participant(participant)
except Exception as e:
# Log error, don't let it crash
print(f"Error processing participant: {e}")# DO: Use once for initialization
@room.once("connected")
def on_connected():
print("Initial connection setup")
# Runs only on first connection
setup_tracks()
# DON'T: Use on for one-time setup
# @room.on("connected")
# def on_connected():
# setup_tracks() # Runs on every reconnection tooclass TemporaryHandler:
def __init__(self, room):
self.room = room
self.handler = lambda p: print(f"Temp: {p.identity}")
room.on("participant_connected", self.handler)
def cleanup(self):
# Unregister when done
self.room.off("participant_connected", self.handler)@room.on("participant_connected")
def on_participant(participant):
# DO: Quick, non-blocking operation
print(f"Joined: {participant.identity}")
# DON'T: Block event loop
# time.sleep(5) # Blocks all event processing
# DO: Offload heavy work to task
asyncio.create_task(heavy_processing(participant))
async def heavy_processing(participant):
# Heavy operations here
await asyncio.sleep(5)
# Process...from enum import Enum
class RoomState(Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
RECONNECTING = "reconnecting"
class StatefulRoom:
def __init__(self):
self.room = Room()
self.state = RoomState.DISCONNECTED
self._setup_handlers()
def _setup_handlers(self):
@self.room.on("connected")
def on_connected():
self.state = RoomState.CONNECTED
print("State: CONNECTED")
@self.room.on("reconnecting")
def on_reconnecting():
self.state = RoomState.RECONNECTING
print("State: RECONNECTING")
@self.room.on("disconnected")
def on_disconnected(reason):
self.state = RoomState.DISCONNECTED
print("State: DISCONNECTED")
async def connect(self, url, token):
self.state = RoomState.CONNECTING
await self.room.connect(url, token)from collections import defaultdict
from datetime import datetime
class EventAggregator:
def __init__(self, room):
self.room = room
self.events = defaultdict(list)
self._register_handlers()
def _register_handlers(self):
"""Register handlers to collect all events."""
events_to_track = [
"participant_connected",
"participant_disconnected",
"track_subscribed",
"track_unsubscribed"
]
for event in events_to_track:
# Create handler that captures event name
handler = self._create_handler(event)
self.room.on(event, handler)
def _create_handler(self, event_name):
"""Create handler that logs event."""
def handler(*args):
self.events[event_name].append({
"timestamp": datetime.now(),
"args": args
})
return handler
def get_event_count(self, event_name):
"""Get count of specific event."""
return len(self.events[event_name])
def get_all_events(self):
"""Get all collected events."""
return dict(self.events)class ConditionalHandler:
def __init__(self, room):
self.room = room
self.enabled = True
@room.on("participant_connected")
def on_participant(participant):
if self.enabled:
self.handle_participant(participant)
def handle_participant(self, participant):
"""Conditionally handle participant."""
print(f"Handling: {participant.identity}")
def enable(self):
"""Enable event handling."""
self.enabled = True
def disable(self):
"""Disable event handling."""
self.enabled = False