0
# Media Analysis and Visualization
1
2
Tools for analyzing media file properties, extracting metadata, and visualizing complex filter graphs for debugging and development purposes.
3
4
## Capabilities
5
6
### Media Analysis and Probing
7
8
Extract comprehensive metadata and technical information from media files using FFprobe.
9
10
```python { .api }
11
def probe(filename: str, cmd: str = 'ffprobe', **kwargs) -> dict:
12
"""
13
Run ffprobe on the specified file and return JSON representation.
14
15
Parameters:
16
- filename: str, path to media file to analyze
17
- cmd: str, ffprobe command path (default: 'ffprobe')
18
- **kwargs: additional ffprobe options as keyword arguments
19
20
Returns:
21
dict: JSON data containing streams, format, and metadata information
22
23
Raises:
24
ffmpeg.Error: if ffprobe returns non-zero exit code
25
"""
26
```
27
28
**Usage Example:**
29
```python
30
import ffmpeg
31
import json
32
33
# Basic media probing
34
probe_data = ffmpeg.probe('video.mp4')
35
36
# Access format information
37
format_info = probe_data['format']
38
print(f"Duration: {format_info['duration']} seconds")
39
print(f"Bitrate: {format_info['bit_rate']} bps")
40
print(f"Size: {format_info['size']} bytes")
41
42
# Access stream information
43
streams = probe_data['streams']
44
for i, stream in enumerate(streams):
45
print(f"Stream {i}:")
46
print(f" Codec: {stream['codec_name']}")
47
print(f" Type: {stream['codec_type']}")
48
49
if stream['codec_type'] == 'video':
50
print(f" Resolution: {stream['width']}x{stream['height']}")
51
print(f" Frame rate: {stream['r_frame_rate']}")
52
print(f" Pixel format: {stream['pix_fmt']}")
53
elif stream['codec_type'] == 'audio':
54
print(f" Sample rate: {stream['sample_rate']} Hz")
55
print(f" Channels: {stream['channels']}")
56
print(f" Sample format: {stream.get('sample_fmt', 'N/A')}")
57
58
# Probe with custom options
59
detailed_probe = ffmpeg.probe(
60
'video.mp4',
61
select_streams='v:0', # Only video stream 0
62
show_entries='stream=width,height,duration,bit_rate'
63
)
64
```
65
66
**Common Probe Data Structure:**
67
```python
68
{
69
"streams": [
70
{
71
"index": 0,
72
"codec_name": "h264",
73
"codec_type": "video",
74
"width": 1920,
75
"height": 1080,
76
"r_frame_rate": "30/1",
77
"duration": "120.5",
78
"bit_rate": "2000000",
79
"pix_fmt": "yuv420p"
80
},
81
{
82
"index": 1,
83
"codec_name": "aac",
84
"codec_type": "audio",
85
"sample_rate": "48000",
86
"channels": 2,
87
"duration": "120.5",
88
"bit_rate": "128000"
89
}
90
],
91
"format": {
92
"filename": "video.mp4",
93
"nb_streams": 2,
94
"format_name": "mov,mp4,m4a,3gp,3g2,mj2",
95
"duration": "120.500000",
96
"size": "32000000",
97
"bit_rate": "2128000"
98
}
99
}
100
```
101
102
### Filter Graph Visualization
103
104
Visualize complex FFmpeg filter graphs using Graphviz for debugging and development.
105
106
```python { .api }
107
def view(stream_spec, detail: bool = False, filename: str = None, pipe: bool = False, **kwargs):
108
"""
109
Visualize the filter graph using Graphviz.
110
111
Parameters:
112
- stream_spec: Stream or stream specification to visualize
113
- detail: bool, show detailed node information (args, kwargs)
114
- filename: str, output filename for visualization (auto-generated if None)
115
- pipe: bool, return raw graphviz data instead of saving to file
116
- show_labels: bool, show edge labels (default: True)
117
118
Returns:
119
Stream specification (for chaining) or bytes (if pipe=True)
120
121
Raises:
122
ImportError: if graphviz package is not installed
123
ValueError: if both filename and pipe are specified
124
"""
125
```
126
127
**Usage Example:**
128
```python
129
import ffmpeg
130
131
# Create complex filter graph
132
input1 = ffmpeg.input('video1.mp4')
133
input2 = ffmpeg.input('video2.mp4')
134
overlay = ffmpeg.input('logo.png')
135
136
complex_graph = (
137
ffmpeg
138
.concat(
139
input1.trim(start=0, duration=30),
140
input2.trim(start=60, duration=30)
141
)
142
.overlay(overlay.hflip(), x=10, y=10)
143
.drawtext('Processed', x=100, y=50, fontsize=24)
144
.output('result.mp4')
145
)
146
147
# Basic visualization
148
complex_graph.view(filename='graph.png')
149
150
# Detailed visualization with node parameters
151
complex_graph.view(detail=True, filename='detailed_graph.png')
152
153
# Get visualization data without saving
154
graph_data = complex_graph.view(pipe=True)
155
156
# Visualization in Jupyter notebooks
157
from IPython.display import Image, display
158
graph_bytes = complex_graph.view(pipe=True)
159
display(Image(graph_bytes))
160
```
161
162
## Analysis Helper Functions
163
164
### Media Information Extraction
165
166
```python
167
def get_video_info(filename):
168
"""Extract key video information."""
169
probe_data = ffmpeg.probe(filename)
170
171
video_stream = None
172
audio_stream = None
173
174
for stream in probe_data['streams']:
175
if stream['codec_type'] == 'video' and video_stream is None:
176
video_stream = stream
177
elif stream['codec_type'] == 'audio' and audio_stream is None:
178
audio_stream = stream
179
180
info = {
181
'duration': float(probe_data['format']['duration']),
182
'size': int(probe_data['format']['size']),
183
'bitrate': int(probe_data['format']['bit_rate'])
184
}
185
186
if video_stream:
187
info.update({
188
'width': video_stream['width'],
189
'height': video_stream['height'],
190
'fps': eval(video_stream['r_frame_rate']),
191
'video_codec': video_stream['codec_name'],
192
'pixel_format': video_stream['pix_fmt']
193
})
194
195
if audio_stream:
196
info.update({
197
'audio_codec': audio_stream['codec_name'],
198
'sample_rate': int(audio_stream['sample_rate']),
199
'channels': audio_stream['channels']
200
})
201
202
return info
203
204
# Usage
205
info = get_video_info('video.mp4')
206
print(f"Video: {info['width']}x{info['height']} at {info['fps']} fps")
207
print(f"Duration: {info['duration']} seconds")
208
```
209
210
### Stream Validation
211
212
```python
213
def validate_streams(*filenames):
214
"""Validate that multiple files have compatible streams for processing."""
215
stream_info = []
216
217
for filename in filenames:
218
try:
219
probe_data = ffmpeg.probe(filename)
220
video_streams = [s for s in probe_data['streams'] if s['codec_type'] == 'video']
221
audio_streams = [s for s in probe_data['streams'] if s['codec_type'] == 'audio']
222
223
stream_info.append({
224
'filename': filename,
225
'video_streams': len(video_streams),
226
'audio_streams': len(audio_streams),
227
'video_codec': video_streams[0]['codec_name'] if video_streams else None,
228
'audio_codec': audio_streams[0]['codec_name'] if audio_streams else None,
229
'resolution': f"{video_streams[0]['width']}x{video_streams[0]['height']}" if video_streams else None
230
})
231
except ffmpeg.Error as e:
232
print(f"Error probing {filename}: {e}")
233
stream_info.append({'filename': filename, 'error': str(e)})
234
235
return stream_info
236
237
# Usage
238
files = ['clip1.mp4', 'clip2.mp4', 'clip3.mp4']
239
validation_results = validate_streams(*files)
240
241
for result in validation_results:
242
if 'error' in result:
243
print(f"ERROR: {result['filename']} - {result['error']}")
244
else:
245
print(f"OK: {result['filename']} - {result['resolution']} - {result['video_codec']}")
246
```
247
248
## Development and Debugging Patterns
249
250
### Command Inspection
251
252
```python
253
# Build and inspect commands before execution
254
stream = (
255
ffmpeg
256
.input('input.mp4')
257
.filter('scale', 640, 480)
258
.filter('fps', fps=30)
259
.output('output.mp4', vcodec='libx264')
260
)
261
262
# Get the command that would be executed
263
cmd = ffmpeg.compile(stream)
264
print("Command:", ' '.join(cmd))
265
266
# Visualize the processing graph
267
stream.view(detail=True)
268
269
# Execute with error capture for debugging
270
try:
271
ffmpeg.run(stream, capture_stderr=True)
272
except ffmpeg.Error as e:
273
print("FFmpeg stderr:")
274
print(e.stderr.decode())
275
```
276
277
### Performance Analysis
278
279
```python
280
import time
281
import tempfile
282
283
def benchmark_processing(input_file, operations):
284
"""Benchmark different processing operations."""
285
results = {}
286
287
for name, operation in operations.items():
288
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
289
temp_path = temp_file.name
290
291
try:
292
stream = operation(ffmpeg.input(input_file)).output(temp_path)
293
294
start_time = time.time()
295
ffmpeg.run(stream, quiet=True, overwrite_output=True)
296
end_time = time.time()
297
298
# Get output file info
299
probe_data = ffmpeg.probe(temp_path)
300
size = int(probe_data['format']['size'])
301
302
results[name] = {
303
'duration': end_time - start_time,
304
'output_size': size
305
}
306
307
except ffmpeg.Error as e:
308
results[name] = {'error': str(e)}
309
finally:
310
try:
311
os.unlink(temp_path)
312
except:
313
pass
314
315
return results
316
317
# Usage
318
operations = {
319
'hflip': lambda input_stream: input_stream.hflip(),
320
'scale_720p': lambda input_stream: input_stream.filter('scale', 1280, 720),
321
'blur': lambda input_stream: input_stream.filter('boxblur', '10:1'),
322
}
323
324
benchmark_results = benchmark_processing('test_video.mp4', operations)
325
for op, result in benchmark_results.items():
326
if 'error' in result:
327
print(f"{op}: ERROR - {result['error']}")
328
else:
329
print(f"{op}: {result['duration']:.2f}s, {result['output_size']} bytes")
330
```
331
332
### Interactive Development
333
334
```python
335
# Create reusable processing functions
336
def create_thumbnail(input_file, output_file, timestamp=1.0, size="320x240"):
337
"""Generate video thumbnail at specific timestamp."""
338
return (
339
ffmpeg
340
.input(input_file, ss=timestamp)
341
.filter('scale', *size.split('x'))
342
.output(output_file, vframes=1)
343
)
344
345
def add_watermark(input_file, watermark_file, output_file, position='bottom-right'):
346
"""Add watermark to video."""
347
input_stream = ffmpeg.input(input_file)
348
watermark = ffmpeg.input(watermark_file)
349
350
if position == 'bottom-right':
351
x, y = 'main_w-overlay_w-10', 'main_h-overlay_h-10'
352
elif position == 'top-left':
353
x, y = '10', '10'
354
else:
355
x, y = '10', '10' # Default
356
357
return input_stream.overlay(watermark, x=x, y=y).output(output_file)
358
359
# Use in development workflow
360
input_video = 'source.mp4'
361
362
# Generate thumbnail for preview
363
thumbnail = create_thumbnail(input_video, 'thumb.jpg', timestamp=5.0)
364
thumbnail.view() # Visualize processing
365
ffmpeg.run(thumbnail)
366
367
# Add watermark with visualization
368
watermarked = add_watermark(input_video, 'logo.png', 'final.mp4')
369
watermarked.view(detail=True) # Debug the graph
370
ffmpeg.run(watermarked)
371
```