or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

audio-tracks.mddocs/

Audio Tracks

Overview

Audio tracks represent audio media streams in a LiveKit room. The SDK provides LocalAudioTrack for publishing local audio and RemoteAudioTrack for receiving audio from remote participants.

Key concepts:

  • LocalAudioTrack: Created from AudioSource for publishing audio
  • RemoteAudioTrack: Received from remote participants for consumption
  • AudioSource: Provides audio frames to LocalAudioTrack
  • AudioStream: Consumes audio frames from RemoteAudioTrack
  • Mute/Unmute: Control audio transmission without unpublishing
  • Sample rate: Typically 48000 Hz (recommended), also 16000, 8000
  • Channels: 1 (mono) or 2 (stereo)

Import

from livekit import (
    Track,
    LocalAudioTrack,
    RemoteAudioTrack,
    AudioTrack,
    LocalTrack,
    TrackKind,
    StreamState,
    AudioSource,
)

Classes

Track (Base Class)

class Track:
    """Base class for all tracks (audio and video).

    Provides common properties and methods shared by all track types.
    Cannot be instantiated directly.
    """

    def __init__(self, owned_info: proto_track.OwnedTrack) -> None:
        """Initialize a Track.

        Args:
            owned_info: Internal track information from FFI
            
        Note:
            Tracks are created by SDK, not directly by application.
        """

Properties

@property
def sid(self) -> str:
    """Session ID of the track.

    Returns:
        str: Unique track session identifier
             Format: "TR_" followed by random string
             
    Note:
        Track SID is assigned by server.
        Unique within room session.
        Used to identify tracks in unpublish, subscriptions, etc.
    """

@property
def name(self) -> str:
    """Name of the track.

    Returns:
        str: Track name (set during creation)
        
    Note:
        Track name is metadata (not unique).
        Displayed to other participants.
        Examples: "microphone", "camera", "screen".
    """

@property
def kind(self) -> proto_track.TrackKind.ValueType:
    """Kind of track (audio or video).

    Returns:
        TrackKind.KIND_AUDIO or TrackKind.KIND_VIDEO
        
    Note:
        Used to determine track type for processing.
        
    Example:
        >>> if track.kind == TrackKind.KIND_AUDIO:
        ...     stream = AudioStream(track)
        ... elif track.kind == TrackKind.KIND_VIDEO:
        ...     stream = VideoStream(track)
    """

@property
def stream_state(self) -> proto_track.StreamState.ValueType:
    """Stream state (active or paused).

    Returns:
        StreamState.STATE_ACTIVE or StreamState.STATE_PAUSED
        
    Note:
        STATE_ACTIVE: Track sending/receiving media.
        STATE_PAUSED: Track temporarily paused.
    """

@property
def muted(self) -> bool:
    """Whether the track is muted.

    Returns:
        bool: True if muted, False otherwise
        
    Note:
        Muted tracks:
        - Audio: Send silence frames
        - Video: Send black frames
        - Maintain connection and track state
        - Save bandwidth
    """

Methods

async def get_stats(self) -> List[proto_stats.RtcStats]:
    """Get statistics for the track.

    Returns:
        List[proto_stats.RtcStats]: List of RTC statistics
        Contains metrics like:
        - Bitrate
        - Bytes sent/received
        - Packets sent/received
        - Packet loss
        - Jitter
        - Codec info

    Raises:
        Exception: If stats retrieval fails
        RuntimeError: If track not active

    Example:
        >>> stats = await track.get_stats()
        >>> for stat in stats:
        ...     print(f"Stat: {stat}")
        
    Note:
        Statistics are snapshots at call time.
        For monitoring, call periodically.
        Stats reset on track republish.
    """

LocalAudioTrack

class LocalAudioTrack(Track):
    """Represents a local audio track.

    Local audio tracks are created from an AudioSource and can be
    published to the room for other participants to receive.
    """

    def __init__(self, info: proto_track.OwnedTrack) -> None:
        """Initialize a LocalAudioTrack.

        Args:
            info: Internal track information

        Note:
            Typically created via create_audio_track() static method
            rather than direct instantiation.
        """

Static Methods

@staticmethod
def create_audio_track(name: str, source: AudioSource) -> LocalAudioTrack:
    """Create a local audio track from an audio source.

    Args:
        name: Name for the track
             Type: str
             Displayed to other participants
             Examples: "microphone", "audio", "voice"
        source: AudioSource instance providing audio data
               Must be already created
               Provides audio frames to track

    Returns:
        LocalAudioTrack: Track ready for publishing
        
    Raises:
        ValueError: If name empty or source invalid
        RuntimeError: If track creation fails

    Example:
        >>> from livekit import AudioSource, LocalAudioTrack
        >>> 
        >>> # Create audio source
        >>> source = AudioSource(sample_rate=48000, num_channels=1)
        >>> 
        >>> # Create track from source
        >>> track = LocalAudioTrack.create_audio_track("microphone", source)
        >>> 
        >>> # Now publish track
        >>> await room.local_participant.publish_track(track)
        
    Note:
        Track is NOT automatically published.
        Must call publish_track() to share with room.
        
        One source can have multiple tracks (not common).
        Track inherits sample rate and channels from source.
    """

Methods

def mute(self) -> None:
    """Mute the audio track.

    Stops sending audio data to remote participants.
    The track publication remains active but sends silence.
    
    Returns:
        None (synchronous operation)

    Example:
        >>> track.mute()
        >>> print(f"Muted: {track.muted}")  # True
        
    Note:
        Muting is immediate and synchronous.
        Sends silence frames (not black audio).
        Connection maintained (no re-negotiation).
        Saves bandwidth (silence is compressed efficiently).
        
        Triggers 'track_muted' event for all participants.
        
        Use cases:
        - Push-to-talk release
        - Privacy (cough, conversation pause)
        - Selective communication
    """

def unmute(self) -> None:
    """Unmute the audio track.

    Resumes sending audio data to remote participants.
    
    Returns:
        None (synchronous operation)

    Example:
        >>> track.unmute()
        >>> print(f"Muted: {track.muted}")  # False
        
    Note:
        Unmuting is immediate and synchronous.
        Resumes sending actual audio immediately.
        No delay or buffering.
        
        Triggers 'track_unmuted' event for all participants.
    """

RemoteAudioTrack

class RemoteAudioTrack(Track):
    """Represents a remote audio track.

    Remote audio tracks are received from other participants
    and can be consumed via AudioStream.
    
    Read-only - cannot mute remote tracks (they control muting).
    """

    def __init__(self, info: proto_track.OwnedTrack) -> None:
        """Initialize a RemoteAudioTrack.

        Args:
            info: Internal track information

        Note:
            Created automatically by the SDK when subscribing
            to remote participants' audio tracks.
            Access via track_subscribed event.
        """

Type Aliases

LocalTrack = Union[LocalVideoTrack, LocalAudioTrack]
"""Union type for local tracks.

Can be either audio or video.
Use for type hints when accepting any local track.
"""

RemoteTrack = Union[RemoteVideoTrack, RemoteAudioTrack]
"""Union type for remote tracks.

Can be either audio or video.
Use for type hints when accepting any remote track.
"""

AudioTrack = Union[LocalAudioTrack, RemoteAudioTrack]
"""Union type for audio tracks.

Can be either local or remote.
Use for type hints when accepting any audio track.
"""

Creating and Publishing Audio Tracks

Basic Audio Track Publishing

from livekit import Room, AudioSource, LocalAudioTrack, TrackPublishOptions

async def publish_audio(room: Room):
    """Publish a local audio track to the room."""
    # Create audio source
    # sample_rate: 48000 recommended (also 16000, 8000)
    # num_channels: 1 (mono) or 2 (stereo)
    # queue_size_ms: Buffer size, default 1000ms
    source = AudioSource(
        sample_rate=48000,
        num_channels=1,  # Mono
        queue_size_ms=1000  # 1 second buffer
    )

    # Create track from source
    track = LocalAudioTrack.create_audio_track("microphone", source)

    # Publish track with default options
    options = TrackPublishOptions()
    publication = await room.local_participant.publish_track(track, options)

    print(f"Published audio track: {publication.sid}")

    return source, track, publication

Stereo Audio Track

from livekit import AudioSource, LocalAudioTrack

# Create stereo audio source
source = AudioSource(
    sample_rate=48000,
    num_channels=2,  # Stereo (left + right)
    queue_size_ms=1000
)

track = LocalAudioTrack.create_audio_track("stereo-audio", source)

# Publish
await room.local_participant.publish_track(track)

Publishing with Options

from livekit import TrackPublishOptions, TrackSource

options = TrackPublishOptions()
options.source = TrackSource.SOURCE_MICROPHONE  # Identify as microphone
options.dtx = True   # Enable discontinuous transmission (silence detection)
options.red = True   # Enable redundant encoding (packet loss recovery)

publication = await room.local_participant.publish_track(track, options)

Muting and Unmuting

from livekit import LocalAudioTrack

track: LocalAudioTrack = ...

# Mute audio (synchronous)
track.mute()
print(f"Track muted: {track.muted}")  # True

# Unmute audio (synchronous)
track.unmute()
print(f"Track muted: {track.muted}")  # False

# Toggle mute
if track.muted:
    track.unmute()
else:
    track.mute()

# Mute state in event
@room.on("track_muted")
def on_muted(participant, publication):
    if publication.kind == TrackKind.KIND_AUDIO:
        print(f"{participant.identity} muted audio")

Receiving Remote Audio

Subscribe to Remote Audio

from livekit import (
    Room,
    RemoteParticipant,
    RemoteTrackPublication,
    Track,
    TrackKind,
    AudioStream,
)

@room.on("track_subscribed")
def on_track_subscribed(
    track: Track,
    publication: RemoteTrackPublication,
    participant: RemoteParticipant
):
    """Handle newly subscribed remote tracks."""
    if track.kind == TrackKind.KIND_AUDIO:
        print(f"Subscribed to audio from {participant.identity}")
        print(f"  Track name: {track.name}")
        print(f"  Track SID: {track.sid}")
        print(f"  Muted: {track.muted}")

        # Create audio stream to receive frames
        audio_stream = AudioStream(track)

        # Process audio in background task
        asyncio.create_task(process_audio_stream(audio_stream))

async def process_audio_stream(stream: AudioStream):
    """Process audio frames from stream."""
    try:
        async for event in stream:
            frame = event.frame
            print(f"Received audio: {frame.samples_per_channel} samples, "
                  f"{frame.duration:.3f}s duration")
            
            # Access audio data
            # data: memoryview of int16 samples
            data = frame.data
            
            # Process audio (e.g., save to file, analyze, play)
            # ...
    finally:
        await stream.aclose()

Manual Track Subscription

from livekit import RemoteParticipant, RemoteTrackPublication, TrackKind

participant: RemoteParticipant = ...

# Subscribe to all audio tracks
for track_sid, publication in participant.track_publications.items():
    if isinstance(publication, RemoteTrackPublication):
        if publication.kind == TrackKind.KIND_AUDIO:
            # Subscribe to track
            publication.set_subscribed(True)

            # Check if track is available
            if publication.track:
                print(f"Subscribed to: {publication.track.name}")

Track Statistics

from livekit import Track

track: Track = ...

# Get track statistics
# Returns list of proto_stats.RtcStats
stats = await track.get_stats()

for stat in stats:
    # Access proto_stats.RtcStats fields
    # Contains: bitrate, packets, bytes, jitter, packet loss, etc.
    print(f"Track stat: {stat}")

Complete Example

import asyncio
import numpy as np
from livekit import (
    Room,
    RoomOptions,
    AudioSource,
    LocalAudioTrack,
    RemoteAudioTrack,
    AudioStream,
    TrackPublishOptions,
    TrackKind,
    RemoteParticipant,
    Track,
    RemoteTrackPublication,
)

async def main():
    room = Room()

    # Handle remote audio tracks
    @room.on("track_subscribed")
    def on_track_subscribed(
        track: Track,
        publication: RemoteTrackPublication,
        participant: RemoteParticipant
    ):
        if track.kind == TrackKind.KIND_AUDIO:
            print(f"Audio track from {participant.identity}: {track.name}")
            print(f"  SID: {track.sid}")
            print(f"  Muted: {track.muted}")
            print(f"  Stream state: {track.stream_state}")

            # Process the audio
            asyncio.create_task(receive_audio(track))

    # Handle track mute changes
    @room.on("track_muted")
    def on_track_muted(participant, publication):
        if publication.kind == TrackKind.KIND_AUDIO:
            print(f"{participant.identity}'s audio muted")

    @room.on("track_unmuted")
    def on_track_unmuted(participant, publication):
        if publication.kind == TrackKind.KIND_AUDIO:
            print(f"{participant.identity}'s audio unmuted")

    # Connect to room
    await room.connect(url, token, RoomOptions(auto_subscribe=True))

    # Create and publish local audio track
    source = AudioSource(sample_rate=48000, num_channels=1)
    track = LocalAudioTrack.create_audio_track("my-microphone", source)

    # Publish with options
    options = TrackPublishOptions()
    options.source = TrackSource.SOURCE_MICROPHONE
    options.dtx = True
    options.red = True
    
    publication = await room.local_participant.publish_track(track, options)

    print(f"Published audio track: {publication.sid}")

    # Generate and capture audio
    asyncio.create_task(generate_audio(source))

    # Mute/unmute demonstration
    await asyncio.sleep(5)
    track.mute()
    print("Audio muted")

    await asyncio.sleep(2)
    track.unmute()
    print("Audio unmuted")

    # Get statistics
    stats = await track.get_stats()
    print(f"Track stats: {len(stats)} items")

    # Keep running
    await asyncio.sleep(30)

    # Cleanup
    await room.local_participant.unpublish_track(track.sid)
    await source.aclose()
    await room.disconnect()

async def generate_audio(source: AudioSource):
    """Generate audio frames and capture to source."""
    from livekit import AudioFrame

    sample_rate = 48000
    num_channels = 1
    samples_per_channel = 480  # 10ms at 48kHz

    frame_duration = samples_per_channel / sample_rate

    while True:
        # Generate audio data (silence or actual audio)
        # Create frame with zeroed data
        frame = AudioFrame.create(
            sample_rate=sample_rate,
            num_channels=num_channels,
            samples_per_channel=samples_per_channel
        )

        # Optionally fill with audio data
        # data = frame.data  # memoryview of int16 samples
        # for i in range(len(data)):
        #     data[i] = generate_sample(i)

        # Capture frame to source
        await source.capture_frame(frame)

        # Wait for next frame
        await asyncio.sleep(frame_duration)

async def receive_audio(track: RemoteAudioTrack):
    """Receive and process remote audio."""
    # Create audio stream
    # sample_rate: Desired output sample rate (resampled if needed)
    # num_channels: Desired output channels
    audio_stream = AudioStream(
        track,
        sample_rate=48000,
        num_channels=1
    )

    try:
        async for event in audio_stream:
            frame = event.frame
            print(f"Received audio frame: {frame.samples_per_channel} samples, "
                  f"{frame.duration:.3f}s duration")

            # Access audio data
            # data: memoryview of int16 samples
            # For mono: [sample0, sample1, sample2, ...]
            # For stereo: [L0, R0, L1, R1, L2, R2, ...]
            data = frame.data

            # Process audio data
            # Examples:
            # - Save to file
            # - Analyze (volume, frequency)
            # - Play through speaker
            # - Apply effects
            # ...

    finally:
        await audio_stream.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Working with Audio Data

Capturing Audio Frames

from livekit import AudioSource, AudioFrame

source = AudioSource(sample_rate=48000, num_channels=1)

# Create audio frame with data
# For mono: samples_per_channel samples
# For stereo: samples_per_channel * 2 samples (interleaved)
audio_data = bytearray(480 * 1 * 2)  # 480 samples * 1 channel * 2 bytes/sample

frame = AudioFrame(
    data=audio_data,
    sample_rate=48000,
    num_channels=1,
    samples_per_channel=480
)

# Capture frame to source
await source.capture_frame(frame)

Checking Queue Status

from livekit import AudioSource

source = AudioSource(sample_rate=48000, num_channels=1, queue_size_ms=1000)

# Check queued duration
# Returns seconds of audio currently queued
queued = source.queued_duration
print(f"Queued audio: {queued:.3f} seconds")

# Clear queue if needed (discards buffered audio)
source.clear_queue()

# Wait for playout (blocks until queue empty)
await source.wait_for_playout()
print("All audio played out")

Audio Track Features

Discontinuous Transmission (DTX)

# Enable DTX to save bandwidth during silence
options = TrackPublishOptions()
options.dtx = True  # Stop transmission during silence

# DTX automatically detects silence and stops sending
# Opus codec handles DTX efficiently

Redundant Encoding (RED)

# Enable RED for packet loss recovery
options = TrackPublishOptions()
options.red = True  # Send redundant audio data

# RED sends older frames alongside new frames
# Recovers from packet loss without retransmission
# Higher bandwidth usage but better quality on poor networks

Best Practices

1. Resource Cleanup

# Always clean up audio resources
source = None
track = None
try:
    source = AudioSource(48000, 1)
    track = LocalAudioTrack.create_audio_track("mic", source)
    await room.local_participant.publish_track(track)
    # ... use track
finally:
    if track:
        await room.local_participant.unpublish_track(track.sid)
    if source:
        await source.aclose()

2. Handle Mute State

@room.on("track_muted")
def on_muted(participant, publication):
    if publication.track and publication.track.kind == TrackKind.KIND_AUDIO:
        # Update UI to show muted state
        print(f"{participant.identity} muted their audio")
        # Display muted icon, etc.

3. Monitor Queue Health

source = AudioSource(48000, 1, queue_size_ms=1000)

async def monitor_queue():
    """Monitor audio queue for issues."""
    while True:
        queued = source.queued_duration
        
        if queued > 0.5:  # More than 500ms queued
            print(f"Warning: Audio queue backing up: {queued:.3f}s")
            # May indicate:
            # - Capturing too fast
            # - Network can't keep up
            # - Need to clear queue or adjust capture rate
        elif queued == 0.0:
            print("Warning: Audio queue empty (underrun)")
            # May cause audio gaps
            
        await asyncio.sleep(1.0)

asyncio.create_task(monitor_queue())

4. Use Appropriate Sample Rates

# Sample rate selection:

# 8000 Hz: Narrowband (phone quality)
# - Lowest bandwidth
# - Voice only
# - Use for: Simple voice communication
source_8k = AudioSource(sample_rate=8000, num_channels=1)

# 16000 Hz: Wideband (VoIP quality)
# - Better than phone
# - Still voice-optimized
# - Use for: VoIP applications
source_16k = AudioSource(sample_rate=16000, num_channels=1)

# 48000 Hz: Full-band (recommended)
# - Best quality
# - Music and voice
# - Use for: Professional applications
source_48k = AudioSource(sample_rate=48000, num_channels=1)

# Recommendation: Use 48000 Hz unless bandwidth is severely limited

5. Handle Track Failures

@room.on("track_subscription_failed")
def on_track_failed(participant, track_sid, error):
    """Handle track subscription failure."""
    print(f"Failed to subscribe to {track_sid}: {error}")
    
    # Common causes:
    if "codec" in error.lower():
        print("Codec not supported - participant may be using unsupported format")
    elif "permission" in error.lower():
        print("Permission denied - check token permissions")
    elif "network" in error.lower():
        print("Network error - check connectivity")

Advanced Patterns

Audio Level Monitoring

import numpy as np
from livekit import AudioFrame

def calculate_audio_level(frame: AudioFrame) -> float:
    """Calculate RMS audio level."""
    # Convert memoryview to numpy array
    samples = np.frombuffer(frame.data, dtype=np.int16)
    
    # Calculate RMS
    rms = np.sqrt(np.mean(samples.astype(np.float32) ** 2))
    
    # Normalize to 0-1 range (int16 max is 32767)
    level = rms / 32767.0
    
    return level

# Usage in stream processing
async def process_audio_stream(stream: AudioStream):
    async for event in stream:
        level = calculate_audio_level(event.frame)
        
        if level > 0.1:
            print(f"Speaking detected: {level:.2f}")
        else:
            print("Silence")

Voice Activity Detection

class VoiceActivityDetector:
    """Simple voice activity detection."""
    
    def __init__(self, threshold: float = 0.02, window_size: int = 5):
        self.threshold = threshold
        self.window = []
        self.window_size = window_size
    
    def add_frame(self, frame: AudioFrame) -> bool:
        """Check if frame contains voice activity."""
        level = calculate_audio_level(frame)
        
        # Add to window
        self.window.append(level)
        if len(self.window) > self.window_size:
            self.window.pop(0)
        
        # Check if average exceeds threshold
        avg_level = sum(self.window) / len(self.window)
        return avg_level > self.threshold

# Usage
vad = VoiceActivityDetector()

async def process_with_vad(stream: AudioStream):
    async for event in stream:
        if vad.add_frame(event.frame):
            print("Voice detected")
            # Process speech
        else:
            print("Silence detected")
            # Skip or handle differently

See Also

  • Audio Frames and Sources - Low-level audio handling
  • Audio Processing - Audio processing and effects
  • Track Publications - Track publication management
  • Participants - Publishing and managing tracks