An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.
—
Type-safe command framework for interactive control and automation. Supports complex command composition, validation, and execution with built-in commands for all major operations and extensible custom command development.
Central command management and execution system.
class CommandManager:
"""
Central command manager for mitmproxy.
Manages command registration, validation, and execution with type safety
and comprehensive error handling.
"""
def execute(self, cmdstr: str) -> Any:
"""
Execute a command string.
Parameters:
- cmdstr: Complete command string with arguments
Returns:
- Command execution result
Raises:
- CommandError: If command syntax is invalid or execution fails
"""
def call(self, path: str, *args: Any) -> Any:
"""
Call a command by path with arguments.
Parameters:
- path: Command path (e.g., "view.flows.select")
- *args: Command arguments
Returns:
- Command execution result
"""
def add(self, path: str, func: Callable) -> None:
"""
Register a new command.
Parameters:
- path: Command path for registration
- func: Function to execute for this command
"""
def collect_commands(self, addon: Any) -> None:
"""
Collect commands from an addon object.
Parameters:
- addon: Addon instance with @command.command decorated methods
"""Comprehensive type system for command arguments and validation.
class TypeManager:
"""
Type management system for command arguments.
Provides type validation, conversion, and completion support
for command arguments.
"""
def command(self, name: str) -> Callable:
"""
Get command by name with type information.
Parameters:
- name: Command name
Returns:
- Command callable with type metadata
"""
def validate_arg(self, arg_type: type, value: str) -> Any:
"""
Validate and convert command argument.
Parameters:
- arg_type: Expected argument type
- value: String value to validate
Returns:
- Converted value
Raises:
- TypeError: If value cannot be converted to expected type
"""
def complete_arg(self, arg_type: type, prefix: str) -> List[str]:
"""
Get completion suggestions for argument.
Parameters:
- arg_type: Argument type
- prefix: Current input prefix
Returns:
- List of completion suggestions
"""
def verify_arg_signature(func: Callable, args: List[Any]) -> bool:
"""
Verify function signature matches provided arguments.
Parameters:
- func: Function to verify
- args: Arguments to check against signature
Returns:
- True if signature matches, False otherwise
"""
def typename(arg_type: type) -> str:
"""
Convert Python type to display string.
Parameters:
- arg_type: Python type object
Returns:
- Human-readable type name
"""Standard command argument types with validation and completion.
class CommandTypes:
"""Built-in command types with validation and completion."""
# Basic types
class Str(str):
"""String argument type."""
class Int(int):
"""Integer argument type."""
class Bool(bool):
"""Boolean argument type."""
# File system types
class Path(str):
"""File path argument with completion."""
def complete(self, prefix: str) -> List[str]:
"""Complete file paths."""
# Command types
class Cmd(str):
"""Command name argument with completion."""
def complete(self, prefix: str) -> List[str]:
"""Complete command names."""
class CmdArgs(str):
"""Command arguments as string."""
# Flow types
class Flow(object):
"""Flow object argument."""
class Flows(list):
"""List of flow objects."""
# Data types
class CutSpec(list):
"""Cut specification for data extraction."""
class Data(list):
"""Tabular data representation."""
# Choice types
class Choice(str):
"""Choice from predefined options."""
def __init__(self, choices: List[str]):
self.choices = choices
def complete(self, prefix: str) -> List[str]:
"""Complete from available choices."""
# Special types
class Marker(str):
"""Flow marker string."""
class Unknown(str):
"""Unknown/dynamic type."""
class Space(str):
"""Whitespace representation."""from mitmproxy import command, http
import mitmproxy.ctx as ctx
from typing import Sequence
class FlowCommandsAddon:
"""Addon providing custom flow manipulation commands."""
@command.command("flow.mark")
def mark_flows(self, flows: Sequence[http.HTTPFlow], marker: str) -> None:
"""
Mark flows with a custom marker.
Usage: flow.mark @focus "important"
"""
count = 0
for flow in flows:
flow.marked = marker
count += 1
ctx.log.info(f"Marked {count} flows with '{marker}'")
@command.command("flow.clear_marks")
def clear_marks(self, flows: Sequence[http.HTTPFlow]) -> None:
"""
Clear markers from flows.
Usage: flow.clear_marks @all
"""
count = 0
for flow in flows:
if flow.marked:
flow.marked = ""
count += 1
ctx.log.info(f"Cleared marks from {count} flows")
@command.command("flow.stats")
def flow_stats(self, flows: Sequence[http.HTTPFlow]) -> str:
"""
Get statistics for selected flows.
Usage: flow.stats @all
"""
if not flows:
return "No flows selected"
total_flows = len(flows)
methods = {}
status_codes = {}
hosts = {}
total_bytes = 0
for flow in flows:
# Method statistics
method = flow.request.method
methods[method] = methods.get(method, 0) + 1
# Status code statistics
if flow.response:
status = flow.response.status_code
status_codes[status] = status_codes.get(status, 0) + 1
# Byte statistics
total_bytes += len(flow.request.content or b"")
total_bytes += len(flow.response.content or b"")
# Host statistics
host = flow.request.host
hosts[host] = hosts.get(host, 0) + 1
# Format statistics
stats = [
f"Flow Statistics ({total_flows} flows):",
f"Total bytes: {total_bytes:,}",
f"Methods: {dict(sorted(methods.items()))}",
f"Status codes: {dict(sorted(status_codes.items()))}",
f"Top hosts: {dict(sorted(hosts.items(), key=lambda x: x[1], reverse=True)[:5])}"
]
return "\n".join(stats)
@command.command("flow.filter_by_size")
def filter_by_size(self, flows: Sequence[http.HTTPFlow], min_size: int = 0, max_size: int = 0) -> Sequence[http.HTTPFlow]:
"""
Filter flows by response size.
Usage: flow.filter_by_size @all 1024 10240
"""
filtered = []
for flow in flows:
if not flow.response:
continue
size = len(flow.response.content or b"")
if min_size > 0 and size < min_size:
continue
if max_size > 0 and size > max_size:
continue
filtered.append(flow)
ctx.log.info(f"Filtered to {len(filtered)} flows by size")
return filtered
@command.command("flow.export_json")
def export_json(self, flows: Sequence[http.HTTPFlow], filename: str) -> None:
"""
Export flows to JSON file.
Usage: flow.export_json @all flows.json
"""
import json
flow_data = []
for flow in flows:
data = {
"method": flow.request.method,
"url": flow.request.url,
"status_code": flow.response.status_code if flow.response else None,
"timestamp": flow.request.timestamp_start,
"request_size": len(flow.request.content or b""),
"response_size": len(flow.response.content or b"") if flow.response else 0
}
flow_data.append(data)
with open(filename, 'w') as f:
json.dump(flow_data, f, indent=2)
ctx.log.info(f"Exported {len(flows)} flows to {filename}")
addons = [FlowCommandsAddon()]from mitmproxy import command, http, options
import mitmproxy.ctx as ctx
from typing import Sequence, Optional
import re
import time
class AdvancedCommandsAddon:
"""Advanced command examples with complex functionality."""
@command.command("proxy.mode")
def set_proxy_mode(self, mode: str) -> str:
"""
Change proxy mode dynamically.
Usage: proxy.mode transparent
Usage: proxy.mode reverse:https://api.example.com
"""
old_mode = ctx.options.mode
try:
ctx.options.mode = mode
ctx.log.info(f"Proxy mode changed from '{old_mode}' to '{mode}'")
return f"Mode changed to: {mode}"
except Exception as e:
ctx.log.error(f"Failed to set mode to '{mode}': {e}")
return f"Error: {e}"
@command.command("proxy.intercept")
def set_intercept(self, pattern: Optional[str] = None) -> str:
"""
Set or clear intercept pattern.
Usage: proxy.intercept "~u /api/"
Usage: proxy.intercept # Clear intercept
"""
old_pattern = ctx.options.intercept
ctx.options.intercept = pattern
if pattern:
ctx.log.info(f"Intercept pattern set to: {pattern}")
return f"Intercepting: {pattern}"
else:
ctx.log.info("Intercept cleared")
return "Intercept cleared"
@command.command("flow.replay")
def replay_flows(self, flows: Sequence[http.HTTPFlow], count: int = 1) -> str:
"""
Replay flows multiple times.
Usage: flow.replay @focus 3
"""
if not flows:
return "No flows to replay"
replayed = 0
for _ in range(count):
for flow in flows:
# This would typically use the replay functionality
ctx.log.info(f"Replaying: {flow.request.method} {flow.request.url}")
replayed += 1
return f"Replayed {replayed} requests ({len(flows)} flows × {count} times)"
@command.command("flow.search")
def search_flows(self, flows: Sequence[http.HTTPFlow], pattern: str, field: str = "url") -> Sequence[http.HTTPFlow]:
"""
Search flows by pattern in specified field.
Usage: flow.search @all "api" url
Usage: flow.search @all "json" content_type
"""
results = []
for flow in flows:
search_text = ""
if field == "url":
search_text = flow.request.url
elif field == "method":
search_text = flow.request.method
elif field == "host":
search_text = flow.request.host
elif field == "content_type":
search_text = flow.response.headers.get("content-type", "") if flow.response else ""
elif field == "status":
search_text = str(flow.response.status_code) if flow.response else ""
if re.search(pattern, search_text, re.IGNORECASE):
results.append(flow)
ctx.log.info(f"Found {len(results)} flows matching '{pattern}' in {field}")
return results
@command.command("flow.time_range")
def filter_by_time(self, flows: Sequence[http.HTTPFlow], start_time: float, end_time: float) -> Sequence[http.HTTPFlow]:
"""
Filter flows by time range (Unix timestamps).
Usage: flow.time_range @all 1609459200 1609545600
"""
results = []
for flow in flows:
if start_time <= flow.request.timestamp_start <= end_time:
results.append(flow)
ctx.log.info(f"Found {len(results)} flows in time range")
return results
@command.command("debug.log_level")
def set_log_level(self, level: str) -> str:
"""
Set logging level.
Usage: debug.log_level info
Usage: debug.log_level debug
"""
valid_levels = ["debug", "info", "warn", "error"]
if level not in valid_levels:
return f"Invalid level. Valid options: {valid_levels}"
# This would set the actual log level
ctx.log.info(f"Log level set to: {level}")
return f"Log level: {level}"
@command.command("debug.memory_stats")
def memory_stats(self) -> str:
"""
Get memory usage statistics.
Usage: debug.memory_stats
"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
stats = [
f"Memory Statistics:",
f"RSS: {memory_info.rss / 1024 / 1024:.1f} MB",
f"VMS: {memory_info.vms / 1024 / 1024:.1f} MB",
f"CPU Percent: {process.cpu_percent():.1f}%",
f"Open Files: {len(process.open_files())}"
]
return "\n".join(stats)
addons = [AdvancedCommandsAddon()]from mitmproxy import command, http
import mitmproxy.ctx as ctx
class ScriptingCommandsAddon:
"""Commands for scripting and automation."""
@command.command("script.wait")
def wait_command(self, seconds: float) -> str:
"""
Wait for specified seconds.
Usage: script.wait 2.5
"""
import time
time.sleep(seconds)
return f"Waited {seconds} seconds"
@command.command("script.log")
def log_command(self, level: str, message: str) -> str:
"""
Log a message at specified level.
Usage: script.log info "Processing complete"
"""
if level == "info":
ctx.log.info(message)
elif level == "warn":
ctx.log.warn(message)
elif level == "error":
ctx.log.error(message)
else:
ctx.log.info(f"[{level}] {message}")
return f"Logged: {message}"
@command.command("script.execute")
def execute_script(self, script_path: str) -> str:
"""
Execute a series of commands from file.
Usage: script.execute commands.txt
"""
try:
with open(script_path, 'r') as f:
commands = f.readlines()
executed = 0
for line in commands:
line = line.strip()
if line and not line.startswith('#'):
try:
result = ctx.master.commands.execute(line)
ctx.log.info(f"Executed: {line} -> {result}")
executed += 1
except Exception as e:
ctx.log.error(f"Failed to execute '{line}': {e}")
return f"Executed {executed} commands from {script_path}"
except FileNotFoundError:
return f"Script file not found: {script_path}"
except Exception as e:
return f"Script execution error: {e}"
@command.command("script.conditional")
def conditional_command(self, condition: str, true_cmd: str, false_cmd: str = "") -> str:
"""
Execute command based on condition.
Usage: script.conditional "flow.count > 10" "proxy.intercept ~all" "proxy.intercept"
"""
try:
# Simple condition evaluation (in real implementation, this would be more robust)
if "flow.count" in condition:
# Get current flow count
flow_count = len(ctx.master.view)
condition = condition.replace("flow.count", str(flow_count))
# Evaluate condition (unsafe - for demo only)
result = eval(condition)
if result and true_cmd:
return ctx.master.commands.execute(true_cmd)
elif not result and false_cmd:
return ctx.master.commands.execute(false_cmd)
else:
return f"Condition '{condition}' evaluated to {result}, no command executed"
except Exception as e:
return f"Condition evaluation error: {e}"
addons = [ScriptingCommandsAddon()]# Example of using commands programmatically
from mitmproxy import master, options
from mitmproxy.tools.main import mitmdump
def automated_proxy_session():
"""Example of automated proxy control using commands."""
# Set up proxy with options
opts = options.Options(
listen_port=8080,
web_open_browser=False
)
# Create master
m = master.Master(opts)
# Add custom command addons
m.addons.add(FlowCommandsAddon())
m.addons.add(AdvancedCommandsAddon())
m.addons.add(ScriptingCommandsAddon())
try:
# Start proxy
m.run_loop = False # Don't start main loop yet
# Execute initial commands
commands_to_run = [
"proxy.mode transparent",
"proxy.intercept ~u /api/",
"script.log info 'Proxy automation started'",
"debug.log_level debug"
]
for cmd in commands_to_run:
try:
result = m.commands.execute(cmd)
print(f"Command: {cmd} -> {result}")
except Exception as e:
print(f"Command failed: {cmd} -> {e}")
# Run proxy
m.run()
except KeyboardInterrupt:
print("Shutting down...")
finally:
m.shutdown()
if __name__ == "__main__":
automated_proxy_session()Install with Tessl CLI
npx tessl i tessl/pypi-mitmproxy