An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.
—
Flexible content viewing, transformation, and analysis with support for various data formats and encoding schemes. Includes syntax highlighting, interactive content exploration, and extensible content view system.
Extensible system for viewing and processing different content types.
class Contentview:
"""
Base class for content viewers.
Content viewers transform raw bytes into human-readable representations
with optional syntax highlighting and formatting.
"""
name: str
content_types: List[str]
def __call__(self, data: bytes, **metadata) -> Tuple[str, Iterator[Tuple[str, bytes]]]:
"""
Transform content for viewing.
Parameters:
- data: Raw content bytes
- **metadata: Additional metadata (content_type, etc.)
Returns:
- Tuple of (description, formatted_lines)
"""
class InteractiveContentview(Contentview):
"""
Interactive content viewer with user input handling.
Extends basic content viewing with interactive capabilities
for exploring complex data structures.
"""
def render_priority(self, data: bytes, **metadata) -> float:
"""
Return priority for this viewer (higher = preferred).
Parameters:
- data: Content to potentially view
- **metadata: Content metadata
Returns:
- Priority score (0.0 to 1.0)
"""
class SyntaxHighlight(Contentview):
"""
Syntax highlighting content viewer.
Provides syntax highlighting for code and structured data formats.
"""
def add(view: Contentview) -> None:
"""
Register a custom content view.
Parameters:
- view: Content view instance to register
"""
class Metadata:
"""
Content metadata container.
Holds information about content type, encoding, and other properties
used by content viewers for processing decisions.
"""
content_type: Optional[str]
charset: Optional[str]
filename: Optional[str]
size: intContent encoding and decoding support for various compression and transformation schemes.
def encode(data: bytes, encoding: str) -> bytes:
"""
Encode content using specified encoding scheme.
Parameters:
- data: Raw content bytes to encode
- encoding: Encoding scheme name (gzip, deflate, brotli, etc.)
Returns:
- Encoded content bytes
Raises:
- ValueError: If encoding scheme is not supported
"""
def decode(data: bytes, encoding: str) -> bytes:
"""
Decode content using specified encoding scheme.
Parameters:
- data: Encoded content bytes to decode
- encoding: Encoding scheme name (gzip, deflate, brotli, etc.)
Returns:
- Decoded content bytes
Raises:
- ValueError: If encoding scheme is not supported or data is invalid
"""
# Supported encoding schemes
ENCODINGS = {
"gzip": "GNU zip compression",
"deflate": "DEFLATE compression",
"brotli": "Brotli compression",
"identity": "No encoding (pass-through)",
"compress": "Unix compress format",
"x-gzip": "Legacy gzip",
"x-deflate": "Legacy deflate"
}from mitmproxy import contentviews
from mitmproxy.contentviews import base
import json
import yaml
class YAMLContentView(base.Contentview):
"""Custom content viewer for YAML files."""
name = "YAML"
content_types = ["application/yaml", "application/x-yaml", "text/yaml"]
def __call__(self, data, **metadata):
try:
# Parse YAML content
parsed = yaml.safe_load(data.decode('utf-8'))
# Convert to pretty-printed JSON for display
formatted = json.dumps(parsed, indent=2, ensure_ascii=False)
# Return formatted content with syntax highlighting
lines = []
for i, line in enumerate(formatted.split('\n')):
# Simple syntax highlighting for JSON
if line.strip().startswith('"') and ':' in line:
# Key lines
lines.append(("text", f"{i+1:4d} "), ("key", line.encode('utf-8')))
elif line.strip() in ['{', '}', '[', ']']:
# Structural lines
lines.append(("text", f"{i+1:4d} "), ("punctuation", line.encode('utf-8')))
else:
# Value lines
lines.append(("text", f"{i+1:4d} "), ("value", line.encode('utf-8')))
return "YAML", lines
except (yaml.YAMLError, UnicodeDecodeError) as e:
return "YAML (parse error)", [("error", str(e).encode('utf-8'))]
def render_priority(self, data, **metadata):
# High priority for YAML content types
content_type = metadata.get("content_type", "")
if any(ct in content_type for ct in self.content_types):
return 0.9
# Medium priority if content looks like YAML
try:
text = data.decode('utf-8')
if any(indicator in text[:100] for indicator in ['---', '- ', ': ']):
return 0.5
except UnicodeDecodeError:
pass
return 0.0
# Register the custom viewer
contentviews.add(YAMLContentView())
class XMLContentView(base.Contentview):
"""Custom content viewer for XML with pretty printing."""
name = "XML Pretty"
content_types = ["application/xml", "text/xml"]
def __call__(self, data, **metadata):
try:
import xml.etree.ElementTree as ET
from xml.dom import minidom
# Parse and pretty-print XML
root = ET.fromstring(data)
rough_string = ET.tostring(root, encoding='unicode')
reparsed = minidom.parseString(rough_string)
pretty = reparsed.toprettyxml(indent=" ")
# Remove empty lines
lines = [line for line in pretty.split('\n') if line.strip()]
# Format for display with line numbers
formatted_lines = []
for i, line in enumerate(lines):
formatted_lines.append(("text", f"{i+1:4d} "), ("xml", line.encode('utf-8')))
return f"XML ({len(lines)} lines)", formatted_lines
except ET.ParseError as e:
return "XML (parse error)", [("error", str(e).encode('utf-8'))]
def render_priority(self, data, **metadata):
content_type = metadata.get("content_type", "")
if any(ct in content_type for ct in self.content_types):
return 0.8
# Check if content starts with XML declaration
try:
text = data.decode('utf-8').strip()
if text.startswith('<?xml') or text.startswith('<'):
return 0.6
except UnicodeDecodeError:
pass
return 0.0
contentviews.add(XMLContentView())from mitmproxy import http, contentviews
from mitmproxy.net import encoding
import mitmproxy.ctx as ctx
import gzip
import json
class ContentProcessorAddon:
"""Addon for comprehensive content processing."""
def response(self, flow: http.HTTPFlow):
"""Process response content."""
if not flow.response:
return
content_type = flow.response.headers.get("content-type", "")
content_encoding = flow.response.headers.get("content-encoding", "")
# Decode compressed content
if content_encoding:
try:
decoded_content = encoding.decode(flow.response.content, content_encoding)
ctx.log.info(f"Decoded {content_encoding} content: {len(flow.response.content)} -> {len(decoded_content)} bytes")
# Store original for potential re-encoding
flow.metadata["original_encoding"] = content_encoding
flow.metadata["original_content"] = flow.response.content
# Update response with decoded content
flow.response.content = decoded_content
del flow.response.headers["content-encoding"]
except ValueError as e:
ctx.log.error(f"Failed to decode {content_encoding}: {e}")
# Process JSON content
if "application/json" in content_type:
self.process_json_content(flow)
# Process HTML content
elif "text/html" in content_type:
self.process_html_content(flow)
# Process image content
elif content_type.startswith("image/"):
self.process_image_content(flow)
def process_json_content(self, flow: http.HTTPFlow):
"""Process JSON response content."""
try:
data = flow.response.json()
# Log JSON structure
ctx.log.info(f"JSON response structure: {type(data).__name__}")
if isinstance(data, dict):
ctx.log.info(f"JSON keys: {list(data.keys())}")
elif isinstance(data, list):
ctx.log.info(f"JSON array length: {len(data)}")
# Pretty-print JSON for debugging
pretty_json = json.dumps(data, indent=2, ensure_ascii=False)
ctx.log.info(f"JSON content preview:\n{pretty_json[:500]}...")
# Could modify JSON data here
if isinstance(data, dict) and "debug" not in data:
data["debug"] = {"processed_by": "mitmproxy", "timestamp": flow.response.timestamp_start}
flow.response.set_text(json.dumps(data))
except ValueError as e:
ctx.log.error(f"Invalid JSON in response: {e}")
def process_html_content(self, flow: http.HTTPFlow):
"""Process HTML response content."""
try:
html_content = flow.response.get_text()
# Log HTML info
title_start = html_content.find("<title>")
title_end = html_content.find("</title>")
if title_start != -1 and title_end != -1:
title = html_content[title_start + 7:title_end]
ctx.log.info(f"HTML page title: {title}")
# Count common elements
element_counts = {
"links": html_content.count("<a "),
"images": html_content.count("<img "),
"scripts": html_content.count("<script"),
"forms": html_content.count("<form")
}
ctx.log.info(f"HTML elements: {element_counts}")
except UnicodeDecodeError as e:
ctx.log.error(f"Failed to decode HTML: {e}")
def process_image_content(self, flow: http.HTTPFlow):
"""Process image response content."""
content_type = flow.response.headers.get("content-type", "")
content_size = len(flow.response.content)
ctx.log.info(f"Image: {content_type}, {content_size} bytes")
# Could analyze image properties here
if content_type == "image/jpeg":
# Simple JPEG header analysis
if flow.response.content.startswith(b'\xff\xd8\xff'):
ctx.log.info("Valid JPEG header detected")
elif content_type == "image/png":
# PNG header analysis
if flow.response.content.startswith(b'\x89PNG\r\n\x1a\n'):
ctx.log.info("Valid PNG header detected")
addons = [ContentProcessorAddon()]from mitmproxy import http
import mitmproxy.ctx as ctx
import hashlib
import magic # python-magic library for file type detection
import re
class ContentAnalyzerAddon:
"""Advanced content analysis and classification."""
def __init__(self):
self.content_stats = {
"total_bytes": 0,
"content_types": {},
"encodings": {},
"file_types": {}
}
def response(self, flow: http.HTTPFlow):
"""Analyze response content comprehensively."""
if not flow.response or not flow.response.content:
return
content = flow.response.content
content_size = len(content)
content_type = flow.response.headers.get("content-type", "unknown")
content_encoding = flow.response.headers.get("content-encoding", "none")
# Update statistics
self.content_stats["total_bytes"] += content_size
self.content_stats["content_types"][content_type] = self.content_stats["content_types"].get(content_type, 0) + 1
self.content_stats["encodings"][content_encoding] = self.content_stats["encodings"].get(content_encoding, 0) + 1
# Detect actual file type using magic numbers
try:
detected_type = magic.from_buffer(content, mime=True)
self.content_stats["file_types"][detected_type] = self.content_stats["file_types"].get(detected_type, 0) + 1
# Check for content type mismatch
if detected_type != content_type.split(';')[0]:
ctx.log.warn(f"Content type mismatch: declared={content_type}, detected={detected_type}")
except Exception as e:
ctx.log.error(f"File type detection failed: {e}")
# Calculate content hash
content_hash = hashlib.sha256(content).hexdigest()[:16]
# Security analysis
self.analyze_security(flow, content, content_type)
# Performance analysis
self.analyze_performance(flow, content, content_size)
# Log analysis summary
ctx.log.info(f"Content analysis: {flow.request.url}")
ctx.log.info(f" Size: {content_size} bytes, Type: {content_type}")
ctx.log.info(f" Hash: {content_hash}, Encoding: {content_encoding}")
def analyze_security(self, flow, content, content_type):
"""Analyze content for security issues."""
security_issues = []
# Check for potential XSS in HTML
if "text/html" in content_type:
try:
html_text = content.decode('utf-8', errors='ignore')
# Simple XSS pattern detection
xss_patterns = [
r'<script[^>]*>.*?javascript:',
r'on\w+\s*=\s*["\'].*?javascript:',
r'<iframe[^>]*src\s*=\s*["\']javascript:',
]
for pattern in xss_patterns:
if re.search(pattern, html_text, re.IGNORECASE | re.DOTALL):
security_issues.append("Potential XSS vector detected")
break
# Check for inline scripts
if '<script' in html_text and 'javascript:' in html_text:
security_issues.append("Inline JavaScript detected")
except UnicodeDecodeError:
pass
# Check for exposed sensitive data in JSON
elif "application/json" in content_type:
try:
json_text = content.decode('utf-8', errors='ignore').lower()
sensitive_keywords = ['password', 'token', 'secret', 'key', 'api_key', 'private']
for keyword in sensitive_keywords:
if keyword in json_text:
security_issues.append(f"Potentially sensitive data: {keyword}")
except UnicodeDecodeError:
pass
# Log security issues
if security_issues:
ctx.log.warn(f"Security analysis for {flow.request.url}:")
for issue in security_issues:
ctx.log.warn(f" - {issue}")
def analyze_performance(self, flow, content, content_size):
"""Analyze content for performance implications."""
performance_notes = []
# Large content warning
if content_size > 1024 * 1024: # > 1MB
performance_notes.append(f"Large response: {content_size / (1024*1024):.2f} MB")
# Check compression effectiveness
content_encoding = flow.response.headers.get("content-encoding", "")
if not content_encoding and content_size > 1024: # > 1KB uncompressed
performance_notes.append("Content could benefit from compression")
# Check caching headers
cache_control = flow.response.headers.get("cache-control", "")
expires = flow.response.headers.get("expires", "")
etag = flow.response.headers.get("etag", "")
if not any([cache_control, expires, etag]):
performance_notes.append("No caching headers present")
# Log performance notes
if performance_notes:
ctx.log.info(f"Performance analysis for {flow.request.url}:")
for note in performance_notes:
ctx.log.info(f" - {note}")
def done(self):
"""Log final content statistics."""
stats = self.content_stats
ctx.log.info("Content Analysis Summary:")
ctx.log.info(f" Total bytes processed: {stats['total_bytes']:,}")
ctx.log.info(f" Unique content types: {len(stats['content_types'])}")
ctx.log.info(f" Most common content type: {max(stats['content_types'], key=stats['content_types'].get) if stats['content_types'] else 'None'}")
ctx.log.info(f" Encoding distribution: {dict(list(stats['encodings'].items())[:5])}")
addons = [ContentAnalyzerAddon()]from mitmproxy import http
from mitmproxy.net import encoding
import mitmproxy.ctx as ctx
import json
import re
class ContentTransformerAddon:
"""Transform content based on rules and filters."""
def __init__(self):
self.transformation_rules = {
# URL pattern -> transformation function
r".*\.json$": self.transform_json,
r".*/api/.*": self.transform_api_response,
r".*\.html$": self.transform_html,
}
def response(self, flow: http.HTTPFlow):
"""Apply content transformations based on URL patterns."""
if not flow.response:
return
url = flow.request.url
# Find matching transformation rules
for pattern, transform_func in self.transformation_rules.items():
if re.match(pattern, url):
try:
transform_func(flow)
except Exception as e:
ctx.log.error(f"Transformation failed for {url}: {e}")
def transform_json(self, flow: http.HTTPFlow):
"""Transform JSON responses."""
try:
data = flow.response.json()
# Add metadata to all JSON responses
if isinstance(data, dict):
data["_metadata"] = {
"processed_by": "mitmproxy",
"original_size": len(flow.response.content),
"url": flow.request.url
}
# Pretty-format JSON
flow.response.set_text(json.dumps(data, indent=2, ensure_ascii=False))
ctx.log.info(f"Transformed JSON response: {flow.request.url}")
except ValueError:
ctx.log.warn(f"Failed to parse JSON: {flow.request.url}")
def transform_api_response(self, flow: http.HTTPFlow):
"""Transform API responses with additional headers."""
# Add API processing headers
flow.response.headers["X-API-Processed"] = "true"
flow.response.headers["X-Processing-Time"] = str(int(time.time()))
# Add CORS headers for development
flow.response.headers["Access-Control-Allow-Origin"] = "*"
flow.response.headers["Access-Control-Allow-Methods"] = "GET,POST,PUT,DELETE,OPTIONS"
flow.response.headers["Access-Control-Allow-Headers"] = "Content-Type,Authorization"
ctx.log.info(f"Transformed API response: {flow.request.url}")
def transform_html(self, flow: http.HTTPFlow):
"""Transform HTML responses."""
try:
html_content = flow.response.get_text()
# Inject debugging script
debug_script = """
<script>
console.log('Page processed by mitmproxy');
window.mitmproxy_processed = true;
</script>
"""
# Insert before closing </body> tag
if "</body>" in html_content:
html_content = html_content.replace("</body>", debug_script + "</body>")
else:
html_content += debug_script
# Add meta tag
meta_tag = '<meta name="processed-by" content="mitmproxy">'
if "<head>" in html_content:
html_content = html_content.replace("<head>", "<head>" + meta_tag)
flow.response.set_text(html_content)
ctx.log.info(f"Transformed HTML response: {flow.request.url}")
except UnicodeDecodeError:
ctx.log.warn(f"Failed to decode HTML: {flow.request.url}")
addons = [ContentTransformerAddon()]Install with Tessl CLI
npx tessl i tessl/pypi-mitmproxy