or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

audio-frames-sources.mddocs/

Audio Frames and Sources

Overview

Low-level audio handling for creating, managing, and capturing audio frames to sources. Provides direct control over audio data for advanced use cases.

Key concepts:

  • AudioFrame: Container for PCM audio samples (int16 format)
  • AudioSource: Real-time audio output with internal queue
  • AudioStream: Async iterator for receiving audio frames
  • Sample rate: Samples per second (Hz) - typically 48000, 16000, or 8000
  • Channels: 1 (mono) or 2 (stereo)
  • Samples: int16 PCM samples, interleaved by channel

Import

from livekit import AudioFrame, AudioSource, AudioStream, AudioFrameEvent, NoiseCancellationOptions

AudioFrame

class AudioFrame:
    """Represents a frame of audio data with int16 samples interleaved by channel.
    
    Audio data format:
    - Sample format: 16-bit signed integer (int16)
    - Range: -32768 to 32767
    - Interleaving: [L0, R0, L1, R1, ...] for stereo
                   [S0, S1, S2, ...] for mono
    """

    def __init__(
        self,
        data: Union[bytes, bytearray, memoryview],
        sample_rate: int,
        num_channels: int,
        samples_per_channel: int
    ) -> None:
        """Initialize AudioFrame with audio data.
        
        Args:
            data: Raw audio data
                 Type: bytes | bytearray | memoryview
                 Must be at least num_channels * samples_per_channel * 2 bytes
                 Format: int16 PCM samples, interleaved by channel
                 
            sample_rate: Sample rate in Hz
                        Type: int
                        Common values: 48000, 16000, 8000
                        
            num_channels: Number of audio channels
                         Type: int
                         1 (mono) or 2 (stereo)
                         
            samples_per_channel: Number of samples per channel
                                Type: int
                                Frame duration = samples_per_channel / sample_rate
            
        Raises:
            ValueError: If data length is invalid
                       Required: num_channels * samples_per_channel * 2 bytes
            
        Example:
            >>> # Mono audio, 10ms at 48kHz
            >>> sample_rate = 48000
            >>> samples_per_channel = 480  # 10ms
            >>> data = bytearray(480 * 1 * 2)  # 480 samples * 1 channel * 2 bytes
            >>> frame = AudioFrame(data, sample_rate, 1, samples_per_channel)
            >>> 
            >>> # Stereo audio, 20ms at 48kHz
            >>> data = bytearray(960 * 2 * 2)  # 960 samples * 2 channels * 2 bytes
            >>> frame = AudioFrame(data, 48000, 2, 960)
        '''

    @staticmethod
    def create(sample_rate: int, num_channels: int, samples_per_channel: int) -> AudioFrame:
        """Create a new empty AudioFrame with zeroed data.
        
        Args:
            sample_rate: Sample rate in Hz
            num_channels: Number of channels (1 or 2)
            samples_per_channel: Samples per channel
            
        Returns:
            AudioFrame: New frame with zeroed data
            
        Example:
            >>> # Create 10ms mono frame at 48kHz
            >>> frame = AudioFrame.create(48000, 1, 480)
            >>> print(f"Duration: {frame.duration}s")  # 0.01
            >>> 
            >>> # Create 20ms stereo frame at 16kHz
            >>> frame = AudioFrame.create(16000, 2, 320)
            
        Note:
            Convenience method for creating frames.
            Data is zeroed (silence).
            Fill data using frame.data memoryview.
        """

    @property
    def userdata(self) -> dict[str, Any]:
        """User data dictionary associated with the frame.
        
        Returns:
            dict: Dictionary for storing custom metadata
            
        Example:
            >>> frame.userdata["timestamp"] = time.time()
            >>> frame.userdata["source"] = "microphone"
            
        Note:
            Not transmitted with frame.
            Used for local tracking/metadata.
        """

    @property
    def data(self) -> memoryview:
        """Memory view of audio data as 16-bit signed integers.
        
        Returns:
            memoryview: View of audio data (int16)
                       Can be modified in-place
                       
        Example:
            >>> frame = AudioFrame.create(48000, 1, 480)
            >>> data = frame.data
            >>> print(f"Total samples: {len(data)}")  # 480 for mono
            >>> 
            >>> # Modify samples
            >>> data[0] = 1000    # Set first sample
            >>> data[100] = -500  # Set another sample
            >>> 
            >>> # For stereo, samples are interleaved:
            >>> # data[0] = left0, data[1] = right0
            >>> # data[2] = left1, data[3] = right1
            
        Note:
            memoryview can be converted to numpy array:
            >>> import numpy as np
            >>> samples = np.frombuffer(data, dtype=np.int16)
            
            For stereo, reshape for separate channels:
            >>> samples = samples.reshape(-1, 2)  # Shape: (samples_per_channel, 2)
            >>> left = samples[:, 0]
            >>> right = samples[:, 1]
        """

    @property
    def sample_rate(self) -> int:
        """Sample rate in Hz.
        
        Returns:
            int: Samples per second (e.g., 48000, 16000, 8000)
        """

    @property
    def num_channels(self) -> int:
        """Number of audio channels.
        
        Returns:
            int: 1 (mono) or 2 (stereo)
        """

    @property
    def samples_per_channel(self) -> int:
        """Number of samples per channel.
        
        Returns:
            int: Sample count per channel
            
        Note:
            Total samples in data = samples_per_channel * num_channels
            
            For mono: len(data) == samples_per_channel
            For stereo: len(data) == samples_per_channel * 2
        """

    @property
    def duration(self) -> float:
        """Duration of the frame in seconds.
        
        Returns:
            float: Frame duration in seconds
                  Calculated as: samples_per_channel / sample_rate
                  
        Example:
            >>> frame = AudioFrame.create(48000, 1, 480)
            >>> print(frame.duration)  # 0.01 (10ms)
            >>> 
            >>> frame = AudioFrame.create(48000, 1, 2400)
            >>> print(frame.duration)  # 0.05 (50ms)
        """

    def to_wav_bytes(self) -> bytes:
        """Convert the audio frame to WAV-formatted bytes.
        
        Returns:
            bytes: Complete WAV file as bytes
                  Includes WAV header and audio data
                  
        Example:
            >>> frame = AudioFrame.create(48000, 1, 480)
            >>> # Fill with audio data
            >>> wav_bytes = frame.to_wav_bytes()
            >>> 
            >>> # Save to file
            >>> with open("audio.wav", "wb") as f:
            ...     f.write(wav_bytes)
            
        Note:
            Creates complete WAV file in memory.
            Useful for:
            - Saving audio snippets
            - Debugging audio pipeline
            - Exporting audio
            
            WAV format:
            - Header: 44 bytes
            - Data: frame audio samples
            - Total: 44 + (samples_per_channel * num_channels * 2) bytes
        """

AudioSource

class AudioSource:
    """Real-time audio source with internal audio queue.
    
    Manages audio frame queue for publishing.
    Handles timing and buffering automatically.
    """

    def __init__(
        self,
        sample_rate: int,
        num_channels: int,
        queue_size_ms: int = 1000,
        loop: asyncio.AbstractEventLoop | None = None
    ) -> None:
        """Initialize AudioSource.
        
        Args:
            sample_rate: Sample rate in Hz
                        Type: int
                        Common values: 48000, 16000, 8000
                        
            num_channels: Number of audio channels
                         Type: int
                         1 (mono) or 2 (stereo)
                         
            queue_size_ms: Buffer size in milliseconds
                          Type: int
                          Default: 1000 (1 second)
                          Range: 100-10000 recommended
                          
            loop: Event loop to use
                 Type: asyncio.AbstractEventLoop | None
                 Default: None (uses current loop)
                 
        Returns:
            AudioSource instance
            
        Raises:
            ValueError: If parameters invalid
            RuntimeError: If loop not available
            
        Example:
            >>> # Mono source at 48kHz with 1s buffer
            >>> source = AudioSource(
            ...     sample_rate=48000,
            ...     num_channels=1,
            ...     queue_size_ms=1000
            ... )
            >>> 
            >>> # Stereo source at 16kHz with 500ms buffer
            >>> source = AudioSource(16000, 2, queue_size_ms=500)
            
        Note:
            Sample rate and channels are fixed at creation.
            To change, create new source.
            
            Queue size affects latency:
            - Smaller (100-500ms): Lower latency, risk of underruns
            - Larger (1000-5000ms): Higher latency, more stable
        """

    @property
    def sample_rate(self) -> int:
        """Sample rate in Hz.
        
        Returns:
            int: Sample rate (e.g., 48000)
        """

    @property
    def num_channels(self) -> int:
        """Number of audio channels.
        
        Returns:
            int: 1 (mono) or 2 (stereo)
        """

    @property
    def queued_duration(self) -> float:
        """Current duration of queued audio data in seconds.
        
        Returns:
            float: Seconds of audio currently in queue
                  0.0 if queue empty
                  
        Example:
            >>> queued = source.queued_duration
            >>> print(f"Queued: {queued:.3f}s")
            >>> 
            >>> if queued > 1.0:
            ...     print("Queue backing up!")
            ... elif queued < 0.1:
            ...     print("Queue running low")
            
        Note:
            Monitor this to detect issues:
            - Too high: Capturing faster than can send
            - Too low: May cause underruns (gaps)
            
            Healthy range: 0.2 - 0.8 seconds
        """

    def clear_queue(self) -> None:
        """Clear the internal audio queue, discarding all buffered audio data.
        
        Returns:
            None
            
        Example:
            >>> # Clear queue if backed up
            >>> if source.queued_duration > 2.0:
            ...     source.clear_queue()
            ...     print("Queue cleared")
            
        Note:
            Discards all queued audio immediately.
            Next capture_frame() starts fresh.
            
            Use cases:
            - Recover from queue backup
            - Reset after long pause
            - Clear stale audio data
        """

    async def capture_frame(self, frame: AudioFrame) -> None:
        """Capture an audio frame and queue it for playback.
        
        Args:
            frame: AudioFrame to capture
                  Must match source sample_rate and num_channels
                  
        Returns:
            None (awaitable)
            
        Raises:
            ValueError: If frame sample rate or channels don't match source
            RuntimeError: If queue is full and cannot accept frame
            
        Example:
            >>> source = AudioSource(48000, 1)
            >>> frame = AudioFrame.create(48000, 1, 480)
            >>> 
            >>> # Fill frame with audio data
            >>> # ... 
            >>> 
            >>> # Capture to source
            >>> await source.capture_frame(frame)
            
        Note:
            Frame is queued for publishing.
            Blocks if queue is full (back pressure).
            
            Publishing happens asynchronously in background.
            No need to manage timing - source handles it.
        """

    async def wait_for_playout(self) -> None:
        """Wait for the audio source to finish playing all queued audio.
        
        Returns:
            None (awaitable, blocks until queue empty)
            
        Example:
            >>> # Capture multiple frames
            >>> for frame in frames:
            ...     await source.capture_frame(frame)
            >>> 
            >>> # Wait for all frames to play out
            >>> await source.wait_for_playout()
            >>> print("All audio played")
            
        Note:
            Blocks until queued_duration reaches 0.
            Useful for ensuring audio completes before:
            - Disconnecting
            - Switching tracks
            - Ending session
        """

    async def aclose(self) -> None:
        """Close the audio source and clean up resources.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> source = AudioSource(48000, 1)
            >>> # ... use source
            >>> await source.aclose()
            
        Note:
            Always close sources when done.
            Releases internal resources and queue.
            Source cannot be reused after closing.
            
            Best practice:
            >>> try:
            ...     source = AudioSource(48000, 1)
            ...     # Use source
            ... finally:
            ...     await source.aclose()
        """

NoiseCancellationOptions

@dataclass
class NoiseCancellationOptions:
    """Options for configuring noise cancellation in audio streams.

    Attributes:
        module_id: Identifier for the noise cancellation module to use
                  Type: str
                  Module-specific identifier
                  
        options: Dictionary of module-specific configuration options
                Type: dict[str, Any]
                Configuration depends on module_id
    """
    module_id: str
    options: dict[str, Any]

Used with AudioStream to enable noise cancellation on received audio. The specific module_id and options depend on the noise cancellation implementation being used.

Example:

# Create audio stream with noise cancellation
nc_options = NoiseCancellationOptions(
    module_id="krisp",  # Example module
    options={"level": "high"}
)

stream = AudioStream(
    track,
    noise_cancellation=nc_options
)

AudioStream

@dataclass
class AudioFrameEvent:
    """Event representing a received audio frame.
    
    Attributes:
        frame: The audio frame
              Type: AudioFrame
    """
    frame: AudioFrame

class AudioStream(AsyncIterator[AudioFrameEvent]):
    """Asynchronous audio stream for receiving audio frames.
    
    Async iterator that yields AudioFrameEvent objects.
    """

    def __init__(
        self,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
        frame_size_ms: int | None = None,
        noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
        **kwargs
    ) -> None:
        """Initialize AudioStream.
        
        Args:
            track: Audio track to receive from
                  Type: Track (RemoteAudioTrack or LocalAudioTrack)
                  
            loop: Event loop to use
                 Type: asyncio.AbstractEventLoop | None
                 Default: None (uses current loop)
                 
            capacity: Internal frame queue capacity
                     Type: int
                     Default: 0 (unbounded)
                     >0: Bounded queue (drops frames if full)
                     
            sample_rate: Sample rate for audio stream
                        Type: int
                        Default: 48000
                        SDK resamples if track has different rate
                        
            num_channels: Number of audio channels
                         Type: int
                         Default: 1 (mono)
                         SDK converts if track has different channels
                         
            frame_size_ms: Frame size in milliseconds
                          Type: int | None
                          Default: None (uses default, typically 10ms)
                          
            noise_cancellation: Noise cancellation options or processor
                               Type: NoiseCancellationOptions | FrameProcessor | None
                               Default: None (no noise cancellation)
                               
        Example:
            >>> # Basic stream
            >>> stream = AudioStream(track)
            >>> 
            >>> # Stream with custom settings
            >>> stream = AudioStream(
            ...     track,
            ...     sample_rate=16000,  # Resample to 16kHz
            ...     num_channels=1,     # Convert to mono
            ...     capacity=100        # Buffer up to 100 frames
            ... )
            >>> 
            >>> # Stream with noise cancellation
            >>> nc_options = NoiseCancellationOptions(
            ...     module_id="krisp",
            ...     options={}
            ... )
            >>> stream = AudioStream(track, noise_cancellation=nc_options)
        '''

    @classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
        frame_size_ms: int | None = None,
        noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None
    ) -> AudioStream:
        """Create an AudioStream from a participant's audio track.
        
        Args:
            participant: Participant to receive audio from
            track_source: Track source type (e.g., SOURCE_MICROPHONE)
            ... (other args same as __init__)
            
        Returns:
            AudioStream: Stream for specified track source
            
        Raises:
            ValueError: If participant doesn't have track with specified source
            
        Example:
            >>> # Stream microphone audio from participant
            >>> stream = AudioStream.from_participant(
            ...     participant=remote_participant,
            ...     track_source=TrackSource.SOURCE_MICROPHONE
            ... )
            
        Note:
            Convenience method for common use case.
            Finds first track matching source type.
        """

    @classmethod
    def from_track(
        cls,
        *,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
        frame_size_ms: int | None = None,
        noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None
    ) -> AudioStream:
        """Create an AudioStream from an existing track.
        
        Alternative constructor (same as __init__).
        
        Example:
            >>> stream = AudioStream.from_track(track=my_track)
        """

    async def aclose(self) -> None:
        """Close the audio stream and clean up resources.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> stream = AudioStream(track)
            >>> try:
            ...     async for event in stream:
            ...         process(event.frame)
            ... finally:
            ...     await stream.aclose()
            
        Note:
            Always close streams when done.
            Releases resources and stops receiving frames.
        """

    def __aiter__(self) -> AsyncIterator[AudioFrameEvent]:
        """Return self as async iterator.
        
        Returns:
            AsyncIterator[AudioFrameEvent]: Self
        """

    async def __anext__(self) -> AudioFrameEvent:
        """Get next audio frame event.
        
        Returns:
            AudioFrameEvent: Next frame event
            
        Raises:
            StopAsyncIteration: When stream ends
            
        Example:
            >>> async for event in stream:
            ...     frame = event.frame
            ...     print(f"Received {frame.samples_per_channel} samples")
        """

Complete Example

import asyncio
from livekit import (
    Room, AudioSource, LocalAudioTrack, AudioFrame,
    AudioStream, TrackKind, RemoteAudioTrack
)

async def main():
    room = Room()
    
    # Publishing audio
    source = AudioSource(sample_rate=48000, num_channels=1)
    track = LocalAudioTrack.create_audio_track("mic", source)
    await room.local_participant.publish_track(track)
    
    # Generate and capture audio
    asyncio.create_task(generate_audio(source))
    
    # Receiving audio
    @room.on("track_subscribed")
    def on_track(track, publication, participant):
        if track.kind == TrackKind.KIND_AUDIO:
            asyncio.create_task(receive_audio(track))
    
    await room.connect(url, token)
    await asyncio.sleep(30)
    
    # Cleanup
    await source.aclose()
    await room.disconnect()

async def generate_audio(source: AudioSource):
    """Generate audio frames."""
    sample_rate = 48000
    samples_per_channel = 480  # 10ms
    
    while True:
        # Create frame
        frame = AudioFrame.create(sample_rate, 1, samples_per_channel)
        
        # Fill with audio data (silence in this example)
        # In production, get from microphone:
        # data = frame.data
        # for i in range(len(data)):
        #     data[i] = microphone_sample(i)
        
        # Capture
        await source.capture_frame(frame)
        
        # Wait for next frame
        await asyncio.sleep(0.01)  # 10ms

async def receive_audio(track: RemoteAudioTrack):
    """Receive and process audio."""
    stream = AudioStream(track, sample_rate=48000, num_channels=1)
    
    try:
        async for event in stream:
            frame = event.frame
            print(f"Received {frame.samples_per_channel} samples, "
                  f"{frame.duration:.3f}s duration")
            
            # Access data
            data = frame.data  # memoryview of int16
            
            # Process audio
            # Examples:
            # - Play through speaker
            # - Save to file
            # - Analyze audio
            # - Apply effects
            
    finally:
        await stream.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

1. Match Sample Rates

# Source and frames must match
source = AudioSource(sample_rate=48000, num_channels=1)

# Good: Matching sample rate
frame = AudioFrame.create(48000, 1, 480)
await source.capture_frame(frame)

# Bad: Mismatched sample rate
# frame = AudioFrame.create(16000, 1, 160)
# await source.capture_frame(frame)  # Raises ValueError

2. Monitor Queue Health

async def monitor_queue(source: AudioSource):
    """Monitor audio source queue."""
    while True:
        queued = source.queued_duration
        
        if queued > 1.5:
            print(f"Queue backing up: {queued:.3f}s")
            # Consider: clear_queue() or slow down capture
        elif queued < 0.05:
            print(f"Queue running low: {queued:.3f}s")
            # May cause audio gaps
        
        await asyncio.sleep(1.0)

asyncio.create_task(monitor_queue(source))

3. Handle Timing Properly

async def capture_with_timing(source: AudioSource):
    """Capture with accurate timing."""
    sample_rate = 48000
    samples_per_channel = 480  # 10ms
    frame_duration = samples_per_channel / sample_rate
    
    next_capture_time = time.time()
    
    while True:
        # Create and capture frame
        frame = AudioFrame.create(sample_rate, 1, samples_per_channel)
        # Fill with data...
        await source.capture_frame(frame)
        
        # Calculate next capture time
        next_capture_time += frame_duration
        
        # Sleep until next frame (with drift compensation)
        now = time.time()
        sleep_time = max(0, next_capture_time - now)
        await asyncio.sleep(sleep_time)

4. Clean Up Streams

# Always use try/finally
stream = AudioStream(track)
try:
    async for event in stream:
        process(event.frame)
finally:
    await stream.aclose()

# Or use context pattern
async def process_stream_safely(track):
    stream = AudioStream(track)
    try:
        async for event in stream:
            process(event.frame)
    except Exception as e:
        print(f"Stream error: {e}")
    finally:
        await stream.aclose()

See Also

  • Audio Tracks - Publishing and managing audio tracks
  • Audio Processing - Audio processing features
  • Utilities - Audio utilities and helpers (mixer, resampler)