CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-av

Pythonic bindings for FFmpeg's libraries enabling multimedia processing with audio/video encoding, decoding, format conversion, and stream manipulation.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

filters.mddocs/

Filter System

Audio and video filtering capabilities using FFmpeg's filter system. PyAV provides access to FFmpeg's comprehensive filter library for audio and video processing pipelines.

Capabilities

Filter Graph

Filter graphs organize and execute audio/video processing pipelines.

class Graph:
    """Filter graph for audio/video processing."""
    
    # Properties
    configured: bool            # True if graph is configured
    
    def __init__(self):
        """Create new filter graph."""
    
    def add(self, filter_name, *args, **kwargs) -> FilterContext:
        """
        Add filter to graph.
        
        Parameters:
        - filter_name: str - Filter name
        - *args: Positional filter arguments
        - **kwargs: Named filter arguments
        
        Returns:
        FilterContext for the added filter
        """
    
    def add_buffer(self, template=None, width=None, height=None, format=None,
                   name=None, **kwargs) -> FilterContext:
        """
        Add video buffer source.
        
        Parameters:
        - template: VideoStream - Template stream for properties
        - width: int - Frame width
        - height: int - Frame height
        - format: str - Pixel format
        - name: str - Buffer name
        
        Returns:
        Video buffer FilterContext
        """
    
    def add_abuffer(self, template=None, format=None, layout=None, rate=None,
                    name=None, **kwargs) -> FilterContext:
        """
        Add audio buffer source.
        
        Parameters:
        - template: AudioStream - Template stream for properties
        - format: str - Audio format
        - layout: str - Channel layout
        - rate: int - Sample rate
        - name: str - Buffer name
        
        Returns:
        Audio buffer FilterContext
        """
    
    def configure(self) -> None:
        """Configure the filter graph for processing."""
    
    def link_nodes(self, output_ctx, input_ctx, output_idx=0, input_idx=0) -> None:
        """
        Link filter contexts.
        
        Parameters:
        - output_ctx: FilterContext - Source context
        - input_ctx: FilterContext - Destination context
        - output_idx: int - Output pad index
        - input_idx: int - Input pad index
        """
    
    def set_audio_frame_size(self, nb_samples) -> None:
        """Set audio frame size for the graph."""
    
    def push(self, frame) -> None:
        """Push frame to graph input."""
    
    def pull(self) -> Frame | None:
        """Pull frame from graph output."""
    
    def vpush(self, frame) -> None:
        """Push video frame to graph."""
    
    def vpull(self) -> VideoFrame | None:
        """Pull video frame from graph."""

Filter Information

Filter discovery and introspection.

# Available filters
filters_available: set[str]  # Set of available filter names

class Filter:
    """Filter information."""
    
    # Properties
    name: str                   # Filter name
    description: str            # Filter description
    descriptor: Descriptor      # Filter descriptor with options
    options: tuple[Option, ...] # Available options
    flags: int                 # Filter flags
    
    def __init__(self, name):
        """
        Get filter by name.
        
        Parameters:
        - name: str - Filter name
        """

Filter Context

Individual filter instances within a graph.

class FilterContext:
    """Filter instance in a graph."""
    
    # Properties
    name: str | None           # Filter context name
    graph: Graph              # Parent graph
    
    def init(self, args=None, **kwargs) -> None:
        """
        Initialize filter context.
        
        Parameters:
        - args: str - Filter argument string
        - **kwargs: Named arguments
        """
    
    def link_to(self, target, output_idx=0, input_idx=0) -> None:
        """
        Link this context to another.
        
        Parameters:
        - target: FilterContext - Target context
        - output_idx: int - Output pad index
        - input_idx: int - Input pad index
        """
    
    def push(self, frame) -> None:
        """
        Push frame to this filter.
        
        Parameters:
        - frame: Frame - Input frame
        """
    
    def pull(self) -> Frame | None:
        """
        Pull processed frame.
        
        Returns:
        Processed frame or None
        """

Audio Normalization

Specialized audio loudness normalization filter.

def stats(loudnorm_args: str, stream: AudioStream) -> bytes:
    """
    Generate loudness normalization statistics.
    
    Parameters:
    - loudnorm_args: str - Loudnorm filter arguments
    - stream: AudioStream - Input audio stream
    
    Returns:
    Statistics data for second-pass normalization
    """

Usage Examples

Basic Video Filtering

import av

# Open input video
input_container = av.open('input.mp4')
input_stream = input_container.streams.video[0]

# Create output
output_container = av.open('filtered.mp4', 'w')
output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
output_stream.width = input_stream.width
output_stream.height = input_stream.height
output_stream.pix_fmt = 'yuv420p'

# Create filter graph
graph = av.filter.Graph()

# Add input buffer
buffer_ctx = graph.add_buffer(template=input_stream)

# Add scale filter (resize)
scale_ctx = graph.add('scale', width=1280, height=720)

# Add color adjustment filter
colorbalance_ctx = graph.add('colorbalance', 
                            rs=0.1,  # Red shadows
                            gs=-0.1, # Green shadows  
                            bs=0.05) # Blue shadows

# Add output buffersink
buffersink_ctx = graph.add('buffersink')

# Link filters: buffer -> scale -> colorbalance -> buffersink
buffer_ctx.link_to(scale_ctx)
scale_ctx.link_to(colorbalance_ctx)
colorbalance_ctx.link_to(buffersink_ctx)

# Configure graph
graph.configure()

# Process frames
for frame in input_container.decode(input_stream):
    # Push frame to filter graph
    buffer_ctx.push(frame)
    
    # Pull filtered frames
    while True:
        try:
            filtered_frame = buffersink_ctx.pull()
            if filtered_frame is None:
                break
                
            # Set timing for output
            filtered_frame.pts = frame.pts
            filtered_frame.time_base = input_stream.time_base
            
            # Encode and write
            for packet in output_stream.encode(filtered_frame):
                output_container.mux(packet)
                
        except av.BlockingIOError:
            break

# Flush
for packet in output_stream.encode():
    output_container.mux(packet)

input_container.close()
output_container.close()

Audio Filtering

import av

# Open input audio
input_container = av.open('input.wav')
input_stream = input_container.streams.audio[0]

# Create output
output_container = av.open('filtered.wav', 'w')
output_stream = output_container.add_stream('pcm_s16le', rate=input_stream.sample_rate)
output_stream.channels = input_stream.channels
output_stream.layout = input_stream.layout

# Create audio filter graph
graph = av.filter.Graph()

# Add audio input buffer
abuffer_ctx = graph.add_abuffer(template=input_stream)

# Add volume filter
volume_ctx = graph.add('volume', volume=1.5)  # Increase volume by 50%

# Add high-pass filter
highpass_ctx = graph.add('highpass', frequency=80, poles=2)

# Add low-pass filter for noise reduction
lowpass_ctx = graph.add('lowpass', frequency=15000, poles=2)

# Add audio output
abuffersink_ctx = graph.add('abuffersink')

# Link audio filters
abuffer_ctx.link_to(volume_ctx)
volume_ctx.link_to(highpass_ctx)
highpass_ctx.link_to(lowpass_ctx)
lowpass_ctx.link_to(abuffersink_ctx)

# Configure graph
graph.configure()

# Process audio frames
for frame in input_container.decode(input_stream):
    abuffer_ctx.push(frame)
    
    while True:
        try:
            filtered_frame = abuffersink_ctx.pull()
            if filtered_frame is None:
                break
                
            # Maintain timing
            filtered_frame.pts = frame.pts
            filtered_frame.time_base = input_stream.time_base
            
            # Encode filtered audio
            for packet in output_stream.encode(filtered_frame):
                output_container.mux(packet)
                
        except av.BlockingIOError:
            break

# Flush encoder
for packet in output_stream.encode():
    output_container.mux(packet)

input_container.close()
output_container.close()

Complex Video Filter Chain

import av

def create_complex_video_filter(input_stream):
    """Create complex video processing filter chain."""
    
    graph = av.filter.Graph()
    
    # Input buffer
    buffer_ctx = graph.add_buffer(template=input_stream)
    
    # 1. Deinterlace if needed
    deinterlace_ctx = graph.add('yadif', mode=0, parity=-1)
    
    # 2. Denoise
    denoise_ctx = graph.add('hqdn3d', luma_temporal=4.0, chroma_temporal=3.0)
    
    # 3. Color correction
    curves_ctx = graph.add('curves', 
                          red='0/0 0.5/0.58 1/1',      # Slight red lift
                          green='0/0 0.5/0.5 1/1',     # No green change
                          blue='0/0 0.5/0.42 1/1')     # Slight blue reduction
    
    # 4. Sharpen
    sharpen_ctx = graph.add('unsharp', 
                           luma_msize_x=5, luma_msize_y=5,
                           luma_amount=1.2,
                           chroma_msize_x=3, chroma_msize_y=3,
                           chroma_amount=0.8)
    
    # 5. Scale to target resolution
    scale_ctx = graph.add('scale', width=1920, height=1080)
    
    # 6. Add subtle vignette
    vignette_ctx = graph.add('vignette', angle='PI/4', x0='w/2', y0='h/2')
    
    # Output
    buffersink_ctx = graph.add('buffersink')
    
    # Link all filters
    buffer_ctx.link_to(deinterlace_ctx)
    deinterlace_ctx.link_to(denoise_ctx)
    denoise_ctx.link_to(curves_ctx)
    curves_ctx.link_to(sharpen_ctx)
    sharpen_ctx.link_to(scale_ctx)
    scale_ctx.link_to(vignette_ctx)
    vignette_ctx.link_to(buffersink_ctx)
    
    graph.configure()
    
    return graph, buffer_ctx, buffersink_ctx

# Use complex filter
input_container = av.open('input.mov')
input_stream = input_container.streams.video[0]

output_container = av.open('processed.mp4', 'w')
output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
output_stream.width = 1920
output_stream.height = 1080
output_stream.pix_fmt = 'yuv420p'

# Create filter chain
graph, buffer_input, buffer_output = create_complex_video_filter(input_stream)

print("Processing with complex filter chain:")
print("- Deinterlacing")
print("- Noise reduction")
print("- Color correction")
print("- Sharpening")
print("- Scaling to 1080p")
print("- Vignette effect")

# Process video
frame_count = 0
for frame in input_container.decode(input_stream):
    buffer_input.push(frame)
    
    while True:
        try:
            processed_frame = buffer_output.pull()
            if processed_frame is None:
                break
                
            processed_frame.pts = frame_count
            processed_frame.time_base = output_stream.time_base
            
            for packet in output_stream.encode(processed_frame):
                output_container.mux(packet)
                
            frame_count += 1
            
        except av.BlockingIOError:
            break

# Flush
for packet in output_stream.encode():
    output_container.mux(packet)

print(f"Processed {frame_count} frames")
input_container.close()
output_container.close()

Audio Loudness Normalization

import av

def normalize_audio_loudness(input_file, output_file, target_lufs=-23.0):
    """Normalize audio to target loudness using two-pass process."""
    
    # First pass: analyze audio
    print("First pass: analyzing audio...")
    
    input_container = av.open(input_file)
    input_stream = input_container.streams.audio[0]
    
    # Create analysis filter
    graph = av.filter.Graph()
    abuffer = graph.add_abuffer(template=input_stream)
    
    # Loudnorm filter for analysis
    loudnorm = graph.add('loudnorm', 
                        I=target_lufs,      # Target integrated loudness
                        TP=-2.0,            # Target true peak
                        LRA=11.0,           # Target loudness range
                        print_format='json')
    
    abuffersink = graph.add('abuffersink')
    
    abuffer.link_to(loudnorm)
    loudnorm.link_to(abuffersink)
    graph.configure()
    
    # Process for analysis (discard output)
    for frame in input_container.decode(input_stream):
        abuffer.push(frame)
        
        while True:
            try:
                abuffersink.pull()  # Discard frame, just analyze
            except av.BlockingIOError:
                break
    
    input_container.close()
    
    # Get analysis results (in real implementation, this would be extracted from filter)
    # For this example, we'll simulate the stats
    analysis_stats = {
        'input_i': -16.0,     # Input integrated loudness
        'input_tp': -1.5,     # Input true peak
        'input_lra': 15.2,    # Input loudness range
        'input_thresh': -26.8, # Input threshold
        'target_offset': -7.0  # Calculated offset
    }
    
    print(f"Analysis complete:")
    print(f"  Input integrated loudness: {analysis_stats['input_i']} LUFS")
    print(f"  Target offset: {analysis_stats['target_offset']} dB")
    
    # Second pass: apply normalization
    print("Second pass: applying normalization...")
    
    input_container = av.open(input_file)
    input_stream = input_container.streams.audio[0]
    
    output_container = av.open(output_file, 'w')
    output_stream = output_container.add_stream('aac', rate=input_stream.sample_rate)
    output_stream.channels = input_stream.channels
    output_stream.layout = input_stream.layout
    
    # Create normalization filter with analysis results
    graph = av.filter.Graph()
    abuffer = graph.add_abuffer(template=input_stream)
    
    loudnorm = graph.add('loudnorm',
                        I=target_lufs,
                        TP=-2.0,
                        LRA=11.0,
                        measured_I=analysis_stats['input_i'],
                        measured_TP=analysis_stats['input_tp'],
                        measured_LRA=analysis_stats['input_lra'],
                        measured_thresh=analysis_stats['input_thresh'],
                        offset=analysis_stats['target_offset'],
                        linear=True)
    
    abuffersink = graph.add('abuffersink')
    
    abuffer.link_to(loudnorm)
    loudnorm.link_to(abuffersink)
    graph.configure()
    
    # Process and encode normalized audio
    for frame in input_container.decode(input_stream):
        abuffer.push(frame)
        
        while True:
            try:
                normalized_frame = abuffersink.pull()
                if normalized_frame is None:
                    break
                    
                for packet in output_stream.encode(normalized_frame):
                    output_container.mux(packet)
                    
            except av.BlockingIOError:
                break
    
    # Flush
    for packet in output_stream.encode():
        output_container.mux(packet)
    
    input_container.close()
    output_container.close()
    
    print(f"Normalization complete: {output_file}")

# Normalize audio file
normalize_audio_loudness('input.wav', 'normalized.aac', target_lufs=-16.0)

Custom Filter Chain Builder

import av

class FilterChainBuilder:
    """Builder for creating filter chains."""
    
    def __init__(self):
        self.graph = av.filter.Graph()
        self.contexts = []
        self.last_context = None
        
    def add_input_buffer(self, stream):
        """Add input buffer for audio or video stream."""
        if stream.type == 'video':
            ctx = self.graph.add_buffer(template=stream)
        elif stream.type == 'audio':
            ctx = self.graph.add_abuffer(template=stream)
        else:
            raise ValueError(f"Unsupported stream type: {stream.type}")
            
        self.contexts.append(ctx)
        self.last_context = ctx
        return self
    
    def add_filter(self, filter_name, **kwargs):
        """Add filter to chain."""
        ctx = self.graph.add(filter_name, **kwargs)
        
        if self.last_context:
            self.last_context.link_to(ctx)
            
        self.contexts.append(ctx)
        self.last_context = ctx
        return self
    
    def add_output_buffer(self):
        """Add output buffer sink."""
        if hasattr(self.contexts[0], 'template'):
            # Determine type from first context
            template = self.contexts[0].template
            if template and hasattr(template, 'width'):
                ctx = self.graph.add('buffersink')
            else:
                ctx = self.graph.add('abuffersink')
        else:
            ctx = self.graph.add('buffersink')  # Default to video
            
        if self.last_context:
            self.last_context.link_to(ctx)
            
        self.contexts.append(ctx)
        self.last_context = ctx
        return self
    
    def build(self):
        """Configure and return the filter graph."""
        self.graph.configure()
        return self.graph, self.contexts[0], self.contexts[-1]

# Example usage
def process_with_builder(input_file, output_file):
    """Process video using filter chain builder."""
    
    input_container = av.open(input_file)
    input_stream = input_container.streams.video[0]
    
    output_container = av.open(output_file, 'w')
    output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
    output_stream.width = 1280
    output_stream.height = 720
    output_stream.pix_fmt = 'yuv420p'
    
    # Build filter chain
    builder = FilterChainBuilder()
    graph, input_ctx, output_ctx = (builder
        .add_input_buffer(input_stream)
        .add_filter('scale', width=1280, height=720)
        .add_filter('eq', brightness=0.1, contrast=1.1, saturation=1.2)
        .add_filter('unsharp', luma_amount=0.8)
        .add_output_buffer()
        .build())
    
    print("Created filter chain: input -> scale -> eq -> unsharp -> output")
    
    # Process frames
    for frame in input_container.decode(input_stream):
        input_ctx.push(frame)
        
        while True:
            try:
                filtered_frame = output_ctx.pull()
                if filtered_frame is None:
                    break
                    
                filtered_frame.pts = frame.pts
                filtered_frame.time_base = input_stream.time_base
                
                for packet in output_stream.encode(filtered_frame):
                    output_container.mux(packet)
                    
            except av.BlockingIOError:
                break
    
    # Flush
    for packet in output_stream.encode():
        output_container.mux(packet)
    
    input_container.close()
    output_container.close()

# Use builder
process_with_builder('input.mp4', 'enhanced.mp4')

Available Filters Reference

import av

def list_available_filters():
    """List all available filters by category."""
    
    print(f"Total available filters: {len(av.filters_available)}")
    
    # Categorize filters
    video_filters = []
    audio_filters = []
    other_filters = []
    
    for filter_name in sorted(av.filters_available):
        try:
            filter_obj = av.Filter(filter_name)
            description = filter_obj.description
            
            if 'video' in description.lower():
                video_filters.append((filter_name, description))
            elif 'audio' in description.lower():
                audio_filters.append((filter_name, description))
            else:
                other_filters.append((filter_name, description))
        except:
            other_filters.append((filter_name, "No description"))
    
    print(f"\nVideo filters ({len(video_filters)}):")
    for name, desc in video_filters[:10]:  # Show first 10
        print(f"  {name:20} - {desc[:60]}...")
    
    print(f"\nAudio filters ({len(audio_filters)}):")
    for name, desc in audio_filters[:10]:  # Show first 10
        print(f"  {name:20} - {desc[:60]}...")
    
    print(f"\nOther filters ({len(other_filters)}):")
    for name, desc in other_filters[:5]:  # Show first 5
        print(f"  {name:20} - {desc[:60]}...")

# List filters
list_available_filters()

# Get detailed info about specific filter
scale_filter = av.Filter('scale')
print(f"\nScale filter details:")
print(f"  Name: {scale_filter.name}")
print(f"  Description: {scale_filter.description}")
print(f"  Options: {len(scale_filter.options)} available")

Install with Tessl CLI

npx tessl i tessl/pypi-av

docs

audio.md

codecs.md

containers.md

filters.md

index.md

streams.md

video.md

tile.json