CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mpv

A python interface to the mpv media player

Pending
Overview
Eval results
Files

streaming.mddocs/

Streaming and Custom Protocols

Custom stream protocol registration, Python-based streaming, and generator-based data feeding. Enables integration with custom data sources, network protocols, and dynamic content generation.

Capabilities

Custom Protocol Registration

Register custom URL schemes and stream handlers for specialized data sources.

def register_stream_protocol(self, proto: str, open_fn=None):
    """
    Register a custom stream protocol handler.

    Parameters:
    - proto: Protocol name (URL scheme)
    - open_fn: Function to handle protocol URLs (optional if using decorator)

    Returns:
    Decorator function if open_fn not provided
    """

Python Stream Integration

Create Python-based streams that integrate with mpv's streaming system.

def python_stream(self, name: str = None, size: int = None):
    """
    Decorator for registering Python stream generators.

    Parameters:
    - name: Stream name/identifier
    - size: Expected stream size in bytes (optional)

    Returns:
    Decorator function for stream generator registration
    """

def python_stream_catchall(self, cb):
    """
    Register a catch-all handler for Python streams.

    Parameters:
    - cb: Callback function for unhandled stream requests
    """

Byte Stream Playback

Play data directly from Python bytes objects and generators.

def play_bytes(self, data: bytes):
    """
    Play data from a bytes object.

    Parameters:
    - data: Raw media data as bytes
    """

def play_context(self):
    """
    Context manager for streaming bytes to mpv.

    Returns:
    Context manager that yields a function for sending data
    """

Stream Classes

GeneratorStream

class GeneratorStream:
    """Transform Python generator into mpv-compatible stream."""
    
    def __init__(self, generator_fun, size: int = None):
        """
        Initialize generator stream.

        Parameters:
        - generator_fun: Function that returns a generator yielding bytes
        - size: Total stream size in bytes (None for unknown)
        """
    
    def seek(self, offset: int) -> bool:
        """
        Seek to position in stream.

        Parameters:
        - offset: Byte offset to seek to

        Returns:
        True if seek successful, False otherwise
        """
    
    def read(self, size: int) -> bytes:
        """
        Read data from stream.

        Parameters:
        - size: Number of bytes to read

        Returns:
        Bytes data (empty bytes when EOF)
        """
    
    def close(self):
        """Close the stream and free resources."""
    
    def cancel(self):
        """Cancel stream operation."""

Usage Examples

Custom Protocol Handler

import mpv
import requests
import io

player = mpv.MPV()

# Register HTTP streaming protocol
@player.register_stream_protocol('myhttp')
def http_handler(url):
    """Custom HTTP streaming handler."""
    # Remove custom protocol prefix
    real_url = url.replace('myhttp://', 'http://')
    
    # Stream HTTP content
    response = requests.get(real_url, stream=True)
    response.raise_for_status()
    
    def data_generator():
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                yield chunk
    
    # Return stream size if available
    content_length = response.headers.get('content-length')
    size = int(content_length) if content_length else None
    
    return GeneratorStream(data_generator, size=size)

# Use custom protocol
player.play('myhttp://example.com/video.mp4')

Python Stream Generator

import os

# File-based streaming
@player.python_stream('file_stream')
def file_stream_generator():
    """Stream file data in chunks."""
    with open('/path/to/large_video.mp4', 'rb') as f:
        while True:
            chunk = f.read(8192)
            if not chunk:
                break
            yield chunk

# Play using Python stream
player.play('python://file_stream')

Dynamic Content Generation

import struct
import math

# Generate synthetic audio data
@player.python_stream('sine_wave', size=44100 * 2 * 10)  # 10 seconds of 16-bit audio
def generate_sine_wave():
    """Generate sine wave audio data."""
    sample_rate = 44100
    frequency = 440  # A4 note
    duration = 10    # seconds
    
    # WAV header
    header = struct.pack('<4sI4s4sIHHIIHH4sI',
        b'RIFF', 44100 * 2 * duration + 36, b'WAVE',
        b'fmt ', 16, 1, 1, sample_rate, sample_rate * 2, 2, 16,
        b'data', 44100 * 2 * duration
    )
    yield header
    
    # Audio samples
    for i in range(sample_rate * duration):
        sample = int(32767 * math.sin(2 * math.pi * frequency * i / sample_rate))
        yield struct.pack('<h', sample)

# Play generated audio
player.play('python://sine_wave')

Network Stream Protocol

import socket
import threading

class NetworkStreamProtocol:
    def __init__(self, player):
        self.player = player
        self.streams = {}
        
    def setup_protocol(self):
        """Setup network streaming protocol."""
        
        @self.player.register_stream_protocol('netstream')
        def network_handler(url):
            # Parse network://host:port/stream_id
            parts = url.replace('netstream://', '').split('/')
            host_port = parts[0]
            stream_id = parts[1] if len(parts) > 1 else 'default'
            
            host, port = host_port.split(':')
            return self.create_network_stream(host, int(port), stream_id)
    
    def create_network_stream(self, host, port, stream_id):
        """Create network-based stream."""
        
        def network_generator():
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                sock.connect((host, port))
                sock.send(f"GET_STREAM {stream_id}\n".encode())
                
                while True:
                    data = sock.recv(8192)
                    if not data:
                        break
                    yield data
            finally:
                sock.close()
        
        return GeneratorStream(network_generator)

# Usage
net_stream = NetworkStreamProtocol(player)
net_stream.setup_protocol()

# Play from network source
player.play('netstream://localhost:8080/stream1')

Bytes Stream Playback

# Load file into memory and play
with open('/path/to/video.mp4', 'rb') as f:
    video_data = f.read()

player.play_bytes(video_data)

# Stream bytes using context manager
with player.play_context() as stream_func:
    # Send data in chunks
    chunk_size = 8192
    with open('/path/to/audio.mp3', 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            stream_func(chunk)

Advanced Stream Management

class StreamManager:
    def __init__(self, player):
        self.player = player
        self.active_streams = {}
        self.setup_protocols()
    
    def setup_protocols(self):
        """Setup multiple custom protocols."""
        
        # Memory-based protocol
        @self.player.register_stream_protocol('memory')
        def memory_handler(url):
            stream_id = url.replace('memory://', '')
            if stream_id in self.active_streams:
                return self.active_streams[stream_id]
            raise ValueError(f"Unknown memory stream: {stream_id}")
        
        # HTTP range request protocol
        @self.player.register_stream_protocol('httprange')
        def http_range_handler(url):
            real_url = url.replace('httprange://', 'http://')
            return self.create_range_stream(real_url)
        
        # Compressed stream protocol
        @self.player.register_stream_protocol('compressed')
        def compressed_handler(url):
            file_path = url.replace('compressed://', '')
            return self.create_compressed_stream(file_path)
    
    def register_memory_stream(self, stream_id, data):
        """Register data for memory:// protocol."""
        def data_generator():
            chunk_size = 8192
            for i in range(0, len(data), chunk_size):
                yield data[i:i + chunk_size]
        
        self.active_streams[stream_id] = GeneratorStream(
            data_generator, size=len(data))
    
    def create_range_stream(self, url):
        """Create HTTP range-request capable stream."""
        import requests
        
        # Get content length
        head_response = requests.head(url)
        content_length = int(head_response.headers.get('content-length', 0))
        
        def range_generator():
            chunk_size = 8192
            for start in range(0, content_length, chunk_size):
                end = min(start + chunk_size - 1, content_length - 1)
                headers = {'Range': f'bytes={start}-{end}'}
                
                response = requests.get(url, headers=headers)
                if response.status_code == 206:  # Partial content
                    yield response.content
                else:
                    break
        
        return GeneratorStream(range_generator, size=content_length)
    
    def create_compressed_stream(self, file_path):
        """Create stream from compressed file."""
        import gzip
        
        def compressed_generator():
            with gzip.open(file_path, 'rb') as f:
                while True:
                    chunk = f.read(8192)
                    if not chunk:
                        break
                    yield chunk
        
        return GeneratorStream(compressed_generator)

# Usage
stream_manager = StreamManager(player)

# Register memory stream
with open('/path/to/video.mp4', 'rb') as f:
    video_data = f.read()
stream_manager.register_memory_stream('video1', video_data)

# Play from different protocols
player.play('memory://video1')
player.play('httprange://example.com/video.mp4')
player.play('compressed://video.mp4.gz')

Streaming Pipeline

class StreamingPipeline:
    def __init__(self, player):
        self.player = player
        self.processors = []
    
    def add_processor(self, processor):
        """Add a data processor to the pipeline."""
        self.processors.append(processor)
    
    def create_pipeline_stream(self, source_generator):
        """Create stream with processing pipeline."""
        
        def pipeline_generator():
            for chunk in source_generator():
                # Process chunk through pipeline
                processed_chunk = chunk
                for processor in self.processors:
                    processed_chunk = processor(processed_chunk)
                
                if processed_chunk:
                    yield processed_chunk
        
        return GeneratorStream(pipeline_generator)
    
    def setup_pipeline_protocol(self):
        """Setup pipeline streaming protocol."""
        
        @self.player.register_stream_protocol('pipeline')
        def pipeline_handler(url):
            # Parse pipeline://source_type/source_path
            parts = url.replace('pipeline://', '').split('/', 1)
            source_type = parts[0]
            source_path = parts[1] if len(parts) > 1 else ''
            
            if source_type == 'file':
                def file_source():
                    with open(source_path, 'rb') as f:
                        while True:
                            chunk = f.read(8192)
                            if not chunk:
                                break
                            yield chunk
                
                return self.create_pipeline_stream(file_source)
            
            raise ValueError(f"Unknown pipeline source: {source_type}")

# Usage with processors
pipeline = StreamingPipeline(player)

# Add decryption processor
def decrypt_processor(chunk):
    # Simple XOR decryption example
    return bytes(b ^ 0x42 for b in chunk)

# Add compression processor  
def decompress_processor(chunk):
    import zlib
    try:
        return zlib.decompress(chunk)
    except:
        return chunk  # Pass through if not compressed

pipeline.add_processor(decrypt_processor)
pipeline.add_processor(decompress_processor)
pipeline.setup_pipeline_protocol()

# Play through pipeline
player.play('pipeline://file//path/to/encrypted_compressed_video.dat')

Real-time Stream Generation

import time
import threading
import queue

class RealTimeStream:
    def __init__(self, player):
        self.player = player
        self.data_queue = queue.Queue()
        self.streaming = False
        
    def start_realtime_stream(self, stream_name):
        """Start real-time data streaming."""
        
        @self.player.python_stream(stream_name)
        def realtime_generator():
            while self.streaming:
                try:
                    # Get data with timeout
                    chunk = self.data_queue.get(timeout=1.0)
                    yield chunk
                except queue.Empty:
                    # Send empty chunk to keep stream alive
                    yield b''
        
        self.streaming = True
    
    def feed_data(self, data):
        """Feed data to the real-time stream."""
        if self.streaming:
            self.data_queue.put(data)
    
    def stop_streaming(self):
        """Stop real-time streaming."""
        self.streaming = False

# Usage
realtime = RealTimeStream(player)
realtime.start_realtime_stream('live_feed')

# Start playback
player.play('python://live_feed')

# Feed data in separate thread
def data_feeder():
    with open('/path/to/stream_source.mp4', 'rb') as f:
        while realtime.streaming:
            chunk = f.read(8192)
            if not chunk:
                break
            realtime.feed_data(chunk)
            time.sleep(0.01)  # Simulate real-time rate

feeder_thread = threading.Thread(target=data_feeder)
feeder_thread.start()

# Later...
# realtime.stop_streaming()
# feeder_thread.join()

Install with Tessl CLI

npx tessl i tessl/pypi-mpv@1.0.2

docs

advanced-rendering.md

core-playback.md

event-handling.md

index.md

input-keybinding.md

playlist-media.md

property-management.md

screenshots-overlays.md

streaming.md

tile.json