tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Audio tracks represent audio media streams in a LiveKit room. The SDK provides LocalAudioTrack for publishing local audio and RemoteAudioTrack for receiving audio from remote participants.
Key concepts:
from livekit import (
Track,
LocalAudioTrack,
RemoteAudioTrack,
AudioTrack,
LocalTrack,
TrackKind,
StreamState,
AudioSource,
)class Track:
"""Base class for all tracks (audio and video).
Provides common properties and methods shared by all track types.
Cannot be instantiated directly.
"""
def __init__(self, owned_info: proto_track.OwnedTrack) -> None:
"""Initialize a Track.
Args:
owned_info: Internal track information from FFI
Note:
Tracks are created by SDK, not directly by application.
"""@property
def sid(self) -> str:
"""Session ID of the track.
Returns:
str: Unique track session identifier
Format: "TR_" followed by random string
Note:
Track SID is assigned by server.
Unique within room session.
Used to identify tracks in unpublish, subscriptions, etc.
"""
@property
def name(self) -> str:
"""Name of the track.
Returns:
str: Track name (set during creation)
Note:
Track name is metadata (not unique).
Displayed to other participants.
Examples: "microphone", "camera", "screen".
"""
@property
def kind(self) -> proto_track.TrackKind.ValueType:
"""Kind of track (audio or video).
Returns:
TrackKind.KIND_AUDIO or TrackKind.KIND_VIDEO
Note:
Used to determine track type for processing.
Example:
>>> if track.kind == TrackKind.KIND_AUDIO:
... stream = AudioStream(track)
... elif track.kind == TrackKind.KIND_VIDEO:
... stream = VideoStream(track)
"""
@property
def stream_state(self) -> proto_track.StreamState.ValueType:
"""Stream state (active or paused).
Returns:
StreamState.STATE_ACTIVE or StreamState.STATE_PAUSED
Note:
STATE_ACTIVE: Track sending/receiving media.
STATE_PAUSED: Track temporarily paused.
"""
@property
def muted(self) -> bool:
"""Whether the track is muted.
Returns:
bool: True if muted, False otherwise
Note:
Muted tracks:
- Audio: Send silence frames
- Video: Send black frames
- Maintain connection and track state
- Save bandwidth
"""async def get_stats(self) -> List[proto_stats.RtcStats]:
"""Get statistics for the track.
Returns:
List[proto_stats.RtcStats]: List of RTC statistics
Contains metrics like:
- Bitrate
- Bytes sent/received
- Packets sent/received
- Packet loss
- Jitter
- Codec info
Raises:
Exception: If stats retrieval fails
RuntimeError: If track not active
Example:
>>> stats = await track.get_stats()
>>> for stat in stats:
... print(f"Stat: {stat}")
Note:
Statistics are snapshots at call time.
For monitoring, call periodically.
Stats reset on track republish.
"""class LocalAudioTrack(Track):
"""Represents a local audio track.
Local audio tracks are created from an AudioSource and can be
published to the room for other participants to receive.
"""
def __init__(self, info: proto_track.OwnedTrack) -> None:
"""Initialize a LocalAudioTrack.
Args:
info: Internal track information
Note:
Typically created via create_audio_track() static method
rather than direct instantiation.
"""@staticmethod
def create_audio_track(name: str, source: AudioSource) -> LocalAudioTrack:
"""Create a local audio track from an audio source.
Args:
name: Name for the track
Type: str
Displayed to other participants
Examples: "microphone", "audio", "voice"
source: AudioSource instance providing audio data
Must be already created
Provides audio frames to track
Returns:
LocalAudioTrack: Track ready for publishing
Raises:
ValueError: If name empty or source invalid
RuntimeError: If track creation fails
Example:
>>> from livekit import AudioSource, LocalAudioTrack
>>>
>>> # Create audio source
>>> source = AudioSource(sample_rate=48000, num_channels=1)
>>>
>>> # Create track from source
>>> track = LocalAudioTrack.create_audio_track("microphone", source)
>>>
>>> # Now publish track
>>> await room.local_participant.publish_track(track)
Note:
Track is NOT automatically published.
Must call publish_track() to share with room.
One source can have multiple tracks (not common).
Track inherits sample rate and channels from source.
"""def mute(self) -> None:
"""Mute the audio track.
Stops sending audio data to remote participants.
The track publication remains active but sends silence.
Returns:
None (synchronous operation)
Example:
>>> track.mute()
>>> print(f"Muted: {track.muted}") # True
Note:
Muting is immediate and synchronous.
Sends silence frames (not black audio).
Connection maintained (no re-negotiation).
Saves bandwidth (silence is compressed efficiently).
Triggers 'track_muted' event for all participants.
Use cases:
- Push-to-talk release
- Privacy (cough, conversation pause)
- Selective communication
"""
def unmute(self) -> None:
"""Unmute the audio track.
Resumes sending audio data to remote participants.
Returns:
None (synchronous operation)
Example:
>>> track.unmute()
>>> print(f"Muted: {track.muted}") # False
Note:
Unmuting is immediate and synchronous.
Resumes sending actual audio immediately.
No delay or buffering.
Triggers 'track_unmuted' event for all participants.
"""class RemoteAudioTrack(Track):
"""Represents a remote audio track.
Remote audio tracks are received from other participants
and can be consumed via AudioStream.
Read-only - cannot mute remote tracks (they control muting).
"""
def __init__(self, info: proto_track.OwnedTrack) -> None:
"""Initialize a RemoteAudioTrack.
Args:
info: Internal track information
Note:
Created automatically by the SDK when subscribing
to remote participants' audio tracks.
Access via track_subscribed event.
"""LocalTrack = Union[LocalVideoTrack, LocalAudioTrack]
"""Union type for local tracks.
Can be either audio or video.
Use for type hints when accepting any local track.
"""
RemoteTrack = Union[RemoteVideoTrack, RemoteAudioTrack]
"""Union type for remote tracks.
Can be either audio or video.
Use for type hints when accepting any remote track.
"""
AudioTrack = Union[LocalAudioTrack, RemoteAudioTrack]
"""Union type for audio tracks.
Can be either local or remote.
Use for type hints when accepting any audio track.
"""from livekit import Room, AudioSource, LocalAudioTrack, TrackPublishOptions
async def publish_audio(room: Room):
"""Publish a local audio track to the room."""
# Create audio source
# sample_rate: 48000 recommended (also 16000, 8000)
# num_channels: 1 (mono) or 2 (stereo)
# queue_size_ms: Buffer size, default 1000ms
source = AudioSource(
sample_rate=48000,
num_channels=1, # Mono
queue_size_ms=1000 # 1 second buffer
)
# Create track from source
track = LocalAudioTrack.create_audio_track("microphone", source)
# Publish track with default options
options = TrackPublishOptions()
publication = await room.local_participant.publish_track(track, options)
print(f"Published audio track: {publication.sid}")
return source, track, publicationfrom livekit import AudioSource, LocalAudioTrack
# Create stereo audio source
source = AudioSource(
sample_rate=48000,
num_channels=2, # Stereo (left + right)
queue_size_ms=1000
)
track = LocalAudioTrack.create_audio_track("stereo-audio", source)
# Publish
await room.local_participant.publish_track(track)from livekit import TrackPublishOptions, TrackSource
options = TrackPublishOptions()
options.source = TrackSource.SOURCE_MICROPHONE # Identify as microphone
options.dtx = True # Enable discontinuous transmission (silence detection)
options.red = True # Enable redundant encoding (packet loss recovery)
publication = await room.local_participant.publish_track(track, options)from livekit import LocalAudioTrack
track: LocalAudioTrack = ...
# Mute audio (synchronous)
track.mute()
print(f"Track muted: {track.muted}") # True
# Unmute audio (synchronous)
track.unmute()
print(f"Track muted: {track.muted}") # False
# Toggle mute
if track.muted:
track.unmute()
else:
track.mute()
# Mute state in event
@room.on("track_muted")
def on_muted(participant, publication):
if publication.kind == TrackKind.KIND_AUDIO:
print(f"{participant.identity} muted audio")from livekit import (
Room,
RemoteParticipant,
RemoteTrackPublication,
Track,
TrackKind,
AudioStream,
)
@room.on("track_subscribed")
def on_track_subscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
"""Handle newly subscribed remote tracks."""
if track.kind == TrackKind.KIND_AUDIO:
print(f"Subscribed to audio from {participant.identity}")
print(f" Track name: {track.name}")
print(f" Track SID: {track.sid}")
print(f" Muted: {track.muted}")
# Create audio stream to receive frames
audio_stream = AudioStream(track)
# Process audio in background task
asyncio.create_task(process_audio_stream(audio_stream))
async def process_audio_stream(stream: AudioStream):
"""Process audio frames from stream."""
try:
async for event in stream:
frame = event.frame
print(f"Received audio: {frame.samples_per_channel} samples, "
f"{frame.duration:.3f}s duration")
# Access audio data
# data: memoryview of int16 samples
data = frame.data
# Process audio (e.g., save to file, analyze, play)
# ...
finally:
await stream.aclose()from livekit import RemoteParticipant, RemoteTrackPublication, TrackKind
participant: RemoteParticipant = ...
# Subscribe to all audio tracks
for track_sid, publication in participant.track_publications.items():
if isinstance(publication, RemoteTrackPublication):
if publication.kind == TrackKind.KIND_AUDIO:
# Subscribe to track
publication.set_subscribed(True)
# Check if track is available
if publication.track:
print(f"Subscribed to: {publication.track.name}")from livekit import Track
track: Track = ...
# Get track statistics
# Returns list of proto_stats.RtcStats
stats = await track.get_stats()
for stat in stats:
# Access proto_stats.RtcStats fields
# Contains: bitrate, packets, bytes, jitter, packet loss, etc.
print(f"Track stat: {stat}")import asyncio
import numpy as np
from livekit import (
Room,
RoomOptions,
AudioSource,
LocalAudioTrack,
RemoteAudioTrack,
AudioStream,
TrackPublishOptions,
TrackKind,
RemoteParticipant,
Track,
RemoteTrackPublication,
)
async def main():
room = Room()
# Handle remote audio tracks
@room.on("track_subscribed")
def on_track_subscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
):
if track.kind == TrackKind.KIND_AUDIO:
print(f"Audio track from {participant.identity}: {track.name}")
print(f" SID: {track.sid}")
print(f" Muted: {track.muted}")
print(f" Stream state: {track.stream_state}")
# Process the audio
asyncio.create_task(receive_audio(track))
# Handle track mute changes
@room.on("track_muted")
def on_track_muted(participant, publication):
if publication.kind == TrackKind.KIND_AUDIO:
print(f"{participant.identity}'s audio muted")
@room.on("track_unmuted")
def on_track_unmuted(participant, publication):
if publication.kind == TrackKind.KIND_AUDIO:
print(f"{participant.identity}'s audio unmuted")
# Connect to room
await room.connect(url, token, RoomOptions(auto_subscribe=True))
# Create and publish local audio track
source = AudioSource(sample_rate=48000, num_channels=1)
track = LocalAudioTrack.create_audio_track("my-microphone", source)
# Publish with options
options = TrackPublishOptions()
options.source = TrackSource.SOURCE_MICROPHONE
options.dtx = True
options.red = True
publication = await room.local_participant.publish_track(track, options)
print(f"Published audio track: {publication.sid}")
# Generate and capture audio
asyncio.create_task(generate_audio(source))
# Mute/unmute demonstration
await asyncio.sleep(5)
track.mute()
print("Audio muted")
await asyncio.sleep(2)
track.unmute()
print("Audio unmuted")
# Get statistics
stats = await track.get_stats()
print(f"Track stats: {len(stats)} items")
# Keep running
await asyncio.sleep(30)
# Cleanup
await room.local_participant.unpublish_track(track.sid)
await source.aclose()
await room.disconnect()
async def generate_audio(source: AudioSource):
"""Generate audio frames and capture to source."""
from livekit import AudioFrame
sample_rate = 48000
num_channels = 1
samples_per_channel = 480 # 10ms at 48kHz
frame_duration = samples_per_channel / sample_rate
while True:
# Generate audio data (silence or actual audio)
# Create frame with zeroed data
frame = AudioFrame.create(
sample_rate=sample_rate,
num_channels=num_channels,
samples_per_channel=samples_per_channel
)
# Optionally fill with audio data
# data = frame.data # memoryview of int16 samples
# for i in range(len(data)):
# data[i] = generate_sample(i)
# Capture frame to source
await source.capture_frame(frame)
# Wait for next frame
await asyncio.sleep(frame_duration)
async def receive_audio(track: RemoteAudioTrack):
"""Receive and process remote audio."""
# Create audio stream
# sample_rate: Desired output sample rate (resampled if needed)
# num_channels: Desired output channels
audio_stream = AudioStream(
track,
sample_rate=48000,
num_channels=1
)
try:
async for event in audio_stream:
frame = event.frame
print(f"Received audio frame: {frame.samples_per_channel} samples, "
f"{frame.duration:.3f}s duration")
# Access audio data
# data: memoryview of int16 samples
# For mono: [sample0, sample1, sample2, ...]
# For stereo: [L0, R0, L1, R1, L2, R2, ...]
data = frame.data
# Process audio data
# Examples:
# - Save to file
# - Analyze (volume, frequency)
# - Play through speaker
# - Apply effects
# ...
finally:
await audio_stream.aclose()
if __name__ == "__main__":
asyncio.run(main())from livekit import AudioSource, AudioFrame
source = AudioSource(sample_rate=48000, num_channels=1)
# Create audio frame with data
# For mono: samples_per_channel samples
# For stereo: samples_per_channel * 2 samples (interleaved)
audio_data = bytearray(480 * 1 * 2) # 480 samples * 1 channel * 2 bytes/sample
frame = AudioFrame(
data=audio_data,
sample_rate=48000,
num_channels=1,
samples_per_channel=480
)
# Capture frame to source
await source.capture_frame(frame)from livekit import AudioSource
source = AudioSource(sample_rate=48000, num_channels=1, queue_size_ms=1000)
# Check queued duration
# Returns seconds of audio currently queued
queued = source.queued_duration
print(f"Queued audio: {queued:.3f} seconds")
# Clear queue if needed (discards buffered audio)
source.clear_queue()
# Wait for playout (blocks until queue empty)
await source.wait_for_playout()
print("All audio played out")# Enable DTX to save bandwidth during silence
options = TrackPublishOptions()
options.dtx = True # Stop transmission during silence
# DTX automatically detects silence and stops sending
# Opus codec handles DTX efficiently# Enable RED for packet loss recovery
options = TrackPublishOptions()
options.red = True # Send redundant audio data
# RED sends older frames alongside new frames
# Recovers from packet loss without retransmission
# Higher bandwidth usage but better quality on poor networks# Always clean up audio resources
source = None
track = None
try:
source = AudioSource(48000, 1)
track = LocalAudioTrack.create_audio_track("mic", source)
await room.local_participant.publish_track(track)
# ... use track
finally:
if track:
await room.local_participant.unpublish_track(track.sid)
if source:
await source.aclose()@room.on("track_muted")
def on_muted(participant, publication):
if publication.track and publication.track.kind == TrackKind.KIND_AUDIO:
# Update UI to show muted state
print(f"{participant.identity} muted their audio")
# Display muted icon, etc.source = AudioSource(48000, 1, queue_size_ms=1000)
async def monitor_queue():
"""Monitor audio queue for issues."""
while True:
queued = source.queued_duration
if queued > 0.5: # More than 500ms queued
print(f"Warning: Audio queue backing up: {queued:.3f}s")
# May indicate:
# - Capturing too fast
# - Network can't keep up
# - Need to clear queue or adjust capture rate
elif queued == 0.0:
print("Warning: Audio queue empty (underrun)")
# May cause audio gaps
await asyncio.sleep(1.0)
asyncio.create_task(monitor_queue())# Sample rate selection:
# 8000 Hz: Narrowband (phone quality)
# - Lowest bandwidth
# - Voice only
# - Use for: Simple voice communication
source_8k = AudioSource(sample_rate=8000, num_channels=1)
# 16000 Hz: Wideband (VoIP quality)
# - Better than phone
# - Still voice-optimized
# - Use for: VoIP applications
source_16k = AudioSource(sample_rate=16000, num_channels=1)
# 48000 Hz: Full-band (recommended)
# - Best quality
# - Music and voice
# - Use for: Professional applications
source_48k = AudioSource(sample_rate=48000, num_channels=1)
# Recommendation: Use 48000 Hz unless bandwidth is severely limited@room.on("track_subscription_failed")
def on_track_failed(participant, track_sid, error):
"""Handle track subscription failure."""
print(f"Failed to subscribe to {track_sid}: {error}")
# Common causes:
if "codec" in error.lower():
print("Codec not supported - participant may be using unsupported format")
elif "permission" in error.lower():
print("Permission denied - check token permissions")
elif "network" in error.lower():
print("Network error - check connectivity")import numpy as np
from livekit import AudioFrame
def calculate_audio_level(frame: AudioFrame) -> float:
"""Calculate RMS audio level."""
# Convert memoryview to numpy array
samples = np.frombuffer(frame.data, dtype=np.int16)
# Calculate RMS
rms = np.sqrt(np.mean(samples.astype(np.float32) ** 2))
# Normalize to 0-1 range (int16 max is 32767)
level = rms / 32767.0
return level
# Usage in stream processing
async def process_audio_stream(stream: AudioStream):
async for event in stream:
level = calculate_audio_level(event.frame)
if level > 0.1:
print(f"Speaking detected: {level:.2f}")
else:
print("Silence")class VoiceActivityDetector:
"""Simple voice activity detection."""
def __init__(self, threshold: float = 0.02, window_size: int = 5):
self.threshold = threshold
self.window = []
self.window_size = window_size
def add_frame(self, frame: AudioFrame) -> bool:
"""Check if frame contains voice activity."""
level = calculate_audio_level(frame)
# Add to window
self.window.append(level)
if len(self.window) > self.window_size:
self.window.pop(0)
# Check if average exceeds threshold
avg_level = sum(self.window) / len(self.window)
return avg_level > self.threshold
# Usage
vad = VoiceActivityDetector()
async def process_with_vad(stream: AudioStream):
async for event in stream:
if vad.add_frame(event.frame):
print("Voice detected")
# Process speech
else:
print("Silence detected")
# Skip or handle differently