A python interface to the mpv media player
—
Custom stream protocol registration, Python-based streaming, and generator-based data feeding. Enables integration with custom data sources, network protocols, and dynamic content generation.
Register custom URL schemes and stream handlers for specialized data sources.
def register_stream_protocol(self, proto: str, open_fn=None):
"""
Register a custom stream protocol handler.
Parameters:
- proto: Protocol name (URL scheme)
- open_fn: Function to handle protocol URLs (optional if using decorator)
Returns:
Decorator function if open_fn not provided
"""Create Python-based streams that integrate with mpv's streaming system.
def python_stream(self, name: str = None, size: int = None):
"""
Decorator for registering Python stream generators.
Parameters:
- name: Stream name/identifier
- size: Expected stream size in bytes (optional)
Returns:
Decorator function for stream generator registration
"""
def python_stream_catchall(self, cb):
"""
Register a catch-all handler for Python streams.
Parameters:
- cb: Callback function for unhandled stream requests
"""Play data directly from Python bytes objects and generators.
def play_bytes(self, data: bytes):
"""
Play data from a bytes object.
Parameters:
- data: Raw media data as bytes
"""
def play_context(self):
"""
Context manager for streaming bytes to mpv.
Returns:
Context manager that yields a function for sending data
"""class GeneratorStream:
"""Transform Python generator into mpv-compatible stream."""
def __init__(self, generator_fun, size: int = None):
"""
Initialize generator stream.
Parameters:
- generator_fun: Function that returns a generator yielding bytes
- size: Total stream size in bytes (None for unknown)
"""
def seek(self, offset: int) -> bool:
"""
Seek to position in stream.
Parameters:
- offset: Byte offset to seek to
Returns:
True if seek successful, False otherwise
"""
def read(self, size: int) -> bytes:
"""
Read data from stream.
Parameters:
- size: Number of bytes to read
Returns:
Bytes data (empty bytes when EOF)
"""
def close(self):
"""Close the stream and free resources."""
def cancel(self):
"""Cancel stream operation."""import mpv
import requests
import io
player = mpv.MPV()
# Register HTTP streaming protocol
@player.register_stream_protocol('myhttp')
def http_handler(url):
"""Custom HTTP streaming handler."""
# Remove custom protocol prefix
real_url = url.replace('myhttp://', 'http://')
# Stream HTTP content
response = requests.get(real_url, stream=True)
response.raise_for_status()
def data_generator():
for chunk in response.iter_content(chunk_size=8192):
if chunk:
yield chunk
# Return stream size if available
content_length = response.headers.get('content-length')
size = int(content_length) if content_length else None
return GeneratorStream(data_generator, size=size)
# Use custom protocol
player.play('myhttp://example.com/video.mp4')import os
# File-based streaming
@player.python_stream('file_stream')
def file_stream_generator():
"""Stream file data in chunks."""
with open('/path/to/large_video.mp4', 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
yield chunk
# Play using Python stream
player.play('python://file_stream')import struct
import math
# Generate synthetic audio data
@player.python_stream('sine_wave', size=44100 * 2 * 10) # 10 seconds of 16-bit audio
def generate_sine_wave():
"""Generate sine wave audio data."""
sample_rate = 44100
frequency = 440 # A4 note
duration = 10 # seconds
# WAV header
header = struct.pack('<4sI4s4sIHHIIHH4sI',
b'RIFF', 44100 * 2 * duration + 36, b'WAVE',
b'fmt ', 16, 1, 1, sample_rate, sample_rate * 2, 2, 16,
b'data', 44100 * 2 * duration
)
yield header
# Audio samples
for i in range(sample_rate * duration):
sample = int(32767 * math.sin(2 * math.pi * frequency * i / sample_rate))
yield struct.pack('<h', sample)
# Play generated audio
player.play('python://sine_wave')import socket
import threading
class NetworkStreamProtocol:
def __init__(self, player):
self.player = player
self.streams = {}
def setup_protocol(self):
"""Setup network streaming protocol."""
@self.player.register_stream_protocol('netstream')
def network_handler(url):
# Parse network://host:port/stream_id
parts = url.replace('netstream://', '').split('/')
host_port = parts[0]
stream_id = parts[1] if len(parts) > 1 else 'default'
host, port = host_port.split(':')
return self.create_network_stream(host, int(port), stream_id)
def create_network_stream(self, host, port, stream_id):
"""Create network-based stream."""
def network_generator():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
sock.send(f"GET_STREAM {stream_id}\n".encode())
while True:
data = sock.recv(8192)
if not data:
break
yield data
finally:
sock.close()
return GeneratorStream(network_generator)
# Usage
net_stream = NetworkStreamProtocol(player)
net_stream.setup_protocol()
# Play from network source
player.play('netstream://localhost:8080/stream1')# Load file into memory and play
with open('/path/to/video.mp4', 'rb') as f:
video_data = f.read()
player.play_bytes(video_data)
# Stream bytes using context manager
with player.play_context() as stream_func:
# Send data in chunks
chunk_size = 8192
with open('/path/to/audio.mp3', 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
stream_func(chunk)class StreamManager:
def __init__(self, player):
self.player = player
self.active_streams = {}
self.setup_protocols()
def setup_protocols(self):
"""Setup multiple custom protocols."""
# Memory-based protocol
@self.player.register_stream_protocol('memory')
def memory_handler(url):
stream_id = url.replace('memory://', '')
if stream_id in self.active_streams:
return self.active_streams[stream_id]
raise ValueError(f"Unknown memory stream: {stream_id}")
# HTTP range request protocol
@self.player.register_stream_protocol('httprange')
def http_range_handler(url):
real_url = url.replace('httprange://', 'http://')
return self.create_range_stream(real_url)
# Compressed stream protocol
@self.player.register_stream_protocol('compressed')
def compressed_handler(url):
file_path = url.replace('compressed://', '')
return self.create_compressed_stream(file_path)
def register_memory_stream(self, stream_id, data):
"""Register data for memory:// protocol."""
def data_generator():
chunk_size = 8192
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
self.active_streams[stream_id] = GeneratorStream(
data_generator, size=len(data))
def create_range_stream(self, url):
"""Create HTTP range-request capable stream."""
import requests
# Get content length
head_response = requests.head(url)
content_length = int(head_response.headers.get('content-length', 0))
def range_generator():
chunk_size = 8192
for start in range(0, content_length, chunk_size):
end = min(start + chunk_size - 1, content_length - 1)
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(url, headers=headers)
if response.status_code == 206: # Partial content
yield response.content
else:
break
return GeneratorStream(range_generator, size=content_length)
def create_compressed_stream(self, file_path):
"""Create stream from compressed file."""
import gzip
def compressed_generator():
with gzip.open(file_path, 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
yield chunk
return GeneratorStream(compressed_generator)
# Usage
stream_manager = StreamManager(player)
# Register memory stream
with open('/path/to/video.mp4', 'rb') as f:
video_data = f.read()
stream_manager.register_memory_stream('video1', video_data)
# Play from different protocols
player.play('memory://video1')
player.play('httprange://example.com/video.mp4')
player.play('compressed://video.mp4.gz')class StreamingPipeline:
def __init__(self, player):
self.player = player
self.processors = []
def add_processor(self, processor):
"""Add a data processor to the pipeline."""
self.processors.append(processor)
def create_pipeline_stream(self, source_generator):
"""Create stream with processing pipeline."""
def pipeline_generator():
for chunk in source_generator():
# Process chunk through pipeline
processed_chunk = chunk
for processor in self.processors:
processed_chunk = processor(processed_chunk)
if processed_chunk:
yield processed_chunk
return GeneratorStream(pipeline_generator)
def setup_pipeline_protocol(self):
"""Setup pipeline streaming protocol."""
@self.player.register_stream_protocol('pipeline')
def pipeline_handler(url):
# Parse pipeline://source_type/source_path
parts = url.replace('pipeline://', '').split('/', 1)
source_type = parts[0]
source_path = parts[1] if len(parts) > 1 else ''
if source_type == 'file':
def file_source():
with open(source_path, 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
yield chunk
return self.create_pipeline_stream(file_source)
raise ValueError(f"Unknown pipeline source: {source_type}")
# Usage with processors
pipeline = StreamingPipeline(player)
# Add decryption processor
def decrypt_processor(chunk):
# Simple XOR decryption example
return bytes(b ^ 0x42 for b in chunk)
# Add compression processor
def decompress_processor(chunk):
import zlib
try:
return zlib.decompress(chunk)
except:
return chunk # Pass through if not compressed
pipeline.add_processor(decrypt_processor)
pipeline.add_processor(decompress_processor)
pipeline.setup_pipeline_protocol()
# Play through pipeline
player.play('pipeline://file//path/to/encrypted_compressed_video.dat')import time
import threading
import queue
class RealTimeStream:
def __init__(self, player):
self.player = player
self.data_queue = queue.Queue()
self.streaming = False
def start_realtime_stream(self, stream_name):
"""Start real-time data streaming."""
@self.player.python_stream(stream_name)
def realtime_generator():
while self.streaming:
try:
# Get data with timeout
chunk = self.data_queue.get(timeout=1.0)
yield chunk
except queue.Empty:
# Send empty chunk to keep stream alive
yield b''
self.streaming = True
def feed_data(self, data):
"""Feed data to the real-time stream."""
if self.streaming:
self.data_queue.put(data)
def stop_streaming(self):
"""Stop real-time streaming."""
self.streaming = False
# Usage
realtime = RealTimeStream(player)
realtime.start_realtime_stream('live_feed')
# Start playback
player.play('python://live_feed')
# Feed data in separate thread
def data_feeder():
with open('/path/to/stream_source.mp4', 'rb') as f:
while realtime.streaming:
chunk = f.read(8192)
if not chunk:
break
realtime.feed_data(chunk)
time.sleep(0.01) # Simulate real-time rate
feeder_thread = threading.Thread(target=data_feeder)
feeder_thread.start()
# Later...
# realtime.stop_streaming()
# feeder_thread.join()Install with Tessl CLI
npx tessl i tessl/pypi-mpv