An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.
—
Client and server connection handling with comprehensive support for TLS/SSL, transparent proxying, upstream proxies, and complex network topologies. Provides detailed connection state tracking and certificate management.
Abstract base class for all connection types providing common connection attributes and state management.
class Connection:
"""
Abstract base class for client and server connections.
Attributes:
- id: Unique connection identifier
- address: Network address (host, port) or None
- state: Current connection state flags
- error: Connection error information if failed
- tls_established: Whether TLS handshake completed
- certificate_list: List of certificates in TLS chain
- alpn: Application Layer Protocol Negotiation result
- cipher: TLS cipher suite in use
- sni: Server Name Indication from TLS handshake
- timestamp_start: Connection establishment start time
- timestamp_tls_setup: TLS handshake completion time
- timestamp_end: Connection termination time
"""
id: str
address: Optional[Address]
state: ConnectionState
error: Optional[str]
tls_established: bool
certificate_list: Sequence[x509.Certificate]
alpn: Optional[bytes]
cipher: Optional[str]
sni: Optional[str]
timestamp_start: float
timestamp_tls_setup: Optional[float]
timestamp_end: Optional[float]
def __str__(self) -> str: ...
def __repr__(self) -> str: ...Represents the client-side connection from the user agent (browser, application) to the proxy.
class Client(Connection):
"""
Client connection from user agent to proxy.
Inherits all Connection attributes and adds client-specific functionality.
Represents the incoming connection that initiated the proxy request.
"""
def __init__(
self,
address: Optional[Address],
state: ConnectionState = ConnectionState.OPEN,
error: Optional[str] = None,
tls_established: bool = False,
certificate_list: Optional[Sequence[x509.Certificate]] = None,
alpn: Optional[bytes] = None,
cipher: Optional[str] = None,
sni: Optional[str] = None,
timestamp_start: float = 0,
timestamp_tls_setup: Optional[float] = None,
timestamp_end: Optional[float] = None
) -> None: ...
@classmethod
def make_dummy(cls, address: Address = ("client", 0)) -> "Client": ...Represents the server-side connection from the proxy to the target server.
class Server(Connection):
"""
Server connection from proxy to target server.
Inherits all Connection attributes and adds server-specific functionality.
Represents the outgoing connection to fulfill the client request.
"""
def __init__(
self,
address: Optional[Address],
state: ConnectionState = ConnectionState.OPEN,
error: Optional[str] = None,
tls_established: bool = False,
certificate_list: Optional[Sequence[x509.Certificate]] = None,
alpn: Optional[bytes] = None,
cipher: Optional[str] = None,
sni: Optional[str] = None,
timestamp_start: float = 0,
timestamp_tls_setup: Optional[float] = None,
timestamp_end: Optional[float] = None
) -> None: ...
@classmethod
def make_dummy(cls, address: Address = ("server", 0)) -> "Server": ...Enumeration of connection state flags using bitwise operations for complex state tracking.
class ConnectionState(Flag):
"""
Connection state flags supporting bitwise operations.
States can be combined to represent complex connection conditions.
"""
CLOSED = 0
OPEN = 1
CAN_READ = 2
CAN_WRITE = 4
# Convenience combinations
ACTIVE = OPEN | CAN_READ | CAN_WRITE # 7
def __str__(self) -> str: ...
def __repr__(self) -> str: ...Network address representation as a tuple of host and port.
# Type alias for network addresses
Address = Tuple[str, int]
# Example addresses:
# ("192.168.1.1", 8080)
# ("example.com", 443)
# ("::1", 80) # IPv6from mitmproxy import connection
from mitmproxy.connection import ConnectionState
def server_connect(flow):
"""Monitor server connection establishment."""
server = flow.server_conn
print(f"Connecting to: {server.address}")
print(f"Connection state: {server.state}")
# Check if connection is ready for I/O
if server.state & ConnectionState.CAN_WRITE:
print("Server ready for writing")
if server.state & ConnectionState.CAN_READ:
print("Server ready for reading")
# Log TLS information
if server.tls_established:
print(f"TLS cipher: {server.cipher}")
print(f"ALPN protocol: {server.alpn}")
print(f"SNI: {server.sni}")
print(f"Certificate count: {len(server.certificate_list)}")
def client_connect(flow):
"""Monitor client connection."""
client = flow.client_conn
print(f"Client connected from: {client.address}")
# Check for TLS
if client.tls_established:
print("Client using TLS")
if client.sni:
print(f"Client SNI: {client.sni}")from mitmproxy import tls
import datetime
def tls_clienthello(data):
"""Analyze client TLS hello."""
print(f"Client SNI: {data.context.client.sni}")
print(f"Client ALPN: {data.context.client.alpn}")
def tls_established_server(data):
"""Analyze established server TLS connection."""
server = data.context.server
if server.certificate_list:
cert = server.certificate_list[0] # First certificate in chain
print(f"Server certificate subject: {cert.subject}")
print(f"Server certificate issuer: {cert.issuer}")
print(f"Certificate valid from: {cert.not_valid_before}")
print(f"Certificate valid until: {cert.not_valid_after}")
# Check certificate validity
now = datetime.datetime.now(datetime.timezone.utc)
if now < cert.not_valid_before:
print("WARNING: Certificate not yet valid")
elif now > cert.not_valid_after:
print("WARNING: Certificate expired")
# Check for SAN (Subject Alternative Names)
try:
san = cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME
)
print(f"SAN entries: {[name.value for name in san.value]}")
except x509.ExtensionNotFound:
print("No SAN extension found")from mitmproxy import connection
def server_connect(flow):
"""Handle server connection issues."""
server = flow.server_conn
# Monitor connection establishment
if server.error:
print(f"Server connection error: {server.error}")
# Could implement retry logic here
# or modify the target server
def error(flow):
"""Handle flow errors including connection issues."""
if flow.error:
error_msg = flow.error.msg
if "connection" in error_msg.lower():
print(f"Connection error: {error_msg}")
# Log connection details for debugging
if flow.server_conn:
print(f"Target: {flow.server_conn.address}")
print(f"TLS: {flow.server_conn.tls_established}")
if flow.client_conn:
print(f"Client: {flow.client_conn.address}")from mitmproxy import http
import time
def request(flow: http.HTTPFlow):
"""Track request timing."""
# Store request start time for later analysis
flow.metadata["proxy_start"] = time.time()
def server_connect(flow):
"""Track server connection timing."""
server = flow.server_conn
if server.timestamp_start:
connect_time = time.time() - server.timestamp_start
print(f"Server connection established in {connect_time:.3f}s")
if server.timestamp_tls_setup and server.timestamp_start:
tls_time = server.timestamp_tls_setup - server.timestamp_start
print(f"TLS handshake took {tls_time:.3f}s")
def response(flow: http.HTTPFlow):
"""Calculate total request timing."""
if "proxy_start" in flow.metadata:
total_time = time.time() - flow.metadata["proxy_start"]
print(f"Total request time: {total_time:.3f}s")
# Break down timing components
client = flow.client_conn
server = flow.server_conn
if (client.timestamp_start and server.timestamp_start and
server.timestamp_end and client.timestamp_end):
# Calculate various timing metrics
client_connect = server.timestamp_start - client.timestamp_start
server_connect = (server.timestamp_tls_setup or server.timestamp_start) - server.timestamp_start
request_send = flow.request.timestamp_end - flow.request.timestamp_start
response_recv = flow.response.timestamp_end - flow.response.timestamp_start
print(f"Timing breakdown:")
print(f" Client connect: {client_connect:.3f}s")
print(f" Server connect: {server_connect:.3f}s")
print(f" Request send: {request_send:.3f}s")
print(f" Response receive: {response_recv:.3f}s")from mitmproxy import connection
from mitmproxy.connection import Server
def server_connect(flow):
"""Modify server connections dynamically."""
# Route requests to different servers based on conditions
if "/api/" in flow.request.path:
# Route API requests to API server
flow.server_conn = Server(("api.internal.com", 443))
elif "/static/" in flow.request.path:
# Route static content to CDN
flow.server_conn = Server(("cdn.example.com", 443))
# Force TLS for specific connections
if flow.request.host == "insecure.example.com":
# Upgrade to secure connection
new_server = Server(("secure.example.com", 443))
new_server.tls_established = True
flow.server_conn = new_server
def client_connect(flow):
"""Handle client connection setup."""
client = flow.client_conn
# Log client information
print(f"Client {client.address} connected")
# Could implement IP-based filtering here
client_ip = client.address[0]
if client_ip in BLOCKED_IPS:
# Close connection (implementation specific)
client.state = ConnectionState.CLOSEDInstall with Tessl CLI
npx tessl i tessl/pypi-mitmproxy