or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

e2ee.mddocs/

End-to-End Encryption

Overview

End-to-end encryption (E2EE) in LiveKit ensures that media is encrypted before transmission and only decrypted by intended recipients. The LiveKit server cannot decrypt E2EE content, providing strong privacy guarantees.

Key concepts:

  • E2EE: End-to-end encryption for media frames
  • Key management: Shared keys or per-participant keys
  • Key ratcheting: Forward secrecy through key rotation
  • Encryption types: GCM (default), CUSTOM
  • Frame cryptors: Per-participant encryption management
  • Encryption states: Monitor encryption health

Import

from livekit import (
    E2EEOptions,
    E2EEManager,
    KeyProvider,
    KeyProviderOptions,
    FrameCryptor,
    EncryptionType,
    EncryptionState,
)

Constants

DEFAULT_RATCHET_SALT: bytes = b"LKFrameEncryptionKey"
"""Default salt for key ratcheting.

Used in key derivation for forward secrecy.
"""

DEFAULT_RATCHET_WINDOW_SIZE: int = 16
"""Default ratchet window size.

Number of old keys to maintain for decryption.
Allows for out-of-order frame delivery.
"""

DEFAULT_FAILURE_TOLERANCE: int = -1
"""Default failure tolerance.

-1 means no tolerance (fail on first error).
Positive value allows N consecutive failures before giving up.
"""

Classes

KeyProviderOptions

@dataclass
class KeyProviderOptions:
    """Options for key provider configuration.
    
    Attributes:
        shared_key: Shared encryption key for all participants
                   Type: bytes | None
                   Default: None
                   Must be exactly 32 bytes for GCM encryption
                   All participants must use same key
                   
        ratchet_salt: Salt for key ratcheting
                     Type: bytes
                     Default: DEFAULT_RATCHET_SALT
                     Used in key derivation
                     
        ratchet_window_size: Number of old keys to keep
                            Type: int
                            Default: DEFAULT_RATCHET_WINDOW_SIZE (16)
                            Larger = more tolerance for out-of-order frames
                            Smaller = less memory usage
                            
        failure_tolerance: Number of consecutive decryption failures allowed
                          Type: int
                          Default: DEFAULT_FAILURE_TOLERANCE (-1)
                          -1: No tolerance (fail immediately)
                          N: Allow N consecutive failures
    """
    
    shared_key: Optional[bytes] = None
    ratchet_salt: bytes = DEFAULT_RATCHET_SALT
    ratchet_window_size: int = DEFAULT_RATCHET_WINDOW_SIZE
    failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE

E2EEOptions

@dataclass
class E2EEOptions:
    """Options for end-to-end encryption.
    
    Attributes:
        key_provider_options: Key provider configuration
                             Type: KeyProviderOptions
                             Required
                             
        encryption_type: Type of encryption to use
                        Type: EncryptionType.ValueType
                        Default: EncryptionType.GCM
                        Options:
                        - GCM: AES-GCM (recommended, hardware accelerated)
                        - CUSTOM: Custom encryption implementation
                        - NONE: No encryption (not recommended for E2EE)
    """
    
    key_provider_options: KeyProviderOptions
    encryption_type: proto_e2ee.EncryptionType.ValueType = proto_e2ee.EncryptionType.GCM

KeyProvider

class KeyProvider:
    """Manages encryption keys for E2EE.
    
    Provides methods for setting, getting, and ratcheting keys.
    Supports both shared keys (all participants) and per-participant keys.
    """
    
    def __init__(self, room_handle: int, options: KeyProviderOptions):
        """Initialize KeyProvider.
        
        Args:
            room_handle: Internal room handle
            options: Key provider options
            
        Note:
            Created automatically by E2EEManager.
            Access via room.e2ee_manager.key_provider.
        """
    
    @property
    def options(self) -> KeyProviderOptions:
        """Key provider options.
        
        Returns:
            KeyProviderOptions: Current configuration
        """
    
    def set_shared_key(self, key: bytes, key_index: int) -> None:
        """Set shared encryption key for all participants.
        
        Args:
            key: Encryption key
                Type: bytes
                Must be exactly 32 bytes for GCM
            key_index: Key index/slot
                      Type: int
                      Typically 0 for main key
                      
        Raises:
            ValueError: If key length invalid
            RuntimeError: If operation fails
            
        Example:
            >>> key = b"my-32-byte-encryption-key-here!!"
            >>> key_provider.set_shared_key(key, key_index=0)
            
        Note:
            All participants must use same key for same index.
            Setting new key triggers key rotation.
        """
    
    def export_shared_key(self, key_index: int) -> bytes:
        """Export shared encryption key.
        
        Args:
            key_index: Key index to export
                      Type: int
                      
        Returns:
            bytes: The encryption key
            
        Raises:
            KeyError: If key index not found
            RuntimeError: If export fails
            
        Example:
            >>> key = key_provider.export_shared_key(key_index=0)
            >>> print(f"Key: {key.hex()}")
            
        Note:
            Used to share key with new participants.
            Be careful with key handling (don't log or expose).
        """
    
    def ratchet_shared_key(self, key_index: int) -> bytes:
        """Ratchet shared encryption key to new key.
        
        Derives new key from current key for forward secrecy.
        
        Args:
            key_index: Key index to ratchet
                      Type: int
                      
        Returns:
            bytes: The new ratcheted key
            
        Raises:
            KeyError: If key index not found
            RuntimeError: If ratcheting fails
            
        Example:
            >>> new_key = key_provider.ratchet_shared_key(key_index=0)
            >>> print(f"New key: {new_key.hex()}")
            
        Note:
            Ratcheting provides forward secrecy.
            Old frames cannot be decrypted with new key.
            All participants must ratchet together.
            
            Triggers KEY_RATCHETED encryption state event.
        """
    
    def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None:
        """Set encryption key for specific participant.
        
        Args:
            participant_identity: Identity of participant
                                 Type: str
            key: Encryption key
                Type: bytes
                Must be 32 bytes for GCM
            key_index: Key index
                      Type: int
                      
        Raises:
            ValueError: If key length invalid
            RuntimeError: If operation fails
            
        Example:
            >>> key_provider.set_key("user123", participant_key, key_index=0)
            
        Note:
            Per-participant keys override shared keys.
            Used for selective encryption or key isolation.
        """
    
    def export_key(self, participant_identity: str, key_index: int) -> bytes:
        """Export encryption key for specific participant.
        
        Args:
            participant_identity: Identity of participant
            key_index: Key index
            
        Returns:
            bytes: The encryption key
            
        Raises:
            KeyError: If key not found
            
        Example:
            >>> key = key_provider.export_key("user123", key_index=0)
        """
    
    def ratchet_key(self, participant_identity: str, key_index: int) -> bytes:
        """Ratchet participant's encryption key.
        
        Args:
            participant_identity: Identity of participant
            key_index: Key index
            
        Returns:
            bytes: New ratcheted key
            
        Example:
            >>> new_key = key_provider.ratchet_key("user123", key_index=0)
        """

FrameCryptor

class FrameCryptor:
    """Manages frame encryption for a participant.
    
    One FrameCryptor per participant for managing their encryption.
    """
    
    def __init__(
        self,
        room_handle: int,
        participant_identity: str,
        key_index: int,
        enabled: bool
    ):
        """Initialize FrameCryptor.
        
        Args:
            room_handle: Internal room handle
            participant_identity: Identity of participant
            key_index: Key index to use
            enabled: Whether encryption enabled
            
        Note:
            Created automatically by E2EEManager.
        """
    
    @property
    def participant_identity(self) -> str:
        """Participant identity.
        
        Returns:
            str: Identity of participant this cryptor is for
        """
    
    @property
    def key_index(self) -> int:
        """Current key index.
        
        Returns:
            int: Active key index
        """
    
    @property
    def enabled(self) -> bool:
        """Whether encryption is enabled.
        
        Returns:
            bool: True if encrypting/decrypting, False otherwise
        """
    
    def set_enabled(self, enabled: bool) -> None:
        """Enable or disable frame encryption.
        
        Args:
            enabled: Whether to enable encryption
                    Type: bool
                    
        Example:
            >>> cryptor.set_enabled(True)  # Enable encryption
            >>> cryptor.set_enabled(False) # Disable encryption
            
        Note:
            Disabling removes encryption overhead.
            Only use in trusted environments.
        """
    
    def set_key_index(self, key_index: int) -> None:
        """Set key index for encryption/decryption.
        
        Args:
            key_index: Key index to use
                      Type: int
                      
        Example:
            >>> cryptor.set_key_index(1)  # Switch to key at index 1
            
        Note:
            Key must be set in KeyProvider before use.
        """

E2EEManager

class E2EEManager:
    """Manages end-to-end encryption for a room.
    
    Central manager for E2EE functionality.
    """
    
    def __init__(self, room_handle: int, options: Optional[E2EEOptions]):
        """Initialize E2EEManager.
        
        Args:
            room_handle: Internal room handle
            options: E2EE options (None if E2EE not enabled)
            
        Note:
            Created automatically by Room.
            Access via room.e2ee_manager.
        """
    
    @property
    def key_provider(self) -> Optional[KeyProvider]:
        """Key provider instance.
        
        Returns:
            KeyProvider: Key management interface
            None: If E2EE not enabled
            
        Example:
            >>> if room.e2ee_manager.key_provider:
            ...     key_provider = room.e2ee_manager.key_provider
            ...     key_provider.set_shared_key(key, 0)
        """
    
    @property
    def enabled(self) -> bool:
        """Whether E2EE is enabled.
        
        Returns:
            bool: True if E2EE enabled, False otherwise
        """
    
    def set_enabled(self, enabled: bool) -> None:
        """Enable or disable end-to-end encryption.
        
        Args:
            enabled: Whether to enable E2EE
                    Type: bool
                    
        Example:
            >>> room.e2ee_manager.set_enabled(True)
            
        Note:
            Can toggle E2EE at runtime.
            Affects all tracks.
        """
    
    def frame_cryptors(self) -> List[FrameCryptor]:
        """Get list of frame cryptors for participants.
        
        Returns:
            List[FrameCryptor]: One cryptor per participant
            
        Example:
            >>> for cryptor in room.e2ee_manager.frame_cryptors():
            ...     print(f"{cryptor.participant_identity}: enabled={cryptor.enabled}")
        """

Usage Examples

Basic E2EE Setup

from livekit import Room, RoomOptions, E2EEOptions, KeyProviderOptions, EncryptionType

# Create encryption options
# Key must be exactly 32 bytes for GCM
key_provider_options = KeyProviderOptions(
    shared_key=b"my-32-byte-encryption-key-here!!"
)

e2ee_options = E2EEOptions(
    key_provider_options=key_provider_options,
    encryption_type=EncryptionType.GCM
)

# Connect with encryption
options = RoomOptions(encryption=e2ee_options)
await room.connect(url, token, options)

print(f"E2EE enabled: {room.e2ee_manager.enabled}")

Key Management

# Access key provider
key_provider = room.e2ee_manager.key_provider

if key_provider:
    # Set shared key for all participants
    new_key = b"another-32-byte-key-for-rotation!"
    key_provider.set_shared_key(new_key, key_index=0)
    
    # Ratchet key for forward secrecy
    # Derives new key from current key
    ratcheted_key = key_provider.ratchet_shared_key(key_index=0)
    print(f"Ratcheted key: {ratcheted_key.hex()}")
    
    # Export key (to share with new participants)
    current_key = key_provider.export_shared_key(key_index=0)
    
    # Per-participant keys (advanced)
    participant_key = b"specific-32-byte-key-for-user-x!"
    key_provider.set_key("participant-identity", participant_key, key_index=0)

Monitor Encryption Status

from livekit import EncryptionState

@room.on("e2ee_state_changed")
def on_e2ee_state_changed(participant, state: EncryptionState):
    """Handle encryption state changes."""
    identity = participant.identity
    
    if state == EncryptionState.OK:
        print(f"{identity}: Encryption working correctly")
    elif state == EncryptionState.NEW:
        print(f"{identity}: Encryption initialized")
    elif state == EncryptionState.MISSING_KEY:
        print(f"{identity}: Missing encryption key!")
        # Action: Set key
        # room.e2ee_manager.key_provider.set_shared_key(key, 0)
    elif state == EncryptionState.DECRYPTION_FAILED:
        print(f"{identity}: Decryption failed (wrong key?)")
        # Action: Verify all participants use same key
    elif state == EncryptionState.ENCRYPTION_FAILED:
        print(f"{identity}: Encryption failed")
        # Action: Check logs, may need restart
    elif state == EncryptionState.KEY_RATCHETED:
        print(f"{identity}: Key ratcheted (forward secrecy)")
    elif state == EncryptionState.INTERNAL_ERROR:
        print(f"{identity}: Internal encryption error")

@room.on("participant_encryption_status_changed")
def on_encryption_status(is_encrypted: bool, participant):
    """Handle encryption status changes."""
    identity = participant.identity
    status = "encrypted" if is_encrypted else "NOT encrypted"
    print(f"{identity} is {status}")
    
    if not is_encrypted:
        print("WARNING: Participant without E2EE!")

Complete Example

import asyncio
from livekit import (
    Room,
    RoomOptions,
    E2EEOptions,
    KeyProviderOptions,
    EncryptionType,
    EncryptionState,
)

async def main():
    room = Room()
    
    # Setup E2EE with 32-byte key
    key = b"my-32-byte-encryption-key-1234!!"
    
    key_provider_options = KeyProviderOptions(
        shared_key=key,
        ratchet_window_size=16,  # Keep last 16 keys
        failure_tolerance=-1      # No tolerance (fail immediately)
    )
    
    e2ee_options = E2EEOptions(
        key_provider_options=key_provider_options,
        encryption_type=EncryptionType.GCM
    )
    
    # Monitor encryption events
    @room.on("e2ee_state_changed")
    def on_e2ee_state(participant, state):
        print(f"E2EE state for {participant.identity}: {state}")
    
    @room.on("participant_encryption_status_changed")
    def on_encryption_status(is_encrypted, participant):
        if not is_encrypted:
            print(f"WARNING: {participant.identity} not encrypted!")
    
    # Connect with encryption
    options = RoomOptions(encryption=e2ee_options)
    await room.connect(url, token, options)
    
    # Access E2EE manager
    e2ee_manager = room.e2ee_manager
    print(f"E2EE enabled: {e2ee_manager.enabled}")
    
    # Get key provider
    key_provider = e2ee_manager.key_provider
    if key_provider:
        # Ratchet key for forward secrecy
        new_key = key_provider.ratchet_shared_key(key_index=0)
        print(f"Ratcheted to new key: {new_key.hex()}")
        
        # List frame cryptors
        cryptors = e2ee_manager.frame_cryptors()
        print(f"Frame cryptors: {len(cryptors)}")
        for cryptor in cryptors:
            print(f"  {cryptor.participant_identity}: "
                  f"enabled={cryptor.enabled}, key_index={cryptor.key_index}")
    
    # Keep running
    await asyncio.sleep(30)
    
    # Cleanup
    await room.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Key Management Patterns

Key Rotation

async def rotate_key_periodically(room: Room, interval_seconds: int = 300):
    """Rotate encryption key periodically.
    
    Args:
        room: Room instance with E2EE enabled
        interval_seconds: Rotation interval (default: 300s = 5 minutes)
    """
    key_provider = room.e2ee_manager.key_provider
    if not key_provider:
        return
    
    while room.isconnected():
        await asyncio.sleep(interval_seconds)
        
        if room.isconnected():
            # Ratchet key
            new_key = key_provider.ratchet_shared_key(key_index=0)
            print(f"Key rotated: {new_key.hex()}")

# Start rotation task
asyncio.create_task(rotate_key_periodically(room))

Per-Participant Keys

# Use different keys for different participants
key_provider = room.e2ee_manager.key_provider

# Set key for participant 1
key1 = b"key-for-participant-1-32-bytes!!"
key_provider.set_key("participant1", key1, key_index=0)

# Set key for participant 2
key2 = b"key-for-participant-2-32-bytes!!"
key_provider.set_key("participant2", key2, key_index=0)

# Note: This is advanced usage
# Typically use shared key for simplicity

Key Derivation

import hashlib

def derive_key_from_password(password: str, salt: bytes = b"livekit-salt") -> bytes:
    """Derive 32-byte key from password.
    
    Args:
        password: Password string
        salt: Salt for derivation
        
    Returns:
        bytes: 32-byte derived key
        
    Note:
        Use strong password and unique salt.
        Consider using PBKDF2 or similar for production.
    """
    # Simple derivation (use proper KDF for production)
    key_material = (password + salt.decode('latin1')).encode('utf-8')
    key = hashlib.sha256(key_material).digest()
    return key

# Usage
password = "my-secure-password-here"
key = derive_key_from_password(password)
key_provider.set_shared_key(key, key_index=0)

Best Practices

1. Use Strong Keys

import secrets

# Good: Generate cryptographically secure key
key = secrets.token_bytes(32)  # 32 bytes = 256 bits

# Bad: Weak key
# key = b"weak" + b"\x00" * 28  # Predictable

2. Never Log or Expose Keys

# DON'T: Log key
# print(f"Key: {key}")
# logger.info(f"Using key: {key.hex()}")

# DO: Log key hash or identifier
import hashlib
key_hash = hashlib.sha256(key).hexdigest()[:8]
print(f"Using key hash: {key_hash}")

3. Handle Missing Keys

@room.on("e2ee_state_changed")
def on_e2ee_state_changed(participant, state):
    if state == EncryptionState.MISSING_KEY:
        print(f"{participant.identity} missing key")
        
        # Set key
        key_provider = room.e2ee_manager.key_provider
        if key_provider:
            key_provider.set_shared_key(known_key, key_index=0)

4. Coordinate Key Changes

# When rotating keys, coordinate with all participants
# Option 1: Use RPC to notify
await local.perform_rpc(
    destination_identity="all",  # Or specific participant
    method="rotate_key",
    payload=json.dumps({"key_index": 0})
)

# Option 2: Use data packet
await local.publish_data(
    json.dumps({"action": "rotate_key", "key_index": 0}),
    reliable=True,
    topic="key-management"
)

# All participants ratchet together
key_provider.ratchet_shared_key(key_index=0)

5. Monitor Encryption Health

import asyncio
from collections import defaultdict

class EncryptionMonitor:
    """Monitor E2EE health."""
    
    def __init__(self, room: Room):
        self.room = room
        self.states = defaultdict(lambda: EncryptionState.NEW)
        
        @room.on("e2ee_state_changed")
        def on_state_changed(participant, state):
            self.states[participant.identity] = state
            self.check_health()
    
    def check_health(self):
        """Check overall encryption health."""
        issues = []
        
        for identity, state in self.states.items():
            if state not in [EncryptionState.NEW, EncryptionState.OK, EncryptionState.KEY_RATCHETED]:
                issues.append((identity, state))
        
        if issues:
            print("Encryption issues detected:")
            for identity, state in issues:
                print(f"  {identity}: {state}")
        else:
            print("All participants encrypted correctly")

# Usage
monitor = EncryptionMonitor(room)

See Also

  • Room and Connection Management - Connecting with E2EE, encryption events
  • Track Publications - Encryption status in publications
  • Participants - Per-participant encryption