0
# Streaming and Custom Protocols
1
2
Custom stream protocol registration, Python-based streaming, and generator-based data feeding. Enables integration with custom data sources, network protocols, and dynamic content generation.
3
4
## Capabilities
5
6
### Custom Protocol Registration
7
8
Register custom URL schemes and stream handlers for specialized data sources.
9
10
```python { .api }
11
def register_stream_protocol(self, proto: str, open_fn=None):
12
"""
13
Register a custom stream protocol handler.
14
15
Parameters:
16
- proto: Protocol name (URL scheme)
17
- open_fn: Function to handle protocol URLs (optional if using decorator)
18
19
Returns:
20
Decorator function if open_fn not provided
21
"""
22
```
23
24
### Python Stream Integration
25
26
Create Python-based streams that integrate with mpv's streaming system.
27
28
```python { .api }
29
def python_stream(self, name: str = None, size: int = None):
30
"""
31
Decorator for registering Python stream generators.
32
33
Parameters:
34
- name: Stream name/identifier
35
- size: Expected stream size in bytes (optional)
36
37
Returns:
38
Decorator function for stream generator registration
39
"""
40
41
def python_stream_catchall(self, cb):
42
"""
43
Register a catch-all handler for Python streams.
44
45
Parameters:
46
- cb: Callback function for unhandled stream requests
47
"""
48
```
49
50
### Byte Stream Playback
51
52
Play data directly from Python bytes objects and generators.
53
54
```python { .api }
55
def play_bytes(self, data: bytes):
56
"""
57
Play data from a bytes object.
58
59
Parameters:
60
- data: Raw media data as bytes
61
"""
62
63
def play_context(self):
64
"""
65
Context manager for streaming bytes to mpv.
66
67
Returns:
68
Context manager that yields a function for sending data
69
"""
70
```
71
72
## Stream Classes
73
74
### GeneratorStream
75
76
```python { .api }
77
class GeneratorStream:
78
"""Transform Python generator into mpv-compatible stream."""
79
80
def __init__(self, generator_fun, size: int = None):
81
"""
82
Initialize generator stream.
83
84
Parameters:
85
- generator_fun: Function that returns a generator yielding bytes
86
- size: Total stream size in bytes (None for unknown)
87
"""
88
89
def seek(self, offset: int) -> bool:
90
"""
91
Seek to position in stream.
92
93
Parameters:
94
- offset: Byte offset to seek to
95
96
Returns:
97
True if seek successful, False otherwise
98
"""
99
100
def read(self, size: int) -> bytes:
101
"""
102
Read data from stream.
103
104
Parameters:
105
- size: Number of bytes to read
106
107
Returns:
108
Bytes data (empty bytes when EOF)
109
"""
110
111
def close(self):
112
"""Close the stream and free resources."""
113
114
def cancel(self):
115
"""Cancel stream operation."""
116
```
117
118
## Usage Examples
119
120
### Custom Protocol Handler
121
122
```python
123
import mpv
124
import requests
125
import io
126
127
player = mpv.MPV()
128
129
# Register HTTP streaming protocol
130
@player.register_stream_protocol('myhttp')
131
def http_handler(url):
132
"""Custom HTTP streaming handler."""
133
# Remove custom protocol prefix
134
real_url = url.replace('myhttp://', 'http://')
135
136
# Stream HTTP content
137
response = requests.get(real_url, stream=True)
138
response.raise_for_status()
139
140
def data_generator():
141
for chunk in response.iter_content(chunk_size=8192):
142
if chunk:
143
yield chunk
144
145
# Return stream size if available
146
content_length = response.headers.get('content-length')
147
size = int(content_length) if content_length else None
148
149
return GeneratorStream(data_generator, size=size)
150
151
# Use custom protocol
152
player.play('myhttp://example.com/video.mp4')
153
```
154
155
### Python Stream Generator
156
157
```python
158
import os
159
160
# File-based streaming
161
@player.python_stream('file_stream')
162
def file_stream_generator():
163
"""Stream file data in chunks."""
164
with open('/path/to/large_video.mp4', 'rb') as f:
165
while True:
166
chunk = f.read(8192)
167
if not chunk:
168
break
169
yield chunk
170
171
# Play using Python stream
172
player.play('python://file_stream')
173
```
174
175
### Dynamic Content Generation
176
177
```python
178
import struct
179
import math
180
181
# Generate synthetic audio data
182
@player.python_stream('sine_wave', size=44100 * 2 * 10) # 10 seconds of 16-bit audio
183
def generate_sine_wave():
184
"""Generate sine wave audio data."""
185
sample_rate = 44100
186
frequency = 440 # A4 note
187
duration = 10 # seconds
188
189
# WAV header
190
header = struct.pack('<4sI4s4sIHHIIHH4sI',
191
b'RIFF', 44100 * 2 * duration + 36, b'WAVE',
192
b'fmt ', 16, 1, 1, sample_rate, sample_rate * 2, 2, 16,
193
b'data', 44100 * 2 * duration
194
)
195
yield header
196
197
# Audio samples
198
for i in range(sample_rate * duration):
199
sample = int(32767 * math.sin(2 * math.pi * frequency * i / sample_rate))
200
yield struct.pack('<h', sample)
201
202
# Play generated audio
203
player.play('python://sine_wave')
204
```
205
206
### Network Stream Protocol
207
208
```python
209
import socket
210
import threading
211
212
class NetworkStreamProtocol:
213
def __init__(self, player):
214
self.player = player
215
self.streams = {}
216
217
def setup_protocol(self):
218
"""Setup network streaming protocol."""
219
220
@self.player.register_stream_protocol('netstream')
221
def network_handler(url):
222
# Parse network://host:port/stream_id
223
parts = url.replace('netstream://', '').split('/')
224
host_port = parts[0]
225
stream_id = parts[1] if len(parts) > 1 else 'default'
226
227
host, port = host_port.split(':')
228
return self.create_network_stream(host, int(port), stream_id)
229
230
def create_network_stream(self, host, port, stream_id):
231
"""Create network-based stream."""
232
233
def network_generator():
234
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
235
try:
236
sock.connect((host, port))
237
sock.send(f"GET_STREAM {stream_id}\n".encode())
238
239
while True:
240
data = sock.recv(8192)
241
if not data:
242
break
243
yield data
244
finally:
245
sock.close()
246
247
return GeneratorStream(network_generator)
248
249
# Usage
250
net_stream = NetworkStreamProtocol(player)
251
net_stream.setup_protocol()
252
253
# Play from network source
254
player.play('netstream://localhost:8080/stream1')
255
```
256
257
### Bytes Stream Playback
258
259
```python
260
# Load file into memory and play
261
with open('/path/to/video.mp4', 'rb') as f:
262
video_data = f.read()
263
264
player.play_bytes(video_data)
265
266
# Stream bytes using context manager
267
with player.play_context() as stream_func:
268
# Send data in chunks
269
chunk_size = 8192
270
with open('/path/to/audio.mp3', 'rb') as f:
271
while True:
272
chunk = f.read(chunk_size)
273
if not chunk:
274
break
275
stream_func(chunk)
276
```
277
278
### Advanced Stream Management
279
280
```python
281
class StreamManager:
282
def __init__(self, player):
283
self.player = player
284
self.active_streams = {}
285
self.setup_protocols()
286
287
def setup_protocols(self):
288
"""Setup multiple custom protocols."""
289
290
# Memory-based protocol
291
@self.player.register_stream_protocol('memory')
292
def memory_handler(url):
293
stream_id = url.replace('memory://', '')
294
if stream_id in self.active_streams:
295
return self.active_streams[stream_id]
296
raise ValueError(f"Unknown memory stream: {stream_id}")
297
298
# HTTP range request protocol
299
@self.player.register_stream_protocol('httprange')
300
def http_range_handler(url):
301
real_url = url.replace('httprange://', 'http://')
302
return self.create_range_stream(real_url)
303
304
# Compressed stream protocol
305
@self.player.register_stream_protocol('compressed')
306
def compressed_handler(url):
307
file_path = url.replace('compressed://', '')
308
return self.create_compressed_stream(file_path)
309
310
def register_memory_stream(self, stream_id, data):
311
"""Register data for memory:// protocol."""
312
def data_generator():
313
chunk_size = 8192
314
for i in range(0, len(data), chunk_size):
315
yield data[i:i + chunk_size]
316
317
self.active_streams[stream_id] = GeneratorStream(
318
data_generator, size=len(data))
319
320
def create_range_stream(self, url):
321
"""Create HTTP range-request capable stream."""
322
import requests
323
324
# Get content length
325
head_response = requests.head(url)
326
content_length = int(head_response.headers.get('content-length', 0))
327
328
def range_generator():
329
chunk_size = 8192
330
for start in range(0, content_length, chunk_size):
331
end = min(start + chunk_size - 1, content_length - 1)
332
headers = {'Range': f'bytes={start}-{end}'}
333
334
response = requests.get(url, headers=headers)
335
if response.status_code == 206: # Partial content
336
yield response.content
337
else:
338
break
339
340
return GeneratorStream(range_generator, size=content_length)
341
342
def create_compressed_stream(self, file_path):
343
"""Create stream from compressed file."""
344
import gzip
345
346
def compressed_generator():
347
with gzip.open(file_path, 'rb') as f:
348
while True:
349
chunk = f.read(8192)
350
if not chunk:
351
break
352
yield chunk
353
354
return GeneratorStream(compressed_generator)
355
356
# Usage
357
stream_manager = StreamManager(player)
358
359
# Register memory stream
360
with open('/path/to/video.mp4', 'rb') as f:
361
video_data = f.read()
362
stream_manager.register_memory_stream('video1', video_data)
363
364
# Play from different protocols
365
player.play('memory://video1')
366
player.play('httprange://example.com/video.mp4')
367
player.play('compressed://video.mp4.gz')
368
```
369
370
### Streaming Pipeline
371
372
```python
373
class StreamingPipeline:
374
def __init__(self, player):
375
self.player = player
376
self.processors = []
377
378
def add_processor(self, processor):
379
"""Add a data processor to the pipeline."""
380
self.processors.append(processor)
381
382
def create_pipeline_stream(self, source_generator):
383
"""Create stream with processing pipeline."""
384
385
def pipeline_generator():
386
for chunk in source_generator():
387
# Process chunk through pipeline
388
processed_chunk = chunk
389
for processor in self.processors:
390
processed_chunk = processor(processed_chunk)
391
392
if processed_chunk:
393
yield processed_chunk
394
395
return GeneratorStream(pipeline_generator)
396
397
def setup_pipeline_protocol(self):
398
"""Setup pipeline streaming protocol."""
399
400
@self.player.register_stream_protocol('pipeline')
401
def pipeline_handler(url):
402
# Parse pipeline://source_type/source_path
403
parts = url.replace('pipeline://', '').split('/', 1)
404
source_type = parts[0]
405
source_path = parts[1] if len(parts) > 1 else ''
406
407
if source_type == 'file':
408
def file_source():
409
with open(source_path, 'rb') as f:
410
while True:
411
chunk = f.read(8192)
412
if not chunk:
413
break
414
yield chunk
415
416
return self.create_pipeline_stream(file_source)
417
418
raise ValueError(f"Unknown pipeline source: {source_type}")
419
420
# Usage with processors
421
pipeline = StreamingPipeline(player)
422
423
# Add decryption processor
424
def decrypt_processor(chunk):
425
# Simple XOR decryption example
426
return bytes(b ^ 0x42 for b in chunk)
427
428
# Add compression processor
429
def decompress_processor(chunk):
430
import zlib
431
try:
432
return zlib.decompress(chunk)
433
except:
434
return chunk # Pass through if not compressed
435
436
pipeline.add_processor(decrypt_processor)
437
pipeline.add_processor(decompress_processor)
438
pipeline.setup_pipeline_protocol()
439
440
# Play through pipeline
441
player.play('pipeline://file//path/to/encrypted_compressed_video.dat')
442
```
443
444
### Real-time Stream Generation
445
446
```python
447
import time
448
import threading
449
import queue
450
451
class RealTimeStream:
452
def __init__(self, player):
453
self.player = player
454
self.data_queue = queue.Queue()
455
self.streaming = False
456
457
def start_realtime_stream(self, stream_name):
458
"""Start real-time data streaming."""
459
460
@self.player.python_stream(stream_name)
461
def realtime_generator():
462
while self.streaming:
463
try:
464
# Get data with timeout
465
chunk = self.data_queue.get(timeout=1.0)
466
yield chunk
467
except queue.Empty:
468
# Send empty chunk to keep stream alive
469
yield b''
470
471
self.streaming = True
472
473
def feed_data(self, data):
474
"""Feed data to the real-time stream."""
475
if self.streaming:
476
self.data_queue.put(data)
477
478
def stop_streaming(self):
479
"""Stop real-time streaming."""
480
self.streaming = False
481
482
# Usage
483
realtime = RealTimeStream(player)
484
realtime.start_realtime_stream('live_feed')
485
486
# Start playback
487
player.play('python://live_feed')
488
489
# Feed data in separate thread
490
def data_feeder():
491
with open('/path/to/stream_source.mp4', 'rb') as f:
492
while realtime.streaming:
493
chunk = f.read(8192)
494
if not chunk:
495
break
496
realtime.feed_data(chunk)
497
time.sleep(0.01) # Simulate real-time rate
498
499
feeder_thread = threading.Thread(target=data_feeder)
500
feeder_thread.start()
501
502
# Later...
503
# realtime.stop_streaming()
504
# feeder_thread.join()
505
```