0
# Content Processing
1
2
Flexible content viewing, transformation, and analysis with support for various data formats and encoding schemes. Includes syntax highlighting, interactive content exploration, and extensible content view system.
3
4
## Capabilities
5
6
### Content View System
7
8
Extensible system for viewing and processing different content types.
9
10
```python { .api }
11
class Contentview:
12
"""
13
Base class for content viewers.
14
15
Content viewers transform raw bytes into human-readable representations
16
with optional syntax highlighting and formatting.
17
"""
18
name: str
19
content_types: List[str]
20
21
def __call__(self, data: bytes, **metadata) -> Tuple[str, Iterator[Tuple[str, bytes]]]:
22
"""
23
Transform content for viewing.
24
25
Parameters:
26
- data: Raw content bytes
27
- **metadata: Additional metadata (content_type, etc.)
28
29
Returns:
30
- Tuple of (description, formatted_lines)
31
"""
32
33
class InteractiveContentview(Contentview):
34
"""
35
Interactive content viewer with user input handling.
36
37
Extends basic content viewing with interactive capabilities
38
for exploring complex data structures.
39
"""
40
def render_priority(self, data: bytes, **metadata) -> float:
41
"""
42
Return priority for this viewer (higher = preferred).
43
44
Parameters:
45
- data: Content to potentially view
46
- **metadata: Content metadata
47
48
Returns:
49
- Priority score (0.0 to 1.0)
50
"""
51
52
class SyntaxHighlight(Contentview):
53
"""
54
Syntax highlighting content viewer.
55
56
Provides syntax highlighting for code and structured data formats.
57
"""
58
59
def add(view: Contentview) -> None:
60
"""
61
Register a custom content view.
62
63
Parameters:
64
- view: Content view instance to register
65
"""
66
67
class Metadata:
68
"""
69
Content metadata container.
70
71
Holds information about content type, encoding, and other properties
72
used by content viewers for processing decisions.
73
"""
74
content_type: Optional[str]
75
charset: Optional[str]
76
filename: Optional[str]
77
size: int
78
```
79
80
### Encoding Utilities
81
82
Content encoding and decoding support for various compression and transformation schemes.
83
84
```python { .api }
85
def encode(data: bytes, encoding: str) -> bytes:
86
"""
87
Encode content using specified encoding scheme.
88
89
Parameters:
90
- data: Raw content bytes to encode
91
- encoding: Encoding scheme name (gzip, deflate, brotli, etc.)
92
93
Returns:
94
- Encoded content bytes
95
96
Raises:
97
- ValueError: If encoding scheme is not supported
98
"""
99
100
def decode(data: bytes, encoding: str) -> bytes:
101
"""
102
Decode content using specified encoding scheme.
103
104
Parameters:
105
- data: Encoded content bytes to decode
106
- encoding: Encoding scheme name (gzip, deflate, brotli, etc.)
107
108
Returns:
109
- Decoded content bytes
110
111
Raises:
112
- ValueError: If encoding scheme is not supported or data is invalid
113
"""
114
115
# Supported encoding schemes
116
ENCODINGS = {
117
"gzip": "GNU zip compression",
118
"deflate": "DEFLATE compression",
119
"brotli": "Brotli compression",
120
"identity": "No encoding (pass-through)",
121
"compress": "Unix compress format",
122
"x-gzip": "Legacy gzip",
123
"x-deflate": "Legacy deflate"
124
}
125
```
126
127
## Usage Examples
128
129
### Custom Content Viewer
130
131
```python
132
from mitmproxy import contentviews
133
from mitmproxy.contentviews import base
134
import json
135
import yaml
136
137
class YAMLContentView(base.Contentview):
138
"""Custom content viewer for YAML files."""
139
140
name = "YAML"
141
content_types = ["application/yaml", "application/x-yaml", "text/yaml"]
142
143
def __call__(self, data, **metadata):
144
try:
145
# Parse YAML content
146
parsed = yaml.safe_load(data.decode('utf-8'))
147
148
# Convert to pretty-printed JSON for display
149
formatted = json.dumps(parsed, indent=2, ensure_ascii=False)
150
151
# Return formatted content with syntax highlighting
152
lines = []
153
for i, line in enumerate(formatted.split('\n')):
154
# Simple syntax highlighting for JSON
155
if line.strip().startswith('"') and ':' in line:
156
# Key lines
157
lines.append(("text", f"{i+1:4d} "), ("key", line.encode('utf-8')))
158
elif line.strip() in ['{', '}', '[', ']']:
159
# Structural lines
160
lines.append(("text", f"{i+1:4d} "), ("punctuation", line.encode('utf-8')))
161
else:
162
# Value lines
163
lines.append(("text", f"{i+1:4d} "), ("value", line.encode('utf-8')))
164
165
return "YAML", lines
166
167
except (yaml.YAMLError, UnicodeDecodeError) as e:
168
return "YAML (parse error)", [("error", str(e).encode('utf-8'))]
169
170
def render_priority(self, data, **metadata):
171
# High priority for YAML content types
172
content_type = metadata.get("content_type", "")
173
if any(ct in content_type for ct in self.content_types):
174
return 0.9
175
176
# Medium priority if content looks like YAML
177
try:
178
text = data.decode('utf-8')
179
if any(indicator in text[:100] for indicator in ['---', '- ', ': ']):
180
return 0.5
181
except UnicodeDecodeError:
182
pass
183
184
return 0.0
185
186
# Register the custom viewer
187
contentviews.add(YAMLContentView())
188
189
class XMLContentView(base.Contentview):
190
"""Custom content viewer for XML with pretty printing."""
191
192
name = "XML Pretty"
193
content_types = ["application/xml", "text/xml"]
194
195
def __call__(self, data, **metadata):
196
try:
197
import xml.etree.ElementTree as ET
198
from xml.dom import minidom
199
200
# Parse and pretty-print XML
201
root = ET.fromstring(data)
202
rough_string = ET.tostring(root, encoding='unicode')
203
reparsed = minidom.parseString(rough_string)
204
pretty = reparsed.toprettyxml(indent=" ")
205
206
# Remove empty lines
207
lines = [line for line in pretty.split('\n') if line.strip()]
208
209
# Format for display with line numbers
210
formatted_lines = []
211
for i, line in enumerate(lines):
212
formatted_lines.append(("text", f"{i+1:4d} "), ("xml", line.encode('utf-8')))
213
214
return f"XML ({len(lines)} lines)", formatted_lines
215
216
except ET.ParseError as e:
217
return "XML (parse error)", [("error", str(e).encode('utf-8'))]
218
219
def render_priority(self, data, **metadata):
220
content_type = metadata.get("content_type", "")
221
if any(ct in content_type for ct in self.content_types):
222
return 0.8
223
224
# Check if content starts with XML declaration
225
try:
226
text = data.decode('utf-8').strip()
227
if text.startswith('<?xml') or text.startswith('<'):
228
return 0.6
229
except UnicodeDecodeError:
230
pass
231
232
return 0.0
233
234
contentviews.add(XMLContentView())
235
```
236
237
### Content Processing in Addons
238
239
```python
240
from mitmproxy import http, contentviews
241
from mitmproxy.net import encoding
242
import mitmproxy.ctx as ctx
243
import gzip
244
import json
245
246
class ContentProcessorAddon:
247
"""Addon for comprehensive content processing."""
248
249
def response(self, flow: http.HTTPFlow):
250
"""Process response content."""
251
if not flow.response:
252
return
253
254
content_type = flow.response.headers.get("content-type", "")
255
content_encoding = flow.response.headers.get("content-encoding", "")
256
257
# Decode compressed content
258
if content_encoding:
259
try:
260
decoded_content = encoding.decode(flow.response.content, content_encoding)
261
ctx.log.info(f"Decoded {content_encoding} content: {len(flow.response.content)} -> {len(decoded_content)} bytes")
262
263
# Store original for potential re-encoding
264
flow.metadata["original_encoding"] = content_encoding
265
flow.metadata["original_content"] = flow.response.content
266
267
# Update response with decoded content
268
flow.response.content = decoded_content
269
del flow.response.headers["content-encoding"]
270
271
except ValueError as e:
272
ctx.log.error(f"Failed to decode {content_encoding}: {e}")
273
274
# Process JSON content
275
if "application/json" in content_type:
276
self.process_json_content(flow)
277
278
# Process HTML content
279
elif "text/html" in content_type:
280
self.process_html_content(flow)
281
282
# Process image content
283
elif content_type.startswith("image/"):
284
self.process_image_content(flow)
285
286
def process_json_content(self, flow: http.HTTPFlow):
287
"""Process JSON response content."""
288
try:
289
data = flow.response.json()
290
291
# Log JSON structure
292
ctx.log.info(f"JSON response structure: {type(data).__name__}")
293
if isinstance(data, dict):
294
ctx.log.info(f"JSON keys: {list(data.keys())}")
295
elif isinstance(data, list):
296
ctx.log.info(f"JSON array length: {len(data)}")
297
298
# Pretty-print JSON for debugging
299
pretty_json = json.dumps(data, indent=2, ensure_ascii=False)
300
ctx.log.info(f"JSON content preview:\n{pretty_json[:500]}...")
301
302
# Could modify JSON data here
303
if isinstance(data, dict) and "debug" not in data:
304
data["debug"] = {"processed_by": "mitmproxy", "timestamp": flow.response.timestamp_start}
305
flow.response.set_text(json.dumps(data))
306
307
except ValueError as e:
308
ctx.log.error(f"Invalid JSON in response: {e}")
309
310
def process_html_content(self, flow: http.HTTPFlow):
311
"""Process HTML response content."""
312
try:
313
html_content = flow.response.get_text()
314
315
# Log HTML info
316
title_start = html_content.find("<title>")
317
title_end = html_content.find("</title>")
318
if title_start != -1 and title_end != -1:
319
title = html_content[title_start + 7:title_end]
320
ctx.log.info(f"HTML page title: {title}")
321
322
# Count common elements
323
element_counts = {
324
"links": html_content.count("<a "),
325
"images": html_content.count("<img "),
326
"scripts": html_content.count("<script"),
327
"forms": html_content.count("<form")
328
}
329
ctx.log.info(f"HTML elements: {element_counts}")
330
331
except UnicodeDecodeError as e:
332
ctx.log.error(f"Failed to decode HTML: {e}")
333
334
def process_image_content(self, flow: http.HTTPFlow):
335
"""Process image response content."""
336
content_type = flow.response.headers.get("content-type", "")
337
content_size = len(flow.response.content)
338
339
ctx.log.info(f"Image: {content_type}, {content_size} bytes")
340
341
# Could analyze image properties here
342
if content_type == "image/jpeg":
343
# Simple JPEG header analysis
344
if flow.response.content.startswith(b'\xff\xd8\xff'):
345
ctx.log.info("Valid JPEG header detected")
346
elif content_type == "image/png":
347
# PNG header analysis
348
if flow.response.content.startswith(b'\x89PNG\r\n\x1a\n'):
349
ctx.log.info("Valid PNG header detected")
350
351
addons = [ContentProcessorAddon()]
352
```
353
354
### Advanced Content Analysis
355
356
```python
357
from mitmproxy import http
358
import mitmproxy.ctx as ctx
359
import hashlib
360
import magic # python-magic library for file type detection
361
import re
362
363
class ContentAnalyzerAddon:
364
"""Advanced content analysis and classification."""
365
366
def __init__(self):
367
self.content_stats = {
368
"total_bytes": 0,
369
"content_types": {},
370
"encodings": {},
371
"file_types": {}
372
}
373
374
def response(self, flow: http.HTTPFlow):
375
"""Analyze response content comprehensively."""
376
if not flow.response or not flow.response.content:
377
return
378
379
content = flow.response.content
380
content_size = len(content)
381
content_type = flow.response.headers.get("content-type", "unknown")
382
content_encoding = flow.response.headers.get("content-encoding", "none")
383
384
# Update statistics
385
self.content_stats["total_bytes"] += content_size
386
self.content_stats["content_types"][content_type] = self.content_stats["content_types"].get(content_type, 0) + 1
387
self.content_stats["encodings"][content_encoding] = self.content_stats["encodings"].get(content_encoding, 0) + 1
388
389
# Detect actual file type using magic numbers
390
try:
391
detected_type = magic.from_buffer(content, mime=True)
392
self.content_stats["file_types"][detected_type] = self.content_stats["file_types"].get(detected_type, 0) + 1
393
394
# Check for content type mismatch
395
if detected_type != content_type.split(';')[0]:
396
ctx.log.warn(f"Content type mismatch: declared={content_type}, detected={detected_type}")
397
398
except Exception as e:
399
ctx.log.error(f"File type detection failed: {e}")
400
401
# Calculate content hash
402
content_hash = hashlib.sha256(content).hexdigest()[:16]
403
404
# Security analysis
405
self.analyze_security(flow, content, content_type)
406
407
# Performance analysis
408
self.analyze_performance(flow, content, content_size)
409
410
# Log analysis summary
411
ctx.log.info(f"Content analysis: {flow.request.url}")
412
ctx.log.info(f" Size: {content_size} bytes, Type: {content_type}")
413
ctx.log.info(f" Hash: {content_hash}, Encoding: {content_encoding}")
414
415
def analyze_security(self, flow, content, content_type):
416
"""Analyze content for security issues."""
417
security_issues = []
418
419
# Check for potential XSS in HTML
420
if "text/html" in content_type:
421
try:
422
html_text = content.decode('utf-8', errors='ignore')
423
424
# Simple XSS pattern detection
425
xss_patterns = [
426
r'<script[^>]*>.*?javascript:',
427
r'on\w+\s*=\s*["\'].*?javascript:',
428
r'<iframe[^>]*src\s*=\s*["\']javascript:',
429
]
430
431
for pattern in xss_patterns:
432
if re.search(pattern, html_text, re.IGNORECASE | re.DOTALL):
433
security_issues.append("Potential XSS vector detected")
434
break
435
436
# Check for inline scripts
437
if '<script' in html_text and 'javascript:' in html_text:
438
security_issues.append("Inline JavaScript detected")
439
440
except UnicodeDecodeError:
441
pass
442
443
# Check for exposed sensitive data in JSON
444
elif "application/json" in content_type:
445
try:
446
json_text = content.decode('utf-8', errors='ignore').lower()
447
448
sensitive_keywords = ['password', 'token', 'secret', 'key', 'api_key', 'private']
449
for keyword in sensitive_keywords:
450
if keyword in json_text:
451
security_issues.append(f"Potentially sensitive data: {keyword}")
452
453
except UnicodeDecodeError:
454
pass
455
456
# Log security issues
457
if security_issues:
458
ctx.log.warn(f"Security analysis for {flow.request.url}:")
459
for issue in security_issues:
460
ctx.log.warn(f" - {issue}")
461
462
def analyze_performance(self, flow, content, content_size):
463
"""Analyze content for performance implications."""
464
performance_notes = []
465
466
# Large content warning
467
if content_size > 1024 * 1024: # > 1MB
468
performance_notes.append(f"Large response: {content_size / (1024*1024):.2f} MB")
469
470
# Check compression effectiveness
471
content_encoding = flow.response.headers.get("content-encoding", "")
472
if not content_encoding and content_size > 1024: # > 1KB uncompressed
473
performance_notes.append("Content could benefit from compression")
474
475
# Check caching headers
476
cache_control = flow.response.headers.get("cache-control", "")
477
expires = flow.response.headers.get("expires", "")
478
etag = flow.response.headers.get("etag", "")
479
480
if not any([cache_control, expires, etag]):
481
performance_notes.append("No caching headers present")
482
483
# Log performance notes
484
if performance_notes:
485
ctx.log.info(f"Performance analysis for {flow.request.url}:")
486
for note in performance_notes:
487
ctx.log.info(f" - {note}")
488
489
def done(self):
490
"""Log final content statistics."""
491
stats = self.content_stats
492
ctx.log.info("Content Analysis Summary:")
493
ctx.log.info(f" Total bytes processed: {stats['total_bytes']:,}")
494
ctx.log.info(f" Unique content types: {len(stats['content_types'])}")
495
ctx.log.info(f" Most common content type: {max(stats['content_types'], key=stats['content_types'].get) if stats['content_types'] else 'None'}")
496
ctx.log.info(f" Encoding distribution: {dict(list(stats['encodings'].items())[:5])}")
497
498
addons = [ContentAnalyzerAddon()]
499
```
500
501
### Content Transformation
502
503
```python
504
from mitmproxy import http
505
from mitmproxy.net import encoding
506
import mitmproxy.ctx as ctx
507
import json
508
import re
509
510
class ContentTransformerAddon:
511
"""Transform content based on rules and filters."""
512
513
def __init__(self):
514
self.transformation_rules = {
515
# URL pattern -> transformation function
516
r".*\.json$": self.transform_json,
517
r".*/api/.*": self.transform_api_response,
518
r".*\.html$": self.transform_html,
519
}
520
521
def response(self, flow: http.HTTPFlow):
522
"""Apply content transformations based on URL patterns."""
523
if not flow.response:
524
return
525
526
url = flow.request.url
527
528
# Find matching transformation rules
529
for pattern, transform_func in self.transformation_rules.items():
530
if re.match(pattern, url):
531
try:
532
transform_func(flow)
533
except Exception as e:
534
ctx.log.error(f"Transformation failed for {url}: {e}")
535
536
def transform_json(self, flow: http.HTTPFlow):
537
"""Transform JSON responses."""
538
try:
539
data = flow.response.json()
540
541
# Add metadata to all JSON responses
542
if isinstance(data, dict):
543
data["_metadata"] = {
544
"processed_by": "mitmproxy",
545
"original_size": len(flow.response.content),
546
"url": flow.request.url
547
}
548
549
# Pretty-format JSON
550
flow.response.set_text(json.dumps(data, indent=2, ensure_ascii=False))
551
552
ctx.log.info(f"Transformed JSON response: {flow.request.url}")
553
554
except ValueError:
555
ctx.log.warn(f"Failed to parse JSON: {flow.request.url}")
556
557
def transform_api_response(self, flow: http.HTTPFlow):
558
"""Transform API responses with additional headers."""
559
# Add API processing headers
560
flow.response.headers["X-API-Processed"] = "true"
561
flow.response.headers["X-Processing-Time"] = str(int(time.time()))
562
563
# Add CORS headers for development
564
flow.response.headers["Access-Control-Allow-Origin"] = "*"
565
flow.response.headers["Access-Control-Allow-Methods"] = "GET,POST,PUT,DELETE,OPTIONS"
566
flow.response.headers["Access-Control-Allow-Headers"] = "Content-Type,Authorization"
567
568
ctx.log.info(f"Transformed API response: {flow.request.url}")
569
570
def transform_html(self, flow: http.HTTPFlow):
571
"""Transform HTML responses."""
572
try:
573
html_content = flow.response.get_text()
574
575
# Inject debugging script
576
debug_script = """
577
<script>
578
console.log('Page processed by mitmproxy');
579
window.mitmproxy_processed = true;
580
</script>
581
"""
582
583
# Insert before closing </body> tag
584
if "</body>" in html_content:
585
html_content = html_content.replace("</body>", debug_script + "</body>")
586
else:
587
html_content += debug_script
588
589
# Add meta tag
590
meta_tag = '<meta name="processed-by" content="mitmproxy">'
591
if "<head>" in html_content:
592
html_content = html_content.replace("<head>", "<head>" + meta_tag)
593
594
flow.response.set_text(html_content)
595
596
ctx.log.info(f"Transformed HTML response: {flow.request.url}")
597
598
except UnicodeDecodeError:
599
ctx.log.warn(f"Failed to decode HTML: {flow.request.url}")
600
601
addons = [ContentTransformerAddon()]
602
```