Pythonic bindings for FFmpeg's libraries enabling multimedia processing with audio/video encoding, decoding, format conversion, and stream manipulation.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Audio and video filtering capabilities using FFmpeg's filter system. PyAV provides access to FFmpeg's comprehensive filter library for audio and video processing pipelines.
Filter graphs organize and execute audio/video processing pipelines.
class Graph:
"""Filter graph for audio/video processing."""
# Properties
configured: bool # True if graph is configured
def __init__(self):
"""Create new filter graph."""
def add(self, filter_name, *args, **kwargs) -> FilterContext:
"""
Add filter to graph.
Parameters:
- filter_name: str - Filter name
- *args: Positional filter arguments
- **kwargs: Named filter arguments
Returns:
FilterContext for the added filter
"""
def add_buffer(self, template=None, width=None, height=None, format=None,
name=None, **kwargs) -> FilterContext:
"""
Add video buffer source.
Parameters:
- template: VideoStream - Template stream for properties
- width: int - Frame width
- height: int - Frame height
- format: str - Pixel format
- name: str - Buffer name
Returns:
Video buffer FilterContext
"""
def add_abuffer(self, template=None, format=None, layout=None, rate=None,
name=None, **kwargs) -> FilterContext:
"""
Add audio buffer source.
Parameters:
- template: AudioStream - Template stream for properties
- format: str - Audio format
- layout: str - Channel layout
- rate: int - Sample rate
- name: str - Buffer name
Returns:
Audio buffer FilterContext
"""
def configure(self) -> None:
"""Configure the filter graph for processing."""
def link_nodes(self, output_ctx, input_ctx, output_idx=0, input_idx=0) -> None:
"""
Link filter contexts.
Parameters:
- output_ctx: FilterContext - Source context
- input_ctx: FilterContext - Destination context
- output_idx: int - Output pad index
- input_idx: int - Input pad index
"""
def set_audio_frame_size(self, nb_samples) -> None:
"""Set audio frame size for the graph."""
def push(self, frame) -> None:
"""Push frame to graph input."""
def pull(self) -> Frame | None:
"""Pull frame from graph output."""
def vpush(self, frame) -> None:
"""Push video frame to graph."""
def vpull(self) -> VideoFrame | None:
"""Pull video frame from graph."""Filter discovery and introspection.
# Available filters
filters_available: set[str] # Set of available filter names
class Filter:
"""Filter information."""
# Properties
name: str # Filter name
description: str # Filter description
descriptor: Descriptor # Filter descriptor with options
options: tuple[Option, ...] # Available options
flags: int # Filter flags
def __init__(self, name):
"""
Get filter by name.
Parameters:
- name: str - Filter name
"""Individual filter instances within a graph.
class FilterContext:
"""Filter instance in a graph."""
# Properties
name: str | None # Filter context name
graph: Graph # Parent graph
def init(self, args=None, **kwargs) -> None:
"""
Initialize filter context.
Parameters:
- args: str - Filter argument string
- **kwargs: Named arguments
"""
def link_to(self, target, output_idx=0, input_idx=0) -> None:
"""
Link this context to another.
Parameters:
- target: FilterContext - Target context
- output_idx: int - Output pad index
- input_idx: int - Input pad index
"""
def push(self, frame) -> None:
"""
Push frame to this filter.
Parameters:
- frame: Frame - Input frame
"""
def pull(self) -> Frame | None:
"""
Pull processed frame.
Returns:
Processed frame or None
"""Specialized audio loudness normalization filter.
def stats(loudnorm_args: str, stream: AudioStream) -> bytes:
"""
Generate loudness normalization statistics.
Parameters:
- loudnorm_args: str - Loudnorm filter arguments
- stream: AudioStream - Input audio stream
Returns:
Statistics data for second-pass normalization
"""import av
# Open input video
input_container = av.open('input.mp4')
input_stream = input_container.streams.video[0]
# Create output
output_container = av.open('filtered.mp4', 'w')
output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
output_stream.width = input_stream.width
output_stream.height = input_stream.height
output_stream.pix_fmt = 'yuv420p'
# Create filter graph
graph = av.filter.Graph()
# Add input buffer
buffer_ctx = graph.add_buffer(template=input_stream)
# Add scale filter (resize)
scale_ctx = graph.add('scale', width=1280, height=720)
# Add color adjustment filter
colorbalance_ctx = graph.add('colorbalance',
rs=0.1, # Red shadows
gs=-0.1, # Green shadows
bs=0.05) # Blue shadows
# Add output buffersink
buffersink_ctx = graph.add('buffersink')
# Link filters: buffer -> scale -> colorbalance -> buffersink
buffer_ctx.link_to(scale_ctx)
scale_ctx.link_to(colorbalance_ctx)
colorbalance_ctx.link_to(buffersink_ctx)
# Configure graph
graph.configure()
# Process frames
for frame in input_container.decode(input_stream):
# Push frame to filter graph
buffer_ctx.push(frame)
# Pull filtered frames
while True:
try:
filtered_frame = buffersink_ctx.pull()
if filtered_frame is None:
break
# Set timing for output
filtered_frame.pts = frame.pts
filtered_frame.time_base = input_stream.time_base
# Encode and write
for packet in output_stream.encode(filtered_frame):
output_container.mux(packet)
except av.BlockingIOError:
break
# Flush
for packet in output_stream.encode():
output_container.mux(packet)
input_container.close()
output_container.close()import av
# Open input audio
input_container = av.open('input.wav')
input_stream = input_container.streams.audio[0]
# Create output
output_container = av.open('filtered.wav', 'w')
output_stream = output_container.add_stream('pcm_s16le', rate=input_stream.sample_rate)
output_stream.channels = input_stream.channels
output_stream.layout = input_stream.layout
# Create audio filter graph
graph = av.filter.Graph()
# Add audio input buffer
abuffer_ctx = graph.add_abuffer(template=input_stream)
# Add volume filter
volume_ctx = graph.add('volume', volume=1.5) # Increase volume by 50%
# Add high-pass filter
highpass_ctx = graph.add('highpass', frequency=80, poles=2)
# Add low-pass filter for noise reduction
lowpass_ctx = graph.add('lowpass', frequency=15000, poles=2)
# Add audio output
abuffersink_ctx = graph.add('abuffersink')
# Link audio filters
abuffer_ctx.link_to(volume_ctx)
volume_ctx.link_to(highpass_ctx)
highpass_ctx.link_to(lowpass_ctx)
lowpass_ctx.link_to(abuffersink_ctx)
# Configure graph
graph.configure()
# Process audio frames
for frame in input_container.decode(input_stream):
abuffer_ctx.push(frame)
while True:
try:
filtered_frame = abuffersink_ctx.pull()
if filtered_frame is None:
break
# Maintain timing
filtered_frame.pts = frame.pts
filtered_frame.time_base = input_stream.time_base
# Encode filtered audio
for packet in output_stream.encode(filtered_frame):
output_container.mux(packet)
except av.BlockingIOError:
break
# Flush encoder
for packet in output_stream.encode():
output_container.mux(packet)
input_container.close()
output_container.close()import av
def create_complex_video_filter(input_stream):
"""Create complex video processing filter chain."""
graph = av.filter.Graph()
# Input buffer
buffer_ctx = graph.add_buffer(template=input_stream)
# 1. Deinterlace if needed
deinterlace_ctx = graph.add('yadif', mode=0, parity=-1)
# 2. Denoise
denoise_ctx = graph.add('hqdn3d', luma_temporal=4.0, chroma_temporal=3.0)
# 3. Color correction
curves_ctx = graph.add('curves',
red='0/0 0.5/0.58 1/1', # Slight red lift
green='0/0 0.5/0.5 1/1', # No green change
blue='0/0 0.5/0.42 1/1') # Slight blue reduction
# 4. Sharpen
sharpen_ctx = graph.add('unsharp',
luma_msize_x=5, luma_msize_y=5,
luma_amount=1.2,
chroma_msize_x=3, chroma_msize_y=3,
chroma_amount=0.8)
# 5. Scale to target resolution
scale_ctx = graph.add('scale', width=1920, height=1080)
# 6. Add subtle vignette
vignette_ctx = graph.add('vignette', angle='PI/4', x0='w/2', y0='h/2')
# Output
buffersink_ctx = graph.add('buffersink')
# Link all filters
buffer_ctx.link_to(deinterlace_ctx)
deinterlace_ctx.link_to(denoise_ctx)
denoise_ctx.link_to(curves_ctx)
curves_ctx.link_to(sharpen_ctx)
sharpen_ctx.link_to(scale_ctx)
scale_ctx.link_to(vignette_ctx)
vignette_ctx.link_to(buffersink_ctx)
graph.configure()
return graph, buffer_ctx, buffersink_ctx
# Use complex filter
input_container = av.open('input.mov')
input_stream = input_container.streams.video[0]
output_container = av.open('processed.mp4', 'w')
output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
output_stream.width = 1920
output_stream.height = 1080
output_stream.pix_fmt = 'yuv420p'
# Create filter chain
graph, buffer_input, buffer_output = create_complex_video_filter(input_stream)
print("Processing with complex filter chain:")
print("- Deinterlacing")
print("- Noise reduction")
print("- Color correction")
print("- Sharpening")
print("- Scaling to 1080p")
print("- Vignette effect")
# Process video
frame_count = 0
for frame in input_container.decode(input_stream):
buffer_input.push(frame)
while True:
try:
processed_frame = buffer_output.pull()
if processed_frame is None:
break
processed_frame.pts = frame_count
processed_frame.time_base = output_stream.time_base
for packet in output_stream.encode(processed_frame):
output_container.mux(packet)
frame_count += 1
except av.BlockingIOError:
break
# Flush
for packet in output_stream.encode():
output_container.mux(packet)
print(f"Processed {frame_count} frames")
input_container.close()
output_container.close()import av
def normalize_audio_loudness(input_file, output_file, target_lufs=-23.0):
"""Normalize audio to target loudness using two-pass process."""
# First pass: analyze audio
print("First pass: analyzing audio...")
input_container = av.open(input_file)
input_stream = input_container.streams.audio[0]
# Create analysis filter
graph = av.filter.Graph()
abuffer = graph.add_abuffer(template=input_stream)
# Loudnorm filter for analysis
loudnorm = graph.add('loudnorm',
I=target_lufs, # Target integrated loudness
TP=-2.0, # Target true peak
LRA=11.0, # Target loudness range
print_format='json')
abuffersink = graph.add('abuffersink')
abuffer.link_to(loudnorm)
loudnorm.link_to(abuffersink)
graph.configure()
# Process for analysis (discard output)
for frame in input_container.decode(input_stream):
abuffer.push(frame)
while True:
try:
abuffersink.pull() # Discard frame, just analyze
except av.BlockingIOError:
break
input_container.close()
# Get analysis results (in real implementation, this would be extracted from filter)
# For this example, we'll simulate the stats
analysis_stats = {
'input_i': -16.0, # Input integrated loudness
'input_tp': -1.5, # Input true peak
'input_lra': 15.2, # Input loudness range
'input_thresh': -26.8, # Input threshold
'target_offset': -7.0 # Calculated offset
}
print(f"Analysis complete:")
print(f" Input integrated loudness: {analysis_stats['input_i']} LUFS")
print(f" Target offset: {analysis_stats['target_offset']} dB")
# Second pass: apply normalization
print("Second pass: applying normalization...")
input_container = av.open(input_file)
input_stream = input_container.streams.audio[0]
output_container = av.open(output_file, 'w')
output_stream = output_container.add_stream('aac', rate=input_stream.sample_rate)
output_stream.channels = input_stream.channels
output_stream.layout = input_stream.layout
# Create normalization filter with analysis results
graph = av.filter.Graph()
abuffer = graph.add_abuffer(template=input_stream)
loudnorm = graph.add('loudnorm',
I=target_lufs,
TP=-2.0,
LRA=11.0,
measured_I=analysis_stats['input_i'],
measured_TP=analysis_stats['input_tp'],
measured_LRA=analysis_stats['input_lra'],
measured_thresh=analysis_stats['input_thresh'],
offset=analysis_stats['target_offset'],
linear=True)
abuffersink = graph.add('abuffersink')
abuffer.link_to(loudnorm)
loudnorm.link_to(abuffersink)
graph.configure()
# Process and encode normalized audio
for frame in input_container.decode(input_stream):
abuffer.push(frame)
while True:
try:
normalized_frame = abuffersink.pull()
if normalized_frame is None:
break
for packet in output_stream.encode(normalized_frame):
output_container.mux(packet)
except av.BlockingIOError:
break
# Flush
for packet in output_stream.encode():
output_container.mux(packet)
input_container.close()
output_container.close()
print(f"Normalization complete: {output_file}")
# Normalize audio file
normalize_audio_loudness('input.wav', 'normalized.aac', target_lufs=-16.0)import av
class FilterChainBuilder:
"""Builder for creating filter chains."""
def __init__(self):
self.graph = av.filter.Graph()
self.contexts = []
self.last_context = None
def add_input_buffer(self, stream):
"""Add input buffer for audio or video stream."""
if stream.type == 'video':
ctx = self.graph.add_buffer(template=stream)
elif stream.type == 'audio':
ctx = self.graph.add_abuffer(template=stream)
else:
raise ValueError(f"Unsupported stream type: {stream.type}")
self.contexts.append(ctx)
self.last_context = ctx
return self
def add_filter(self, filter_name, **kwargs):
"""Add filter to chain."""
ctx = self.graph.add(filter_name, **kwargs)
if self.last_context:
self.last_context.link_to(ctx)
self.contexts.append(ctx)
self.last_context = ctx
return self
def add_output_buffer(self):
"""Add output buffer sink."""
if hasattr(self.contexts[0], 'template'):
# Determine type from first context
template = self.contexts[0].template
if template and hasattr(template, 'width'):
ctx = self.graph.add('buffersink')
else:
ctx = self.graph.add('abuffersink')
else:
ctx = self.graph.add('buffersink') # Default to video
if self.last_context:
self.last_context.link_to(ctx)
self.contexts.append(ctx)
self.last_context = ctx
return self
def build(self):
"""Configure and return the filter graph."""
self.graph.configure()
return self.graph, self.contexts[0], self.contexts[-1]
# Example usage
def process_with_builder(input_file, output_file):
"""Process video using filter chain builder."""
input_container = av.open(input_file)
input_stream = input_container.streams.video[0]
output_container = av.open(output_file, 'w')
output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
output_stream.width = 1280
output_stream.height = 720
output_stream.pix_fmt = 'yuv420p'
# Build filter chain
builder = FilterChainBuilder()
graph, input_ctx, output_ctx = (builder
.add_input_buffer(input_stream)
.add_filter('scale', width=1280, height=720)
.add_filter('eq', brightness=0.1, contrast=1.1, saturation=1.2)
.add_filter('unsharp', luma_amount=0.8)
.add_output_buffer()
.build())
print("Created filter chain: input -> scale -> eq -> unsharp -> output")
# Process frames
for frame in input_container.decode(input_stream):
input_ctx.push(frame)
while True:
try:
filtered_frame = output_ctx.pull()
if filtered_frame is None:
break
filtered_frame.pts = frame.pts
filtered_frame.time_base = input_stream.time_base
for packet in output_stream.encode(filtered_frame):
output_container.mux(packet)
except av.BlockingIOError:
break
# Flush
for packet in output_stream.encode():
output_container.mux(packet)
input_container.close()
output_container.close()
# Use builder
process_with_builder('input.mp4', 'enhanced.mp4')import av
def list_available_filters():
"""List all available filters by category."""
print(f"Total available filters: {len(av.filters_available)}")
# Categorize filters
video_filters = []
audio_filters = []
other_filters = []
for filter_name in sorted(av.filters_available):
try:
filter_obj = av.Filter(filter_name)
description = filter_obj.description
if 'video' in description.lower():
video_filters.append((filter_name, description))
elif 'audio' in description.lower():
audio_filters.append((filter_name, description))
else:
other_filters.append((filter_name, description))
except:
other_filters.append((filter_name, "No description"))
print(f"\nVideo filters ({len(video_filters)}):")
for name, desc in video_filters[:10]: # Show first 10
print(f" {name:20} - {desc[:60]}...")
print(f"\nAudio filters ({len(audio_filters)}):")
for name, desc in audio_filters[:10]: # Show first 10
print(f" {name:20} - {desc[:60]}...")
print(f"\nOther filters ({len(other_filters)}):")
for name, desc in other_filters[:5]: # Show first 5
print(f" {name:20} - {desc[:60]}...")
# List filters
list_available_filters()
# Get detailed info about specific filter
scale_filter = av.Filter('scale')
print(f"\nScale filter details:")
print(f" Name: {scale_filter.name}")
print(f" Description: {scale_filter.description}")
print(f" Options: {len(scale_filter.options)} available")Install with Tessl CLI
npx tessl i tessl/pypi-av