An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.
—
Support for TCP, UDP, and WebSocket protocols beyond HTTP, enabling comprehensive network traffic analysis and modification for various protocol types. Each protocol provides flow tracking with message-level granularity.
Raw TCP connection handling with message-level tracking and content inspection capabilities.
class TCPFlow(Flow):
"""
A TCP connection flow tracking raw TCP messages.
Attributes:
- messages: List of TCP messages exchanged
- client_conn: Client connection details
- server_conn: Server connection details
- live: Whether the connection is still active
- error: Error information if the flow failed
"""
messages: List[TCPMessage]
client_conn: Client
server_conn: Server
live: bool
error: Optional[Error]
def __init__(self, client_conn: Client, server_conn: Server) -> None: ...
class TCPMessage:
"""
Individual TCP message within a flow.
Attributes:
- from_client: True if message is from client to server
- content: Raw message content as bytes
- timestamp: Message timestamp
"""
from_client: bool
content: bytes
timestamp: float
def __init__(self, from_client: bool, content: bytes, timestamp: float = 0) -> None: ...
def __str__(self) -> str: ...
def __repr__(self) -> str: ...UDP packet tracking with support for connectionless protocol patterns.
class UDPFlow(Flow):
"""
A UDP flow tracking individual UDP messages/packets.
Attributes:
- messages: List of UDP messages exchanged
- client_conn: Client connection details
- server_conn: Server connection details
- live: Whether the flow is still active
- error: Error information if the flow failed
"""
messages: List[UDPMessage]
client_conn: Client
server_conn: Server
live: bool
error: Optional[Error]
def __init__(self, client_conn: Client, server_conn: Server) -> None: ...
class UDPMessage:
"""
Individual UDP message/packet within a flow.
Attributes:
- from_client: True if message is from client to server
- content: UDP packet content as bytes
- timestamp: Message timestamp
"""
from_client: bool
content: bytes
timestamp: float
def __init__(self, from_client: bool, content: bytes, timestamp: float = 0) -> None: ...
def __str__(self) -> str: ...
def __repr__(self) -> str: ...WebSocket message tracking within HTTP flows with support for different message types.
class WebSocketMessage:
"""
Individual WebSocket message within an HTTP flow.
Attributes:
- type: WebSocket message type (text, binary, ping, pong, close)
- content: Message content as bytes
- from_client: True if message is from client to server
- timestamp: Message timestamp
- killed: Whether the message was killed/blocked
"""
type: int
content: bytes
from_client: bool
timestamp: float
killed: bool
def __init__(
self,
type: int,
content: bytes,
from_client: bool = True,
timestamp: float = 0
) -> None: ...
def kill(self) -> None: ...
def __str__(self) -> str: ...
def __repr__(self) -> str: ...
class WebSocketData:
"""
Container for WebSocket messages within an HTTP flow.
Provides access to all WebSocket messages exchanged after the
HTTP upgrade handshake.
"""
messages: List[WebSocketMessage]
def __init__(self) -> None: ...
# Type alias for WebSocket message state
WebSocketMessageState = Tuple[int, bool] # (opcode, from_client)DNS query and response tracking for DNS-over-HTTPS and transparent DNS proxying.
class DNSFlow(Flow):
"""
A DNS query/response flow.
Attributes:
- request: DNS query message
- response: DNS response message (None if not yet received)
- client_conn: Client connection details
- server_conn: Server connection details
- live: Whether the flow is still active
- error: Error information if the flow failed
"""
request: DNSMessage
response: Optional[DNSMessage]
client_conn: Client
server_conn: Server
live: bool
error: Optional[Error]
def __init__(self, client_conn: Client, server_conn: Server, request: DNSMessage) -> None: ...
class DNSMessage:
"""
DNS query or response message.
Attributes:
- query: True if this is a query, False if response
- content: Raw DNS message content as bytes
- timestamp: Message timestamp
"""
query: bool
content: bytes
timestamp: float
def __init__(self, query: bool, content: bytes, timestamp: float = 0) -> None: ...from mitmproxy import tcp
def tcp_message(flow: tcp.TCPFlow):
"""Intercept and modify TCP messages."""
message = flow.messages[-1] # Get the latest message
print(f"TCP message: {len(message.content)} bytes")
print(f"Direction: {'client->server' if message.from_client else 'server->client'}")
# Log raw content (be careful with binary data)
try:
content_str = message.content.decode('utf-8', errors='replace')
print(f"Content preview: {content_str[:100]}...")
except:
print(f"Binary content: {message.content[:50].hex()}...")
# Modify content
if message.from_client and b"password" in message.content:
# Replace password in client messages
message.content = message.content.replace(b"password", b"********")
# Block specific content
if b"BLOCKED_COMMAND" in message.content:
# Kill the message to prevent forwarding
flow.kill()
def tcp_start(flow: tcp.TCPFlow):
"""Handle TCP flow start."""
print(f"TCP connection: {flow.client_conn.address} -> {flow.server_conn.address}")
# Could implement protocol detection here
# by examining the first few bytes
def tcp_end(flow: tcp.TCPFlow):
"""Handle TCP flow completion."""
total_messages = len(flow.messages)
total_bytes = sum(len(msg.content) for msg in flow.messages)
print(f"TCP flow completed: {total_messages} messages, {total_bytes} bytes")from mitmproxy import udp
import struct
def udp_message(flow: udp.UDPFlow):
"""Analyze UDP packets."""
message = flow.messages[-1]
print(f"UDP packet: {len(message.content)} bytes")
print(f"Direction: {'client->server' if message.from_client else 'server->client'}")
# Example: Parse DNS queries (assuming DNS over UDP)
if flow.server_conn.address[1] == 53: # DNS port
try:
# Simple DNS header parsing
if len(message.content) >= 12:
header = struct.unpack('!HHHHHH', message.content[:12])
transaction_id, flags, questions, answers, authority, additional = header
print(f"DNS Transaction ID: {transaction_id}")
print(f"Questions: {questions}, Answers: {answers}")
# Could parse questions/answers here
except struct.error:
print("Invalid DNS packet format")
# Modify UDP content
if message.from_client and b"blocked.com" in message.content:
# Replace blocked domain
message.content = message.content.replace(b"blocked.com", b"allowed.com")
def udp_start(flow: udp.UDPFlow):
"""Handle UDP flow start."""
print(f"UDP flow: {flow.client_conn.address} -> {flow.server_conn.address}")
def udp_end(flow: udp.UDPFlow):
"""Handle UDP flow completion."""
print(f"UDP flow ended: {len(flow.messages)} packets")from mitmproxy import http, websocket
import json
def websocket_message(flow: http.HTTPFlow):
"""Handle WebSocket messages within HTTP flows."""
if not hasattr(flow, 'websocket') or not flow.websocket:
return
message = flow.websocket.messages[-1]
print(f"WebSocket message type: {message.type}")
print(f"Direction: {'client->server' if message.from_client else 'server->client'}")
print(f"Content length: {len(message.content)} bytes")
# Handle different message types
if message.type == 1: # Text message
try:
text = message.content.decode('utf-8')
print(f"Text message: {text[:100]}...")
# Try to parse as JSON
try:
data = json.loads(text)
print(f"JSON data: {data}")
# Modify JSON messages
if message.from_client and "command" in data:
if data["command"] == "delete_all":
# Block dangerous commands
message.kill()
return
# Add metadata to commands
data["proxy_timestamp"] = message.timestamp
message.content = json.dumps(data).encode('utf-8')
except json.JSONDecodeError:
print("Not JSON format")
except UnicodeDecodeError:
print("Not valid UTF-8 text")
elif message.type == 2: # Binary message
print(f"Binary message: {message.content[:50].hex()}...")
# Could handle binary protocols here
elif message.type == 8: # Close frame
print("WebSocket close frame")
elif message.type in (9, 10): # Ping/Pong
print(f"WebSocket {'ping' if message.type == 9 else 'pong'}")
def websocket_start(flow: http.HTTPFlow):
"""Handle WebSocket handshake completion."""
print(f"WebSocket connection established: {flow.request.url}")
# Initialize WebSocket data container if needed
if not hasattr(flow, 'websocket'):
flow.websocket = websocket.WebSocketData()
def websocket_end(flow: http.HTTPFlow):
"""Handle WebSocket connection termination."""
if hasattr(flow, 'websocket') and flow.websocket:
message_count = len(flow.websocket.messages)
print(f"WebSocket connection closed: {message_count} messages exchanged")from mitmproxy import tcp, flow
def tcp_message(flow: tcp.TCPFlow):
"""Implement protocol detection for TCP flows."""
if len(flow.messages) == 1: # First message
message = flow.messages[0]
# Simple protocol detection based on first bytes
if message.content.startswith(b'GET ') or message.content.startswith(b'POST '):
print("Detected HTTP over TCP (should have been handled as HTTP)")
elif message.content.startswith(b'\x16\x03'): # TLS handshake
print("Detected TLS handshake")
elif len(message.content) >= 4:
# Check for common protocol signatures
first_bytes = message.content[:4]
if first_bytes == b'SSH-':
print("Detected SSH protocol")
elif first_bytes.startswith(b'\x00\x00'):
print("Possible binary protocol")
else:
print(f"Unknown protocol, first bytes: {first_bytes.hex()}")
def next_layer(nextlayer):
"""Implement custom protocol detection logic."""
# This would be called before TCP flow creation
# to determine if the connection should be handled differently
client_hello = nextlayer.data_client()
if client_hello:
# Examine initial data to determine protocol
if client_hello.startswith(b'GET ') or client_hello.startswith(b'POST '):
# Should be handled as HTTP
nextlayer.layer = HttpLayer(nextlayer.context)
elif client_hello.startswith(b'\x16\x03'):
# TLS handshake - continue with TLS layer
nextlayer.layer = TlsLayer(nextlayer.context)
else:
# Default to TCP for unknown protocols
nextlayer.layer = TcpLayer(nextlayer.context)from mitmproxy import http, tcp, udp
class ProtocolStats:
def __init__(self):
self.http_flows = 0
self.tcp_flows = 0
self.udp_flows = 0
self.websocket_messages = 0
self.total_bytes = 0
def log_stats(self):
print(f"Protocol Statistics:")
print(f" HTTP flows: {self.http_flows}")
print(f" TCP flows: {self.tcp_flows}")
print(f" UDP flows: {self.udp_flows}")
print(f" WebSocket messages: {self.websocket_messages}")
print(f" Total bytes: {self.total_bytes}")
stats = ProtocolStats()
def response(flow: http.HTTPFlow):
"""Track HTTP flow statistics."""
stats.http_flows += 1
if flow.request.content:
stats.total_bytes += len(flow.request.content)
if flow.response and flow.response.content:
stats.total_bytes += len(flow.response.content)
def tcp_end(flow: tcp.TCPFlow):
"""Track TCP flow statistics."""
stats.tcp_flows += 1
for message in flow.messages:
stats.total_bytes += len(message.content)
def udp_end(flow: udp.UDPFlow):
"""Track UDP flow statistics."""
stats.udp_flows += 1
for message in flow.messages:
stats.total_bytes += len(message.content)
def websocket_message(flow: http.HTTPFlow):
"""Track WebSocket message statistics."""
if hasattr(flow, 'websocket') and flow.websocket:
stats.websocket_messages += 1
message = flow.websocket.messages[-1]
stats.total_bytes += len(message.content)
def done():
"""Print final statistics when mitmproxy shuts down."""
stats.log_stats()Install with Tessl CLI
npx tessl i tessl/pypi-mitmproxy