or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/livekit@1.0.x

docs

audio-frames-sources.mdaudio-processing.mdaudio-tracks.mddata-streaming.mde2ee.mdevents.mdindex.mdparticipants.mdroom.mdrpc.mdtrack-publications.mdtranscription.mdtypes-enums.mdutilities.mdvideo-frames-sources.mdvideo-processing.mdvideo-tracks.md
tile.json

tessl/pypi-livekit

tessl install tessl/pypi-livekit@1.0.0

Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities

utilities.mddocs/

Utilities

Overview

Helper classes and functions for audio/video processing, synchronization, and device management. Provides utilities for common tasks like mixing audio, resampling, and synchronizing A/V streams.

Key utilities:

  • combine_audio_frames: Combine multiple audio frames
  • AudioMixer: Mix multiple audio streams
  • AudioResampler: Convert sample rates
  • AudioFilter: Load audio filter plugins
  • AVSynchronizer: Synchronize audio and video
  • MediaDevices: Native audio device access
  • FrameProcessor: Base class for custom processing
  • sine_wave_generator: Generate test audio

Import

from livekit import (
    combine_audio_frames,
    AudioMixer,
    AudioResampler,
    AudioResamplerQuality,
    AudioFilter,
    AVSynchronizer,
    MediaDevices,
    FrameProcessor,
)

# sine_wave_generator requires separate import from utils module
from livekit.rtc.utils import sine_wave_generator

Functions

combine_audio_frames

def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame:
    """Combines one or more AudioFrame objects into a single AudioFrame.

    Args:
        buffer: Single AudioFrame or list of AudioFrame objects
               Type: AudioFrame | list[AudioFrame]
               All frames must have same sample rate and channel count

    Returns:
        AudioFrame: Combined frame containing all samples
        
    Raises:
        ValueError: If buffer is empty
        ValueError: If frames have differing sample rates
        ValueError: If frames have differing channel counts
        
    Example:
        >>> frame1 = AudioFrame.create(48000, 1, 480)  # 10ms
        >>> frame2 = AudioFrame.create(48000, 1, 480)  # 10ms
        >>> frame3 = AudioFrame.create(48000, 1, 480)  # 10ms
        >>> 
        >>> # Combine into single frame
        >>> combined = combine_audio_frames([frame1, frame2, frame3])
        >>> print(f"Duration: {combined.duration}s")  # 0.03 (30ms)
        >>> 
        >>> # Single frame (returns copy)
        >>> single = combine_audio_frames(frame1)
        
    Note:
        Useful for buffering or batching audio.
        Samples are concatenated in order.
        
        Combined frame properties:
        - sample_rate: Same as input frames
        - num_channels: Same as input frames
        - samples_per_channel: Sum of all inputs
        - duration: Sum of all input durations
    """

sine_wave_generator

async def sine_wave_generator(
    freq: float,
    duration: float,
    sample_rate: int = 48000,
    amplitude: float = 0.3,
) -> AsyncIterator[AudioFrame]:
    """Generate sine wave audio frames.

    Useful for testing audio pipelines and generating test signals.

    Args:
        freq: Frequency of the sine wave in Hz
             Type: float
             Range: 20-20000 Hz (human hearing range)
             Common values:
             - 440 Hz: A4 note (tuning standard)
             - 1000 Hz: Test tone
             
        duration: Duration of the audio in seconds
                 Type: float
                 Must be positive
                 
        sample_rate: Sample rate in Hz
                    Type: int
                    Default: 48000
                    
        amplitude: Amplitude of the sine wave
                  Type: float
                  Range: [0.0, 1.0]
                  Default: 0.3 (30% volume)
                  1.0 = full scale (may clip)

    Yields:
        AudioFrame: Audio frames containing sine wave data
        Frames are mono (1 channel)
        Frame size: 10ms (480 samples at 48kHz)

    Raises:
        ImportError: If numpy is not installed
                    Requires: pip install numpy
        ValueError: If parameters out of range

    Example:
        >>> import asyncio
        >>> 
        >>> # Generate 440 Hz tone for 1 second
        >>> async def generate_tone():
        ...     async for frame in sine_wave_generator(440, 1.0):
        ...         print(f"Generated frame with {frame.samples_per_channel} samples")
        ...         # Process or capture frame
        >>> 
        >>> asyncio.run(generate_tone())
        >>> 
        >>> # Generate test tone and capture
        >>> source = AudioSource(48000, 1)
        >>> async for frame in sine_wave_generator(1000, 2.0):
        ...     await source.capture_frame(frame)
        
    Note:
        Requires numpy package.
        Generates mono audio (1 channel).
        Yields frames every 10ms.
        
        Use cases:
        - Testing audio pipeline
        - Audio system verification
        - Generating notification sounds
        - Creating test signals
    """

AudioMixer

class AudioMixer(AsyncIterator[AudioFrame]):
    """Mix multiple audio streams into a single output stream.
    
    Combines multiple audio sources by summing samples.
    Automatically handles timing and synchronization.
    """
    
    def __init__(
        self,
        sample_rate: int,
        num_channels: int,
        *,
        blocksize: int = 0,
        stream_timeout_ms: int = 100,
        capacity: int = 100
    ) -> None:
        """Initialize AudioMixer.
        
        Args:
            sample_rate: Audio sample rate in Hz
                        Type: int
                        All input streams resampled to this rate
                        
            num_channels: Number of audio channels
                         Type: int
                         1 (mono) or 2 (stereo)
                         
            blocksize: Audio block size in samples
                      Type: int
                      Default: 0 (uses sample_rate // 10 = 10ms blocks)
                      Number of samples per output frame
                      
            stream_timeout_ms: Max wait time for streams in milliseconds
                              Type: int
                              Default: 100
                              Time to wait for all streams before mixing
                              
            capacity: Max mixed frames in output queue
                     Type: int
                     Default: 100
                     Limits memory usage
                     
        Example:
            >>> # Mix at 48kHz mono
            >>> mixer = AudioMixer(sample_rate=48000, num_channels=1)
            >>> 
            >>> # Mix at 48kHz stereo with custom block size
            >>> mixer = AudioMixer(
            ...     sample_rate=48000,
            ...     num_channels=2,
            ...     blocksize=960  # 20ms at 48kHz
            ... )
            
        Note:
            Mixer runs as async iterator.
            Yields mixed frames continuously.
            Stops when end_input() called or all streams end.
        """
    
    def add_stream(self, stream: AsyncIterator[AudioFrame]) -> None:
        """Add an audio stream to the mixer.
        
        Args:
            stream: Audio stream to mix
                   Type: AsyncIterator[AudioFrame]
                   Typically AudioStream instance
                   
        Returns:
            None
            
        Example:
            >>> stream1 = AudioStream(track1)
            >>> stream2 = AudioStream(track2)
            >>> 
            >>> mixer.add_stream(stream1)
            >>> mixer.add_stream(stream2)
            >>> 
            >>> # Iterate mixed output
            >>> async for mixed_frame in mixer:
            ...     await output_source.capture_frame(mixed_frame)
            
        Note:
            Can add streams dynamically while mixing.
            Stream is automatically removed when it ends.
        """
    
    def remove_stream(self, stream: AsyncIterator[AudioFrame]) -> None:
        """Remove an audio stream from the mixer.
        
        Args:
            stream: Audio stream to remove
                   Must be previously added
                   
        Returns:
            None
            
        Raises:
            ValueError: If stream not in mixer
            
        Example:
            >>> mixer.remove_stream(stream1)
            
        Note:
            Stream removal is immediate.
            Remaining streams continue mixing.
        """
    
    async def aclose(self) -> None:
        """Immediately stop mixing and close the mixer.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> await mixer.aclose()
            
        Note:
            Stops all stream processing.
            Closes output queue.
            Cannot be reused after closing.
        """
    
    def end_input(self) -> None:
        """Signal that no more streams will be added (flushes remaining data).
        
        Returns:
            None
            
        Example:
            >>> # Add all streams
            >>> mixer.add_stream(stream1)
            >>> mixer.add_stream(stream2)
            >>> 
            >>> # Signal end of input
            >>> mixer.end_input()
            >>> 
            >>> # Mixer will end when all streams complete
            
        Note:
            Mixer continues until all streams end.
            After end_input(), cannot add more streams.
            Output iterator completes when all streams done.
        """
    
    def __aiter__(self) -> AudioMixer:
        """Return self as async iterator."""
    
    async def __anext__(self) -> AudioFrame:
        """Get next mixed audio frame.
        
        Returns:
            AudioFrame: Mixed frame
            
        Raises:
            StopAsyncIteration: When mixing complete
        """

AudioResampler

class AudioResamplerQuality(StrEnum):
    """Audio resampler quality levels.
    
    Higher quality = better audio, more CPU usage.
    """
    QUICK = "quick"          # Fastest, lowest quality
    LOW = "low"              # Fast, low quality
    MEDIUM = "medium"        # Balanced (default)
    HIGH = "high"            # Slow, high quality
    VERY_HIGH = "very_high"  # Slowest, best quality

class AudioResampler:
    """Resample audio data from one sample rate to another using Sox.
    
    Uses high-quality SoX resampler.
    Handles sample rate conversion with minimal quality loss.
    """
    
    def __init__(
        self,
        input_rate: int,
        output_rate: int,
        *,
        num_channels: int = 1,
        quality: AudioResamplerQuality = AudioResamplerQuality.MEDIUM
    ) -> None:
        """Initialize AudioResampler.
        
        Args:
            input_rate: Input sample rate in Hz
                       Type: int
                       Source audio sample rate
                       
            output_rate: Output sample rate in Hz
                        Type: int
                        Target audio sample rate
                        
            num_channels: Number of audio channels
                         Type: int
                         Default: 1 (mono)
                         
            quality: Resampling quality
                    Type: AudioResamplerQuality
                    Default: MEDIUM
                    Higher quality = better audio, more CPU
                    
        Example:
            >>> # Resample 44.1kHz to 48kHz
            >>> resampler = AudioResampler(
            ...     input_rate=44100,
            ...     output_rate=48000,
            ...     quality=AudioResamplerQuality.HIGH
            ... )
            >>> 
            >>> # Downsample 48kHz to 16kHz
            >>> resampler = AudioResampler(
            ...     input_rate=48000,
            ...     output_rate=16000
            ... )
            
        Note:
            Resampler maintains internal state.
            One resampler per input stream.
            
            Common conversions:
            - 44100 -> 48000: CD audio to LiveKit
            - 48000 -> 16000: LiveKit to speech recognition
            - 8000 -> 48000: Phone audio to LiveKit
        """
    
    def push(self, data: bytearray | AudioFrame) -> list[AudioFrame]:
        """Push audio data into resampler and retrieve resampled data.
        
        Args:
            data: Audio data to resample
                 Type: bytearray | AudioFrame
                 If bytearray: Raw int16 PCM samples
                 If AudioFrame: Uses frame data
                 
        Returns:
            list[AudioFrame]: List of resampled frames
                            May be 0, 1, or multiple frames
                            Depends on input/output rate ratio
                            
        Example:
            >>> # Resample frame
            >>> input_frame = AudioFrame.create(44100, 1, 441)  # 10ms at 44.1kHz
            >>> output_frames = resampler.push(input_frame)
            >>> 
            >>> for frame in output_frames:
            ...     print(f"Output: {frame.samples_per_channel} samples")
            ...     await source.capture_frame(frame)
            
        Note:
            May return 0 frames if resampler buffering.
            May return multiple frames if ratio produces more data.
            
            Typical returns:
            - 44100 -> 48000: Usually 1-2 frames per input
            - 48000 -> 16000: Usually 0-1 frames per input
            
            Call flush() at end to get remaining frames.
        """
    
    def flush(self) -> list[AudioFrame]:
        """Flush remaining audio data through resampler.
        
        Returns:
            list[AudioFrame]: Remaining resampled frames
            
        Example:
            >>> # Process all frames
            >>> for input_frame in input_frames:
            ...     output_frames = resampler.push(input_frame)
            ...     for frame in output_frames:
            ...         await source.capture_frame(frame)
            >>> 
            >>> # Flush remaining
            >>> final_frames = resampler.flush()
            >>> for frame in final_frames:
            ...     await source.capture_frame(frame)
            
        Note:
            Call at end of stream to get buffered samples.
            Returns empty list if no buffered samples.
        """

AudioFilter

class AudioFilter:
    """Load and manage audio filter plugins.
    
    Enables loading custom audio processing plugins.
    Advanced feature for specialized audio effects.
    """
    
    def __init__(
        self,
        module_id: str,
        path: str,
        dependencies: Optional[List[str]] = None
    ) -> None:
        """Initialize AudioFilter.
        
        Args:
            module_id: Module identifier
                      Type: str
                      Unique identifier for filter
                      
            path: Path to plugin library
                 Type: str
                 Path to shared library (.so, .dylib, .dll)
                 
            dependencies: Optional list of dependency paths
                         Type: List[str] | None
                         Default: None
                         Paths to required dependencies
        
        Raises:
            Exception: If plugin fails to load
            FileNotFoundError: If path doesn't exist
            
        Example:
            >>> filter = AudioFilter(
            ...     module_id="my_filter",
            ...     path="/path/to/filter.so",
            ...     dependencies=["/path/to/dep1.so"]
            ... )
            
        Note:
            Advanced feature for custom audio processing.
            Plugin must follow LiveKit audio filter interface.
        """

AVSynchronizer

class AVSynchronizer:
    """Synchronize audio and video capture for consistent timing.
    
    Ensures audio and video frames are captured with proper timing
    to maintain synchronization in published streams.
    """
    
    def __init__(
        self,
        *,
        audio_source: AudioSource,
        video_source: VideoSource,
        video_fps: float,
        video_queue_size_ms: float = 100,
        _max_delay_tolerance_ms: float = 300
    ):
        """Initialize AVSynchronizer.
        
        Args:
            audio_source: AudioSource for synchronized audio
                         Type: AudioSource
                         Must be created before synchronizer
                         
            video_source: VideoSource for synchronized video
                         Type: VideoSource
                         Must be created before synchronizer
                         
            video_fps: Target video frame rate
                      Type: float
                      Frames per second (e.g., 30.0, 24.0, 15.0)
                      
            video_queue_size_ms: Video frame buffer size
                                Type: float
                                Default: 100 (ms)
                                Larger = more latency, more stable
                                
            _max_delay_tolerance_ms: Max delay tolerance
                                    Type: float
                                    Default: 300 (ms)
                                    Internal parameter
                                    
        Example:
            >>> # Create sources
            >>> audio_source = AudioSource(sample_rate=48000, num_channels=2)
            >>> video_source = VideoSource(width=1280, height=720)
            >>> 
            >>> # Create synchronizer for 30 FPS video
            >>> av_sync = AVSynchronizer(
            ...     audio_source=audio_source,
            ...     video_source=video_source,
            ...     video_fps=30.0,
            ...     video_queue_size_ms=50
            ... )
            
        Note:
            Synchronizer manages timing automatically.
            Push frames as fast as available.
            Synchronizer buffers and releases at correct times.
        """
    
    @property
    def actual_fps(self) -> float:
        """Actual measured frames per second.
        
        Returns:
            float: Current FPS being achieved
                  May differ from target fps
                  
        Example:
            >>> print(f"Target FPS: {target_fps}")
            >>> print(f"Actual FPS: {av_sync.actual_fps}")
            
        Note:
            Useful for monitoring performance.
            If actual_fps < target_fps, system may be overloaded.
        """
    
    @property
    def last_video_time(self) -> float:
        """Time of last video frame captured.
        
        Returns:
            float: Timestamp in seconds
        """
    
    @property
    def last_audio_time(self) -> float:
        """Time of last audio frame played out.
        
        Returns:
            float: Timestamp in seconds
        """
    
    async def push(self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None) -> None:
        """Push a frame to the synchronizer.
        
        Args:
            frame: Frame to push
                  Type: VideoFrame | AudioFrame
                  Synchronizer handles both types
                  
            timestamp: Optional timestamp in seconds
                      Type: float | None
                      Default: None (uses current time)
                      
        Returns:
            None (awaitable)
            
        Example:
            >>> # Push video frame
            >>> video_frame = VideoFrame(1280, 720, VideoBufferType.RGBA, data)
            >>> await av_sync.push(video_frame)
            >>> 
            >>> # Push audio frame
            >>> audio_frame = AudioFrame.create(48000, 2, 480)
            >>> await av_sync.push(audio_frame)
            >>> 
            >>> # Push with explicit timestamp
            >>> timestamp = time.time()
            >>> await av_sync.push(video_frame, timestamp=timestamp)
            
        Note:
            Push frames as received/generated.
            Synchronizer handles buffering and timing.
            Video frames are buffered to match audio timing.
            Audio frames are played immediately or buffered.
        """
    
    async def clear_queue(self) -> None:
        """Clear all queued frames.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> # Clear on error or reset
            >>> await av_sync.clear_queue()
            
        Note:
            Discards all buffered video and audio.
            Use to recover from synchronization issues.
        """
    
    async def wait_for_playout(self) -> None:
        """Wait until all frames are played out.
        
        Returns:
            None (awaitable, blocks until queues empty)
            
        Example:
            >>> # Push all frames
            >>> for frame in frames:
            ...     await av_sync.push(frame)
            >>> 
            >>> # Wait for completion
            >>> await av_sync.wait_for_playout()
            >>> print("All A/V played out")
        """
    
    def reset(self) -> None:
        """Reset synchronizer state.
        
        Returns:
            None
            
        Example:
            >>> av_sync.reset()
            
        Note:
            Clears queues and resets timing.
            Use when starting new session or recovering from errors.
        """
    
    async def aclose(self) -> None:
        """Close the synchronizer and clean up resources.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> await av_sync.aclose()
            
        Note:
            Always close synchronizer when done.
            Does not close audio_source or video_source.
        """

Usage Example:

from livekit import rtc

# Create audio and video sources
audio_source = rtc.AudioSource(sample_rate=48000, num_channels=2)
video_source = rtc.VideoSource(width=1280, height=720)

# Create synchronizer for 30 FPS video
av_sync = rtc.AVSynchronizer(
    audio_source=audio_source,
    video_source=video_source,
    video_fps=30.0,
    video_queue_size_ms=50
)

# Push frames to synchronizer (handles timing automatically)
await av_sync.push(video_frame)
await av_sync.push(audio_frame)

# Check actual FPS being achieved
print(f"Actual FPS: {av_sync.actual_fps:.1f}")

# Clean up
await av_sync.aclose()
await audio_source.aclose()
await video_source.aclose()

MediaDevices

# Constants
DEFAULT_SAMPLE_RATE: int = 48000
DEFAULT_CHANNELS: int = 1
BLOCKSIZE: int = 2400  # 50ms at 48kHz

class MediaDevices:
    """High-level interface to native audio devices.
    
    Provides easy access to system microphones and speakers.
    Requires sounddevice package: pip install sounddevice
    """

    def __init__(
        self,
        *,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        input_sample_rate: int = DEFAULT_SAMPLE_RATE,
        output_sample_rate: int = DEFAULT_SAMPLE_RATE,
        num_channels: int = DEFAULT_CHANNELS,
        blocksize: int = BLOCKSIZE
    ) -> None:
        """Initialize MediaDevices.
        
        Args:
            loop: Event loop to use
            input_sample_rate: Sample rate for input (default: 48000)
            output_sample_rate: Sample rate for output (default: 48000)
            num_channels: Number of channels (default: 1)
            blocksize: Block size in samples (default: 2400 = 50ms at 48kHz)
            
        Raises:
            ImportError: If sounddevice not installed
        """

    def list_input_devices(self) -> list[dict[str, Any]]:
        """List available input devices (microphones).
        
        Returns:
            list[dict]: List of device information dicts
                       Each dict contains:
                       - name: Device name (str)
                       - index: Device index (int)
                       - channels: Max input channels (int)
                       - sample_rate: Default sample rate (float)
                       
        Example:
            >>> devices = media_devices.list_input_devices()
            >>> for dev in devices:
            ...     print(f"{dev['index']}: {dev['name']}")
        """

    def list_output_devices(self) -> list[dict[str, Any]]:
        """List available output devices (speakers).
        
        Returns:
            list[dict]: List of device information dicts
        """

    def default_input_device(self) -> Optional[int]:
        """Get default input device index.
        
        Returns:
            int | None: Default input device index or None
        """

    def default_output_device(self) -> Optional[int]:
        """Get default output device index.
        
        Returns:
            int | None: Default output device index or None
        """

    def open_input(
        self,
        *,
        enable_aec: bool = True,
        noise_suppression: bool = True,
        high_pass_filter: bool = True,
        auto_gain_control: bool = True,
        input_device: Optional[int] = None,
        queue_capacity: int = 50,
        input_channel_index: Optional[int] = None
    ) -> InputCapture:
        """Open audio input device and start capture.
        
        Args:
            enable_aec: Enable echo cancellation
            noise_suppression: Enable noise suppression
            high_pass_filter: Enable high-pass filter
            auto_gain_control: Enable automatic gain control
            input_device: Device index (None for default)
            queue_capacity: Frame queue capacity
            input_channel_index: Specific channel to capture
            
        Returns:
            InputCapture: Capture object with source and controls
            
        Example:
            >>> devices = MediaDevices()
            >>> capture = devices.open_input(
            ...     enable_aec=True,
            ...     noise_suppression=True,
            ...     auto_gain_control=True
            ... )
            >>> 
            >>> # Use captured audio
            >>> source = capture.source
            >>> track = LocalAudioTrack.create_audio_track("mic", source)
            >>> await room.local_participant.publish_track(track)
        """

    def open_output(self, *, output_device: Optional[int] = None) -> OutputPlayer:
        """Create an OutputPlayer for audio playback.
        
        Args:
            output_device: Device index (None for default)
            
        Returns:
            OutputPlayer: Player for audio playback
            
        Example:
            >>> player = devices.open_output()
            >>> await player.add_track(remote_audio_track)
            >>> await player.start()
        """

InputCapture

@dataclass
class InputCapture:
    """Holds resources for an active audio input capture.

    Attributes:
        source: AudioSource that receives captured frames
               Type: AudioSource
               Can be published as LocalAudioTrack
               Already has APM applied if enabled
               
        input_stream: Underlying sounddevice.InputStream
                     Type: Any (sounddevice.InputStream)
                     Low-level stream object
                     
        task: Async task that drains queue and calls source.capture_frame
             Type: asyncio.Task
             Background task processing audio
             
        apm: Optional AudioProcessingModule for processing
            Type: AudioProcessingModule | None
            AEC, NS, HPF, AGC if enabled
            
        delay_estimator: Internal helper for combining capture and render delays
                        Type: Any (internal implementation)
    """

    source: AudioSource
    input_stream: Any  # sounddevice.InputStream
    task: asyncio.Task
    apm: Optional[AudioProcessingModule]
    delay_estimator: Any  # Internal helper

    async def aclose(self) -> None:
        """Stop capture and close underlying resources.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> capture = devices.open_input()
            >>> # ... use capture
            >>> await capture.aclose()
            
        Note:
            Stops audio capture.
            Closes sounddevice stream.
            Cancels background task.
            Does not close capture.source (caller should close).
        """

OutputPlayer

class OutputPlayer:
    """Audio output helper using sounddevice.OutputStream.

    When apm_for_reverse is provided, feeds rendered PCM into APM reverse path
    for echo cancellation.
    
    Plays multiple tracks simultaneously (mixes automatically).
    """

    def __init__(
        self,
        loop: asyncio.AbstractEventLoop,
        sample_rate: int,
        num_channels: int,
        output_device: Optional[int],
        apm_for_reverse: Optional[AudioProcessingModule],
        delay_estimator: Any
    ) -> None:
        """Initialize OutputPlayer.
        
        Args:
            loop: Event loop
            sample_rate: Output sample rate
            num_channels: Number of output channels
            output_device: Device index (None for default)
            apm_for_reverse: APM for echo cancellation reverse stream
            delay_estimator: Internal delay estimator
            
        Note:
            Typically created via MediaDevices.open_output().
            Not usually constructed directly.
        """

    async def add_track(self, track: Track) -> None:
        """Add a track to play back.

        Args:
            track: Track to play
                  Type: Track (RemoteAudioTrack or LocalAudioTrack)
                  Multiple tracks can be added (mixed automatically)

        Returns:
            None (awaitable)
            
        Example:
            >>> player = devices.open_output()
            >>> 
            >>> # Add tracks as they're subscribed
            >>> @room.on("track_subscribed")
            >>> def on_track(track, publication, participant):
            ...     if track.kind == TrackKind.KIND_AUDIO:
            ...         asyncio.create_task(player.add_track(track))
            
        Note:
            Multiple tracks are mixed automatically.
            Track audio is resampled if needed.
        """

    async def remove_track(self, track: Track) -> None:
        """Remove a track from playback.

        Args:
            track: Track to remove
                  Type: Track

        Returns:
            None (awaitable)
            
        Example:
            >>> await player.remove_track(track)
            
        Note:
            Stops playing specified track.
            Other tracks continue playing.
        """

    async def start(self) -> None:
        """Start the output stream.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> player = devices.open_output()
            >>> await player.add_track(track1)
            >>> await player.add_track(track2)
            >>> await player.start()
            
        Note:
            Must be called after adding tracks.
            Starts sounddevice output stream.
        """

    async def aclose(self) -> None:
        """Stop playback and close resources.
        
        Returns:
            None (awaitable)
            
        Example:
            >>> await player.aclose()
        """

FrameProcessor

class FrameProcessor[T](ABC):
    """Generic interface for frame processing.
    
    Base class for implementing custom audio or video processing.
    Generic type T: AudioFrame or VideoFrame.
    """

    @property
    @abstractmethod
    def enabled(self) -> bool:
        """Whether processor is enabled (get/set).
        
        Returns:
            bool: True if processing enabled, False otherwise
            
        Note:
            Must implement as property with getter and setter.
        """

    @abstractmethod
    def _process(self, frame: T) -> T:
        """Process a frame.

        Args:
            frame: Frame to process
                  Type: T (AudioFrame or VideoFrame)

        Returns:
            T: Processed frame (can be same instance or new)
            
        Note:
            Called for each frame when enabled=True.
            Can modify frame in-place or create new frame.
            Should be fast (runs in hot path).
        """

    @abstractmethod
    def _close(self) -> None:
        """Close and cleanup processor resources.
        
        Returns:
            None
            
        Note:
            Called when processor is no longer needed.
            Free allocated resources.
            Not async (use async cleanup before if needed).
        """

    def _on_stream_info_updated(
        self,
        *,
        room_name: str,
        participant_identity: str,
        publication_sid: str,
    ) -> None:
        """Optional callback when stream information is updated.

        Args:
            room_name: Name of the room
            participant_identity: Identity of the participant
            publication_sid: SID of the track publication
            
        Note:
            Optional method.
            Called when stream metadata changes.
            Override if processor needs this information.
        """

    def _on_credentials_updated(self, *, token: str, url: str) -> None:
        """Optional callback when credentials are updated.

        Args:
            token: New access token
            url: LiveKit server URL
            
        Note:
            Optional method.
            Called when token is refreshed.
            Override if processor needs credentials.
        """

Example Implementation:

from livekit import FrameProcessor, AudioFrame
import numpy as np

class GainProcessor(FrameProcessor[AudioFrame]):
    """Custom audio gain processor."""
    
    def __init__(self, gain: float = 1.0):
        self._enabled = True
        self._gain = gain
    
    @property
    def enabled(self) -> bool:
        return self._enabled
    
    @enabled.setter
    def enabled(self, value: bool):
        self._enabled = value
    
    def set_gain(self, gain: float):
        """Set gain multiplier."""
        self._gain = gain
    
    def _process(self, frame: AudioFrame) -> AudioFrame:
        """Apply gain to audio frame."""
        if not self._enabled or self._gain == 1.0:
            return frame
        
        # Convert to numpy array
        samples = np.frombuffer(frame.data, dtype=np.int16)
        
        # Apply gain
        samples = samples.astype(np.float32) * self._gain
        
        # Clamp and convert back
        samples = np.clip(samples, -32768, 32767).astype(np.int16)
        
        # Create new frame with processed data
        return AudioFrame(
            samples.tobytes(),
            frame.sample_rate,
            frame.num_channels,
            frame.samples_per_channel
        )
    
    def _close(self):
        """Cleanup resources."""
        pass

# Usage
processor = GainProcessor(gain=1.5)  # 150% volume
stream = AudioStream(track, noise_cancellation=processor)

Complete Examples

Example 1: Audio Mixing

from livekit import AudioMixer, AudioStream, AudioSource, LocalAudioTrack

async def mix_audio_streams(track1, track2):
    """Mix two audio streams."""
    # Create streams
    stream1 = AudioStream(track1, sample_rate=48000, num_channels=1)
    stream2 = AudioStream(track2, sample_rate=48000, num_channels=1)
    
    # Create mixer
    mixer = AudioMixer(sample_rate=48000, num_channels=1)
    mixer.add_stream(stream1)
    mixer.add_stream(stream2)
    
    # Create output
    output_source = AudioSource(48000, 1)
    output_track = LocalAudioTrack.create_audio_track("mixed", output_source)
    await room.local_participant.publish_track(output_track)
    
    # Mix and publish
    try:
        async for mixed_frame in mixer:
            await output_source.capture_frame(mixed_frame)
    finally:
        await mixer.aclose()
        await stream1.aclose()
        await stream2.aclose()
        await output_source.aclose()

Example 2: Audio Resampling

from livekit import AudioResampler, AudioResamplerQuality, AudioFrame

async def resample_stream(input_stream: AudioStream, output_source: AudioSource):
    """Resample audio stream from 44.1kHz to 48kHz."""
    resampler = AudioResampler(
        input_rate=44100,
        output_rate=48000,
        num_channels=1,
        quality=AudioResamplerQuality.HIGH
    )
    
    try:
        async for event in input_stream:
            # Push frame through resampler
            output_frames = resampler.push(event.frame)
            
            # Capture all output frames
            for frame in output_frames:
                await output_source.capture_frame(frame)
        
        # Flush remaining frames
        final_frames = resampler.flush()
        for frame in final_frames:
            await output_source.capture_frame(frame)
            
    finally:
        await input_stream.aclose()

See Also

  • Audio Processing - Audio processing features
  • Audio Frames and Sources - Audio frame handling
  • Video Frames and Sources - Video frame handling