tessl install tessl/pypi-livekit@1.0.0Python Real-time SDK for LiveKit providing WebRTC-based video, audio, and data streaming capabilities
Helper classes and functions for audio/video processing, synchronization, and device management. Provides utilities for common tasks like mixing audio, resampling, and synchronizing A/V streams.
Key utilities:
from livekit import (
combine_audio_frames,
AudioMixer,
AudioResampler,
AudioResamplerQuality,
AudioFilter,
AVSynchronizer,
MediaDevices,
FrameProcessor,
)
# sine_wave_generator requires separate import from utils module
from livekit.rtc.utils import sine_wave_generatordef combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame:
"""Combines one or more AudioFrame objects into a single AudioFrame.
Args:
buffer: Single AudioFrame or list of AudioFrame objects
Type: AudioFrame | list[AudioFrame]
All frames must have same sample rate and channel count
Returns:
AudioFrame: Combined frame containing all samples
Raises:
ValueError: If buffer is empty
ValueError: If frames have differing sample rates
ValueError: If frames have differing channel counts
Example:
>>> frame1 = AudioFrame.create(48000, 1, 480) # 10ms
>>> frame2 = AudioFrame.create(48000, 1, 480) # 10ms
>>> frame3 = AudioFrame.create(48000, 1, 480) # 10ms
>>>
>>> # Combine into single frame
>>> combined = combine_audio_frames([frame1, frame2, frame3])
>>> print(f"Duration: {combined.duration}s") # 0.03 (30ms)
>>>
>>> # Single frame (returns copy)
>>> single = combine_audio_frames(frame1)
Note:
Useful for buffering or batching audio.
Samples are concatenated in order.
Combined frame properties:
- sample_rate: Same as input frames
- num_channels: Same as input frames
- samples_per_channel: Sum of all inputs
- duration: Sum of all input durations
"""async def sine_wave_generator(
freq: float,
duration: float,
sample_rate: int = 48000,
amplitude: float = 0.3,
) -> AsyncIterator[AudioFrame]:
"""Generate sine wave audio frames.
Useful for testing audio pipelines and generating test signals.
Args:
freq: Frequency of the sine wave in Hz
Type: float
Range: 20-20000 Hz (human hearing range)
Common values:
- 440 Hz: A4 note (tuning standard)
- 1000 Hz: Test tone
duration: Duration of the audio in seconds
Type: float
Must be positive
sample_rate: Sample rate in Hz
Type: int
Default: 48000
amplitude: Amplitude of the sine wave
Type: float
Range: [0.0, 1.0]
Default: 0.3 (30% volume)
1.0 = full scale (may clip)
Yields:
AudioFrame: Audio frames containing sine wave data
Frames are mono (1 channel)
Frame size: 10ms (480 samples at 48kHz)
Raises:
ImportError: If numpy is not installed
Requires: pip install numpy
ValueError: If parameters out of range
Example:
>>> import asyncio
>>>
>>> # Generate 440 Hz tone for 1 second
>>> async def generate_tone():
... async for frame in sine_wave_generator(440, 1.0):
... print(f"Generated frame with {frame.samples_per_channel} samples")
... # Process or capture frame
>>>
>>> asyncio.run(generate_tone())
>>>
>>> # Generate test tone and capture
>>> source = AudioSource(48000, 1)
>>> async for frame in sine_wave_generator(1000, 2.0):
... await source.capture_frame(frame)
Note:
Requires numpy package.
Generates mono audio (1 channel).
Yields frames every 10ms.
Use cases:
- Testing audio pipeline
- Audio system verification
- Generating notification sounds
- Creating test signals
"""class AudioMixer(AsyncIterator[AudioFrame]):
"""Mix multiple audio streams into a single output stream.
Combines multiple audio sources by summing samples.
Automatically handles timing and synchronization.
"""
def __init__(
self,
sample_rate: int,
num_channels: int,
*,
blocksize: int = 0,
stream_timeout_ms: int = 100,
capacity: int = 100
) -> None:
"""Initialize AudioMixer.
Args:
sample_rate: Audio sample rate in Hz
Type: int
All input streams resampled to this rate
num_channels: Number of audio channels
Type: int
1 (mono) or 2 (stereo)
blocksize: Audio block size in samples
Type: int
Default: 0 (uses sample_rate // 10 = 10ms blocks)
Number of samples per output frame
stream_timeout_ms: Max wait time for streams in milliseconds
Type: int
Default: 100
Time to wait for all streams before mixing
capacity: Max mixed frames in output queue
Type: int
Default: 100
Limits memory usage
Example:
>>> # Mix at 48kHz mono
>>> mixer = AudioMixer(sample_rate=48000, num_channels=1)
>>>
>>> # Mix at 48kHz stereo with custom block size
>>> mixer = AudioMixer(
... sample_rate=48000,
... num_channels=2,
... blocksize=960 # 20ms at 48kHz
... )
Note:
Mixer runs as async iterator.
Yields mixed frames continuously.
Stops when end_input() called or all streams end.
"""
def add_stream(self, stream: AsyncIterator[AudioFrame]) -> None:
"""Add an audio stream to the mixer.
Args:
stream: Audio stream to mix
Type: AsyncIterator[AudioFrame]
Typically AudioStream instance
Returns:
None
Example:
>>> stream1 = AudioStream(track1)
>>> stream2 = AudioStream(track2)
>>>
>>> mixer.add_stream(stream1)
>>> mixer.add_stream(stream2)
>>>
>>> # Iterate mixed output
>>> async for mixed_frame in mixer:
... await output_source.capture_frame(mixed_frame)
Note:
Can add streams dynamically while mixing.
Stream is automatically removed when it ends.
"""
def remove_stream(self, stream: AsyncIterator[AudioFrame]) -> None:
"""Remove an audio stream from the mixer.
Args:
stream: Audio stream to remove
Must be previously added
Returns:
None
Raises:
ValueError: If stream not in mixer
Example:
>>> mixer.remove_stream(stream1)
Note:
Stream removal is immediate.
Remaining streams continue mixing.
"""
async def aclose(self) -> None:
"""Immediately stop mixing and close the mixer.
Returns:
None (awaitable)
Example:
>>> await mixer.aclose()
Note:
Stops all stream processing.
Closes output queue.
Cannot be reused after closing.
"""
def end_input(self) -> None:
"""Signal that no more streams will be added (flushes remaining data).
Returns:
None
Example:
>>> # Add all streams
>>> mixer.add_stream(stream1)
>>> mixer.add_stream(stream2)
>>>
>>> # Signal end of input
>>> mixer.end_input()
>>>
>>> # Mixer will end when all streams complete
Note:
Mixer continues until all streams end.
After end_input(), cannot add more streams.
Output iterator completes when all streams done.
"""
def __aiter__(self) -> AudioMixer:
"""Return self as async iterator."""
async def __anext__(self) -> AudioFrame:
"""Get next mixed audio frame.
Returns:
AudioFrame: Mixed frame
Raises:
StopAsyncIteration: When mixing complete
"""class AudioResamplerQuality(StrEnum):
"""Audio resampler quality levels.
Higher quality = better audio, more CPU usage.
"""
QUICK = "quick" # Fastest, lowest quality
LOW = "low" # Fast, low quality
MEDIUM = "medium" # Balanced (default)
HIGH = "high" # Slow, high quality
VERY_HIGH = "very_high" # Slowest, best quality
class AudioResampler:
"""Resample audio data from one sample rate to another using Sox.
Uses high-quality SoX resampler.
Handles sample rate conversion with minimal quality loss.
"""
def __init__(
self,
input_rate: int,
output_rate: int,
*,
num_channels: int = 1,
quality: AudioResamplerQuality = AudioResamplerQuality.MEDIUM
) -> None:
"""Initialize AudioResampler.
Args:
input_rate: Input sample rate in Hz
Type: int
Source audio sample rate
output_rate: Output sample rate in Hz
Type: int
Target audio sample rate
num_channels: Number of audio channels
Type: int
Default: 1 (mono)
quality: Resampling quality
Type: AudioResamplerQuality
Default: MEDIUM
Higher quality = better audio, more CPU
Example:
>>> # Resample 44.1kHz to 48kHz
>>> resampler = AudioResampler(
... input_rate=44100,
... output_rate=48000,
... quality=AudioResamplerQuality.HIGH
... )
>>>
>>> # Downsample 48kHz to 16kHz
>>> resampler = AudioResampler(
... input_rate=48000,
... output_rate=16000
... )
Note:
Resampler maintains internal state.
One resampler per input stream.
Common conversions:
- 44100 -> 48000: CD audio to LiveKit
- 48000 -> 16000: LiveKit to speech recognition
- 8000 -> 48000: Phone audio to LiveKit
"""
def push(self, data: bytearray | AudioFrame) -> list[AudioFrame]:
"""Push audio data into resampler and retrieve resampled data.
Args:
data: Audio data to resample
Type: bytearray | AudioFrame
If bytearray: Raw int16 PCM samples
If AudioFrame: Uses frame data
Returns:
list[AudioFrame]: List of resampled frames
May be 0, 1, or multiple frames
Depends on input/output rate ratio
Example:
>>> # Resample frame
>>> input_frame = AudioFrame.create(44100, 1, 441) # 10ms at 44.1kHz
>>> output_frames = resampler.push(input_frame)
>>>
>>> for frame in output_frames:
... print(f"Output: {frame.samples_per_channel} samples")
... await source.capture_frame(frame)
Note:
May return 0 frames if resampler buffering.
May return multiple frames if ratio produces more data.
Typical returns:
- 44100 -> 48000: Usually 1-2 frames per input
- 48000 -> 16000: Usually 0-1 frames per input
Call flush() at end to get remaining frames.
"""
def flush(self) -> list[AudioFrame]:
"""Flush remaining audio data through resampler.
Returns:
list[AudioFrame]: Remaining resampled frames
Example:
>>> # Process all frames
>>> for input_frame in input_frames:
... output_frames = resampler.push(input_frame)
... for frame in output_frames:
... await source.capture_frame(frame)
>>>
>>> # Flush remaining
>>> final_frames = resampler.flush()
>>> for frame in final_frames:
... await source.capture_frame(frame)
Note:
Call at end of stream to get buffered samples.
Returns empty list if no buffered samples.
"""class AudioFilter:
"""Load and manage audio filter plugins.
Enables loading custom audio processing plugins.
Advanced feature for specialized audio effects.
"""
def __init__(
self,
module_id: str,
path: str,
dependencies: Optional[List[str]] = None
) -> None:
"""Initialize AudioFilter.
Args:
module_id: Module identifier
Type: str
Unique identifier for filter
path: Path to plugin library
Type: str
Path to shared library (.so, .dylib, .dll)
dependencies: Optional list of dependency paths
Type: List[str] | None
Default: None
Paths to required dependencies
Raises:
Exception: If plugin fails to load
FileNotFoundError: If path doesn't exist
Example:
>>> filter = AudioFilter(
... module_id="my_filter",
... path="/path/to/filter.so",
... dependencies=["/path/to/dep1.so"]
... )
Note:
Advanced feature for custom audio processing.
Plugin must follow LiveKit audio filter interface.
"""class AVSynchronizer:
"""Synchronize audio and video capture for consistent timing.
Ensures audio and video frames are captured with proper timing
to maintain synchronization in published streams.
"""
def __init__(
self,
*,
audio_source: AudioSource,
video_source: VideoSource,
video_fps: float,
video_queue_size_ms: float = 100,
_max_delay_tolerance_ms: float = 300
):
"""Initialize AVSynchronizer.
Args:
audio_source: AudioSource for synchronized audio
Type: AudioSource
Must be created before synchronizer
video_source: VideoSource for synchronized video
Type: VideoSource
Must be created before synchronizer
video_fps: Target video frame rate
Type: float
Frames per second (e.g., 30.0, 24.0, 15.0)
video_queue_size_ms: Video frame buffer size
Type: float
Default: 100 (ms)
Larger = more latency, more stable
_max_delay_tolerance_ms: Max delay tolerance
Type: float
Default: 300 (ms)
Internal parameter
Example:
>>> # Create sources
>>> audio_source = AudioSource(sample_rate=48000, num_channels=2)
>>> video_source = VideoSource(width=1280, height=720)
>>>
>>> # Create synchronizer for 30 FPS video
>>> av_sync = AVSynchronizer(
... audio_source=audio_source,
... video_source=video_source,
... video_fps=30.0,
... video_queue_size_ms=50
... )
Note:
Synchronizer manages timing automatically.
Push frames as fast as available.
Synchronizer buffers and releases at correct times.
"""
@property
def actual_fps(self) -> float:
"""Actual measured frames per second.
Returns:
float: Current FPS being achieved
May differ from target fps
Example:
>>> print(f"Target FPS: {target_fps}")
>>> print(f"Actual FPS: {av_sync.actual_fps}")
Note:
Useful for monitoring performance.
If actual_fps < target_fps, system may be overloaded.
"""
@property
def last_video_time(self) -> float:
"""Time of last video frame captured.
Returns:
float: Timestamp in seconds
"""
@property
def last_audio_time(self) -> float:
"""Time of last audio frame played out.
Returns:
float: Timestamp in seconds
"""
async def push(self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None) -> None:
"""Push a frame to the synchronizer.
Args:
frame: Frame to push
Type: VideoFrame | AudioFrame
Synchronizer handles both types
timestamp: Optional timestamp in seconds
Type: float | None
Default: None (uses current time)
Returns:
None (awaitable)
Example:
>>> # Push video frame
>>> video_frame = VideoFrame(1280, 720, VideoBufferType.RGBA, data)
>>> await av_sync.push(video_frame)
>>>
>>> # Push audio frame
>>> audio_frame = AudioFrame.create(48000, 2, 480)
>>> await av_sync.push(audio_frame)
>>>
>>> # Push with explicit timestamp
>>> timestamp = time.time()
>>> await av_sync.push(video_frame, timestamp=timestamp)
Note:
Push frames as received/generated.
Synchronizer handles buffering and timing.
Video frames are buffered to match audio timing.
Audio frames are played immediately or buffered.
"""
async def clear_queue(self) -> None:
"""Clear all queued frames.
Returns:
None (awaitable)
Example:
>>> # Clear on error or reset
>>> await av_sync.clear_queue()
Note:
Discards all buffered video and audio.
Use to recover from synchronization issues.
"""
async def wait_for_playout(self) -> None:
"""Wait until all frames are played out.
Returns:
None (awaitable, blocks until queues empty)
Example:
>>> # Push all frames
>>> for frame in frames:
... await av_sync.push(frame)
>>>
>>> # Wait for completion
>>> await av_sync.wait_for_playout()
>>> print("All A/V played out")
"""
def reset(self) -> None:
"""Reset synchronizer state.
Returns:
None
Example:
>>> av_sync.reset()
Note:
Clears queues and resets timing.
Use when starting new session or recovering from errors.
"""
async def aclose(self) -> None:
"""Close the synchronizer and clean up resources.
Returns:
None (awaitable)
Example:
>>> await av_sync.aclose()
Note:
Always close synchronizer when done.
Does not close audio_source or video_source.
"""Usage Example:
from livekit import rtc
# Create audio and video sources
audio_source = rtc.AudioSource(sample_rate=48000, num_channels=2)
video_source = rtc.VideoSource(width=1280, height=720)
# Create synchronizer for 30 FPS video
av_sync = rtc.AVSynchronizer(
audio_source=audio_source,
video_source=video_source,
video_fps=30.0,
video_queue_size_ms=50
)
# Push frames to synchronizer (handles timing automatically)
await av_sync.push(video_frame)
await av_sync.push(audio_frame)
# Check actual FPS being achieved
print(f"Actual FPS: {av_sync.actual_fps:.1f}")
# Clean up
await av_sync.aclose()
await audio_source.aclose()
await video_source.aclose()# Constants
DEFAULT_SAMPLE_RATE: int = 48000
DEFAULT_CHANNELS: int = 1
BLOCKSIZE: int = 2400 # 50ms at 48kHz
class MediaDevices:
"""High-level interface to native audio devices.
Provides easy access to system microphones and speakers.
Requires sounddevice package: pip install sounddevice
"""
def __init__(
self,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
input_sample_rate: int = DEFAULT_SAMPLE_RATE,
output_sample_rate: int = DEFAULT_SAMPLE_RATE,
num_channels: int = DEFAULT_CHANNELS,
blocksize: int = BLOCKSIZE
) -> None:
"""Initialize MediaDevices.
Args:
loop: Event loop to use
input_sample_rate: Sample rate for input (default: 48000)
output_sample_rate: Sample rate for output (default: 48000)
num_channels: Number of channels (default: 1)
blocksize: Block size in samples (default: 2400 = 50ms at 48kHz)
Raises:
ImportError: If sounddevice not installed
"""
def list_input_devices(self) -> list[dict[str, Any]]:
"""List available input devices (microphones).
Returns:
list[dict]: List of device information dicts
Each dict contains:
- name: Device name (str)
- index: Device index (int)
- channels: Max input channels (int)
- sample_rate: Default sample rate (float)
Example:
>>> devices = media_devices.list_input_devices()
>>> for dev in devices:
... print(f"{dev['index']}: {dev['name']}")
"""
def list_output_devices(self) -> list[dict[str, Any]]:
"""List available output devices (speakers).
Returns:
list[dict]: List of device information dicts
"""
def default_input_device(self) -> Optional[int]:
"""Get default input device index.
Returns:
int | None: Default input device index or None
"""
def default_output_device(self) -> Optional[int]:
"""Get default output device index.
Returns:
int | None: Default output device index or None
"""
def open_input(
self,
*,
enable_aec: bool = True,
noise_suppression: bool = True,
high_pass_filter: bool = True,
auto_gain_control: bool = True,
input_device: Optional[int] = None,
queue_capacity: int = 50,
input_channel_index: Optional[int] = None
) -> InputCapture:
"""Open audio input device and start capture.
Args:
enable_aec: Enable echo cancellation
noise_suppression: Enable noise suppression
high_pass_filter: Enable high-pass filter
auto_gain_control: Enable automatic gain control
input_device: Device index (None for default)
queue_capacity: Frame queue capacity
input_channel_index: Specific channel to capture
Returns:
InputCapture: Capture object with source and controls
Example:
>>> devices = MediaDevices()
>>> capture = devices.open_input(
... enable_aec=True,
... noise_suppression=True,
... auto_gain_control=True
... )
>>>
>>> # Use captured audio
>>> source = capture.source
>>> track = LocalAudioTrack.create_audio_track("mic", source)
>>> await room.local_participant.publish_track(track)
"""
def open_output(self, *, output_device: Optional[int] = None) -> OutputPlayer:
"""Create an OutputPlayer for audio playback.
Args:
output_device: Device index (None for default)
Returns:
OutputPlayer: Player for audio playback
Example:
>>> player = devices.open_output()
>>> await player.add_track(remote_audio_track)
>>> await player.start()
"""@dataclass
class InputCapture:
"""Holds resources for an active audio input capture.
Attributes:
source: AudioSource that receives captured frames
Type: AudioSource
Can be published as LocalAudioTrack
Already has APM applied if enabled
input_stream: Underlying sounddevice.InputStream
Type: Any (sounddevice.InputStream)
Low-level stream object
task: Async task that drains queue and calls source.capture_frame
Type: asyncio.Task
Background task processing audio
apm: Optional AudioProcessingModule for processing
Type: AudioProcessingModule | None
AEC, NS, HPF, AGC if enabled
delay_estimator: Internal helper for combining capture and render delays
Type: Any (internal implementation)
"""
source: AudioSource
input_stream: Any # sounddevice.InputStream
task: asyncio.Task
apm: Optional[AudioProcessingModule]
delay_estimator: Any # Internal helper
async def aclose(self) -> None:
"""Stop capture and close underlying resources.
Returns:
None (awaitable)
Example:
>>> capture = devices.open_input()
>>> # ... use capture
>>> await capture.aclose()
Note:
Stops audio capture.
Closes sounddevice stream.
Cancels background task.
Does not close capture.source (caller should close).
"""class OutputPlayer:
"""Audio output helper using sounddevice.OutputStream.
When apm_for_reverse is provided, feeds rendered PCM into APM reverse path
for echo cancellation.
Plays multiple tracks simultaneously (mixes automatically).
"""
def __init__(
self,
loop: asyncio.AbstractEventLoop,
sample_rate: int,
num_channels: int,
output_device: Optional[int],
apm_for_reverse: Optional[AudioProcessingModule],
delay_estimator: Any
) -> None:
"""Initialize OutputPlayer.
Args:
loop: Event loop
sample_rate: Output sample rate
num_channels: Number of output channels
output_device: Device index (None for default)
apm_for_reverse: APM for echo cancellation reverse stream
delay_estimator: Internal delay estimator
Note:
Typically created via MediaDevices.open_output().
Not usually constructed directly.
"""
async def add_track(self, track: Track) -> None:
"""Add a track to play back.
Args:
track: Track to play
Type: Track (RemoteAudioTrack or LocalAudioTrack)
Multiple tracks can be added (mixed automatically)
Returns:
None (awaitable)
Example:
>>> player = devices.open_output()
>>>
>>> # Add tracks as they're subscribed
>>> @room.on("track_subscribed")
>>> def on_track(track, publication, participant):
... if track.kind == TrackKind.KIND_AUDIO:
... asyncio.create_task(player.add_track(track))
Note:
Multiple tracks are mixed automatically.
Track audio is resampled if needed.
"""
async def remove_track(self, track: Track) -> None:
"""Remove a track from playback.
Args:
track: Track to remove
Type: Track
Returns:
None (awaitable)
Example:
>>> await player.remove_track(track)
Note:
Stops playing specified track.
Other tracks continue playing.
"""
async def start(self) -> None:
"""Start the output stream.
Returns:
None (awaitable)
Example:
>>> player = devices.open_output()
>>> await player.add_track(track1)
>>> await player.add_track(track2)
>>> await player.start()
Note:
Must be called after adding tracks.
Starts sounddevice output stream.
"""
async def aclose(self) -> None:
"""Stop playback and close resources.
Returns:
None (awaitable)
Example:
>>> await player.aclose()
"""class FrameProcessor[T](ABC):
"""Generic interface for frame processing.
Base class for implementing custom audio or video processing.
Generic type T: AudioFrame or VideoFrame.
"""
@property
@abstractmethod
def enabled(self) -> bool:
"""Whether processor is enabled (get/set).
Returns:
bool: True if processing enabled, False otherwise
Note:
Must implement as property with getter and setter.
"""
@abstractmethod
def _process(self, frame: T) -> T:
"""Process a frame.
Args:
frame: Frame to process
Type: T (AudioFrame or VideoFrame)
Returns:
T: Processed frame (can be same instance or new)
Note:
Called for each frame when enabled=True.
Can modify frame in-place or create new frame.
Should be fast (runs in hot path).
"""
@abstractmethod
def _close(self) -> None:
"""Close and cleanup processor resources.
Returns:
None
Note:
Called when processor is no longer needed.
Free allocated resources.
Not async (use async cleanup before if needed).
"""
def _on_stream_info_updated(
self,
*,
room_name: str,
participant_identity: str,
publication_sid: str,
) -> None:
"""Optional callback when stream information is updated.
Args:
room_name: Name of the room
participant_identity: Identity of the participant
publication_sid: SID of the track publication
Note:
Optional method.
Called when stream metadata changes.
Override if processor needs this information.
"""
def _on_credentials_updated(self, *, token: str, url: str) -> None:
"""Optional callback when credentials are updated.
Args:
token: New access token
url: LiveKit server URL
Note:
Optional method.
Called when token is refreshed.
Override if processor needs credentials.
"""Example Implementation:
from livekit import FrameProcessor, AudioFrame
import numpy as np
class GainProcessor(FrameProcessor[AudioFrame]):
"""Custom audio gain processor."""
def __init__(self, gain: float = 1.0):
self._enabled = True
self._gain = gain
@property
def enabled(self) -> bool:
return self._enabled
@enabled.setter
def enabled(self, value: bool):
self._enabled = value
def set_gain(self, gain: float):
"""Set gain multiplier."""
self._gain = gain
def _process(self, frame: AudioFrame) -> AudioFrame:
"""Apply gain to audio frame."""
if not self._enabled or self._gain == 1.0:
return frame
# Convert to numpy array
samples = np.frombuffer(frame.data, dtype=np.int16)
# Apply gain
samples = samples.astype(np.float32) * self._gain
# Clamp and convert back
samples = np.clip(samples, -32768, 32767).astype(np.int16)
# Create new frame with processed data
return AudioFrame(
samples.tobytes(),
frame.sample_rate,
frame.num_channels,
frame.samples_per_channel
)
def _close(self):
"""Cleanup resources."""
pass
# Usage
processor = GainProcessor(gain=1.5) # 150% volume
stream = AudioStream(track, noise_cancellation=processor)from livekit import AudioMixer, AudioStream, AudioSource, LocalAudioTrack
async def mix_audio_streams(track1, track2):
"""Mix two audio streams."""
# Create streams
stream1 = AudioStream(track1, sample_rate=48000, num_channels=1)
stream2 = AudioStream(track2, sample_rate=48000, num_channels=1)
# Create mixer
mixer = AudioMixer(sample_rate=48000, num_channels=1)
mixer.add_stream(stream1)
mixer.add_stream(stream2)
# Create output
output_source = AudioSource(48000, 1)
output_track = LocalAudioTrack.create_audio_track("mixed", output_source)
await room.local_participant.publish_track(output_track)
# Mix and publish
try:
async for mixed_frame in mixer:
await output_source.capture_frame(mixed_frame)
finally:
await mixer.aclose()
await stream1.aclose()
await stream2.aclose()
await output_source.aclose()from livekit import AudioResampler, AudioResamplerQuality, AudioFrame
async def resample_stream(input_stream: AudioStream, output_source: AudioSource):
"""Resample audio stream from 44.1kHz to 48kHz."""
resampler = AudioResampler(
input_rate=44100,
output_rate=48000,
num_channels=1,
quality=AudioResamplerQuality.HIGH
)
try:
async for event in input_stream:
# Push frame through resampler
output_frames = resampler.push(event.frame)
# Capture all output frames
for frame in output_frames:
await output_source.capture_frame(frame)
# Flush remaining frames
final_frames = resampler.flush()
for frame in final_frames:
await output_source.capture_frame(frame)
finally:
await input_stream.aclose()