CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mitmproxy

An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.

Pending
Overview
Eval results
Files

addons.mddocs/

Addon Development

Extensible addon system for custom functionality with lifecycle hooks, event handling, and access to all proxy components. Includes comprehensive built-in addons and framework for developing custom extensions.

Capabilities

Addon System Overview

The addon system provides a comprehensive framework for extending mitmproxy functionality.

def default_addons() -> List[Any]:
    """
    Get list of all built-in addons.
    
    Returns:
    - List of addon instances providing core mitmproxy functionality
    """

def concurrent(func: Callable) -> Callable:
    """
    Decorator for concurrent addon execution.
    
    Allows addon methods to run concurrently without blocking
    the main proxy processing thread.
    
    Parameters:
    - func: Addon method to make concurrent
    
    Returns:
    - Decorated function that runs concurrently
    """

Addon Context Access

Global context object providing access to proxy components.

# Context module for addon access
import mitmproxy.ctx as ctx

# Context provides access to:
# - ctx.master: Master instance
# - ctx.options: Configuration options
# - ctx.log: Logging interface

class Context:
    """
    Global context object for addon access to proxy components.
    
    Available as `ctx` in addon methods.
    """
    master: Master
    options: Options
    log: LogEntry
    
    def log_info(self, message: str) -> None:
        """Log info message."""
    
    def log_warn(self, message: str) -> None:
        """Log warning message."""
    
    def log_error(self, message: str) -> None:
        """Log error message."""

Built-in Addons

Comprehensive set of built-in addons for common functionality.

# Core functionality addons
class Core:
    """Core proxy functionality."""

class Browser:
    """Browser integration and automation."""

class Block:
    """Request/response blocking capabilities."""

class BlockList:
    """Host and URL blacklisting functionality."""

class AntiCache:
    """Cache prevention for testing."""

class AntiComp:
    """Compression disabling for analysis."""

# Playback and testing addons
class ClientPlayback:
    """Client-side request replay."""

class ServerPlayback:
    """Server-side response replay."""

# Content modification addons
class ModifyBody:
    """Request/response body modification."""

class ModifyHeaders:
    """Header modification and injection."""

class MapRemote:
    """Remote URL mapping and redirection."""

class MapLocal:
    """Local file serving and mapping."""

# Authentication and security addons
class ProxyAuth:
    """Proxy authentication management."""

class StickyAuth:
    """Authentication persistence across requests."""

class StickyCookie:
    """Cookie persistence and management."""

class UpstreamAuth:
    """Upstream server authentication."""

# Data export and persistence addons
class Save:
    """Flow saving and persistence."""

class SaveHar:
    """HAR format export."""

class Export:
    """Multi-format flow export."""

# Network and protocol addons
class DnsResolver:
    """DNS resolution and caching."""

class NextLayer:
    """Protocol layer detection and routing."""

class TlsConfig:
    """TLS/SSL configuration management."""

# User interface addons
class CommandHistory:
    """Command history management."""

class Comment:
    """Flow commenting and annotation."""

class Cut:
    """Data extraction and filtering."""

# Development and debugging addons
class ScriptLoader:
    """Custom script loading and management."""

class Onboarding:
    """User onboarding and help system."""

class DisableH2C:
    """Disable HTTP/2 cleartext upgrade for compatibility."""

class StripDnsHttpsRecords:
    """Strip DNS HTTPS records for testing."""

class UpdateAltSvc:
    """Update Alt-Svc headers for HTTP/3 support."""

Usage Examples

Basic Addon Development

from mitmproxy import http
import mitmproxy.ctx as ctx

class BasicAddon:
    """Basic addon example demonstrating common patterns."""
    
    def __init__(self):
        self.request_count = 0
        self.blocked_domains = {"malicious.com", "blocked.example"}
    
    def load(self, loader):
        """Called when the addon is loaded."""
        ctx.log.info("BasicAddon loaded")
        
        # Add custom options
        loader.add_option(
            name="basic_addon_enabled",
            typespec=bool,
            default=True,
            help="Enable basic addon functionality"
        )
    
    def configure(self, updates):
        """Called when configuration changes."""
        if "basic_addon_enabled" in updates:
            enabled = ctx.options.basic_addon_enabled
            ctx.log.info(f"BasicAddon {'enabled' if enabled else 'disabled'}")
    
    def request(self, flow: http.HTTPFlow) -> None:
        """Handle HTTP requests."""
        if not ctx.options.basic_addon_enabled:
            return
        
        self.request_count += 1
        
        # Log request
        ctx.log.info(f"Request #{self.request_count}: {flow.request.method} {flow.request.url}")
        
        # Block malicious domains
        if flow.request.host in self.blocked_domains:
            ctx.log.warn(f"Blocked request to {flow.request.host}")
            flow.response = http.Response.make(
                status_code=403,
                content="Blocked by BasicAddon",
                headers={"Content-Type": "text/plain"}
            )
            return
        
        # Add custom header
        flow.request.headers["X-BasicAddon"] = "processed"
    
    def response(self, flow: http.HTTPFlow) -> None:
        """Handle HTTP responses."""
        if not ctx.options.basic_addon_enabled:
            return
        
        if flow.response:
            # Add response header
            flow.response.headers["X-BasicAddon-Response"] = "processed"
            
            # Log response
            ctx.log.info(f"Response: {flow.response.status_code} for {flow.request.url}")
    
    def done(self):
        """Called when mitmproxy shuts down."""
        ctx.log.info(f"BasicAddon processed {self.request_count} requests")

# Register the addon
addons = [BasicAddon()]

Advanced Addon with Concurrent Processing

from mitmproxy import http
from mitmproxy.script import concurrent
import mitmproxy.ctx as ctx
import asyncio
import aiohttp
import json

class APIAnalyzerAddon:
    """Advanced addon with concurrent processing for API analysis."""
    
    def __init__(self):
        self.api_calls = []
        self.session = None
    
    def load(self, loader):
        """Initialize addon with configuration."""
        ctx.log.info("APIAnalyzerAddon loaded")
        
        # Add configuration options
        loader.add_option(
            name="api_analyzer_webhook",
            typespec=str,
            default="",
            help="Webhook URL for API analysis results"
        )
        
        loader.add_option(
            name="api_analyzer_filter",
            typespec=str,
            default="/api/",
            help="URL pattern to filter API calls"
        )
    
    async def setup_session(self):
        """Set up HTTP session for webhook calls."""
        if not self.session:
            self.session = aiohttp.ClientSession()
    
    @concurrent
    def request(self, flow: http.HTTPFlow) -> None:
        """Analyze API requests concurrently."""
        # Filter API calls
        filter_pattern = ctx.options.api_analyzer_filter
        if filter_pattern not in flow.request.url:
            return
        
        # Extract API call information
        api_call = {
            "timestamp": flow.request.timestamp_start,
            "method": flow.request.method,
            "url": flow.request.url,
            "headers": dict(flow.request.headers),
            "content_type": flow.request.headers.get("content-type", ""),
            "content_length": len(flow.request.content) if flow.request.content else 0
        }
        
        # Parse JSON request body
        if "application/json" in api_call["content_type"]:
            try:
                api_call["json_body"] = flow.request.json()
            except ValueError:
                api_call["json_body"] = None
        
        self.api_calls.append(api_call)
        ctx.log.info(f"Analyzed API call: {flow.request.method} {flow.request.url}")
    
    @concurrent
    def response(self, flow: http.HTTPFlow) -> None:
        """Analyze API responses concurrently."""
        filter_pattern = ctx.options.api_analyzer_filter
        if filter_pattern not in flow.request.url:
            return
        
        # Find corresponding request analysis
        for api_call in reversed(self.api_calls):
            if (api_call["url"] == flow.request.url and 
                api_call["method"] == flow.request.method):
                
                # Add response information
                if flow.response:
                    api_call.update({
                        "status_code": flow.response.status_code,
                        "response_headers": dict(flow.response.headers),
                        "response_content_type": flow.response.headers.get("content-type", ""),
                        "response_length": len(flow.response.content) if flow.response.content else 0,
                        "response_time": flow.response.timestamp_end - flow.request.timestamp_start if flow.response.timestamp_end else None
                    })
                    
                    # Parse JSON response
                    if "application/json" in api_call.get("response_content_type", ""):
                        try:
                            api_call["json_response"] = flow.response.json()
                        except ValueError:
                            api_call["json_response"] = None
                
                # Send to webhook if configured
                webhook_url = ctx.options.api_analyzer_webhook
                if webhook_url:
                    asyncio.create_task(self.send_webhook(api_call, webhook_url))
                
                break
    
    async def send_webhook(self, api_call_data, webhook_url):
        """Send API analysis data to webhook."""
        try:
            await self.setup_session()
            
            async with self.session.post(
                webhook_url,
                json=api_call_data,
                headers={"Content-Type": "application/json"}
            ) as response:
                if response.status == 200:
                    ctx.log.info(f"Webhook sent successfully for {api_call_data['url']}")
                else:
                    ctx.log.warn(f"Webhook failed with status {response.status}")
                    
        except Exception as e:
            ctx.log.error(f"Webhook error: {e}")
    
    def done(self):
        """Cleanup when addon shuts down."""
        if self.session:
            asyncio.create_task(self.session.close())
        
        ctx.log.info(f"APIAnalyzerAddon analyzed {len(self.api_calls)} API calls")

addons = [APIAnalyzerAddon()]

Addon with State Management

from mitmproxy import http
import mitmproxy.ctx as ctx
import sqlite3
import threading
from datetime import datetime

class TrafficLoggerAddon:
    """Addon that logs traffic to SQLite database with thread safety."""
    
    def __init__(self):
        self.db_path = "traffic_log.db"
        self.lock = threading.Lock()
        self.setup_database()
    
    def setup_database(self):
        """Initialize SQLite database."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS requests (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp REAL,
                    method TEXT,
                    url TEXT,
                    host TEXT,
                    path TEXT,
                    status_code INTEGER,
                    request_size INTEGER,
                    response_size INTEGER,
                    response_time REAL,
                    user_agent TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            conn.commit()
    
    def load(self, loader):
        """Configure addon options."""
        loader.add_option(
            name="traffic_logger_enabled",
            typespec=bool,
            default=True,
            help="Enable traffic logging to database"
        )
        
        loader.add_option(
            name="traffic_logger_filter",
            typespec=str,
            default="",
            help="Host filter for logging (empty = log all)"
        )
    
    def request(self, flow: http.HTTPFlow) -> None:
        """Log request initiation."""
        if not ctx.options.traffic_logger_enabled:
            return
        
        # Apply host filter
        host_filter = ctx.options.traffic_logger_filter
        if host_filter and host_filter not in flow.request.host:
            return
        
        # Store request start time for later response time calculation
        flow.metadata["log_start_time"] = datetime.now()
    
    def response(self, flow: http.HTTPFlow) -> None:
        """Log completed request/response."""
        if not ctx.options.traffic_logger_enabled:
            return
        
        # Apply host filter
        host_filter = ctx.options.traffic_logger_filter
        if host_filter and host_filter not in flow.request.host:
            return
        
        # Calculate response time
        response_time = None
        if "log_start_time" in flow.metadata:
            start_time = flow.metadata["log_start_time"]
            response_time = (datetime.now() - start_time).total_seconds()
        
        # Prepare log data
        log_data = {
            "timestamp": flow.request.timestamp_start,
            "method": flow.request.method,
            "url": flow.request.url,
            "host": flow.request.host,
            "path": flow.request.path,
            "status_code": flow.response.status_code if flow.response else None,
            "request_size": len(flow.request.content) if flow.request.content else 0,
            "response_size": len(flow.response.content) if flow.response and flow.response.content else 0,
            "response_time": response_time,
            "user_agent": flow.request.headers.get("User-Agent", "")
        }
        
        # Thread-safe database insert
        with self.lock:
            try:
                with sqlite3.connect(self.db_path) as conn:
                    conn.execute("""
                        INSERT INTO requests 
                        (timestamp, method, url, host, path, status_code, 
                         request_size, response_size, response_time, user_agent)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """, (
                        log_data["timestamp"],
                        log_data["method"],
                        log_data["url"],
                        log_data["host"],
                        log_data["path"],
                        log_data["status_code"],
                        log_data["request_size"],
                        log_data["response_size"],
                        log_data["response_time"],
                        log_data["user_agent"]
                    ))
                    conn.commit()
                    
            except sqlite3.Error as e:
                ctx.log.error(f"Database error: {e}")
    
    def get_stats(self):
        """Get traffic statistics from database."""
        with self.lock:
            try:
                with sqlite3.connect(self.db_path) as conn:
                    cursor = conn.cursor()
                    
                    # Get basic stats
                    cursor.execute("SELECT COUNT(*) FROM requests")
                    total_requests = cursor.fetchone()[0]
                    
                    cursor.execute("SELECT COUNT(*) FROM requests WHERE status_code >= 400")
                    error_requests = cursor.fetchone()[0]
                    
                    cursor.execute("SELECT AVG(response_time) FROM requests WHERE response_time IS NOT NULL")
                    avg_response_time = cursor.fetchone()[0]
                    
                    return {
                        "total_requests": total_requests,
                        "error_requests": error_requests,
                        "avg_response_time": avg_response_time
                    }
                    
            except sqlite3.Error as e:
                ctx.log.error(f"Database error: {e}")
                return None
    
    def done(self):
        """Log final statistics."""
        stats = self.get_stats()
        if stats:
            ctx.log.info(f"TrafficLogger final stats: {stats}")

addons = [TrafficLoggerAddon()]

Command-based Addon

from mitmproxy import http, command
import mitmproxy.ctx as ctx
from typing import Sequence

class CommandAddon:
    """Addon that provides custom commands."""
    
    def __init__(self):
        self.blocked_hosts = set()
        self.request_count = 0
    
    @command.command("addon.block_host")
    def block_host(self, host: str) -> None:
        """Block requests to a specific host."""
        self.blocked_hosts.add(host)
        ctx.log.info(f"Blocked host: {host}")
    
    @command.command("addon.unblock_host")
    def unblock_host(self, host: str) -> None:
        """Unblock requests to a specific host."""
        self.blocked_hosts.discard(host)
        ctx.log.info(f"Unblocked host: {host}")
    
    @command.command("addon.list_blocked")
    def list_blocked(self) -> Sequence[str]:
        """List all blocked hosts."""
        return list(self.blocked_hosts)
    
    @command.command("addon.clear_blocked")
    def clear_blocked(self) -> None:
        """Clear all blocked hosts."""
        count = len(self.blocked_hosts)
        self.blocked_hosts.clear()
        ctx.log.info(f"Cleared {count} blocked hosts")
    
    @command.command("addon.stats")
    def get_stats(self) -> str:
        """Get addon statistics."""
        return f"Requests processed: {self.request_count}, Blocked hosts: {len(self.blocked_hosts)}"
    
    def request(self, flow: http.HTTPFlow) -> None:
        """Handle requests with blocking logic."""
        self.request_count += 1
        
        if flow.request.host in self.blocked_hosts:
            ctx.log.info(f"Blocked request to {flow.request.host}")
            flow.response = http.Response.make(
                status_code=403,
                content=f"Host {flow.request.host} is blocked",
                headers={"Content-Type": "text/plain"}
            )

addons = [CommandAddon()]

Addon Integration Example

# Example of loading and using custom addons

# Save addon code to files, e.g., basic_addon.py, api_analyzer.py, etc.

# Load addons in mitmproxy:
# 1. Command line: mitmproxy -s basic_addon.py -s api_analyzer.py
# 2. Programmatically:

from mitmproxy.tools.main import mitmdump
from mitmproxy import master, options
from basic_addon import BasicAddon
from api_analyzer import APIAnalyzerAddon

def run_with_addons():
    """Run mitmproxy with custom addons."""
    opts = options.Options(listen_port=8080)
    
    # Create master with addons
    m = master.Master(opts)
    m.addons.add(BasicAddon())
    m.addons.add(APIAnalyzerAddon())
    
    try:
        m.run()
    except KeyboardInterrupt:
        m.shutdown()

if __name__ == "__main__":
    run_with_addons()

Install with Tessl CLI

npx tessl i tessl/pypi-mitmproxy

docs

addons.md

commands.md

configuration.md

connections.md

content.md

flow-io.md

http-flows.md

index.md

protocols.md

tile.json