Pythonic bindings for FFmpeg's libraries enabling multimedia processing with audio/video encoding, decoding, format conversion, and stream manipulation.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Low-level packet handling and stream operations for precise control over media data flow and timing. PyAV provides comprehensive access to FFmpeg's stream and packet management capabilities.
Packets contain compressed media data with timing and metadata information.
class Packet:
"""Container for compressed media data."""
# Properties
stream: Stream # Associated stream
stream_index: int # Stream index in container
time_base: Fraction # Time base for timestamps
pts: int | None # Presentation timestamp
dts: int | None # Decode timestamp
pos: int # Byte position in stream
size: int # Packet size in bytes
duration: int # Packet duration in time_base units
opaque: object | None # User data
# Packet flags
is_keyframe: bool # True if keyframe
is_corrupt: bool # True if corrupt
is_discard: bool # True if should be discarded
is_trusted: bool # True if trusted
is_disposable: bool # True if disposable
def __init__(self, size=0):
"""
Create packet.
Parameters:
- size: int - Initial packet size
"""
def decode(self) -> list[SubtitleSet]:
"""
Decode subtitle packet.
Returns:
List of subtitle sets (for subtitle packets only)
"""
# Inherits Buffer methods
def update(self, input: bytes) -> None: ...
def __buffer__(self, flags: int) -> memoryview: ...
def __bytes__(self) -> bytes: ...Base stream class with common properties and methods.
class Disposition(Flag):
"""Stream disposition flags."""
DEFAULT = 1 # Default stream
DUB = 2 # Dubbed stream
ORIGINAL = 4 # Original language
COMMENT = 8 # Commentary
LYRICS = 16 # Lyrics
KARAOKE = 32 # Karaoke
FORCED = 64 # Forced subtitles
HEARING_IMPAIRED = 128 # Hearing impaired
VISUAL_IMPAIRED = 256 # Visual impaired
CLEAN_EFFECTS = 512 # Clean effects
ATTACHED_PIC = 1024 # Attached picture
TIMED_THUMBNAILS = 2048 # Timed thumbnails
CAPTIONS = 4096 # Captions
DESCRIPTIONS = 8192 # Descriptions
METADATA = 16384 # Metadata
DEPENDENT = 32768 # Dependent stream
STILL_IMAGE = 65536 # Still image
class Stream:
"""Base media stream."""
# Properties
index: int # Stream index
id: int # Stream ID
type: str # Stream type ('video', 'audio', 'subtitle', etc.)
profile: str | None # Codec profile
codec_context: CodecContext # Codec context
container: Container # Parent container
metadata: dict[str, str] # Stream metadata
disposition: int # Disposition flags
# Timing
time_base: Fraction # Stream time base
start_time: int | None # Start time in time_base units
duration: int | None # Duration in time_base units
frames: int # Number of frames (0 if unknown)
# Language and title
language: str | None # Language code
title: str | None # Stream title
def encode(self, frame=None) -> list[Packet]:
"""
Encode frame to packets.
Parameters:
- frame: Frame | None - Frame to encode (None flushes)
Returns:
List of encoded packets
"""
def decode(self, packet=None) -> list[Frame]:
"""
Decode packet to frames.
Parameters:
- packet: Packet | None - Packet to decode (None flushes)
Returns:
List of decoded frames
"""Stream subclasses for different media types.
class AudioStream(Stream):
"""Audio stream with audio-specific properties."""
type: Literal['audio'] # Stream type
codec_context: AudioCodecContext
# Audio properties (delegated to codec context)
frame_size: int
sample_rate: int
rate: int
bit_rate: int
channels: int
format: AudioFormat
layout: AudioLayout
class VideoStream(Stream):
"""Video stream with video-specific properties."""
type: Literal['video'] # Stream type
codec_context: VideoCodecContext
# Video properties (delegated to codec context)
width: int
height: int
format: VideoFormat
pix_fmt: str
framerate: Fraction
rate: Fraction
bit_rate: int
max_bit_rate: int
sample_aspect_ratio: Fraction
display_aspect_ratio: Fraction
class SubtitleStream(Stream):
"""Subtitle stream."""
type: Literal['subtitle'] # Stream type
codec_context: SubtitleCodecContext
def decode(self, packet=None) -> list[SubtitleSet]:
"""Decode subtitle packet."""
def decode2(self, packet=None) -> list[SubtitleSet]:
"""Alternative decode method."""
class DataStream(Stream):
"""Data stream for non-media data."""
type: Literal['data'] # Stream type
class AttachmentStream(Stream):
"""Attachment stream (e.g., cover art)."""
type: Literal['attachment'] # Stream type
mimetype: str | None # MIME type of attachmentThe StreamContainer provides organized access to streams by type.
class StreamContainer:
"""Container managing streams in a media file."""
# Stream collections by type
video: tuple[VideoStream, ...]
audio: tuple[AudioStream, ...]
subtitles: tuple[SubtitleStream, ...]
attachments: tuple[AttachmentStream, ...]
data: tuple[DataStream, ...]
other: tuple[Stream, ...]
def __len__(self) -> int:
"""Total number of streams."""
def __iter__(self) -> Iterator[Stream]:
"""Iterate over all streams."""
def __getitem__(self, index: int) -> Stream:
"""Get stream by index."""
def get(self, *, video=None, audio=None, subtitles=None, data=None) -> list[Stream]:
"""
Get streams by type and criteria.
Parameters:
- video: int | tuple - Video stream selection
- audio: int | tuple - Audio stream selection
- subtitles: int | tuple - Subtitle stream selection
- data: int | tuple - Data stream selection
Returns:
List of matching streams
"""
def best(self, kind) -> Stream | None:
"""
Get the best stream of a given type.
Parameters:
- kind: str - Stream type ('video', 'audio', 'subtitle')
Returns:
Best stream of the specified type or None
"""import av
# Open container and examine packets
container = av.open('sample.mp4')
print(f"Container has {len(container.streams)} streams:")
for i, stream in enumerate(container.streams):
print(f" Stream {i}: {stream.type} ({stream.codec_context.name})")
if stream.language:
print(f" Language: {stream.language}")
if stream.title:
print(f" Title: {stream.title}")
# Process packets directly
packet_count = 0
for packet in container.demux():
stream = packet.stream
print(f"Packet {packet_count}:")
print(f" Stream: {stream.index} ({stream.type})")
print(f" Size: {packet.size} bytes")
print(f" PTS: {packet.pts}")
print(f" DTS: {packet.dts}")
print(f" Duration: {packet.duration}")
print(f" Keyframe: {packet.is_keyframe}")
print(f" Time: {packet.pts * stream.time_base if packet.pts else None}")
packet_count += 1
if packet_count >= 10: # Examine first 10 packets
break
container.close()import av
def analyze_streams(filename):
"""Analyze all streams in a media file."""
container = av.open(filename)
print(f"File: {filename}")
print(f"Format: {container.format.name} ({container.format.long_name})")
print(f"Duration: {container.duration / av.time_base:.2f} seconds")
print(f"Total streams: {len(container.streams)}")
# Video streams
if container.streams.video:
print(f"\nVideo streams ({len(container.streams.video)}):")
for i, stream in enumerate(container.streams.video):
print(f" Stream {stream.index}:")
print(f" Codec: {stream.codec_context.name}")
print(f" Resolution: {stream.width}x{stream.height}")
print(f" Pixel format: {stream.format.name}")
print(f" Frame rate: {stream.framerate}")
print(f" Bitrate: {stream.bit_rate}")
print(f" Duration: {stream.duration * stream.time_base if stream.duration else 'Unknown'}")
# Check disposition
if stream.disposition & av.stream.Disposition.DEFAULT:
print(f" Default: Yes")
if stream.language:
print(f" Language: {stream.language}")
# Audio streams
if container.streams.audio:
print(f"\nAudio streams ({len(container.streams.audio)}):")
for i, stream in enumerate(container.streams.audio):
print(f" Stream {stream.index}:")
print(f" Codec: {stream.codec_context.name}")
print(f" Sample rate: {stream.sample_rate}")
print(f" Channels: {stream.channels}")
print(f" Layout: {stream.layout.name}")
print(f" Format: {stream.format.name}")
print(f" Bitrate: {stream.bit_rate}")
if stream.disposition & av.stream.Disposition.DEFAULT:
print(f" Default: Yes")
if stream.language:
print(f" Language: {stream.language}")
if stream.title:
print(f" Title: {stream.title}")
# Subtitle streams
if container.streams.subtitles:
print(f"\nSubtitle streams ({len(container.streams.subtitles)}):")
for i, stream in enumerate(container.streams.subtitles):
print(f" Stream {stream.index}:")
print(f" Codec: {stream.codec_context.name}")
if stream.language:
print(f" Language: {stream.language}")
if stream.title:
print(f" Title: {stream.title}")
# Check subtitle disposition
if stream.disposition & av.stream.Disposition.FORCED:
print(f" Forced: Yes")
if stream.disposition & av.stream.Disposition.HEARING_IMPAIRED:
print(f" Hearing impaired: Yes")
# Attachment streams (cover art, etc.)
if container.streams.attachments:
print(f"\nAttachment streams ({len(container.streams.attachments)}):")
for stream in container.streams.attachments:
print(f" Stream {stream.index}:")
print(f" Codec: {stream.codec_context.name}")
print(f" MIME type: {stream.mimetype}")
if stream.title:
print(f" Filename: {stream.title}")
# Find best streams
best_video = container.streams.best('video')
best_audio = container.streams.best('audio')
if best_video:
print(f"\nBest video stream: {best_video.index}")
if best_audio:
print(f"Best audio stream: {best_audio.index}")
container.close()
# Analyze file
analyze_streams('movie.mkv')import av
def extract_keyframes(input_file, output_dir):
"""Extract keyframes with precise timing information."""
import os
if not os.path.exists(output_dir):
os.makedirs(output_dir)
container = av.open(input_file)
video_stream = container.streams.video[0]
print(f"Video stream info:")
print(f" Time base: {video_stream.time_base}")
print(f" Frame rate: {video_stream.framerate}")
print(f" Total duration: {container.duration / av.time_base:.2f}s")
keyframe_count = 0
for packet in container.demux(video_stream):
if packet.is_keyframe:
# Decode keyframe packet
for frame in packet.decode():
# Calculate precise timing
pts_seconds = packet.pts * video_stream.time_base if packet.pts else 0
dts_seconds = packet.dts * video_stream.time_base if packet.dts else 0
print(f"Keyframe {keyframe_count}:")
print(f" PTS: {packet.pts} ({pts_seconds:.3f}s)")
print(f" DTS: {packet.dts} ({dts_seconds:.3f}s)")
print(f" Size: {packet.size} bytes")
print(f" Position: {packet.pos}")
# Save keyframe
output_path = os.path.join(
output_dir,
f"keyframe_{keyframe_count:04d}_{pts_seconds:.3f}s.jpg"
)
frame.save(output_path)
keyframe_count += 1
# Limit extraction
if keyframe_count >= 20:
break
if keyframe_count >= 20:
break
container.close()
print(f"Extracted {keyframe_count} keyframes to {output_dir}")
# Extract keyframes
extract_keyframes('video.mp4', 'keyframes/')import av
def process_multi_stream(input_file, output_file):
"""Process multiple streams with different handling."""
input_container = av.open(input_file)
output_container = av.open(output_file, 'w')
# Map input streams to output streams
stream_mapping = {}
# Process video streams
for input_stream in input_container.streams.video:
output_stream = output_container.add_stream('h264', rate=input_stream.framerate)
output_stream.width = input_stream.width // 2 # Half resolution
output_stream.height = input_stream.height // 2
output_stream.pix_fmt = 'yuv420p'
stream_mapping[input_stream.index] = output_stream
print(f"Video stream {input_stream.index}: {input_stream.width}x{input_stream.height} -> {output_stream.width}x{output_stream.height}")
# Process audio streams (copy first audio stream only)
if input_container.streams.audio:
input_stream = input_container.streams.audio[0]
output_stream = output_container.add_stream('aac', rate=input_stream.sample_rate)
output_stream.channels = input_stream.channels
output_stream.layout = input_stream.layout
stream_mapping[input_stream.index] = output_stream
print(f"Audio stream {input_stream.index}: {input_stream.sample_rate}Hz {input_stream.channels}ch")
# Process packets by stream
frame_counts = {}
for packet in input_container.demux():
input_stream_index = packet.stream_index
if input_stream_index not in stream_mapping:
continue # Skip unmapped streams
input_stream = input_container.streams[input_stream_index]
output_stream = stream_mapping[input_stream_index]
# Initialize frame counter
if input_stream_index not in frame_counts:
frame_counts[input_stream_index] = 0
# Decode and process frames
for frame in packet.decode():
if input_stream.type == 'video':
# Resize video frame
resized_frame = frame.reformat(
width=output_stream.width,
height=output_stream.height
)
resized_frame.pts = frame_counts[input_stream_index]
resized_frame.time_base = output_stream.time_base
# Encode and mux
for out_packet in output_stream.encode(resized_frame):
output_container.mux(out_packet)
elif input_stream.type == 'audio':
# Pass through audio (could apply processing here)
frame.pts = frame_counts[input_stream_index] * output_stream.frame_size
frame.time_base = output_stream.time_base
for out_packet in output_stream.encode(frame):
output_container.mux(out_packet)
frame_counts[input_stream_index] += 1
# Flush all encoders
for output_stream in stream_mapping.values():
for packet in output_stream.encode():
output_container.mux(packet)
# Report processing
for stream_index, count in frame_counts.items():
stream_type = input_container.streams[stream_index].type
print(f"Processed {count} {stream_type} frames from stream {stream_index}")
input_container.close()
output_container.close()
# Process multiple streams
process_multi_stream('input.mkv', 'processed.mp4')import av
def copy_with_metadata(input_file, output_file, new_metadata=None):
"""Copy file while modifying stream metadata."""
input_container = av.open(input_file)
output_container = av.open(output_file, 'w')
# Copy container metadata
for key, value in input_container.metadata.items():
output_container.metadata[key] = value
# Add new container metadata
if new_metadata:
for key, value in new_metadata.items():
output_container.metadata[key] = value
# Process streams
for input_stream in input_container.streams:
if input_stream.type == 'video':
output_stream = output_container.add_stream_from_template(input_stream)
# Copy video metadata
for key, value in input_stream.metadata.items():
output_stream.metadata[key] = value
# Set custom metadata
output_stream.metadata['encoder'] = 'PyAV'
output_stream.metadata['processed_by'] = 'Python script'
elif input_stream.type == 'audio':
output_stream = output_container.add_stream_from_template(input_stream)
# Copy and modify audio metadata
for key, value in input_stream.metadata.items():
output_stream.metadata[key] = value
# Language tagging
if not input_stream.language:
output_stream.language = 'eng' # Default to English
# Title modification
if input_stream.title:
output_stream.title = f"Enhanced {input_stream.title}"
else:
output_stream.title = f"Audio Track {input_stream.index}"
# Copy data with metadata preservation
for packet in input_container.demux():
input_stream = packet.stream
output_stream = output_container.streams[input_stream.index]
# Update packet stream reference
packet.stream = output_stream
output_container.mux(packet)
print("Metadata copying complete:")
print(f" Container metadata: {len(output_container.metadata)} entries")
for i, stream in enumerate(output_container.streams):
print(f" Stream {i} metadata: {len(stream.metadata)} entries")
if stream.language:
print(f" Language: {stream.language}")
if stream.title:
print(f" Title: {stream.title}")
input_container.close()
output_container.close()
# Copy with metadata
new_metadata = {
'title': 'Processed Video',
'artist': 'PyAV Processing',
'creation_time': '2024-01-01T00:00:00.000000Z'
}
copy_with_metadata('input.mp4', 'output_with_metadata.mp4', new_metadata)import av
def synchronize_streams(input_file, output_file, audio_delay_ms=0):
"""Synchronize audio and video streams with optional delay."""
input_container = av.open(input_file)
output_container = av.open(output_file, 'w')
# Get streams
video_stream = input_container.streams.video[0]
audio_stream = input_container.streams.audio[0]
# Create output streams
out_video = output_container.add_stream_from_template(video_stream)
out_audio = output_container.add_stream_from_template(audio_stream)
print(f"Input timing:")
print(f" Video time base: {video_stream.time_base}")
print(f" Audio time base: {audio_stream.time_base}")
print(f" Audio delay: {audio_delay_ms}ms")
# Calculate delay in audio time base units
audio_delay_units = int(audio_delay_ms * audio_stream.sample_rate / 1000)
# Track timing
video_pts = 0
audio_pts = audio_delay_units # Start with delay
# Process packets with timing adjustment
for packet in input_container.demux():
if packet.stream == video_stream:
# Process video packets
for frame in packet.decode():
frame.pts = video_pts
frame.time_base = out_video.time_base
for out_packet in out_video.encode(frame):
output_container.mux(out_packet)
video_pts += 1
elif packet.stream == audio_stream:
# Process audio packets with delay
for frame in packet.decode():
frame.pts = audio_pts
frame.time_base = out_audio.time_base
for out_packet in out_audio.encode(frame):
output_container.mux(out_packet)
audio_pts += frame.samples
# Flush encoders
for packet in out_video.encode():
output_container.mux(packet)
for packet in out_audio.encode():
output_container.mux(packet)
print(f"Synchronization complete:")
print(f" Final video PTS: {video_pts}")
print(f" Final audio PTS: {audio_pts}")
print(f" Audio delay applied: {audio_delay_ms}ms")
input_container.close()
output_container.close()
# Synchronize with 100ms audio delay
synchronize_streams('input.mp4', 'synchronized.mp4', audio_delay_ms=100)Install with Tessl CLI
npx tessl i tessl/pypi-av