tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Low-level audio handling for creating, managing, and capturing audio frames to sources. Provides direct control over audio data for advanced use cases.
Key concepts:
from livekit import AudioFrame, AudioSource, AudioStream, AudioFrameEvent, NoiseCancellationOptionsclass AudioFrame:
"""Represents a frame of audio data with int16 samples interleaved by channel.
Audio data format:
- Sample format: 16-bit signed integer (int16)
- Range: -32768 to 32767
- Interleaving: [L0, R0, L1, R1, ...] for stereo
[S0, S1, S2, ...] for mono
"""
def __init__(
self,
data: Union[bytes, bytearray, memoryview],
sample_rate: int,
num_channels: int,
samples_per_channel: int
) -> None:
"""Initialize AudioFrame with audio data.
Args:
data: Raw audio data
Type: bytes | bytearray | memoryview
Must be at least num_channels * samples_per_channel * 2 bytes
Format: int16 PCM samples, interleaved by channel
sample_rate: Sample rate in Hz
Type: int
Common values: 48000, 16000, 8000
num_channels: Number of audio channels
Type: int
1 (mono) or 2 (stereo)
samples_per_channel: Number of samples per channel
Type: int
Frame duration = samples_per_channel / sample_rate
Raises:
ValueError: If data length is invalid
Required: num_channels * samples_per_channel * 2 bytes
Example:
>>> # Mono audio, 10ms at 48kHz
>>> sample_rate = 48000
>>> samples_per_channel = 480 # 10ms
>>> data = bytearray(480 * 1 * 2) # 480 samples * 1 channel * 2 bytes
>>> frame = AudioFrame(data, sample_rate, 1, samples_per_channel)
>>>
>>> # Stereo audio, 20ms at 48kHz
>>> data = bytearray(960 * 2 * 2) # 960 samples * 2 channels * 2 bytes
>>> frame = AudioFrame(data, 48000, 2, 960)
'''
@staticmethod
def create(sample_rate: int, num_channels: int, samples_per_channel: int) -> AudioFrame:
"""Create a new empty AudioFrame with zeroed data.
Args:
sample_rate: Sample rate in Hz
num_channels: Number of channels (1 or 2)
samples_per_channel: Samples per channel
Returns:
AudioFrame: New frame with zeroed data
Example:
>>> # Create 10ms mono frame at 48kHz
>>> frame = AudioFrame.create(48000, 1, 480)
>>> print(f"Duration: {frame.duration}s") # 0.01
>>>
>>> # Create 20ms stereo frame at 16kHz
>>> frame = AudioFrame.create(16000, 2, 320)
Note:
Convenience method for creating frames.
Data is zeroed (silence).
Fill data using frame.data memoryview.
"""
@property
def userdata(self) -> dict[str, Any]:
"""User data dictionary associated with the frame.
Returns:
dict: Dictionary for storing custom metadata
Example:
>>> frame.userdata["timestamp"] = time.time()
>>> frame.userdata["source"] = "microphone"
Note:
Not transmitted with frame.
Used for local tracking/metadata.
"""
@property
def data(self) -> memoryview:
"""Memory view of audio data as 16-bit signed integers.
Returns:
memoryview: View of audio data (int16)
Can be modified in-place
Example:
>>> frame = AudioFrame.create(48000, 1, 480)
>>> data = frame.data
>>> print(f"Total samples: {len(data)}") # 480 for mono
>>>
>>> # Modify samples
>>> data[0] = 1000 # Set first sample
>>> data[100] = -500 # Set another sample
>>>
>>> # For stereo, samples are interleaved:
>>> # data[0] = left0, data[1] = right0
>>> # data[2] = left1, data[3] = right1
Note:
memoryview can be converted to numpy array:
>>> import numpy as np
>>> samples = np.frombuffer(data, dtype=np.int16)
For stereo, reshape for separate channels:
>>> samples = samples.reshape(-1, 2) # Shape: (samples_per_channel, 2)
>>> left = samples[:, 0]
>>> right = samples[:, 1]
"""
@property
def sample_rate(self) -> int:
"""Sample rate in Hz.
Returns:
int: Samples per second (e.g., 48000, 16000, 8000)
"""
@property
def num_channels(self) -> int:
"""Number of audio channels.
Returns:
int: 1 (mono) or 2 (stereo)
"""
@property
def samples_per_channel(self) -> int:
"""Number of samples per channel.
Returns:
int: Sample count per channel
Note:
Total samples in data = samples_per_channel * num_channels
For mono: len(data) == samples_per_channel
For stereo: len(data) == samples_per_channel * 2
"""
@property
def duration(self) -> float:
"""Duration of the frame in seconds.
Returns:
float: Frame duration in seconds
Calculated as: samples_per_channel / sample_rate
Example:
>>> frame = AudioFrame.create(48000, 1, 480)
>>> print(frame.duration) # 0.01 (10ms)
>>>
>>> frame = AudioFrame.create(48000, 1, 2400)
>>> print(frame.duration) # 0.05 (50ms)
"""
def to_wav_bytes(self) -> bytes:
"""Convert the audio frame to WAV-formatted bytes.
Returns:
bytes: Complete WAV file as bytes
Includes WAV header and audio data
Example:
>>> frame = AudioFrame.create(48000, 1, 480)
>>> # Fill with audio data
>>> wav_bytes = frame.to_wav_bytes()
>>>
>>> # Save to file
>>> with open("audio.wav", "wb") as f:
... f.write(wav_bytes)
Note:
Creates complete WAV file in memory.
Useful for:
- Saving audio snippets
- Debugging audio pipeline
- Exporting audio
WAV format:
- Header: 44 bytes
- Data: frame audio samples
- Total: 44 + (samples_per_channel * num_channels * 2) bytes
"""class AudioSource:
"""Real-time audio source with internal audio queue.
Manages audio frame queue for publishing.
Handles timing and buffering automatically.
"""
def __init__(
self,
sample_rate: int,
num_channels: int,
queue_size_ms: int = 1000,
loop: asyncio.AbstractEventLoop | None = None
) -> None:
"""Initialize AudioSource.
Args:
sample_rate: Sample rate in Hz
Type: int
Common values: 48000, 16000, 8000
num_channels: Number of audio channels
Type: int
1 (mono) or 2 (stereo)
queue_size_ms: Buffer size in milliseconds
Type: int
Default: 1000 (1 second)
Range: 100-10000 recommended
loop: Event loop to use
Type: asyncio.AbstractEventLoop | None
Default: None (uses current loop)
Returns:
AudioSource instance
Raises:
ValueError: If parameters invalid
RuntimeError: If loop not available
Example:
>>> # Mono source at 48kHz with 1s buffer
>>> source = AudioSource(
... sample_rate=48000,
... num_channels=1,
... queue_size_ms=1000
... )
>>>
>>> # Stereo source at 16kHz with 500ms buffer
>>> source = AudioSource(16000, 2, queue_size_ms=500)
Note:
Sample rate and channels are fixed at creation.
To change, create new source.
Queue size affects latency:
- Smaller (100-500ms): Lower latency, risk of underruns
- Larger (1000-5000ms): Higher latency, more stable
"""
@property
def sample_rate(self) -> int:
"""Sample rate in Hz.
Returns:
int: Sample rate (e.g., 48000)
"""
@property
def num_channels(self) -> int:
"""Number of audio channels.
Returns:
int: 1 (mono) or 2 (stereo)
"""
@property
def queued_duration(self) -> float:
"""Current duration of queued audio data in seconds.
Returns:
float: Seconds of audio currently in queue
0.0 if queue empty
Example:
>>> queued = source.queued_duration
>>> print(f"Queued: {queued:.3f}s")
>>>
>>> if queued > 1.0:
... print("Queue backing up!")
... elif queued < 0.1:
... print("Queue running low")
Note:
Monitor this to detect issues:
- Too high: Capturing faster than can send
- Too low: May cause underruns (gaps)
Healthy range: 0.2 - 0.8 seconds
"""
def clear_queue(self) -> None:
"""Clear the internal audio queue, discarding all buffered audio data.
Returns:
None
Example:
>>> # Clear queue if backed up
>>> if source.queued_duration > 2.0:
... source.clear_queue()
... print("Queue cleared")
Note:
Discards all queued audio immediately.
Next capture_frame() starts fresh.
Use cases:
- Recover from queue backup
- Reset after long pause
- Clear stale audio data
"""
async def capture_frame(self, frame: AudioFrame) -> None:
"""Capture an audio frame and queue it for playback.
Args:
frame: AudioFrame to capture
Must match source sample_rate and num_channels
Returns:
None (awaitable)
Raises:
ValueError: If frame sample rate or channels don't match source
RuntimeError: If queue is full and cannot accept frame
Example:
>>> source = AudioSource(48000, 1)
>>> frame = AudioFrame.create(48000, 1, 480)
>>>
>>> # Fill frame with audio data
>>> # ...
>>>
>>> # Capture to source
>>> await source.capture_frame(frame)
Note:
Frame is queued for publishing.
Blocks if queue is full (back pressure).
Publishing happens asynchronously in background.
No need to manage timing - source handles it.
"""
async def wait_for_playout(self) -> None:
"""Wait for the audio source to finish playing all queued audio.
Returns:
None (awaitable, blocks until queue empty)
Example:
>>> # Capture multiple frames
>>> for frame in frames:
... await source.capture_frame(frame)
>>>
>>> # Wait for all frames to play out
>>> await source.wait_for_playout()
>>> print("All audio played")
Note:
Blocks until queued_duration reaches 0.
Useful for ensuring audio completes before:
- Disconnecting
- Switching tracks
- Ending session
"""
async def aclose(self) -> None:
"""Close the audio source and clean up resources.
Returns:
None (awaitable)
Example:
>>> source = AudioSource(48000, 1)
>>> # ... use source
>>> await source.aclose()
Note:
Always close sources when done.
Releases internal resources and queue.
Source cannot be reused after closing.
Best practice:
>>> try:
... source = AudioSource(48000, 1)
... # Use source
... finally:
... await source.aclose()
"""@dataclass
class NoiseCancellationOptions:
"""Options for configuring noise cancellation in audio streams.
Attributes:
module_id: Identifier for the noise cancellation module to use
Type: str
Module-specific identifier
options: Dictionary of module-specific configuration options
Type: dict[str, Any]
Configuration depends on module_id
"""
module_id: str
options: dict[str, Any]Used with AudioStream to enable noise cancellation on received audio. The specific module_id and options depend on the noise cancellation implementation being used.
Example:
# Create audio stream with noise cancellation
nc_options = NoiseCancellationOptions(
module_id="krisp", # Example module
options={"level": "high"}
)
stream = AudioStream(
track,
noise_cancellation=nc_options
)@dataclass
class AudioFrameEvent:
"""Event representing a received audio frame.
Attributes:
frame: The audio frame
Type: AudioFrame
"""
frame: AudioFrame
class AudioStream(AsyncIterator[AudioFrameEvent]):
"""Asynchronous audio stream for receiving audio frames.
Async iterator that yields AudioFrameEvent objects.
"""
def __init__(
self,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None,
**kwargs
) -> None:
"""Initialize AudioStream.
Args:
track: Audio track to receive from
Type: Track (RemoteAudioTrack or LocalAudioTrack)
loop: Event loop to use
Type: asyncio.AbstractEventLoop | None
Default: None (uses current loop)
capacity: Internal frame queue capacity
Type: int
Default: 0 (unbounded)
>0: Bounded queue (drops frames if full)
sample_rate: Sample rate for audio stream
Type: int
Default: 48000
SDK resamples if track has different rate
num_channels: Number of audio channels
Type: int
Default: 1 (mono)
SDK converts if track has different channels
frame_size_ms: Frame size in milliseconds
Type: int | None
Default: None (uses default, typically 10ms)
noise_cancellation: Noise cancellation options or processor
Type: NoiseCancellationOptions | FrameProcessor | None
Default: None (no noise cancellation)
Example:
>>> # Basic stream
>>> stream = AudioStream(track)
>>>
>>> # Stream with custom settings
>>> stream = AudioStream(
... track,
... sample_rate=16000, # Resample to 16kHz
... num_channels=1, # Convert to mono
... capacity=100 # Buffer up to 100 frames
... )
>>>
>>> # Stream with noise cancellation
>>> nc_options = NoiseCancellationOptions(
... module_id="krisp",
... options={}
... )
>>> stream = AudioStream(track, noise_cancellation=nc_options)
'''
@classmethod
def from_participant(
cls,
*,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None
) -> AudioStream:
"""Create an AudioStream from a participant's audio track.
Args:
participant: Participant to receive audio from
track_source: Track source type (e.g., SOURCE_MICROPHONE)
... (other args same as __init__)
Returns:
AudioStream: Stream for specified track source
Raises:
ValueError: If participant doesn't have track with specified source
Example:
>>> # Stream microphone audio from participant
>>> stream = AudioStream.from_participant(
... participant=remote_participant,
... track_source=TrackSource.SOURCE_MICROPHONE
... )
Note:
Convenience method for common use case.
Finds first track matching source type.
"""
@classmethod
def from_track(
cls,
*,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
frame_size_ms: int | None = None,
noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None
) -> AudioStream:
"""Create an AudioStream from an existing track.
Alternative constructor (same as __init__).
Example:
>>> stream = AudioStream.from_track(track=my_track)
"""
async def aclose(self) -> None:
"""Close the audio stream and clean up resources.
Returns:
None (awaitable)
Example:
>>> stream = AudioStream(track)
>>> try:
... async for event in stream:
... process(event.frame)
... finally:
... await stream.aclose()
Note:
Always close streams when done.
Releases resources and stops receiving frames.
"""
def __aiter__(self) -> AsyncIterator[AudioFrameEvent]:
"""Return self as async iterator.
Returns:
AsyncIterator[AudioFrameEvent]: Self
"""
async def __anext__(self) -> AudioFrameEvent:
"""Get next audio frame event.
Returns:
AudioFrameEvent: Next frame event
Raises:
StopAsyncIteration: When stream ends
Example:
>>> async for event in stream:
... frame = event.frame
... print(f"Received {frame.samples_per_channel} samples")
"""import asyncio
from livekit import (
Room, AudioSource, LocalAudioTrack, AudioFrame,
AudioStream, TrackKind, RemoteAudioTrack
)
async def main():
room = Room()
# Publishing audio
source = AudioSource(sample_rate=48000, num_channels=1)
track = LocalAudioTrack.create_audio_track("mic", source)
await room.local_participant.publish_track(track)
# Generate and capture audio
asyncio.create_task(generate_audio(source))
# Receiving audio
@room.on("track_subscribed")
def on_track(track, publication, participant):
if track.kind == TrackKind.KIND_AUDIO:
asyncio.create_task(receive_audio(track))
await room.connect(url, token)
await asyncio.sleep(30)
# Cleanup
await source.aclose()
await room.disconnect()
async def generate_audio(source: AudioSource):
"""Generate audio frames."""
sample_rate = 48000
samples_per_channel = 480 # 10ms
while True:
# Create frame
frame = AudioFrame.create(sample_rate, 1, samples_per_channel)
# Fill with audio data (silence in this example)
# In production, get from microphone:
# data = frame.data
# for i in range(len(data)):
# data[i] = microphone_sample(i)
# Capture
await source.capture_frame(frame)
# Wait for next frame
await asyncio.sleep(0.01) # 10ms
async def receive_audio(track: RemoteAudioTrack):
"""Receive and process audio."""
stream = AudioStream(track, sample_rate=48000, num_channels=1)
try:
async for event in stream:
frame = event.frame
print(f"Received {frame.samples_per_channel} samples, "
f"{frame.duration:.3f}s duration")
# Access data
data = frame.data # memoryview of int16
# Process audio
# Examples:
# - Play through speaker
# - Save to file
# - Analyze audio
# - Apply effects
finally:
await stream.aclose()
if __name__ == "__main__":
asyncio.run(main())# Source and frames must match
source = AudioSource(sample_rate=48000, num_channels=1)
# Good: Matching sample rate
frame = AudioFrame.create(48000, 1, 480)
await source.capture_frame(frame)
# Bad: Mismatched sample rate
# frame = AudioFrame.create(16000, 1, 160)
# await source.capture_frame(frame) # Raises ValueErrorasync def monitor_queue(source: AudioSource):
"""Monitor audio source queue."""
while True:
queued = source.queued_duration
if queued > 1.5:
print(f"Queue backing up: {queued:.3f}s")
# Consider: clear_queue() or slow down capture
elif queued < 0.05:
print(f"Queue running low: {queued:.3f}s")
# May cause audio gaps
await asyncio.sleep(1.0)
asyncio.create_task(monitor_queue(source))async def capture_with_timing(source: AudioSource):
"""Capture with accurate timing."""
sample_rate = 48000
samples_per_channel = 480 # 10ms
frame_duration = samples_per_channel / sample_rate
next_capture_time = time.time()
while True:
# Create and capture frame
frame = AudioFrame.create(sample_rate, 1, samples_per_channel)
# Fill with data...
await source.capture_frame(frame)
# Calculate next capture time
next_capture_time += frame_duration
# Sleep until next frame (with drift compensation)
now = time.time()
sleep_time = max(0, next_capture_time - now)
await asyncio.sleep(sleep_time)# Always use try/finally
stream = AudioStream(track)
try:
async for event in stream:
process(event.frame)
finally:
await stream.aclose()
# Or use context pattern
async def process_stream_safely(track):
stream = AudioStream(track)
try:
async for event in stream:
process(event.frame)
except Exception as e:
print(f"Stream error: {e}")
finally:
await stream.aclose()