An audio package for PyTorch providing GPU-accelerated audio I/O operations, signal processing transforms, and machine learning utilities for audio data.
Advanced streaming capabilities for real-time audio processing, media encoding/decoding, and efficient handling of large audio files. TorchAudio provides streaming interfaces for both reading and writing audio/video media with fine-grained control over codecs and processing parameters.
Real-time audio stream reading with buffering and codec control.
class StreamReader:
"""Stream reader for audio/video files with real-time processing capabilities."""
def __init__(self, src: str, format: Optional[str] = None,
option: Optional[Dict[str, str]] = None) -> None:
"""
Args:
src: Source path or URL
format: Input format override
option: Additional format-specific options
"""
def add_basic_audio_stream(self, frames_per_chunk: int, buffer_chunk_size: int = 3,
stream_index: Optional[int] = None,
decoder: Optional[str] = None,
decoder_option: Optional[Dict[str, str]] = None) -> int:
"""
Add basic audio stream for reading.
Args:
frames_per_chunk: Number of frames per chunk
buffer_chunk_size: Number of chunks to buffer
stream_index: Stream index to read from
decoder: Decoder to use
decoder_option: Decoder-specific options
Returns:
int: Stream index
"""
def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.) -> int:
"""
Process next packet from stream.
Args:
timeout: Timeout in seconds
backoff: Backoff factor for retry
Returns:
int: Number of packets processed
"""
def pop_chunks(self) -> List[Optional[torch.Tensor]]:
"""
Pop available chunks from all streams.
Returns:
List[Optional[torch.Tensor]]: Audio chunks for each stream
"""
def get_metadata(self) -> Dict[str, Any]:
"""Get stream metadata including sample rate, channels, etc."""
def seek(self, timestamp: float) -> None:
"""Seek to timestamp in seconds."""
def close(self) -> None:
"""Close the stream reader."""Real-time audio stream writing with encoding and format control.
class StreamWriter:
"""Stream writer for audio/video files with real-time encoding."""
def __init__(self, dst: str, format: Optional[str] = None,
option: Optional[Dict[str, str]] = None) -> None:
"""
Args:
dst: Destination path
format: Output format override
option: Format-specific options
"""
def add_audio_stream(self, sample_rate: int, num_channels: int,
format: str = "fltp", encoder: Optional[str] = None,
codec_config: Optional[CodecConfig] = None,
encoder_option: Optional[Dict[str, str]] = None) -> int:
"""
Add audio stream for writing.
Args:
sample_rate: Sample rate in Hz
num_channels: Number of audio channels
format: Audio sample format
encoder: Encoder to use
codec_config: Codec configuration
encoder_option: Encoder-specific options
Returns:
int: Stream index
"""
def write_audio_chunk(self, stream_index: int, chunk: torch.Tensor,
pts: Optional[int] = None) -> None:
"""
Write audio chunk to stream.
Args:
stream_index: Target stream index
chunk: Audio tensor (channels, frames)
pts: Presentation timestamp
"""
def close(self) -> None:
"""Close the stream writer and finalize output."""Configuration classes for fine control over encoding/decoding parameters.
class CodecConfig:
"""Configuration for audio/video codecs."""
def __init__(self, bit_rate: Optional[int] = None,
compression_level: Optional[int] = None,
qscale: Optional[float] = None,
qmin: Optional[int] = None,
qmax: Optional[int] = None,
bit_rate_tolerance: Optional[int] = None,
buffer_size: Optional[int] = None) -> None:
"""
Args:
bit_rate: Target bit rate
compression_level: Compression level (codec-dependent)
qscale: Quality scale
qmin: Minimum quantizer
qmax: Maximum quantizer
bit_rate_tolerance: Bit rate tolerance
buffer_size: Buffer size
"""Real-time audio effects application during streaming.
class AudioEffector:
"""Apply audio effects during streaming."""
def __init__(self, effect: str, *args, **kwargs) -> None:
"""
Args:
effect: Effect name (e.g., "reverb", "chorus", "flanger")
*args, **kwargs: Effect-specific parameters
"""
def apply(self, waveform: torch.Tensor, sample_rate: int) -> torch.Tensor:
"""
Apply effect to audio waveform.
Args:
waveform: Input audio (..., time)
sample_rate: Sample rate
Returns:
Tensor: Processed audio
"""Direct audio playback capabilities.
def play_audio(waveform: torch.Tensor, sample_rate: int,
normalize: bool = True, channels_first: bool = True) -> None:
"""
Play audio directly through system audio.
Args:
waveform: Audio tensor to play
sample_rate: Sample rate in Hz
normalize: Whether to normalize audio volume
channels_first: Whether tensor is (channels, time) or (time, channels)
"""import torch
import torchaudio
from torchaudio.io import StreamReader, StreamWriter
# Set up real-time audio processing
def process_audio_stream(input_path: str, output_path: str):
# Create reader and writer
reader = StreamReader(input_path)
writer = StreamWriter(output_path, format="wav")
# Configure streams
reader.add_basic_audio_stream(frames_per_chunk=1024, buffer_chunk_size=4)
writer.add_audio_stream(sample_rate=44100, num_channels=2)
# Process audio in chunks
try:
while True:
# Read chunk
code = reader.process_packet()
if code == 0: # End of stream
break
chunks = reader.pop_chunks()
if chunks[0] is not None:
# Apply processing (e.g., effects, filtering)
processed = apply_effects(chunks[0])
# Write processed chunk
writer.write_audio_chunk(0, processed)
finally:
reader.close()
writer.close()
def apply_effects(audio: torch.Tensor) -> torch.Tensor:
# Example: apply reverb or other effects
return torchaudio.functional.overdrive(audio, gain=10, colour=20)import torchaudio
from torchaudio.io import StreamReader
import matplotlib.pyplot as plt
def monitor_audio_stream(source: str):
"""Monitor audio stream with real-time visualization."""
reader = StreamReader(source)
reader.add_basic_audio_stream(frames_per_chunk=2048)
plt.ion() # Interactive mode
fig, ax = plt.subplots()
try:
while True:
reader.process_packet(timeout=0.1)
chunks = reader.pop_chunks()
if chunks[0] is not None:
# Visualize audio waveform
waveform = chunks[0][0] # First channel
ax.clear()
ax.plot(waveform.numpy())
ax.set_ylim([-1, 1])
plt.pause(0.01)
except KeyboardInterrupt:
print("Stopping monitoring...")
finally:
reader.close()
plt.ioff()import torchaudio
from torchaudio.io import StreamReader, StreamWriter, CodecConfig
def convert_audio_format(input_path: str, output_path: str,
target_sample_rate: int = 44100,
target_channels: int = 2,
target_bitrate: int = 128000):
"""Convert audio to different format with streaming."""
# Create reader
reader = StreamReader(input_path)
reader.add_basic_audio_stream(frames_per_chunk=4096)
# Create writer with codec configuration
codec_config = CodecConfig(bit_rate=target_bitrate)
writer = StreamWriter(output_path, format="mp3")
writer.add_audio_stream(
sample_rate=target_sample_rate,
num_channels=target_channels,
encoder="mp3",
codec_config=codec_config
)
# Set up resampling if needed
metadata = reader.get_metadata()
original_sr = metadata["sample_rate"]
if original_sr != target_sample_rate:
resampler = torchaudio.transforms.Resample(original_sr, target_sample_rate)
else:
resampler = None
# Process stream
try:
while reader.process_packet() != 0:
chunks = reader.pop_chunks()
if chunks[0] is not None:
audio = chunks[0]
# Resample if needed
if resampler is not None:
audio = resampler(audio)
# Convert to target channels
if audio.shape[0] != target_channels:
if target_channels == 1 and audio.shape[0] == 2:
audio = audio.mean(dim=0, keepdim=True)
elif target_channels == 2 and audio.shape[0] == 1:
audio = audio.repeat(2, 1)
writer.write_audio_chunk(0, audio)
finally:
reader.close()
writer.close()import torchaudio
from torchaudio.io import StreamReader
def stream_from_url(url: str):
"""Stream audio from network URL."""
reader = StreamReader(url)
reader.add_basic_audio_stream(frames_per_chunk=1024, buffer_chunk_size=8)
print(f"Streaming from: {url}")
metadata = reader.get_metadata()
print(f"Sample rate: {metadata['sample_rate']} Hz")
print(f"Channels: {metadata['num_channels']}")
chunk_count = 0
try:
while True:
code = reader.process_packet(timeout=1.0)
if code == 0:
break
chunks = reader.pop_chunks()
if chunks[0] is not None:
chunk_count += 1
if chunk_count % 100 == 0:
print(f"Processed {chunk_count} chunks")
# Process audio chunk (e.g., save, analyze, play)
audio_chunk = chunks[0]
# ... process audio_chunk ...
except KeyboardInterrupt:
print("Stream interrupted by user")
finally:
reader.close()These streaming capabilities enable real-time audio processing applications, efficient handling of large media files, and integration with live audio sources and network streams.
Install with Tessl CLI
npx tessl i tessl/pypi-torchaudio