or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

index.mddocs/

LiveKit RTC Python SDK

Package Information

  • Package Name: livekit
  • Version: 1.0.23
  • PyPI: pkg:pypi/livekit@1.0.23
  • Installation: pip install livekit
  • License: Apache License 2.0
  • Official Documentation: https://docs.livekit.io/home/client/connect/
  • Python Version Requirement: Python 3.8 or higher
  • Key Dependencies: asyncio-based, requires async/await support

Overview

The LiveKit RTC Python SDK is a comprehensive real-time communication library that enables developers to build WebRTC-based applications with support for audio, video, data channels, end-to-end encryption, and advanced media processing capabilities. Built on top of LiveKit's infrastructure, this SDK provides a high-level, asyncio-based API for Python applications.

Important: This SDK is fully asynchronous and requires an async runtime environment. All I/O operations must be awaited, and event handlers are synchronous functions (use asyncio.create_task() for async operations within handlers).

Core Imports

# Main room and connection
from livekit import (
    Room,
    RoomOptions,
    RtcConfiguration,
    ConnectError,
    DataPacket,
    SipDTMF,
    RtcStats,
)

# Participants
from livekit import (
    Participant,
    LocalParticipant,
    RemoteParticipant,
    ParticipantKind,
    DisconnectReason,
)

# Tracks
from livekit import (
    Track,
    LocalTrack,
    RemoteTrack,
    LocalAudioTrack,
    LocalVideoTrack,
    RemoteAudioTrack,
    RemoteVideoTrack,
    AudioTrack,
    VideoTrack,
)

# Track Publications
from livekit import (
    TrackPublication,
    LocalTrackPublication,
    RemoteTrackPublication,
    TrackPublishOptions,
)

# Audio components
from livekit import (
    AudioFrame,
    AudioSource,
    AudioStream,
    AudioFrameEvent,
    AudioMixer,
    AudioResampler,
    AudioResamplerQuality,
    AudioProcessingModule,
    NoiseCancellationOptions,
    AudioFilter,
)

# Video components
from livekit import (
    VideoFrame,
    VideoSource,
    VideoStream,
    VideoFrameEvent,
)

# End-to-end encryption
from livekit import (
    E2EEManager,
    E2EEOptions,
    KeyProvider,
    KeyProviderOptions,
    FrameCryptor,
)

# Data streaming
from livekit import (
    DataPacket,
    DataPacketKind,
    TextStreamInfo,
    ByteStreamInfo,
    TextStreamReader,
    TextStreamWriter,
    ByteStreamReader,
    ByteStreamWriter,
)

# RPC
from livekit import RpcError, RpcInvocationData

# Transcription
from livekit import Transcription, TranscriptionSegment

# Types and enums
from livekit import (
    ConnectionState,
    ConnectionQuality,
    TrackKind,
    TrackSource,
    StreamState,
    VideoBufferType,
    VideoRotation,
    VideoCodec,
    VideoEncoding,
    EncryptionType,
    EncryptionState,
    IceTransportType,
    ContinualGatheringPolicy,
    IceServer,
    ParticipantTrackPermission,
)

# Utilities
from livekit import (
    EventEmitter,
    AVSynchronizer,
    FrameProcessor,
    MediaDevices,
    combine_audio_frames,
    stats,
)
from livekit.rtc.utils import sine_wave_generator

# Version
from livekit import __version__

Basic Usage Example

import asyncio
from livekit import rtc

async def main():
    # Create a room instance
    room = rtc.Room()

    # Set up event handlers
    @room.on("participant_connected")
    def on_participant_connected(participant: rtc.RemoteParticipant):
        print(f"Participant connected: {participant.identity}")

    @room.on("track_subscribed")
    def on_track_subscribed(
        track: rtc.Track,
        publication: rtc.RemoteTrackPublication,
        participant: rtc.RemoteParticipant
    ):
        print(f"Track subscribed: {track.sid}")

        if track.kind == rtc.TrackKind.KIND_AUDIO:
            # Create audio stream to receive frames
            audio_stream = rtc.AudioStream(track)
            asyncio.create_task(process_audio(audio_stream))

    # Connect to the room
    # Note: Must use wss:// protocol for secure WebSocket connection
    # Token must be generated server-side using LiveKit server SDK
    await room.connect(
        url="wss://your-livekit-server.com",
        token="your-access-token"
    )

    print(f"Connected to room: {room.name}")

    # Publish a local audio track
    source = rtc.AudioSource(sample_rate=48000, num_channels=1)
    track = rtc.LocalAudioTrack.create_audio_track("microphone", source)
    options = rtc.TrackPublishOptions()
    await room.local_participant.publish_track(track, options)

    # Keep the connection alive
    await asyncio.sleep(60)

    # Disconnect
    await room.disconnect()

async def process_audio(stream: rtc.AudioStream):
    async for event in stream:
        frame: rtc.AudioFrame = event.frame
        # Process audio frame
        print(f"Received audio frame: {frame.samples_per_channel} samples")

if __name__ == "__main__":
    asyncio.run(main())

Architecture Overview

The LiveKit RTC SDK is organized around several key architectural components:

1. Room Connection Management

The Room class is the central entry point that manages WebRTC connections, signaling, and participant lifecycle. It inherits from EventEmitter to provide comprehensive event handling.

  • Connection Lifecycle: CONN_DISCONNECTED → CONN_CONNECTED → CONN_RECONNECTING (on failure) → CONN_CONNECTED (on recovery)
  • Auto-reconnection: Built-in automatic reconnection on network failures (configurable)
  • Event-driven: All state changes and data delivery happen through events
  • Thread-safe: All operations run on the provided asyncio event loop

2. Participants

  • LocalParticipant: Represents the local user with capabilities to publish tracks, send data, and perform RPC calls
    • Can update own metadata, name, and attributes (requires permissions)
    • Can control which remote participants can subscribe to tracks
    • Has RPC method registration for receiving calls
  • RemoteParticipant: Represents other users in the room with track subscriptions
    • Read-only properties (identity, metadata, tracks)
    • Can control subscription to their published tracks

3. Tracks and Publications

  • Tracks: Media streams (audio/video) with Local and Remote variants
    • LocalAudioTrack/LocalVideoTrack: Created from AudioSource/VideoSource for publishing
    • RemoteAudioTrack/RemoteVideoTrack: Received from remote participants for consumption
    • Tracks have mute/unmute capabilities and statistics access
  • Publications: Metadata about published tracks with subscription management
    • Contains encoding parameters, dimensions, MIME types
    • Local publications can wait for subscription confirmation
    • Remote publications can be subscribed/unsubscribed

4. Media Processing Pipeline

  • Sources: AudioSource and VideoSource for capturing/generating media
    • Sources have internal queues for buffering
    • AudioSource supports playout waiting and queue clearing
    • VideoSource captures frames with timestamps and rotation
  • Frames: AudioFrame and VideoFrame for raw media data
    • AudioFrame: int16 PCM samples, interleaved by channel
    • VideoFrame: Various pixel formats (RGBA, I420, etc.)
    • Both support format conversion
  • Streams: AudioStream and VideoStream for consuming media asynchronously
    • Async iterators yielding frame events
    • Configurable capacity for backpressure management
    • Can specify desired output format
  • Processors: Audio processing (APM, resampler, mixer, filters)
    • AudioProcessingModule: Echo cancellation, noise suppression, AGC
    • AudioResampler: Sample rate conversion
    • AudioMixer: Mix multiple audio streams
    • Custom FrameProcessor: Implement custom processing logic

5. Data Communication

  • Data Packets: Simple byte/string messages with reliable/lossy delivery
    • KIND_RELIABLE: TCP-like guaranteed delivery, ordered
    • KIND_LOSSY: UDP-like best-effort delivery, lower latency
    • Optional topic for categorization
    • Can target specific participants or broadcast to all
  • Data Streams: Chunked streaming for large files or continuous data
    • TextStreamWriter/Reader for text data
    • ByteStreamWriter/Reader for binary data (files, images, etc.)
    • Support for progress tracking, attributes, reply chains
  • RPC: Request-response pattern for method invocation between participants
    • Type-safe with JSON payloads (recommended)
    • Timeout support
    • Error codes for different failure scenarios
    • Method handlers can be sync or async

6. Security

  • E2EE: End-to-end encryption with key management and frame cryptors
    • Shared key or per-participant keys
    • Key ratcheting for forward secrecy
    • GCM or custom encryption types
    • Encryption state monitoring

Capabilities

Room and Connection Management

Core functionality for connecting to LiveKit rooms, managing connection state, and handling room-level events.

from livekit import Room, RoomOptions, RtcConfiguration

# Create room with options
room = Room()
options = RoomOptions(
    auto_subscribe=True,  # Automatically subscribe to all published tracks
    dynacast=True,        # Enable dynamic casting for optimal quality
)

# Connect to room
# url: Must start with wss:// (secure WebSocket)
# token: JWT token generated server-side with room name, participant identity, and permissions
# options: Optional configuration for room behavior
await room.connect(url="wss://...", token="...", options=options)

# Check connection state
if room.isconnected():
    print(f"Connected to {room.name}")
    print(f"Session ID: {await room.sid}")  # Note: sid is an async property
    print(f"Participants: {room.num_participants}")

# Get RTC statistics
# Returns publisher and subscriber statistics for all tracks
stats = await room.get_rtc_stats()

See: Room and Connection Management for complete details.

Participants

Managing local and remote participants, including metadata, attributes, and participant-level operations.

from livekit import LocalParticipant, RemoteParticipant

# Access local participant
local = room.local_participant
print(f"Local identity: {local.identity}")

# Update metadata (requires canUpdateOwnMetadata permission)
# metadata: Typically JSON string for structured data
await local.set_metadata('{"status": "available"}')
await local.set_name("New Name")
await local.set_attributes({"role": "moderator"})

# Access remote participants
# remote_participants is a read-only mapping of identity -> RemoteParticipant
for identity, participant in room.remote_participants.items():
    print(f"{participant.name} ({participant.kind})")
    # ParticipantKind: STANDARD, INGRESS, EGRESS, SIP, AGENT

See: Participants for complete details.

Audio Tracks

Creating, publishing, and managing audio tracks with muting capabilities.

from livekit import AudioSource, LocalAudioTrack, TrackPublishOptions

# Create audio source and track
# sample_rate: Typically 48000 (recommended), 16000, or 8000
# num_channels: 1 (mono) or 2 (stereo)
# queue_size_ms: Internal buffer size, default 1000ms
source = AudioSource(sample_rate=48000, num_channels=2)
track = LocalAudioTrack.create_audio_track("my-audio", source)

# Publish track
options = TrackPublishOptions()
publication = await room.local_participant.publish_track(track, options)

# Mute/unmute (synchronous operations)
track.mute()   # Sends silence, maintains connection
track.unmute() # Resumes sending audio

# Unpublish
# track_sid: String identifier from publication.sid or track.sid
await room.local_participant.unpublish_track(track.sid)

See: Audio Tracks for complete details.

Video Tracks

Creating, publishing, and managing video tracks with muting capabilities.

from livekit import VideoSource, LocalVideoTrack, VideoFrame, VideoBufferType

# Create video source
# width, height: Video resolution in pixels
source = VideoSource(width=1920, height=1080)

# Capture frames
# VideoBufferType: RGBA, I420, NV12, etc.
# data: Raw pixel data matching the specified format and dimensions
frame = VideoFrame(
    width=1920,
    height=1080,
    type=VideoBufferType.RGBA,
    data=frame_data  # bytearray, bytes, or memoryview
)
source.capture_frame(frame)

# Create and publish track
track = LocalVideoTrack.create_video_track("camera", source)
publication = await room.local_participant.publish_track(track)

See: Video Tracks for complete details.

Track Publications

Managing track publications, subscriptions, and waiting for subscriber confirmation.

from livekit import RemoteTrackPublication

# Subscribe to remote track
for pub in participant.track_publications.values():
    if isinstance(pub, RemoteTrackPublication):
        pub.set_subscribed(True)

        # Access the track once subscribed
        # Note: track may be None if subscription hasn't completed yet
        if pub.track:
            print(f"Track: {pub.track.name}")

# Wait for local track subscription
# Blocks until at least one remote participant has subscribed
await local_publication.wait_for_subscription()

See: Track Publications for complete details.

Audio Processing

WebRTC audio processing module with echo cancellation, noise suppression, and gain control.

from livekit import AudioProcessingModule

# Create APM with features
apm = AudioProcessingModule(
    echo_cancellation=True,    # Acoustic echo cancellation (AEC)
    noise_suppression=True,    # Noise suppression (NS)
    high_pass_filter=True,     # High-pass filter (HPF) to remove low-frequency noise
    auto_gain_control=True,    # Automatic gain control (AGC)
)

# Process audio frames (MUST be exactly 10ms duration)
# For 48kHz: 480 samples per channel
# For 16kHz: 160 samples per channel
# Processes in-place, modifying the frame
apm.process_stream(audio_frame)

# For echo cancellation, process reverse stream (speaker output)
apm.process_reverse_stream(playback_frame)

# Set stream delay for better echo cancellation
# delay_ms: Time difference between capture and playout in milliseconds
apm.set_stream_delay_ms(50)

See: Audio Processing for complete details.

Video Processing

Video frame format conversion and manipulation.

from livekit import VideoFrame, VideoBufferType

# Convert video format
# Common conversions: RGBA <-> I420, NV12 <-> I420
frame_i420 = frame_rgba.convert(VideoBufferType.I420)

# Flip vertically (useful for camera APIs that invert)
frame_flipped = frame_rgba.convert(VideoBufferType.RGBA, flip_y=True)

# Access plane data for YUV formats
# I420 has 3 planes: Y (luminance), U (chrominance), V (chrominance)
y_plane = frame_i420.get_plane(0)
u_plane = frame_i420.get_plane(1)
v_plane = frame_i420.get_plane(2)

See: Video Processing for complete details.

Audio Frames and Sources

Low-level audio frame handling and source management for audio capture and playback.

from livekit import AudioFrame, AudioSource

# Create audio frame
# sample_rate: Samples per second (e.g., 48000, 16000, 8000)
# num_channels: 1 (mono) or 2 (stereo)
# samples_per_channel: Number of samples per channel in this frame
frame = AudioFrame.create(
    sample_rate=48000,
    num_channels=2,
    samples_per_channel=480  # 10ms at 48kHz
)

# Access frame data
# data: memoryview of int16 samples, interleaved by channel
# For stereo: [L0, R0, L1, R1, L2, R2, ...]
data = frame.data
print(f"Duration: {frame.duration}s")

# Capture to source
source = AudioSource(sample_rate=48000, num_channels=2)
await source.capture_frame(frame)

# Wait for playout (blocks until all queued audio is played)
await source.wait_for_playout()

# Export to WAV
# Returns WAV file as bytes (includes header)
wav_bytes = frame.to_wav_bytes()

See: Audio Frames and Sources for complete details.

Video Frames and Sources

Low-level video frame handling and source management for video capture.

from livekit import VideoFrame, VideoSource, VideoBufferType, VideoRotation

# Create video frame
# data: Raw pixel data
#   - RGBA: width * height * 4 bytes
#   - I420: width * height * 1.5 bytes (planar)
frame = VideoFrame(
    width=640,
    height=480,
    type=VideoBufferType.RGBA,
    data=pixel_data  # bytearray, bytes, or memoryview
)

# Capture to source
source = VideoSource(width=640, height=480)
# timestamp_us: Microsecond timestamp for frame timing (0 for automatic)
# rotation: VideoRotation enum for frame orientation
source.capture_frame(frame, timestamp_us=0, rotation=VideoRotation.VIDEO_ROTATION_0)

# Access frame properties
print(f"Size: {frame.width}x{frame.height}")
print(f"Format: {frame.type}")

See: Video Frames and Sources for complete details.

End-to-End Encryption

Secure room communication with key management and participant-specific encryption.

from livekit import E2EEOptions, KeyProviderOptions, EncryptionType

# Configure E2EE
# shared_key: Must be exactly 32 bytes for GCM encryption
key_provider_options = KeyProviderOptions(
    shared_key=b"my-32-byte-encryption-key-here!!"
)
e2ee_options = E2EEOptions(
    key_provider_options=key_provider_options,
    encryption_type=EncryptionType.GCM,  # GCM (default) or CUSTOM
)

# Connect with encryption
options = RoomOptions(encryption=e2ee_options)
await room.connect(url, token, options)

# Manage keys
key_provider = room.e2ee_manager.key_provider
# Set new key (all participants must use same key)
key_provider.set_shared_key(new_key, key_index=0)
# Ratchet key for forward secrecy (derives new key from current)
ratcheted_key = key_provider.ratchet_shared_key(key_index=0)

See: End-to-End Encryption for complete details.

Data Streaming

High-throughput data streaming for files, text, and binary data with chunked delivery.

from livekit import LocalParticipant

local: LocalParticipant = room.local_participant

# Stream text (chunked delivery)
# topic: Optional categorization string for filtering on receiver
writer = await local.stream_text(topic="chat")
await writer.write("Hello, ")
await writer.write("world!")
await writer.aclose()  # Must close to signal end of stream

# Send complete text (convenience method)
info = await local.send_text("Quick message", topic="chat")

# Stream bytes (for large files or progressive data)
# name: Descriptive name (e.g., filename)
# mime_type: Content type (e.g., "application/pdf", "image/jpeg")
# total_size: Optional total size in bytes for progress tracking
writer = await local.stream_bytes(
    name="document.pdf",
    mime_type="application/pdf",
    total_size=1024000
)
await writer.write(chunk1)
await writer.write(chunk2)
await writer.aclose()

# Send file (convenience method, reads and streams file)
info = await local.send_file("/path/to/file.txt")

# Register stream handlers (room-level)
async def handle_text_stream(reader: TextStreamReader, sender_identity: str):
    # Read all chunks at once
    text = await reader.read_all()
    print(f"Received: {text}")

room.register_text_stream_handler("chat", handle_text_stream)

See: Data Streaming for complete details.

RPC (Remote Procedure Call)

Request-response communication pattern between participants with typed method handlers.

from livekit import RpcError, RpcInvocationData

local: LocalParticipant = room.local_participant

# Register RPC method handler
# method_name: String identifier for the RPC method
# Handler receives RpcInvocationData with caller_identity, payload, request_id, response_timeout
@local.register_rpc_method("calculate")
async def handle_calculate(data: RpcInvocationData) -> str:
    import json
    params = json.loads(data.payload)
    result = params["a"] + params["b"]
    return json.dumps({"result": result})

# Call RPC method on remote participant
# destination_identity: Identity of target participant
# method: Method name registered by recipient
# payload: String payload (JSON recommended for structured data)
# response_timeout: Optional timeout in seconds (None for default)
result = await local.perform_rpc(
    destination_identity="remote-user",
    method="calculate",
    payload='{"a": 5, "b": 3}',
    response_timeout=5.0
)
print(f"Result: {result}")  # {"result": 8}

# Handle errors
try:
    result = await local.perform_rpc(...)
except RpcError as e:
    print(f"RPC failed: {e.code} - {e.message}")
    # Error codes: UNSUPPORTED_METHOD, RECIPIENT_NOT_FOUND, RESPONSE_TIMEOUT, etc.
    if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
        print("Participant not found")

See: RPC for complete details.

Transcription

Receiving and publishing transcription data for audio tracks.

from livekit import Transcription, TranscriptionSegment

# Receive transcriptions
@room.on("transcription_received")
def on_transcription(
    segments: list[TranscriptionSegment],
    participant: Participant,
    publication: TrackPublication
):
    for segment in segments:
        print(f"[{segment.start_time}-{segment.end_time}ms] {segment.text}")
        print(f"Language: {segment.language}, Final: {segment.final}")
        # final: False for interim results, True for final transcription

# Publish transcription
# participant_identity: Identity of participant whose audio is transcribed
# track_sid: SID of the audio track being transcribed
transcription = Transcription(
    participant_identity=local.identity,
    track_sid="track-sid",
    segments=[
        TranscriptionSegment(
            id="seg1",              # Unique segment identifier
            text="Hello world",     # Transcribed text
            start_time=0,           # Start time in milliseconds
            end_time=1000,          # End time in milliseconds
            language="en",          # Language code (BCP 47)
            final=True              # Whether this is final or interim
        )
    ]
)
await local.publish_transcription(transcription)

See: Transcription for complete details.

Event System

Comprehensive event handling for all room, participant, and track events.

from livekit import Room, EventEmitter

room = Room()

# Register event handler (can be used as decorator or direct call)
@room.on("participant_connected")
def on_participant_connected(participant: RemoteParticipant):
    print(f"Participant joined: {participant.identity}")

# One-time event handler (automatically unregisters after first call)
@room.once("connected")
def on_connected():
    print("Connected to room!")

# Unregister handler
def my_handler(participant):
    print("Handler")

room.on("participant_connected", my_handler)
room.off("participant_connected", my_handler)

# Available events
# Connection: connected, disconnected, reconnecting, reconnected, connection_state_changed
# Participants: participant_connected, participant_disconnected
# Tracks: local_track_published, local_track_unpublished, local_track_subscribed,
#         track_published, track_unpublished, track_subscribed, track_unsubscribed,
#         track_subscription_failed
# Track state: track_muted, track_unmuted
# Metadata: room_metadata_changed, participant_metadata_changed, participant_name_changed,
#           participant_attributes_changed
# Data: data_received, sip_dtmf_received, transcription_received
# Quality: connection_quality_changed, active_speakers_changed
# E2EE: e2ee_state_changed, participant_encryption_status_changed
# Other: room_updated, moved, token_refreshed

See: Event System for complete details.

Utilities

Helper classes and functions for audio/video processing, synchronization, and device management.

from livekit import (
    combine_audio_frames,
    AudioMixer,
    AudioResampler,
    AudioResamplerQuality,
    AVSynchronizer,
    MediaDevices,
)

# Combine audio frames (must have same sample rate and channel count)
combined = combine_audio_frames([frame1, frame2, frame3])

# Mix audio streams (combines multiple streams into one)
mixer = AudioMixer(sample_rate=48000, num_channels=1)
mixer.add_stream(audio_stream1)
mixer.add_stream(audio_stream2)
async for mixed_frame in mixer:
    # Process mixed audio
    pass

# Resample audio (convert sample rate)
resampler = AudioResampler(
    input_rate=44100,
    output_rate=48000,
    quality=AudioResamplerQuality.HIGH  # QUICK, LOW, MEDIUM, HIGH, VERY_HIGH
)
# Returns list of resampled frames (may be 0, 1, or multiple)
resampled_frames = resampler.push(input_frame)

# Synchronize A/V (maintains timing between audio and video)
synchronizer = AVSynchronizer(
    audio_source=audio_source,
    video_source=video_source,
    video_fps=30.0  # Target frame rate
)
await synchronizer.push(video_frame)
await synchronizer.push(audio_frame)

# Media devices (requires sounddevice package: pip install sounddevice)
devices = MediaDevices()
input_devices = devices.list_input_devices()
# Open input with built-in audio processing
capture = devices.open_input(enable_aec=True)

See: Utilities for complete details.

Types and Enums

Comprehensive type definitions and enumerations used throughout the SDK.

from livekit import (
    # Connection
    ConnectionState,
    ConnectionQuality,

    # Tracks
    TrackKind,
    TrackSource,
    StreamState,

    # Video
    VideoBufferType,
    VideoRotation,
    VideoCodec,

    # Encryption
    EncryptionType,
    EncryptionState,

    # Network
    IceTransportType,
    ContinualGatheringPolicy,
    DataPacketKind,

    # Participants
    ParticipantKind,
    DisconnectReason,
)

# Example usage
if room.connection_state == ConnectionState.CONN_CONNECTED:
    print("Connected!")

if track.kind == TrackKind.KIND_AUDIO:
    print("Audio track")

if participant.kind == ParticipantKind.AGENT:
    print("AI agent participant")

See: Types and Enums for complete details.

Advanced Features

Custom Frame Processing

Implement custom frame processors for audio or video:

from livekit import FrameProcessor, AudioFrame

class CustomAudioProcessor(FrameProcessor[AudioFrame]):
    """Custom audio processor example.
    
    Processors can modify frames in-place or create new frames.
    Must implement enabled property, _process, and _close methods.
    """
    
    def __init__(self):
        self._enabled = True

    @property
    def enabled(self) -> bool:
        """Whether processor is enabled (can be set to toggle processing)."""
        return self._enabled

    @enabled.setter
    def enabled(self, value: bool):
        self._enabled = value

    def _process(self, frame: AudioFrame) -> AudioFrame:
        """Process a single audio frame.
        
        Args:
            frame: Input audio frame
            
        Returns:
            Processed audio frame (can be same instance or new)
        """
        if not self._enabled:
            return frame
        # Custom processing logic
        # Example: Apply gain, filtering, etc.
        return frame

    def _close(self):
        """Cleanup resources when processor is closed.
        
        Called when stream is closed or processor is removed.
        Free any allocated resources here.
        """
        pass

Jupyter Notebook Integration

Display LiveKit rooms in Jupyter notebooks or Google Colab.

from livekit.rtc.jupyter import display_room, room_html

def display_room(
    url: str,
    token: str,
    *,
    width: str = "100%",
    height: str = "110px"
) -> None:
    """Display a LiveKit room in a Jupyter notebook or Google Colab.

    Args:
        url: The LiveKit room URL (wss://...)
        token: The LiveKit join token (JWT)
        width: Width of the display (default: "100%")
        height: Height of the display (default: "110px")

    Important:
        The rendered HTML will include the provided url and token in plain text.
        Avoid using sensitive tokens in public notebooks.
        
    Raises:
        ImportError: If IPython is not available
    """

def room_html(
    url: str,
    token: str,
    *,
    width: str,
    height: str
) -> HTML:
    """Generate HTML for embedding a LiveKit room.

    Args:
        url: The LiveKit room URL (wss://...)
        token: The LiveKit join token (JWT)
        width: Width of the iframe
        height: Height of the iframe

    Returns:
        IPython.core.display.HTML object

    Important:
        The returned HTML contains the provided url and token directly.
        Avoid using sensitive tokens in public notebooks.
        
    Raises:
        ImportError: If IPython is not available
    """

Example usage:

from livekit.rtc.jupyter import display_room

# Display room in notebook
display_room(
    url="wss://your-server.com",
    token="your-token",
    width="100%",
    height="600px"
)

Statistics Access

Access detailed RTC statistics:

from livekit import stats

# Get room stats
# Returns RtcStats with publisher_stats and subscriber_stats lists
rtc_stats = await room.get_rtc_stats()
for pub_stat in rtc_stats.publisher_stats:
    # Access stats_pb2 fields (protobuf messages)
    # Contains: bitrate, bytes sent, packets sent, etc.
    print(f"Publisher stats: {pub_stat}")

for sub_stat in rtc_stats.subscriber_stats:
    # Access stats_pb2 fields (protobuf messages)
    # Contains: bitrate, bytes received, packets received, jitter, packet loss, etc.
    print(f"Subscriber stats: {sub_stat}")

# Get track-specific stats
# Returns list of RTC stats for the specific track
track_stats = await track.get_stats()
for stat in track_stats:
    print(f"Track stat: {stat}")

Error Handling

The SDK provides specific exception types for different failure scenarios:

from livekit import ConnectError, RpcError

# Connection errors
try:
    await room.connect(url, token)
except ConnectError as e:
    print(f"Connection failed: {e.message}")
    # Common causes:
    # - Invalid URL format (must start with wss://)
    # - Invalid or expired token
    # - Network connectivity issues
    # - Server not reachable
    # - Permission denied (check token permissions)

# RPC errors
try:
    result = await local.perform_rpc(
        destination_identity="remote-user",
        method="some_method",
        payload="data"
    )
except RpcError as e:
    print(f"RPC error {e.code}: {e.message}")
    # Error codes (RpcError.ErrorCode enum):
    # - RECIPIENT_NOT_FOUND: Participant not in room
    # - UNSUPPORTED_METHOD: Method not registered
    # - RESPONSE_TIMEOUT: No response within timeout
    # - CONNECTION_TIMEOUT: Connection lost during call
    # - APPLICATION_ERROR: Handler raised error
    # - REQUEST_PAYLOAD_TOO_LARGE: Payload exceeds limit
    # - RESPONSE_PAYLOAD_TOO_LARGE: Response exceeds limit
    if e.code == RpcError.ErrorCode.RECIPIENT_NOT_FOUND:
        print("Participant not found")
    elif e.code == RpcError.ErrorCode.RESPONSE_TIMEOUT:
        print("Request timed out")

For method-specific errors, SDK methods may raise standard Python exceptions or return error information through return values. Consult the specific method documentation for error handling details.

Common exception types:

  • ConnectError: Connection failures
  • RpcError: RPC call failures
  • RuntimeError: General runtime errors (e.g., invalid state, invalid parameters)
  • ValueError: Invalid argument values
  • TypeError: Type mismatches

Thread Safety and Async

The SDK is built on asyncio and all async methods must be called from an async context:

import asyncio

# All async operations require event loop
async def main():
    room = Room()
    await room.connect(url, token)
    # ... async operations
    await room.disconnect()

# Run with asyncio
# asyncio.run() creates a new event loop, runs the coroutine, and closes the loop
asyncio.run(main())

# Custom event loop (advanced usage)
# Pass loop to Room if you manage your own loop
loop = asyncio.new_event_loop()
room = Room(loop=loop)
# ... use room

Important async patterns:

# Event handlers are SYNCHRONOUS functions
@room.on("participant_connected")
def on_participant_connected(participant):
    # This runs synchronously
    print(f"Connected: {participant.identity}")
    
    # For async operations in handlers, use asyncio.create_task()
    asyncio.create_task(async_operation(participant))

async def async_operation(participant):
    # This can await
    await some_async_function()

# Don't await in event handlers - this will raise ValueError
@room.on("participant_connected")
async def bad_handler(participant):  # DON'T DO THIS
    await something()  # Will raise error

Resource Cleanup

Always clean up resources when done:

# Close sources (releases internal resources)
await audio_source.aclose()
await video_source.aclose()

# Close streams (stops receiving frames)
await audio_stream.aclose()
await video_stream.aclose()

# Close synchronizer (stops A/V sync)
await synchronizer.aclose()

# Close mixer (stops mixing)
await mixer.aclose()

# Disconnect room (unpublishes tracks, closes connections)
await room.disconnect()

# Close media devices (stops capture/playback)
await capture.aclose()
await player.aclose()

Best practice with context managers:

# Note: Most resources don't support context managers yet
# Use try/finally for cleanup

audio_source = None
room = None
try:
    room = Room()
    await room.connect(url, token)
    
    audio_source = AudioSource(48000, 1)
    track = LocalAudioTrack.create_audio_track("mic", audio_source)
    await room.local_participant.publish_track(track)
    
    # ... do work
    
finally:
    if audio_source:
        await audio_source.aclose()
    if room:
        await room.disconnect()

Common Patterns and Best Practices

Pattern 1: Robust Connection with Retry

import asyncio
from livekit import Room, ConnectError

async def connect_with_retry(url: str, token: str, max_retries: int = 3):
    """Connect to room with automatic retry on failure."""
    room = Room()
    
    for attempt in range(max_retries):
        try:
            await room.connect(url, token)
            return room
        except ConnectError as e:
            print(f"Connection attempt {attempt + 1} failed: {e.message}")
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise

Pattern 2: Track Subscription with Error Handling

@room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
    """Handle track subscription with proper error handling."""
    try:
        if track.kind == TrackKind.KIND_AUDIO:
            # Create stream with error handling
            stream = AudioStream(track)
            asyncio.create_task(process_audio_safe(stream))
        elif track.kind == TrackKind.KIND_VIDEO:
            stream = VideoStream(track)
            asyncio.create_task(process_video_safe(stream))
    except Exception as e:
        print(f"Error subscribing to track: {e}")

async def process_audio_safe(stream):
    """Process audio stream with exception handling."""
    try:
        async for event in stream:
            # Process frame
            pass
    except Exception as e:
        print(f"Error processing audio: {e}")
    finally:
        await stream.aclose()

Pattern 3: Graceful Shutdown

import signal
import asyncio

class LiveKitClient:
    def __init__(self):
        self.room = None
        self.running = True
        
    async def run(self, url: str, token: str):
        """Run client with graceful shutdown handling."""
        self.room = Room()
        
        # Setup signal handlers
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, lambda: asyncio.create_task(self.shutdown()))
        
        try:
            await self.room.connect(url, token)
            
            while self.running:
                await asyncio.sleep(1)
                
        finally:
            await self.cleanup()
            
    async def shutdown(self):
        """Initiate shutdown."""
        print("Shutting down...")
        self.running = False
        
    async def cleanup(self):
        """Cleanup all resources."""
        if self.room:
            await self.room.disconnect()

Complete Reference

For detailed information about each capability area, see the following documentation: