or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

events.mddocs/

Event System

Overview

The LiveKit SDK uses an event-driven architecture based on the EventEmitter class. The Room class inherits from EventEmitter and provides comprehensive event handling for all room, participant, and track events.

Key concepts:

  • EventEmitter: Base class providing event registration and management
  • Event handlers: Synchronous functions (not async) called when events occur
  • Event types: String-based event names (e.g., "participant_connected")
  • Handler registration: Three methods: on(), once(), off()
  • Multiple handlers: Multiple handlers can be registered for the same event
  • Execution order: Handlers are called in registration order

Import

from livekit import EventEmitter

EventEmitter

class EventEmitter[T_contra]:
    """Event emitter for managing event listeners.
    
    Generic type T_contra represents the event types.
    For Room, this is the union of all event type strings.
    
    Thread-safe when used with asyncio event loop.
    """
    
    def __init__(self) -> None:
        """Initialize an EventEmitter.
        
        Creates empty handler registry.
        No events are registered initially.
        """
    
    def emit(self, event: T_contra, *args) -> None:
        """Trigger all callbacks for an event.
        
        Args:
            event: Event to emit (string)
            *args: Arguments to pass to callbacks
                  Number and types depend on event
        
        Note:
            Calls all registered handlers in order.
            Handler exceptions are caught and logged.
            One handler exception doesn't stop others.
            
        Example (internal use):
            >>> # SDK calls emit internally
            >>> # emitter.emit("participant_connected", participant)
        """
    
    def on(
        self,
        event: T_contra,
        callback: Optional[Callable] = None
    ) -> Callable:
        """Register a callback for an event (can be used as decorator).
        
        Args:
            event: Event to listen for (string)
                  Must be valid event type for emitter
            callback: Callback function (optional for decorator)
                     Must be synchronous function
                     Signature depends on event type
        
        Returns:
            Callback or decorator function
        
        Raises:
            ValueError: If callback is async function
                       Use synchronous functions only
                       For async operations, use asyncio.create_task()
        
        Example (decorator):
            >>> @room.on("participant_connected")
            ... def on_participant(participant):
            ...     print(f"Joined: {participant.identity}")
            ...     # For async operations:
            ...     asyncio.create_task(async_handler(participant))
        
        Example (direct call):
            >>> def handler(participant):
            ...     print(f"Joined: {participant.identity}")
            >>> room.on("participant_connected", handler)
        
        Note:
            Handlers MUST be synchronous functions.
            Multiple handlers can be registered for same event.
            Handlers are called in registration order.
            Handler is not automatically unregistered.
        """
    
    def once(
        self,
        event: T_contra,
        callback: Optional[Callable] = None
    ) -> Callable:
        """Register a callback to be called only once (can be used as decorator).
        
        Args:
            event: Event to listen for (string)
            callback: Callback function (optional for decorator)
        
        Returns:
            Callback or decorator function
        
        Raises:
            ValueError: If callback is async function
        
        Example:
            >>> @room.once("connected")
            ... def on_connected():
            ...     print("Connected!") # Only called once
        
        Note:
            Handler is automatically unregistered after first call.
            Useful for one-time setup or initialization.
        """
    
    def off(self, event: T_contra, callback: Callable) -> None:
        """Unregister a callback from an event.
        
        Args:
            event: Event to stop listening for (string)
            callback: The exact callback function to remove
                     Must be same reference as registered
        
        Returns:
            None
        
        Raises:
            ValueError: If callback not registered for event
                       (silently ignored in some implementations)
        
        Example:
            >>> def handler(participant):
            ...     print("Handler")
            >>> 
            >>> # Register
            >>> room.on("participant_connected", handler)
            >>> 
            >>> # Later, unregister
            >>> room.off("participant_connected", handler)
        
        Note:
            Must pass exact same function reference.
            Lambda functions cannot be unregistered:
            
            >>> # BAD: Cannot unregister
            >>> room.on("event", lambda x: print(x))
            >>> 
            >>> # GOOD: Can unregister
            >>> handler = lambda x: print(x)
            >>> room.on("event", handler)
            >>> room.off("event", handler)
        """

Room Events

All Room events are documented in detail in Room and Connection Management.

The Room class emits events in the following categories:

Connection Events

Events related to room connection state:

# Connection lifecycle events
"connected"                    # Successfully connected to room
"disconnected"                 # Disconnected from room (with reason)
"reconnecting"                 # Attempting to reconnect
"reconnected"                  # Successfully reconnected
"connection_state_changed"     # Connection state changed (with new state)

Participant Events

Events related to participant join/leave:

# Participant lifecycle events
"participant_connected"        # Remote participant joined
"participant_disconnected"     # Remote participant left

Track Events

Events related to track publishing and subscription:

# Local track events
"local_track_published"        # Local track published
"local_track_unpublished"      # Local track unpublished
"local_track_subscribed"       # Local track subscribed by someone

# Remote track events
"track_published"              # Remote participant published track
"track_unpublished"            # Remote participant unpublished track
"track_subscribed"             # Successfully subscribed to remote track
"track_unsubscribed"           # Unsubscribed from remote track
"track_subscription_failed"    # Track subscription failed (with error)

# Track state events
"track_muted"                  # Track muted
"track_unmuted"                # Track unmuted

Metadata Events

Events related to metadata changes:

# Metadata change events
"room_metadata_changed"              # Room metadata changed
"participant_metadata_changed"       # Participant metadata changed
"participant_name_changed"           # Participant name changed
"participant_attributes_changed"     # Participant attributes changed

Data Events

Events related to data communication:

# Data communication events
"data_received"                # Data packet received
"sip_dtmf_received"           # SIP DTMF signal received
"transcription_received"      # Transcription data received

Quality Events

Events related to connection and audio quality:

# Quality monitoring events
"active_speakers_changed"          # Active speakers list changed
"connection_quality_changed"       # Participant connection quality changed

Encryption Events

Events related to end-to-end encryption:

# E2EE events
"e2ee_state_changed"                    # Encryption state changed
"participant_encryption_status_changed" # Participant encryption status changed

Other Events

Miscellaneous events:

# Miscellaneous events
"room_updated"        # Room properties updated
"moved"               # Participant moved to different room
"token_refreshed"     # Access token refreshed

Usage Examples

Register Event Handler

from livekit import Room, RemoteParticipant

room = Room()

@room.on("participant_connected")
def on_participant(participant: RemoteParticipant):
    """Handle participant connection.
    
    Args:
        participant: The participant who joined
    """
    print(f"Participant joined: {participant.identity}")
    print(f"Name: {participant.name}")
    print(f"Metadata: {participant.metadata}")

One-Time Handler

@room.once("connected")
def on_connected():
    """Called only once when first connected.
    
    Automatically unregistered after first call.
    """
    print("Connected!")  # Only printed once

Unregister Handler

def my_handler(participant):
    """Example handler function."""
    print(f"Participant: {participant.identity}")

# Register
room.on("participant_connected", my_handler)

# Later, unregister
room.off("participant_connected", my_handler)

Multiple Handlers

# Multiple handlers for same event
@room.on("participant_connected")
def handler1(participant):
    print(f"Handler 1: {participant.identity}")

@room.on("participant_connected")
def handler2(participant):
    print(f"Handler 2: {participant.identity}")

# Both handlers will be called in registration order
# Output:
# Handler 1: user123
# Handler 2: user123

Async Operations in Handlers

import asyncio
from livekit import Room, Track, AudioStream

room = Room()

@room.on("track_subscribed")
def on_track(track, publication, participant):
    """Handle track subscription.
    
    Event handlers MUST be synchronous.
    For async operations, use asyncio.create_task().
    """
    print(f"Track: {track.name}")
    
    # CORRECT: Use create_task for async operations
    asyncio.create_task(process_track(track))

async def process_track(track: Track):
    """Async function to process track.
    
    Called from event handler via create_task().
    """
    stream = AudioStream(track)
    try:
        async for event in stream:
            # Process frames
            print(f"Frame: {event.frame.samples_per_channel}")
    finally:
        await stream.aclose()

Error Handling in Handlers

@room.on("participant_connected")
def on_participant(participant):
    """Handler with error handling."""
    try:
        # Handler logic
        print(f"Participant: {participant.identity}")
        
        # Some operation that might fail
        if not participant.metadata:
            raise ValueError("Missing metadata")
            
    except Exception as e:
        # Exceptions are caught and logged by SDK
        # But good practice to handle explicitly
        print(f"Error in handler: {e}")

Handler Patterns

class EventHandler:
    """Class-based event handler pattern."""
    
    def __init__(self, room: Room):
        self.room = room
        self._register_handlers()
    
    def _register_handlers(self):
        """Register all event handlers."""
        self.room.on("participant_connected", self.on_participant_connected)
        self.room.on("participant_disconnected", self.on_participant_disconnected)
        self.room.on("track_subscribed", self.on_track_subscribed)
    
    def on_participant_connected(self, participant):
        """Handle participant connection."""
        print(f"Joined: {participant.identity}")
        # Can access self and other methods
        self.log_event("join", participant.identity)
    
    def on_participant_disconnected(self, participant):
        """Handle participant disconnection."""
        print(f"Left: {participant.identity}")
        self.log_event("leave", participant.identity)
    
    def on_track_subscribed(self, track, publication, participant):
        """Handle track subscription."""
        print(f"Track from {participant.identity}: {track.name}")
    
    def log_event(self, event_type: str, identity: str):
        """Helper method for logging."""
        print(f"Event: {event_type}, Participant: {identity}")
    
    def unregister_handlers(self):
        """Cleanup handlers when done."""
        self.room.off("participant_connected", self.on_participant_connected)
        self.room.off("participant_disconnected", self.on_participant_disconnected)
        self.room.off("track_subscribed", self.on_track_subscribed)

Complete Example

import asyncio
from livekit import Room, RemoteParticipant, Track, AudioStream, TrackKind

async def main():
    room = Room()
    
    # Connection events
    @room.on("connected")
    def on_connected():
        """Handle successful connection."""
        print("Connected to room")
        print(f"Room name: {room.name}")
    
    @room.on("disconnected")
    def on_disconnected(reason):
        """Handle disconnection."""
        print(f"Disconnected: {reason}")
    
    # Participant events
    @room.on("participant_connected")
    def on_participant_joined(participant: RemoteParticipant):
        """Handle participant joining."""
        print(f"Joined: {participant.identity}")
        print(f"  Name: {participant.name}")
        print(f"  Kind: {participant.kind}")
    
    @room.on("participant_disconnected")
    def on_participant_left(participant: RemoteParticipant):
        """Handle participant leaving."""
        print(f"Left: {participant.identity}")
    
    # Track events
    @room.on("track_subscribed")
    def on_track(track: Track, publication, participant: RemoteParticipant):
        """Handle track subscription."""
        print(f"Track from {participant.identity}: {track.name}")
        
        # Process based on track type
        if track.kind == TrackKind.KIND_AUDIO:
            asyncio.create_task(process_audio(track))
        elif track.kind == TrackKind.KIND_VIDEO:
            asyncio.create_task(process_video(track))
    
    # Data events
    @room.on("data_received")
    def on_data(packet):
        """Handle data packet."""
        sender = packet.participant.identity if packet.participant else "server"
        try:
            message = packet.data.decode('utf-8')
            print(f"Data from {sender}: {message}")
        except UnicodeDecodeError:
            print(f"Binary data from {sender}: {len(packet.data)} bytes")
    
    # Quality events
    @room.on("active_speakers_changed")
    def on_speakers(speakers):
        """Handle active speakers change."""
        if speakers:
            identities = [s.identity for s in speakers]
            print(f"Active speakers: {', '.join(identities)}")
        else:
            print("No active speakers")
    
    # Connect
    await room.connect(url, token)
    
    # Keep running
    await asyncio.sleep(30)
    
    # Disconnect
    await room.disconnect()

async def process_audio(track: Track):
    """Process audio track asynchronously."""
    stream = AudioStream(track)
    try:
        async for event in stream:
            # Process audio frame
            frame = event.frame
            print(f"Audio: {frame.samples_per_channel} samples")
    finally:
        await stream.aclose()

async def process_video(track: Track):
    """Process video track asynchronously."""
    from livekit import VideoStream
    stream = VideoStream(track)
    try:
        async for event in stream:
            # Process video frame
            frame = event.frame
            print(f"Video: {frame.width}x{frame.height}")
    finally:
        await stream.aclose()

asyncio.run(main())

Best Practices

1. Use Synchronous Event Handlers

# DO: Synchronous handler with async task
@room.on("track_subscribed")
def on_track(track, publication, participant):
    asyncio.create_task(process_track_async(track))

async def process_track_async(track):
    # Async operations here
    pass

# DON'T: Async event handler
# @room.on("track_subscribed")
# async def on_track(track, publication, participant):
#     await process_track(track)  # Will raise error

2. Handle Exceptions Gracefully

@room.on("participant_connected")
def on_participant(participant):
    try:
        # Handler logic
        process_participant(participant)
    except Exception as e:
        # Log error, don't let it crash
        print(f"Error processing participant: {e}")

3. Use Once for One-Time Setup

# DO: Use once for initialization
@room.once("connected")
def on_connected():
    print("Initial connection setup")
    # Runs only on first connection
    setup_tracks()

# DON'T: Use on for one-time setup
# @room.on("connected")
# def on_connected():
#     setup_tracks()  # Runs on every reconnection too

4. Unregister Handlers When Done

class TemporaryHandler:
    def __init__(self, room):
        self.room = room
        self.handler = lambda p: print(f"Temp: {p.identity}")
        room.on("participant_connected", self.handler)
    
    def cleanup(self):
        # Unregister when done
        self.room.off("participant_connected", self.handler)

5. Don't Block Event Loop

@room.on("participant_connected")
def on_participant(participant):
    # DO: Quick, non-blocking operation
    print(f"Joined: {participant.identity}")
    
    # DON'T: Block event loop
    # time.sleep(5)  # Blocks all event processing
    
    # DO: Offload heavy work to task
    asyncio.create_task(heavy_processing(participant))

async def heavy_processing(participant):
    # Heavy operations here
    await asyncio.sleep(5)
    # Process...

Common Patterns

Pattern 1: State Machine

from enum import Enum

class RoomState(Enum):
    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    RECONNECTING = "reconnecting"

class StatefulRoom:
    def __init__(self):
        self.room = Room()
        self.state = RoomState.DISCONNECTED
        self._setup_handlers()
    
    def _setup_handlers(self):
        @self.room.on("connected")
        def on_connected():
            self.state = RoomState.CONNECTED
            print("State: CONNECTED")
        
        @self.room.on("reconnecting")
        def on_reconnecting():
            self.state = RoomState.RECONNECTING
            print("State: RECONNECTING")
        
        @self.room.on("disconnected")
        def on_disconnected(reason):
            self.state = RoomState.DISCONNECTED
            print("State: DISCONNECTED")
    
    async def connect(self, url, token):
        self.state = RoomState.CONNECTING
        await self.room.connect(url, token)

Pattern 2: Event Aggregation

from collections import defaultdict
from datetime import datetime

class EventAggregator:
    def __init__(self, room):
        self.room = room
        self.events = defaultdict(list)
        self._register_handlers()
    
    def _register_handlers(self):
        """Register handlers to collect all events."""
        events_to_track = [
            "participant_connected",
            "participant_disconnected",
            "track_subscribed",
            "track_unsubscribed"
        ]
        
        for event in events_to_track:
            # Create handler that captures event name
            handler = self._create_handler(event)
            self.room.on(event, handler)
    
    def _create_handler(self, event_name):
        """Create handler that logs event."""
        def handler(*args):
            self.events[event_name].append({
                "timestamp": datetime.now(),
                "args": args
            })
        return handler
    
    def get_event_count(self, event_name):
        """Get count of specific event."""
        return len(self.events[event_name])
    
    def get_all_events(self):
        """Get all collected events."""
        return dict(self.events)

Pattern 3: Conditional Event Handling

class ConditionalHandler:
    def __init__(self, room):
        self.room = room
        self.enabled = True
        
        @room.on("participant_connected")
        def on_participant(participant):
            if self.enabled:
                self.handle_participant(participant)
    
    def handle_participant(self, participant):
        """Conditionally handle participant."""
        print(f"Handling: {participant.identity}")
    
    def enable(self):
        """Enable event handling."""
        self.enabled = True
    
    def disable(self):
        """Disable event handling."""
        self.enabled = False

See Also