An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.
—
Extensible addon system for custom functionality with lifecycle hooks, event handling, and access to all proxy components. Includes comprehensive built-in addons and framework for developing custom extensions.
The addon system provides a comprehensive framework for extending mitmproxy functionality.
def default_addons() -> List[Any]:
"""
Get list of all built-in addons.
Returns:
- List of addon instances providing core mitmproxy functionality
"""
def concurrent(func: Callable) -> Callable:
"""
Decorator for concurrent addon execution.
Allows addon methods to run concurrently without blocking
the main proxy processing thread.
Parameters:
- func: Addon method to make concurrent
Returns:
- Decorated function that runs concurrently
"""Global context object providing access to proxy components.
# Context module for addon access
import mitmproxy.ctx as ctx
# Context provides access to:
# - ctx.master: Master instance
# - ctx.options: Configuration options
# - ctx.log: Logging interface
class Context:
"""
Global context object for addon access to proxy components.
Available as `ctx` in addon methods.
"""
master: Master
options: Options
log: LogEntry
def log_info(self, message: str) -> None:
"""Log info message."""
def log_warn(self, message: str) -> None:
"""Log warning message."""
def log_error(self, message: str) -> None:
"""Log error message."""Comprehensive set of built-in addons for common functionality.
# Core functionality addons
class Core:
"""Core proxy functionality."""
class Browser:
"""Browser integration and automation."""
class Block:
"""Request/response blocking capabilities."""
class BlockList:
"""Host and URL blacklisting functionality."""
class AntiCache:
"""Cache prevention for testing."""
class AntiComp:
"""Compression disabling for analysis."""
# Playback and testing addons
class ClientPlayback:
"""Client-side request replay."""
class ServerPlayback:
"""Server-side response replay."""
# Content modification addons
class ModifyBody:
"""Request/response body modification."""
class ModifyHeaders:
"""Header modification and injection."""
class MapRemote:
"""Remote URL mapping and redirection."""
class MapLocal:
"""Local file serving and mapping."""
# Authentication and security addons
class ProxyAuth:
"""Proxy authentication management."""
class StickyAuth:
"""Authentication persistence across requests."""
class StickyCookie:
"""Cookie persistence and management."""
class UpstreamAuth:
"""Upstream server authentication."""
# Data export and persistence addons
class Save:
"""Flow saving and persistence."""
class SaveHar:
"""HAR format export."""
class Export:
"""Multi-format flow export."""
# Network and protocol addons
class DnsResolver:
"""DNS resolution and caching."""
class NextLayer:
"""Protocol layer detection and routing."""
class TlsConfig:
"""TLS/SSL configuration management."""
# User interface addons
class CommandHistory:
"""Command history management."""
class Comment:
"""Flow commenting and annotation."""
class Cut:
"""Data extraction and filtering."""
# Development and debugging addons
class ScriptLoader:
"""Custom script loading and management."""
class Onboarding:
"""User onboarding and help system."""
class DisableH2C:
"""Disable HTTP/2 cleartext upgrade for compatibility."""
class StripDnsHttpsRecords:
"""Strip DNS HTTPS records for testing."""
class UpdateAltSvc:
"""Update Alt-Svc headers for HTTP/3 support."""from mitmproxy import http
import mitmproxy.ctx as ctx
class BasicAddon:
"""Basic addon example demonstrating common patterns."""
def __init__(self):
self.request_count = 0
self.blocked_domains = {"malicious.com", "blocked.example"}
def load(self, loader):
"""Called when the addon is loaded."""
ctx.log.info("BasicAddon loaded")
# Add custom options
loader.add_option(
name="basic_addon_enabled",
typespec=bool,
default=True,
help="Enable basic addon functionality"
)
def configure(self, updates):
"""Called when configuration changes."""
if "basic_addon_enabled" in updates:
enabled = ctx.options.basic_addon_enabled
ctx.log.info(f"BasicAddon {'enabled' if enabled else 'disabled'}")
def request(self, flow: http.HTTPFlow) -> None:
"""Handle HTTP requests."""
if not ctx.options.basic_addon_enabled:
return
self.request_count += 1
# Log request
ctx.log.info(f"Request #{self.request_count}: {flow.request.method} {flow.request.url}")
# Block malicious domains
if flow.request.host in self.blocked_domains:
ctx.log.warn(f"Blocked request to {flow.request.host}")
flow.response = http.Response.make(
status_code=403,
content="Blocked by BasicAddon",
headers={"Content-Type": "text/plain"}
)
return
# Add custom header
flow.request.headers["X-BasicAddon"] = "processed"
def response(self, flow: http.HTTPFlow) -> None:
"""Handle HTTP responses."""
if not ctx.options.basic_addon_enabled:
return
if flow.response:
# Add response header
flow.response.headers["X-BasicAddon-Response"] = "processed"
# Log response
ctx.log.info(f"Response: {flow.response.status_code} for {flow.request.url}")
def done(self):
"""Called when mitmproxy shuts down."""
ctx.log.info(f"BasicAddon processed {self.request_count} requests")
# Register the addon
addons = [BasicAddon()]from mitmproxy import http
from mitmproxy.script import concurrent
import mitmproxy.ctx as ctx
import asyncio
import aiohttp
import json
class APIAnalyzerAddon:
"""Advanced addon with concurrent processing for API analysis."""
def __init__(self):
self.api_calls = []
self.session = None
def load(self, loader):
"""Initialize addon with configuration."""
ctx.log.info("APIAnalyzerAddon loaded")
# Add configuration options
loader.add_option(
name="api_analyzer_webhook",
typespec=str,
default="",
help="Webhook URL for API analysis results"
)
loader.add_option(
name="api_analyzer_filter",
typespec=str,
default="/api/",
help="URL pattern to filter API calls"
)
async def setup_session(self):
"""Set up HTTP session for webhook calls."""
if not self.session:
self.session = aiohttp.ClientSession()
@concurrent
def request(self, flow: http.HTTPFlow) -> None:
"""Analyze API requests concurrently."""
# Filter API calls
filter_pattern = ctx.options.api_analyzer_filter
if filter_pattern not in flow.request.url:
return
# Extract API call information
api_call = {
"timestamp": flow.request.timestamp_start,
"method": flow.request.method,
"url": flow.request.url,
"headers": dict(flow.request.headers),
"content_type": flow.request.headers.get("content-type", ""),
"content_length": len(flow.request.content) if flow.request.content else 0
}
# Parse JSON request body
if "application/json" in api_call["content_type"]:
try:
api_call["json_body"] = flow.request.json()
except ValueError:
api_call["json_body"] = None
self.api_calls.append(api_call)
ctx.log.info(f"Analyzed API call: {flow.request.method} {flow.request.url}")
@concurrent
def response(self, flow: http.HTTPFlow) -> None:
"""Analyze API responses concurrently."""
filter_pattern = ctx.options.api_analyzer_filter
if filter_pattern not in flow.request.url:
return
# Find corresponding request analysis
for api_call in reversed(self.api_calls):
if (api_call["url"] == flow.request.url and
api_call["method"] == flow.request.method):
# Add response information
if flow.response:
api_call.update({
"status_code": flow.response.status_code,
"response_headers": dict(flow.response.headers),
"response_content_type": flow.response.headers.get("content-type", ""),
"response_length": len(flow.response.content) if flow.response.content else 0,
"response_time": flow.response.timestamp_end - flow.request.timestamp_start if flow.response.timestamp_end else None
})
# Parse JSON response
if "application/json" in api_call.get("response_content_type", ""):
try:
api_call["json_response"] = flow.response.json()
except ValueError:
api_call["json_response"] = None
# Send to webhook if configured
webhook_url = ctx.options.api_analyzer_webhook
if webhook_url:
asyncio.create_task(self.send_webhook(api_call, webhook_url))
break
async def send_webhook(self, api_call_data, webhook_url):
"""Send API analysis data to webhook."""
try:
await self.setup_session()
async with self.session.post(
webhook_url,
json=api_call_data,
headers={"Content-Type": "application/json"}
) as response:
if response.status == 200:
ctx.log.info(f"Webhook sent successfully for {api_call_data['url']}")
else:
ctx.log.warn(f"Webhook failed with status {response.status}")
except Exception as e:
ctx.log.error(f"Webhook error: {e}")
def done(self):
"""Cleanup when addon shuts down."""
if self.session:
asyncio.create_task(self.session.close())
ctx.log.info(f"APIAnalyzerAddon analyzed {len(self.api_calls)} API calls")
addons = [APIAnalyzerAddon()]from mitmproxy import http
import mitmproxy.ctx as ctx
import sqlite3
import threading
from datetime import datetime
class TrafficLoggerAddon:
"""Addon that logs traffic to SQLite database with thread safety."""
def __init__(self):
self.db_path = "traffic_log.db"
self.lock = threading.Lock()
self.setup_database()
def setup_database(self):
"""Initialize SQLite database."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
method TEXT,
url TEXT,
host TEXT,
path TEXT,
status_code INTEGER,
request_size INTEGER,
response_size INTEGER,
response_time REAL,
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
def load(self, loader):
"""Configure addon options."""
loader.add_option(
name="traffic_logger_enabled",
typespec=bool,
default=True,
help="Enable traffic logging to database"
)
loader.add_option(
name="traffic_logger_filter",
typespec=str,
default="",
help="Host filter for logging (empty = log all)"
)
def request(self, flow: http.HTTPFlow) -> None:
"""Log request initiation."""
if not ctx.options.traffic_logger_enabled:
return
# Apply host filter
host_filter = ctx.options.traffic_logger_filter
if host_filter and host_filter not in flow.request.host:
return
# Store request start time for later response time calculation
flow.metadata["log_start_time"] = datetime.now()
def response(self, flow: http.HTTPFlow) -> None:
"""Log completed request/response."""
if not ctx.options.traffic_logger_enabled:
return
# Apply host filter
host_filter = ctx.options.traffic_logger_filter
if host_filter and host_filter not in flow.request.host:
return
# Calculate response time
response_time = None
if "log_start_time" in flow.metadata:
start_time = flow.metadata["log_start_time"]
response_time = (datetime.now() - start_time).total_seconds()
# Prepare log data
log_data = {
"timestamp": flow.request.timestamp_start,
"method": flow.request.method,
"url": flow.request.url,
"host": flow.request.host,
"path": flow.request.path,
"status_code": flow.response.status_code if flow.response else None,
"request_size": len(flow.request.content) if flow.request.content else 0,
"response_size": len(flow.response.content) if flow.response and flow.response.content else 0,
"response_time": response_time,
"user_agent": flow.request.headers.get("User-Agent", "")
}
# Thread-safe database insert
with self.lock:
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO requests
(timestamp, method, url, host, path, status_code,
request_size, response_size, response_time, user_agent)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
log_data["timestamp"],
log_data["method"],
log_data["url"],
log_data["host"],
log_data["path"],
log_data["status_code"],
log_data["request_size"],
log_data["response_size"],
log_data["response_time"],
log_data["user_agent"]
))
conn.commit()
except sqlite3.Error as e:
ctx.log.error(f"Database error: {e}")
def get_stats(self):
"""Get traffic statistics from database."""
with self.lock:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Get basic stats
cursor.execute("SELECT COUNT(*) FROM requests")
total_requests = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM requests WHERE status_code >= 400")
error_requests = cursor.fetchone()[0]
cursor.execute("SELECT AVG(response_time) FROM requests WHERE response_time IS NOT NULL")
avg_response_time = cursor.fetchone()[0]
return {
"total_requests": total_requests,
"error_requests": error_requests,
"avg_response_time": avg_response_time
}
except sqlite3.Error as e:
ctx.log.error(f"Database error: {e}")
return None
def done(self):
"""Log final statistics."""
stats = self.get_stats()
if stats:
ctx.log.info(f"TrafficLogger final stats: {stats}")
addons = [TrafficLoggerAddon()]from mitmproxy import http, command
import mitmproxy.ctx as ctx
from typing import Sequence
class CommandAddon:
"""Addon that provides custom commands."""
def __init__(self):
self.blocked_hosts = set()
self.request_count = 0
@command.command("addon.block_host")
def block_host(self, host: str) -> None:
"""Block requests to a specific host."""
self.blocked_hosts.add(host)
ctx.log.info(f"Blocked host: {host}")
@command.command("addon.unblock_host")
def unblock_host(self, host: str) -> None:
"""Unblock requests to a specific host."""
self.blocked_hosts.discard(host)
ctx.log.info(f"Unblocked host: {host}")
@command.command("addon.list_blocked")
def list_blocked(self) -> Sequence[str]:
"""List all blocked hosts."""
return list(self.blocked_hosts)
@command.command("addon.clear_blocked")
def clear_blocked(self) -> None:
"""Clear all blocked hosts."""
count = len(self.blocked_hosts)
self.blocked_hosts.clear()
ctx.log.info(f"Cleared {count} blocked hosts")
@command.command("addon.stats")
def get_stats(self) -> str:
"""Get addon statistics."""
return f"Requests processed: {self.request_count}, Blocked hosts: {len(self.blocked_hosts)}"
def request(self, flow: http.HTTPFlow) -> None:
"""Handle requests with blocking logic."""
self.request_count += 1
if flow.request.host in self.blocked_hosts:
ctx.log.info(f"Blocked request to {flow.request.host}")
flow.response = http.Response.make(
status_code=403,
content=f"Host {flow.request.host} is blocked",
headers={"Content-Type": "text/plain"}
)
addons = [CommandAddon()]# Example of loading and using custom addons
# Save addon code to files, e.g., basic_addon.py, api_analyzer.py, etc.
# Load addons in mitmproxy:
# 1. Command line: mitmproxy -s basic_addon.py -s api_analyzer.py
# 2. Programmatically:
from mitmproxy.tools.main import mitmdump
from mitmproxy import master, options
from basic_addon import BasicAddon
from api_analyzer import APIAnalyzerAddon
def run_with_addons():
"""Run mitmproxy with custom addons."""
opts = options.Options(listen_port=8080)
# Create master with addons
m = master.Master(opts)
m.addons.add(BasicAddon())
m.addons.add(APIAnalyzerAddon())
try:
m.run()
except KeyboardInterrupt:
m.shutdown()
if __name__ == "__main__":
run_with_addons()Install with Tessl CLI
npx tessl i tessl/pypi-mitmproxy