0
# Streaming I/O
1
2
Advanced streaming capabilities for real-time audio processing, media encoding/decoding, and efficient handling of large audio files. TorchAudio provides streaming interfaces for both reading and writing audio/video media with fine-grained control over codecs and processing parameters.
3
4
## Capabilities
5
6
### Stream Reading
7
8
Real-time audio stream reading with buffering and codec control.
9
10
```python { .api }
11
class StreamReader:
12
"""Stream reader for audio/video files with real-time processing capabilities."""
13
14
def __init__(self, src: str, format: Optional[str] = None,
15
option: Optional[Dict[str, str]] = None) -> None:
16
"""
17
Args:
18
src: Source path or URL
19
format: Input format override
20
option: Additional format-specific options
21
"""
22
23
def add_basic_audio_stream(self, frames_per_chunk: int, buffer_chunk_size: int = 3,
24
stream_index: Optional[int] = None,
25
decoder: Optional[str] = None,
26
decoder_option: Optional[Dict[str, str]] = None) -> int:
27
"""
28
Add basic audio stream for reading.
29
30
Args:
31
frames_per_chunk: Number of frames per chunk
32
buffer_chunk_size: Number of chunks to buffer
33
stream_index: Stream index to read from
34
decoder: Decoder to use
35
decoder_option: Decoder-specific options
36
37
Returns:
38
int: Stream index
39
"""
40
41
def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.) -> int:
42
"""
43
Process next packet from stream.
44
45
Args:
46
timeout: Timeout in seconds
47
backoff: Backoff factor for retry
48
49
Returns:
50
int: Number of packets processed
51
"""
52
53
def pop_chunks(self) -> List[Optional[torch.Tensor]]:
54
"""
55
Pop available chunks from all streams.
56
57
Returns:
58
List[Optional[torch.Tensor]]: Audio chunks for each stream
59
"""
60
61
def get_metadata(self) -> Dict[str, Any]:
62
"""Get stream metadata including sample rate, channels, etc."""
63
64
def seek(self, timestamp: float) -> None:
65
"""Seek to timestamp in seconds."""
66
67
def close(self) -> None:
68
"""Close the stream reader."""
69
```
70
71
### Stream Writing
72
73
Real-time audio stream writing with encoding and format control.
74
75
```python { .api }
76
class StreamWriter:
77
"""Stream writer for audio/video files with real-time encoding."""
78
79
def __init__(self, dst: str, format: Optional[str] = None,
80
option: Optional[Dict[str, str]] = None) -> None:
81
"""
82
Args:
83
dst: Destination path
84
format: Output format override
85
option: Format-specific options
86
"""
87
88
def add_audio_stream(self, sample_rate: int, num_channels: int,
89
format: str = "fltp", encoder: Optional[str] = None,
90
codec_config: Optional[CodecConfig] = None,
91
encoder_option: Optional[Dict[str, str]] = None) -> int:
92
"""
93
Add audio stream for writing.
94
95
Args:
96
sample_rate: Sample rate in Hz
97
num_channels: Number of audio channels
98
format: Audio sample format
99
encoder: Encoder to use
100
codec_config: Codec configuration
101
encoder_option: Encoder-specific options
102
103
Returns:
104
int: Stream index
105
"""
106
107
def write_audio_chunk(self, stream_index: int, chunk: torch.Tensor,
108
pts: Optional[int] = None) -> None:
109
"""
110
Write audio chunk to stream.
111
112
Args:
113
stream_index: Target stream index
114
chunk: Audio tensor (channels, frames)
115
pts: Presentation timestamp
116
"""
117
118
def close(self) -> None:
119
"""Close the stream writer and finalize output."""
120
```
121
122
### Codec Configuration
123
124
Configuration classes for fine control over encoding/decoding parameters.
125
126
```python { .api }
127
class CodecConfig:
128
"""Configuration for audio/video codecs."""
129
130
def __init__(self, bit_rate: Optional[int] = None,
131
compression_level: Optional[int] = None,
132
qscale: Optional[float] = None,
133
qmin: Optional[int] = None,
134
qmax: Optional[int] = None,
135
bit_rate_tolerance: Optional[int] = None,
136
buffer_size: Optional[int] = None) -> None:
137
"""
138
Args:
139
bit_rate: Target bit rate
140
compression_level: Compression level (codec-dependent)
141
qscale: Quality scale
142
qmin: Minimum quantizer
143
qmax: Maximum quantizer
144
bit_rate_tolerance: Bit rate tolerance
145
buffer_size: Buffer size
146
"""
147
```
148
149
### Audio Effects Processing
150
151
Real-time audio effects application during streaming.
152
153
```python { .api }
154
class AudioEffector:
155
"""Apply audio effects during streaming."""
156
157
def __init__(self, effect: str, *args, **kwargs) -> None:
158
"""
159
Args:
160
effect: Effect name (e.g., "reverb", "chorus", "flanger")
161
*args, **kwargs: Effect-specific parameters
162
"""
163
164
def apply(self, waveform: torch.Tensor, sample_rate: int) -> torch.Tensor:
165
"""
166
Apply effect to audio waveform.
167
168
Args:
169
waveform: Input audio (..., time)
170
sample_rate: Sample rate
171
172
Returns:
173
Tensor: Processed audio
174
"""
175
```
176
177
### Playback Functionality
178
179
Direct audio playback capabilities.
180
181
```python { .api }
182
def play_audio(waveform: torch.Tensor, sample_rate: int,
183
normalize: bool = True, channels_first: bool = True) -> None:
184
"""
185
Play audio directly through system audio.
186
187
Args:
188
waveform: Audio tensor to play
189
sample_rate: Sample rate in Hz
190
normalize: Whether to normalize audio volume
191
channels_first: Whether tensor is (channels, time) or (time, channels)
192
"""
193
```
194
195
## Usage Examples
196
197
### Real-time Audio Processing
198
199
```python
200
import torch
201
import torchaudio
202
from torchaudio.io import StreamReader, StreamWriter
203
204
# Set up real-time audio processing
205
def process_audio_stream(input_path: str, output_path: str):
206
# Create reader and writer
207
reader = StreamReader(input_path)
208
writer = StreamWriter(output_path, format="wav")
209
210
# Configure streams
211
reader.add_basic_audio_stream(frames_per_chunk=1024, buffer_chunk_size=4)
212
writer.add_audio_stream(sample_rate=44100, num_channels=2)
213
214
# Process audio in chunks
215
try:
216
while True:
217
# Read chunk
218
code = reader.process_packet()
219
if code == 0: # End of stream
220
break
221
222
chunks = reader.pop_chunks()
223
if chunks[0] is not None:
224
# Apply processing (e.g., effects, filtering)
225
processed = apply_effects(chunks[0])
226
227
# Write processed chunk
228
writer.write_audio_chunk(0, processed)
229
230
finally:
231
reader.close()
232
writer.close()
233
234
def apply_effects(audio: torch.Tensor) -> torch.Tensor:
235
# Example: apply reverb or other effects
236
return torchaudio.functional.overdrive(audio, gain=10, colour=20)
237
```
238
239
### Live Audio Monitoring
240
241
```python
242
import torchaudio
243
from torchaudio.io import StreamReader
244
import matplotlib.pyplot as plt
245
246
def monitor_audio_stream(source: str):
247
"""Monitor audio stream with real-time visualization."""
248
249
reader = StreamReader(source)
250
reader.add_basic_audio_stream(frames_per_chunk=2048)
251
252
plt.ion() # Interactive mode
253
fig, ax = plt.subplots()
254
255
try:
256
while True:
257
reader.process_packet(timeout=0.1)
258
chunks = reader.pop_chunks()
259
260
if chunks[0] is not None:
261
# Visualize audio waveform
262
waveform = chunks[0][0] # First channel
263
ax.clear()
264
ax.plot(waveform.numpy())
265
ax.set_ylim([-1, 1])
266
plt.pause(0.01)
267
268
except KeyboardInterrupt:
269
print("Stopping monitoring...")
270
finally:
271
reader.close()
272
plt.ioff()
273
```
274
275
### Format Conversion Pipeline
276
277
```python
278
import torchaudio
279
from torchaudio.io import StreamReader, StreamWriter, CodecConfig
280
281
def convert_audio_format(input_path: str, output_path: str,
282
target_sample_rate: int = 44100,
283
target_channels: int = 2,
284
target_bitrate: int = 128000):
285
"""Convert audio to different format with streaming."""
286
287
# Create reader
288
reader = StreamReader(input_path)
289
reader.add_basic_audio_stream(frames_per_chunk=4096)
290
291
# Create writer with codec configuration
292
codec_config = CodecConfig(bit_rate=target_bitrate)
293
writer = StreamWriter(output_path, format="mp3")
294
writer.add_audio_stream(
295
sample_rate=target_sample_rate,
296
num_channels=target_channels,
297
encoder="mp3",
298
codec_config=codec_config
299
)
300
301
# Set up resampling if needed
302
metadata = reader.get_metadata()
303
original_sr = metadata["sample_rate"]
304
305
if original_sr != target_sample_rate:
306
resampler = torchaudio.transforms.Resample(original_sr, target_sample_rate)
307
else:
308
resampler = None
309
310
# Process stream
311
try:
312
while reader.process_packet() != 0:
313
chunks = reader.pop_chunks()
314
315
if chunks[0] is not None:
316
audio = chunks[0]
317
318
# Resample if needed
319
if resampler is not None:
320
audio = resampler(audio)
321
322
# Convert to target channels
323
if audio.shape[0] != target_channels:
324
if target_channels == 1 and audio.shape[0] == 2:
325
audio = audio.mean(dim=0, keepdim=True)
326
elif target_channels == 2 and audio.shape[0] == 1:
327
audio = audio.repeat(2, 1)
328
329
writer.write_audio_chunk(0, audio)
330
331
finally:
332
reader.close()
333
writer.close()
334
```
335
336
### Network Audio Streaming
337
338
```python
339
import torchaudio
340
from torchaudio.io import StreamReader
341
342
def stream_from_url(url: str):
343
"""Stream audio from network URL."""
344
345
reader = StreamReader(url)
346
reader.add_basic_audio_stream(frames_per_chunk=1024, buffer_chunk_size=8)
347
348
print(f"Streaming from: {url}")
349
metadata = reader.get_metadata()
350
print(f"Sample rate: {metadata['sample_rate']} Hz")
351
print(f"Channels: {metadata['num_channels']}")
352
353
chunk_count = 0
354
try:
355
while True:
356
code = reader.process_packet(timeout=1.0)
357
if code == 0:
358
break
359
360
chunks = reader.pop_chunks()
361
if chunks[0] is not None:
362
chunk_count += 1
363
if chunk_count % 100 == 0:
364
print(f"Processed {chunk_count} chunks")
365
366
# Process audio chunk (e.g., save, analyze, play)
367
audio_chunk = chunks[0]
368
# ... process audio_chunk ...
369
370
except KeyboardInterrupt:
371
print("Stream interrupted by user")
372
finally:
373
reader.close()
374
```
375
376
These streaming capabilities enable real-time audio processing applications, efficient handling of large media files, and integration with live audio sources and network streams.