tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
End-to-end encryption (E2EE) in LiveKit ensures that media is encrypted before transmission and only decrypted by intended recipients. The LiveKit server cannot decrypt E2EE content, providing strong privacy guarantees.
Key concepts:
from livekit import (
E2EEOptions,
E2EEManager,
KeyProvider,
KeyProviderOptions,
FrameCryptor,
EncryptionType,
EncryptionState,
)DEFAULT_RATCHET_SALT: bytes = b"LKFrameEncryptionKey"
"""Default salt for key ratcheting.
Used in key derivation for forward secrecy.
"""
DEFAULT_RATCHET_WINDOW_SIZE: int = 16
"""Default ratchet window size.
Number of old keys to maintain for decryption.
Allows for out-of-order frame delivery.
"""
DEFAULT_FAILURE_TOLERANCE: int = -1
"""Default failure tolerance.
-1 means no tolerance (fail on first error).
Positive value allows N consecutive failures before giving up.
"""@dataclass
class KeyProviderOptions:
"""Options for key provider configuration.
Attributes:
shared_key: Shared encryption key for all participants
Type: bytes | None
Default: None
Must be exactly 32 bytes for GCM encryption
All participants must use same key
ratchet_salt: Salt for key ratcheting
Type: bytes
Default: DEFAULT_RATCHET_SALT
Used in key derivation
ratchet_window_size: Number of old keys to keep
Type: int
Default: DEFAULT_RATCHET_WINDOW_SIZE (16)
Larger = more tolerance for out-of-order frames
Smaller = less memory usage
failure_tolerance: Number of consecutive decryption failures allowed
Type: int
Default: DEFAULT_FAILURE_TOLERANCE (-1)
-1: No tolerance (fail immediately)
N: Allow N consecutive failures
"""
shared_key: Optional[bytes] = None
ratchet_salt: bytes = DEFAULT_RATCHET_SALT
ratchet_window_size: int = DEFAULT_RATCHET_WINDOW_SIZE
failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE@dataclass
class E2EEOptions:
"""Options for end-to-end encryption.
Attributes:
key_provider_options: Key provider configuration
Type: KeyProviderOptions
Required
encryption_type: Type of encryption to use
Type: EncryptionType.ValueType
Default: EncryptionType.GCM
Options:
- GCM: AES-GCM (recommended, hardware accelerated)
- CUSTOM: Custom encryption implementation
- NONE: No encryption (not recommended for E2EE)
"""
key_provider_options: KeyProviderOptions
encryption_type: proto_e2ee.EncryptionType.ValueType = proto_e2ee.EncryptionType.GCMclass KeyProvider:
"""Manages encryption keys for E2EE.
Provides methods for setting, getting, and ratcheting keys.
Supports both shared keys (all participants) and per-participant keys.
"""
def __init__(self, room_handle: int, options: KeyProviderOptions):
"""Initialize KeyProvider.
Args:
room_handle: Internal room handle
options: Key provider options
Note:
Created automatically by E2EEManager.
Access via room.e2ee_manager.key_provider.
"""
@property
def options(self) -> KeyProviderOptions:
"""Key provider options.
Returns:
KeyProviderOptions: Current configuration
"""
def set_shared_key(self, key: bytes, key_index: int) -> None:
"""Set shared encryption key for all participants.
Args:
key: Encryption key
Type: bytes
Must be exactly 32 bytes for GCM
key_index: Key index/slot
Type: int
Typically 0 for main key
Raises:
ValueError: If key length invalid
RuntimeError: If operation fails
Example:
>>> key = b"my-32-byte-encryption-key-here!!"
>>> key_provider.set_shared_key(key, key_index=0)
Note:
All participants must use same key for same index.
Setting new key triggers key rotation.
"""
def export_shared_key(self, key_index: int) -> bytes:
"""Export shared encryption key.
Args:
key_index: Key index to export
Type: int
Returns:
bytes: The encryption key
Raises:
KeyError: If key index not found
RuntimeError: If export fails
Example:
>>> key = key_provider.export_shared_key(key_index=0)
>>> print(f"Key: {key.hex()}")
Note:
Used to share key with new participants.
Be careful with key handling (don't log or expose).
"""
def ratchet_shared_key(self, key_index: int) -> bytes:
"""Ratchet shared encryption key to new key.
Derives new key from current key for forward secrecy.
Args:
key_index: Key index to ratchet
Type: int
Returns:
bytes: The new ratcheted key
Raises:
KeyError: If key index not found
RuntimeError: If ratcheting fails
Example:
>>> new_key = key_provider.ratchet_shared_key(key_index=0)
>>> print(f"New key: {new_key.hex()}")
Note:
Ratcheting provides forward secrecy.
Old frames cannot be decrypted with new key.
All participants must ratchet together.
Triggers KEY_RATCHETED encryption state event.
"""
def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None:
"""Set encryption key for specific participant.
Args:
participant_identity: Identity of participant
Type: str
key: Encryption key
Type: bytes
Must be 32 bytes for GCM
key_index: Key index
Type: int
Raises:
ValueError: If key length invalid
RuntimeError: If operation fails
Example:
>>> key_provider.set_key("user123", participant_key, key_index=0)
Note:
Per-participant keys override shared keys.
Used for selective encryption or key isolation.
"""
def export_key(self, participant_identity: str, key_index: int) -> bytes:
"""Export encryption key for specific participant.
Args:
participant_identity: Identity of participant
key_index: Key index
Returns:
bytes: The encryption key
Raises:
KeyError: If key not found
Example:
>>> key = key_provider.export_key("user123", key_index=0)
"""
def ratchet_key(self, participant_identity: str, key_index: int) -> bytes:
"""Ratchet participant's encryption key.
Args:
participant_identity: Identity of participant
key_index: Key index
Returns:
bytes: New ratcheted key
Example:
>>> new_key = key_provider.ratchet_key("user123", key_index=0)
"""class FrameCryptor:
"""Manages frame encryption for a participant.
One FrameCryptor per participant for managing their encryption.
"""
def __init__(
self,
room_handle: int,
participant_identity: str,
key_index: int,
enabled: bool
):
"""Initialize FrameCryptor.
Args:
room_handle: Internal room handle
participant_identity: Identity of participant
key_index: Key index to use
enabled: Whether encryption enabled
Note:
Created automatically by E2EEManager.
"""
@property
def participant_identity(self) -> str:
"""Participant identity.
Returns:
str: Identity of participant this cryptor is for
"""
@property
def key_index(self) -> int:
"""Current key index.
Returns:
int: Active key index
"""
@property
def enabled(self) -> bool:
"""Whether encryption is enabled.
Returns:
bool: True if encrypting/decrypting, False otherwise
"""
def set_enabled(self, enabled: bool) -> None:
"""Enable or disable frame encryption.
Args:
enabled: Whether to enable encryption
Type: bool
Example:
>>> cryptor.set_enabled(True) # Enable encryption
>>> cryptor.set_enabled(False) # Disable encryption
Note:
Disabling removes encryption overhead.
Only use in trusted environments.
"""
def set_key_index(self, key_index: int) -> None:
"""Set key index for encryption/decryption.
Args:
key_index: Key index to use
Type: int
Example:
>>> cryptor.set_key_index(1) # Switch to key at index 1
Note:
Key must be set in KeyProvider before use.
"""class E2EEManager:
"""Manages end-to-end encryption for a room.
Central manager for E2EE functionality.
"""
def __init__(self, room_handle: int, options: Optional[E2EEOptions]):
"""Initialize E2EEManager.
Args:
room_handle: Internal room handle
options: E2EE options (None if E2EE not enabled)
Note:
Created automatically by Room.
Access via room.e2ee_manager.
"""
@property
def key_provider(self) -> Optional[KeyProvider]:
"""Key provider instance.
Returns:
KeyProvider: Key management interface
None: If E2EE not enabled
Example:
>>> if room.e2ee_manager.key_provider:
... key_provider = room.e2ee_manager.key_provider
... key_provider.set_shared_key(key, 0)
"""
@property
def enabled(self) -> bool:
"""Whether E2EE is enabled.
Returns:
bool: True if E2EE enabled, False otherwise
"""
def set_enabled(self, enabled: bool) -> None:
"""Enable or disable end-to-end encryption.
Args:
enabled: Whether to enable E2EE
Type: bool
Example:
>>> room.e2ee_manager.set_enabled(True)
Note:
Can toggle E2EE at runtime.
Affects all tracks.
"""
def frame_cryptors(self) -> List[FrameCryptor]:
"""Get list of frame cryptors for participants.
Returns:
List[FrameCryptor]: One cryptor per participant
Example:
>>> for cryptor in room.e2ee_manager.frame_cryptors():
... print(f"{cryptor.participant_identity}: enabled={cryptor.enabled}")
"""from livekit import Room, RoomOptions, E2EEOptions, KeyProviderOptions, EncryptionType
# Create encryption options
# Key must be exactly 32 bytes for GCM
key_provider_options = KeyProviderOptions(
shared_key=b"my-32-byte-encryption-key-here!!"
)
e2ee_options = E2EEOptions(
key_provider_options=key_provider_options,
encryption_type=EncryptionType.GCM
)
# Connect with encryption
options = RoomOptions(encryption=e2ee_options)
await room.connect(url, token, options)
print(f"E2EE enabled: {room.e2ee_manager.enabled}")# Access key provider
key_provider = room.e2ee_manager.key_provider
if key_provider:
# Set shared key for all participants
new_key = b"another-32-byte-key-for-rotation!"
key_provider.set_shared_key(new_key, key_index=0)
# Ratchet key for forward secrecy
# Derives new key from current key
ratcheted_key = key_provider.ratchet_shared_key(key_index=0)
print(f"Ratcheted key: {ratcheted_key.hex()}")
# Export key (to share with new participants)
current_key = key_provider.export_shared_key(key_index=0)
# Per-participant keys (advanced)
participant_key = b"specific-32-byte-key-for-user-x!"
key_provider.set_key("participant-identity", participant_key, key_index=0)from livekit import EncryptionState
@room.on("e2ee_state_changed")
def on_e2ee_state_changed(participant, state: EncryptionState):
"""Handle encryption state changes."""
identity = participant.identity
if state == EncryptionState.OK:
print(f"{identity}: Encryption working correctly")
elif state == EncryptionState.NEW:
print(f"{identity}: Encryption initialized")
elif state == EncryptionState.MISSING_KEY:
print(f"{identity}: Missing encryption key!")
# Action: Set key
# room.e2ee_manager.key_provider.set_shared_key(key, 0)
elif state == EncryptionState.DECRYPTION_FAILED:
print(f"{identity}: Decryption failed (wrong key?)")
# Action: Verify all participants use same key
elif state == EncryptionState.ENCRYPTION_FAILED:
print(f"{identity}: Encryption failed")
# Action: Check logs, may need restart
elif state == EncryptionState.KEY_RATCHETED:
print(f"{identity}: Key ratcheted (forward secrecy)")
elif state == EncryptionState.INTERNAL_ERROR:
print(f"{identity}: Internal encryption error")
@room.on("participant_encryption_status_changed")
def on_encryption_status(is_encrypted: bool, participant):
"""Handle encryption status changes."""
identity = participant.identity
status = "encrypted" if is_encrypted else "NOT encrypted"
print(f"{identity} is {status}")
if not is_encrypted:
print("WARNING: Participant without E2EE!")import asyncio
from livekit import (
Room,
RoomOptions,
E2EEOptions,
KeyProviderOptions,
EncryptionType,
EncryptionState,
)
async def main():
room = Room()
# Setup E2EE with 32-byte key
key = b"my-32-byte-encryption-key-1234!!"
key_provider_options = KeyProviderOptions(
shared_key=key,
ratchet_window_size=16, # Keep last 16 keys
failure_tolerance=-1 # No tolerance (fail immediately)
)
e2ee_options = E2EEOptions(
key_provider_options=key_provider_options,
encryption_type=EncryptionType.GCM
)
# Monitor encryption events
@room.on("e2ee_state_changed")
def on_e2ee_state(participant, state):
print(f"E2EE state for {participant.identity}: {state}")
@room.on("participant_encryption_status_changed")
def on_encryption_status(is_encrypted, participant):
if not is_encrypted:
print(f"WARNING: {participant.identity} not encrypted!")
# Connect with encryption
options = RoomOptions(encryption=e2ee_options)
await room.connect(url, token, options)
# Access E2EE manager
e2ee_manager = room.e2ee_manager
print(f"E2EE enabled: {e2ee_manager.enabled}")
# Get key provider
key_provider = e2ee_manager.key_provider
if key_provider:
# Ratchet key for forward secrecy
new_key = key_provider.ratchet_shared_key(key_index=0)
print(f"Ratcheted to new key: {new_key.hex()}")
# List frame cryptors
cryptors = e2ee_manager.frame_cryptors()
print(f"Frame cryptors: {len(cryptors)}")
for cryptor in cryptors:
print(f" {cryptor.participant_identity}: "
f"enabled={cryptor.enabled}, key_index={cryptor.key_index}")
# Keep running
await asyncio.sleep(30)
# Cleanup
await room.disconnect()
if __name__ == "__main__":
asyncio.run(main())async def rotate_key_periodically(room: Room, interval_seconds: int = 300):
"""Rotate encryption key periodically.
Args:
room: Room instance with E2EE enabled
interval_seconds: Rotation interval (default: 300s = 5 minutes)
"""
key_provider = room.e2ee_manager.key_provider
if not key_provider:
return
while room.isconnected():
await asyncio.sleep(interval_seconds)
if room.isconnected():
# Ratchet key
new_key = key_provider.ratchet_shared_key(key_index=0)
print(f"Key rotated: {new_key.hex()}")
# Start rotation task
asyncio.create_task(rotate_key_periodically(room))# Use different keys for different participants
key_provider = room.e2ee_manager.key_provider
# Set key for participant 1
key1 = b"key-for-participant-1-32-bytes!!"
key_provider.set_key("participant1", key1, key_index=0)
# Set key for participant 2
key2 = b"key-for-participant-2-32-bytes!!"
key_provider.set_key("participant2", key2, key_index=0)
# Note: This is advanced usage
# Typically use shared key for simplicityimport hashlib
def derive_key_from_password(password: str, salt: bytes = b"livekit-salt") -> bytes:
"""Derive 32-byte key from password.
Args:
password: Password string
salt: Salt for derivation
Returns:
bytes: 32-byte derived key
Note:
Use strong password and unique salt.
Consider using PBKDF2 or similar for production.
"""
# Simple derivation (use proper KDF for production)
key_material = (password + salt.decode('latin1')).encode('utf-8')
key = hashlib.sha256(key_material).digest()
return key
# Usage
password = "my-secure-password-here"
key = derive_key_from_password(password)
key_provider.set_shared_key(key, key_index=0)import secrets
# Good: Generate cryptographically secure key
key = secrets.token_bytes(32) # 32 bytes = 256 bits
# Bad: Weak key
# key = b"weak" + b"\x00" * 28 # Predictable# DON'T: Log key
# print(f"Key: {key}")
# logger.info(f"Using key: {key.hex()}")
# DO: Log key hash or identifier
import hashlib
key_hash = hashlib.sha256(key).hexdigest()[:8]
print(f"Using key hash: {key_hash}")@room.on("e2ee_state_changed")
def on_e2ee_state_changed(participant, state):
if state == EncryptionState.MISSING_KEY:
print(f"{participant.identity} missing key")
# Set key
key_provider = room.e2ee_manager.key_provider
if key_provider:
key_provider.set_shared_key(known_key, key_index=0)# When rotating keys, coordinate with all participants
# Option 1: Use RPC to notify
await local.perform_rpc(
destination_identity="all", # Or specific participant
method="rotate_key",
payload=json.dumps({"key_index": 0})
)
# Option 2: Use data packet
await local.publish_data(
json.dumps({"action": "rotate_key", "key_index": 0}),
reliable=True,
topic="key-management"
)
# All participants ratchet together
key_provider.ratchet_shared_key(key_index=0)import asyncio
from collections import defaultdict
class EncryptionMonitor:
"""Monitor E2EE health."""
def __init__(self, room: Room):
self.room = room
self.states = defaultdict(lambda: EncryptionState.NEW)
@room.on("e2ee_state_changed")
def on_state_changed(participant, state):
self.states[participant.identity] = state
self.check_health()
def check_health(self):
"""Check overall encryption health."""
issues = []
for identity, state in self.states.items():
if state not in [EncryptionState.NEW, EncryptionState.OK, EncryptionState.KEY_RATCHETED]:
issues.append((identity, state))
if issues:
print("Encryption issues detected:")
for identity, state in issues:
print(f" {identity}: {state}")
else:
print("All participants encrypted correctly")
# Usage
monitor = EncryptionMonitor(room)