CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-av

Pythonic bindings for FFmpeg's libraries enabling multimedia processing with audio/video encoding, decoding, format conversion, and stream manipulation.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

streams.mddocs/

Packet and Stream Management

Low-level packet handling and stream operations for precise control over media data flow and timing. PyAV provides comprehensive access to FFmpeg's stream and packet management capabilities.

Capabilities

Packets

Packets contain compressed media data with timing and metadata information.

class Packet:
    """Container for compressed media data."""
    
    # Properties
    stream: Stream              # Associated stream
    stream_index: int          # Stream index in container
    time_base: Fraction        # Time base for timestamps
    pts: int | None           # Presentation timestamp
    dts: int | None           # Decode timestamp
    pos: int                  # Byte position in stream
    size: int                 # Packet size in bytes
    duration: int             # Packet duration in time_base units
    opaque: object | None     # User data
    
    # Packet flags
    is_keyframe: bool         # True if keyframe
    is_corrupt: bool          # True if corrupt
    is_discard: bool          # True if should be discarded
    is_trusted: bool          # True if trusted
    is_disposable: bool       # True if disposable
    
    def __init__(self, size=0):
        """
        Create packet.
        
        Parameters:
        - size: int - Initial packet size
        """
    
    def decode(self) -> list[SubtitleSet]:
        """
        Decode subtitle packet.
        
        Returns:
        List of subtitle sets (for subtitle packets only)
        """
    
    # Inherits Buffer methods
    def update(self, input: bytes) -> None: ...
    def __buffer__(self, flags: int) -> memoryview: ...
    def __bytes__(self) -> bytes: ...

Base Stream

Base stream class with common properties and methods.

class Disposition(Flag):
    """Stream disposition flags."""
    DEFAULT = 1            # Default stream
    DUB = 2               # Dubbed stream
    ORIGINAL = 4          # Original language
    COMMENT = 8           # Commentary
    LYRICS = 16           # Lyrics
    KARAOKE = 32          # Karaoke
    FORCED = 64           # Forced subtitles
    HEARING_IMPAIRED = 128 # Hearing impaired
    VISUAL_IMPAIRED = 256  # Visual impaired
    CLEAN_EFFECTS = 512    # Clean effects
    ATTACHED_PIC = 1024    # Attached picture
    TIMED_THUMBNAILS = 2048 # Timed thumbnails
    CAPTIONS = 4096        # Captions
    DESCRIPTIONS = 8192    # Descriptions
    METADATA = 16384       # Metadata
    DEPENDENT = 32768      # Dependent stream
    STILL_IMAGE = 65536    # Still image

class Stream:
    """Base media stream."""
    
    # Properties
    index: int                 # Stream index
    id: int                   # Stream ID
    type: str                 # Stream type ('video', 'audio', 'subtitle', etc.)
    profile: str | None       # Codec profile
    codec_context: CodecContext # Codec context
    container: Container      # Parent container
    metadata: dict[str, str]  # Stream metadata
    disposition: int          # Disposition flags
    
    # Timing
    time_base: Fraction       # Stream time base
    start_time: int | None    # Start time in time_base units
    duration: int | None      # Duration in time_base units
    frames: int               # Number of frames (0 if unknown)
    
    # Language and title
    language: str | None      # Language code
    title: str | None         # Stream title
    
    def encode(self, frame=None) -> list[Packet]:
        """
        Encode frame to packets.
        
        Parameters:
        - frame: Frame | None - Frame to encode (None flushes)
        
        Returns:
        List of encoded packets
        """
    
    def decode(self, packet=None) -> list[Frame]:
        """
        Decode packet to frames.
        
        Parameters:
        - packet: Packet | None - Packet to decode (None flushes)
        
        Returns:
        List of decoded frames
        """

Specialized Stream Types

Stream subclasses for different media types.

class AudioStream(Stream):
    """Audio stream with audio-specific properties."""
    
    type: Literal['audio']     # Stream type
    codec_context: AudioCodecContext
    
    # Audio properties (delegated to codec context)
    frame_size: int
    sample_rate: int
    rate: int
    bit_rate: int
    channels: int
    format: AudioFormat
    layout: AudioLayout

class VideoStream(Stream):
    """Video stream with video-specific properties."""
    
    type: Literal['video']     # Stream type
    codec_context: VideoCodecContext
    
    # Video properties (delegated to codec context)
    width: int
    height: int
    format: VideoFormat
    pix_fmt: str
    framerate: Fraction
    rate: Fraction
    bit_rate: int
    max_bit_rate: int
    sample_aspect_ratio: Fraction
    display_aspect_ratio: Fraction

class SubtitleStream(Stream):
    """Subtitle stream."""
    
    type: Literal['subtitle']  # Stream type
    codec_context: SubtitleCodecContext
    
    def decode(self, packet=None) -> list[SubtitleSet]:
        """Decode subtitle packet."""
    
    def decode2(self, packet=None) -> list[SubtitleSet]:
        """Alternative decode method."""

class DataStream(Stream):
    """Data stream for non-media data."""
    
    type: Literal['data']      # Stream type

class AttachmentStream(Stream):
    """Attachment stream (e.g., cover art)."""
    
    type: Literal['attachment'] # Stream type
    mimetype: str | None       # MIME type of attachment

Stream Container Management

The StreamContainer provides organized access to streams by type.

class StreamContainer:
    """Container managing streams in a media file."""
    
    # Stream collections by type
    video: tuple[VideoStream, ...]
    audio: tuple[AudioStream, ...]
    subtitles: tuple[SubtitleStream, ...]
    attachments: tuple[AttachmentStream, ...]
    data: tuple[DataStream, ...]
    other: tuple[Stream, ...]
    
    def __len__(self) -> int:
        """Total number of streams."""
    
    def __iter__(self) -> Iterator[Stream]:
        """Iterate over all streams."""
    
    def __getitem__(self, index: int) -> Stream:
        """Get stream by index."""
    
    def get(self, *, video=None, audio=None, subtitles=None, data=None) -> list[Stream]:
        """
        Get streams by type and criteria.
        
        Parameters:
        - video: int | tuple - Video stream selection
        - audio: int | tuple - Audio stream selection
        - subtitles: int | tuple - Subtitle stream selection
        - data: int | tuple - Data stream selection
        
        Returns:
        List of matching streams
        """
    
    def best(self, kind) -> Stream | None:
        """
        Get the best stream of a given type.
        
        Parameters:
        - kind: str - Stream type ('video', 'audio', 'subtitle')
        
        Returns:
        Best stream of the specified type or None
        """

Usage Examples

Basic Packet Inspection

import av

# Open container and examine packets
container = av.open('sample.mp4')

print(f"Container has {len(container.streams)} streams:")
for i, stream in enumerate(container.streams):
    print(f"  Stream {i}: {stream.type} ({stream.codec_context.name})")
    if stream.language:
        print(f"    Language: {stream.language}")
    if stream.title:
        print(f"    Title: {stream.title}")

# Process packets directly
packet_count = 0
for packet in container.demux():
    stream = packet.stream
    
    print(f"Packet {packet_count}:")
    print(f"  Stream: {stream.index} ({stream.type})")
    print(f"  Size: {packet.size} bytes")
    print(f"  PTS: {packet.pts}")
    print(f"  DTS: {packet.dts}")
    print(f"  Duration: {packet.duration}")
    print(f"  Keyframe: {packet.is_keyframe}")
    print(f"  Time: {packet.pts * stream.time_base if packet.pts else None}")
    
    packet_count += 1
    if packet_count >= 10:  # Examine first 10 packets
        break

container.close()

Stream Selection and Analysis

import av

def analyze_streams(filename):
    """Analyze all streams in a media file."""
    
    container = av.open(filename)
    
    print(f"File: {filename}")
    print(f"Format: {container.format.name} ({container.format.long_name})")
    print(f"Duration: {container.duration / av.time_base:.2f} seconds")
    print(f"Total streams: {len(container.streams)}")
    
    # Video streams
    if container.streams.video:
        print(f"\nVideo streams ({len(container.streams.video)}):")
        for i, stream in enumerate(container.streams.video):
            print(f"  Stream {stream.index}:")
            print(f"    Codec: {stream.codec_context.name}")
            print(f"    Resolution: {stream.width}x{stream.height}")
            print(f"    Pixel format: {stream.format.name}")
            print(f"    Frame rate: {stream.framerate}")
            print(f"    Bitrate: {stream.bit_rate}")
            print(f"    Duration: {stream.duration * stream.time_base if stream.duration else 'Unknown'}")
            
            # Check disposition
            if stream.disposition & av.stream.Disposition.DEFAULT:
                print(f"    Default: Yes")
            if stream.language:
                print(f"    Language: {stream.language}")
    
    # Audio streams
    if container.streams.audio:
        print(f"\nAudio streams ({len(container.streams.audio)}):")
        for i, stream in enumerate(container.streams.audio):
            print(f"  Stream {stream.index}:")
            print(f"    Codec: {stream.codec_context.name}")
            print(f"    Sample rate: {stream.sample_rate}")
            print(f"    Channels: {stream.channels}")
            print(f"    Layout: {stream.layout.name}")
            print(f"    Format: {stream.format.name}")
            print(f"    Bitrate: {stream.bit_rate}")
            
            if stream.disposition & av.stream.Disposition.DEFAULT:
                print(f"    Default: Yes")
            if stream.language:
                print(f"    Language: {stream.language}")
            if stream.title:
                print(f"    Title: {stream.title}")
    
    # Subtitle streams
    if container.streams.subtitles:
        print(f"\nSubtitle streams ({len(container.streams.subtitles)}):")
        for i, stream in enumerate(container.streams.subtitles):
            print(f"  Stream {stream.index}:")
            print(f"    Codec: {stream.codec_context.name}")
            if stream.language:
                print(f"    Language: {stream.language}")
            if stream.title:
                print(f"    Title: {stream.title}")
            
            # Check subtitle disposition
            if stream.disposition & av.stream.Disposition.FORCED:
                print(f"    Forced: Yes")
            if stream.disposition & av.stream.Disposition.HEARING_IMPAIRED:
                print(f"    Hearing impaired: Yes")
    
    # Attachment streams (cover art, etc.)
    if container.streams.attachments:
        print(f"\nAttachment streams ({len(container.streams.attachments)}):")
        for stream in container.streams.attachments:
            print(f"  Stream {stream.index}:")
            print(f"    Codec: {stream.codec_context.name}")
            print(f"    MIME type: {stream.mimetype}")
            if stream.title:
                print(f"    Filename: {stream.title}")
    
    # Find best streams
    best_video = container.streams.best('video')
    best_audio = container.streams.best('audio')
    
    if best_video:
        print(f"\nBest video stream: {best_video.index}")
    if best_audio:
        print(f"Best audio stream: {best_audio.index}")
    
    container.close()

# Analyze file
analyze_streams('movie.mkv')

Precise Packet Timing

import av

def extract_keyframes(input_file, output_dir):
    """Extract keyframes with precise timing information."""
    
    import os
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    container = av.open(input_file)
    video_stream = container.streams.video[0]
    
    print(f"Video stream info:")
    print(f"  Time base: {video_stream.time_base}")
    print(f"  Frame rate: {video_stream.framerate}")
    print(f"  Total duration: {container.duration / av.time_base:.2f}s")
    
    keyframe_count = 0
    
    for packet in container.demux(video_stream):
        if packet.is_keyframe:
            # Decode keyframe packet
            for frame in packet.decode():
                # Calculate precise timing
                pts_seconds = packet.pts * video_stream.time_base if packet.pts else 0
                dts_seconds = packet.dts * video_stream.time_base if packet.dts else 0
                
                print(f"Keyframe {keyframe_count}:")
                print(f"  PTS: {packet.pts} ({pts_seconds:.3f}s)")
                print(f"  DTS: {packet.dts} ({dts_seconds:.3f}s)")
                print(f"  Size: {packet.size} bytes")
                print(f"  Position: {packet.pos}")
                
                # Save keyframe
                output_path = os.path.join(
                    output_dir,
                    f"keyframe_{keyframe_count:04d}_{pts_seconds:.3f}s.jpg"
                )
                frame.save(output_path)
                
                keyframe_count += 1
                
                # Limit extraction
                if keyframe_count >= 20:
                    break
        
        if keyframe_count >= 20:
            break
    
    container.close()
    print(f"Extracted {keyframe_count} keyframes to {output_dir}")

# Extract keyframes
extract_keyframes('video.mp4', 'keyframes/')

Multi-Stream Processing

import av

def process_multi_stream(input_file, output_file):
    """Process multiple streams with different handling."""
    
    input_container = av.open(input_file)
    output_container = av.open(output_file, 'w')
    
    # Map input streams to output streams
    stream_mapping = {}
    
    # Process video streams
    for input_stream in input_container.streams.video:
        output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
        output_stream.width = input_stream.width // 2  # Half resolution
        output_stream.height = input_stream.height // 2
        output_stream.pix_fmt = 'yuv420p'
        
        stream_mapping[input_stream.index] = output_stream
        print(f"Video stream {input_stream.index}: {input_stream.width}x{input_stream.height} -> {output_stream.width}x{output_stream.height}")
    
    # Process audio streams (copy first audio stream only)
    if input_container.streams.audio:
        input_stream = input_container.streams.audio[0]
        output_stream = output_container.add_stream('aac', rate=input_stream.sample_rate)
        output_stream.channels = input_stream.channels
        output_stream.layout = input_stream.layout
        
        stream_mapping[input_stream.index] = output_stream
        print(f"Audio stream {input_stream.index}: {input_stream.sample_rate}Hz {input_stream.channels}ch")
    
    # Process packets by stream
    frame_counts = {}
    
    for packet in input_container.demux():
        input_stream_index = packet.stream_index
        
        if input_stream_index not in stream_mapping:
            continue  # Skip unmapped streams
        
        input_stream = input_container.streams[input_stream_index]
        output_stream = stream_mapping[input_stream_index]
        
        # Initialize frame counter
        if input_stream_index not in frame_counts:
            frame_counts[input_stream_index] = 0
        
        # Decode and process frames
        for frame in packet.decode():
            if input_stream.type == 'video':
                # Resize video frame
                resized_frame = frame.reformat(
                    width=output_stream.width,
                    height=output_stream.height
                )
                resized_frame.pts = frame_counts[input_stream_index]
                resized_frame.time_base = output_stream.time_base
                
                # Encode and mux
                for out_packet in output_stream.encode(resized_frame):
                    output_container.mux(out_packet)
                    
            elif input_stream.type == 'audio':
                # Pass through audio (could apply processing here)
                frame.pts = frame_counts[input_stream_index] * output_stream.frame_size
                frame.time_base = output_stream.time_base
                
                for out_packet in output_stream.encode(frame):
                    output_container.mux(out_packet)
            
            frame_counts[input_stream_index] += 1
    
    # Flush all encoders
    for output_stream in stream_mapping.values():
        for packet in output_stream.encode():
            output_container.mux(packet)
    
    # Report processing
    for stream_index, count in frame_counts.items():
        stream_type = input_container.streams[stream_index].type
        print(f"Processed {count} {stream_type} frames from stream {stream_index}")
    
    input_container.close()
    output_container.close()

# Process multiple streams
process_multi_stream('input.mkv', 'processed.mp4')

Stream Metadata Manipulation

import av

def copy_with_metadata(input_file, output_file, new_metadata=None):
    """Copy file while modifying stream metadata."""
    
    input_container = av.open(input_file)
    output_container = av.open(output_file, 'w')
    
    # Copy container metadata
    for key, value in input_container.metadata.items():
        output_container.metadata[key] = value
    
    # Add new container metadata
    if new_metadata:
        for key, value in new_metadata.items():
            output_container.metadata[key] = value
    
    # Process streams
    for input_stream in input_container.streams:
        if input_stream.type == 'video':
            output_stream = output_container.add_stream_from_template(input_stream)
            
            # Copy video metadata
            for key, value in input_stream.metadata.items():
                output_stream.metadata[key] = value
                
            # Set custom metadata
            output_stream.metadata['encoder'] = 'PyAV'
            output_stream.metadata['processed_by'] = 'Python script'
            
        elif input_stream.type == 'audio':
            output_stream = output_container.add_stream_from_template(input_stream)
            
            # Copy and modify audio metadata
            for key, value in input_stream.metadata.items():
                output_stream.metadata[key] = value
            
            # Language tagging
            if not input_stream.language:
                output_stream.language = 'eng'  # Default to English
            
            # Title modification
            if input_stream.title:
                output_stream.title = f"Enhanced {input_stream.title}"
            else:
                output_stream.title = f"Audio Track {input_stream.index}"
    
    # Copy data with metadata preservation
    for packet in input_container.demux():
        input_stream = packet.stream
        output_stream = output_container.streams[input_stream.index]
        
        # Update packet stream reference
        packet.stream = output_stream
        output_container.mux(packet)
    
    print("Metadata copying complete:")
    print(f"  Container metadata: {len(output_container.metadata)} entries")
    for i, stream in enumerate(output_container.streams):
        print(f"  Stream {i} metadata: {len(stream.metadata)} entries")
        if stream.language:
            print(f"    Language: {stream.language}")
        if stream.title:
            print(f"    Title: {stream.title}")
    
    input_container.close()
    output_container.close()

# Copy with metadata
new_metadata = {
    'title': 'Processed Video',
    'artist': 'PyAV Processing',
    'creation_time': '2024-01-01T00:00:00.000000Z'
}

copy_with_metadata('input.mp4', 'output_with_metadata.mp4', new_metadata)

Stream Time Synchronization

import av

def synchronize_streams(input_file, output_file, audio_delay_ms=0):
    """Synchronize audio and video streams with optional delay."""
    
    input_container = av.open(input_file)
    output_container = av.open(output_file, 'w')
    
    # Get streams
    video_stream = input_container.streams.video[0]
    audio_stream = input_container.streams.audio[0]
    
    # Create output streams
    out_video = output_container.add_stream_from_template(video_stream)
    out_audio = output_container.add_stream_from_template(audio_stream)
    
    print(f"Input timing:")
    print(f"  Video time base: {video_stream.time_base}")
    print(f"  Audio time base: {audio_stream.time_base}")
    print(f"  Audio delay: {audio_delay_ms}ms")
    
    # Calculate delay in audio time base units
    audio_delay_units = int(audio_delay_ms * audio_stream.sample_rate / 1000)
    
    # Track timing
    video_pts = 0
    audio_pts = audio_delay_units  # Start with delay
    
    # Process packets with timing adjustment
    for packet in input_container.demux():
        if packet.stream == video_stream:
            # Process video packets
            for frame in packet.decode():
                frame.pts = video_pts
                frame.time_base = out_video.time_base
                
                for out_packet in out_video.encode(frame):
                    output_container.mux(out_packet)
                
                video_pts += 1
                
        elif packet.stream == audio_stream:
            # Process audio packets with delay
            for frame in packet.decode():
                frame.pts = audio_pts
                frame.time_base = out_audio.time_base
                
                for out_packet in out_audio.encode(frame):
                    output_container.mux(out_packet)
                
                audio_pts += frame.samples
    
    # Flush encoders
    for packet in out_video.encode():
        output_container.mux(packet)
    for packet in out_audio.encode():
        output_container.mux(packet)
    
    print(f"Synchronization complete:")
    print(f"  Final video PTS: {video_pts}")
    print(f"  Final audio PTS: {audio_pts}")
    print(f"  Audio delay applied: {audio_delay_ms}ms")
    
    input_container.close()
    output_container.close()

# Synchronize with 100ms audio delay
synchronize_streams('input.mp4', 'synchronized.mp4', audio_delay_ms=100)

Install with Tessl CLI

npx tessl i tessl/pypi-av

docs

audio.md

codecs.md

containers.md

filters.md

index.md

streams.md

video.md

tile.json