CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mitmproxy

An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.

Pending
Overview
Eval results
Files

protocols.mddocs/

Multi-Protocol Support

Support for TCP, UDP, and WebSocket protocols beyond HTTP, enabling comprehensive network traffic analysis and modification for various protocol types. Each protocol provides flow tracking with message-level granularity.

Capabilities

TCP Flow Management

Raw TCP connection handling with message-level tracking and content inspection capabilities.

class TCPFlow(Flow):
    """
    A TCP connection flow tracking raw TCP messages.
    
    Attributes:
    - messages: List of TCP messages exchanged
    - client_conn: Client connection details
    - server_conn: Server connection details
    - live: Whether the connection is still active
    - error: Error information if the flow failed
    """
    messages: List[TCPMessage]
    client_conn: Client
    server_conn: Server
    live: bool
    error: Optional[Error]
    
    def __init__(self, client_conn: Client, server_conn: Server) -> None: ...

class TCPMessage:
    """
    Individual TCP message within a flow.
    
    Attributes:
    - from_client: True if message is from client to server
    - content: Raw message content as bytes
    - timestamp: Message timestamp
    """
    from_client: bool
    content: bytes
    timestamp: float
    
    def __init__(self, from_client: bool, content: bytes, timestamp: float = 0) -> None: ...
    
    def __str__(self) -> str: ...
    def __repr__(self) -> str: ...

UDP Flow Management

UDP packet tracking with support for connectionless protocol patterns.

class UDPFlow(Flow):
    """
    A UDP flow tracking individual UDP messages/packets.
    
    Attributes:
    - messages: List of UDP messages exchanged
    - client_conn: Client connection details
    - server_conn: Server connection details
    - live: Whether the flow is still active
    - error: Error information if the flow failed
    """
    messages: List[UDPMessage]
    client_conn: Client
    server_conn: Server
    live: bool
    error: Optional[Error]
    
    def __init__(self, client_conn: Client, server_conn: Server) -> None: ...

class UDPMessage:
    """
    Individual UDP message/packet within a flow.
    
    Attributes:
    - from_client: True if message is from client to server
    - content: UDP packet content as bytes
    - timestamp: Message timestamp
    """
    from_client: bool
    content: bytes
    timestamp: float
    
    def __init__(self, from_client: bool, content: bytes, timestamp: float = 0) -> None: ...
    
    def __str__(self) -> str: ...
    def __repr__(self) -> str: ...

WebSocket Support

WebSocket message tracking within HTTP flows with support for different message types.

class WebSocketMessage:
    """
    Individual WebSocket message within an HTTP flow.
    
    Attributes:
    - type: WebSocket message type (text, binary, ping, pong, close)
    - content: Message content as bytes
    - from_client: True if message is from client to server
    - timestamp: Message timestamp
    - killed: Whether the message was killed/blocked
    """
    type: int
    content: bytes
    from_client: bool
    timestamp: float
    killed: bool
    
    def __init__(
        self,
        type: int,
        content: bytes,
        from_client: bool = True,
        timestamp: float = 0
    ) -> None: ...
    
    def kill(self) -> None: ...
    def __str__(self) -> str: ...
    def __repr__(self) -> str: ...

class WebSocketData:
    """
    Container for WebSocket messages within an HTTP flow.
    
    Provides access to all WebSocket messages exchanged after the
    HTTP upgrade handshake.
    """
    messages: List[WebSocketMessage]
    
    def __init__(self) -> None: ...

# Type alias for WebSocket message state
WebSocketMessageState = Tuple[int, bool]  # (opcode, from_client)

DNS Flow Support

DNS query and response tracking for DNS-over-HTTPS and transparent DNS proxying.

class DNSFlow(Flow):
    """
    A DNS query/response flow.
    
    Attributes:
    - request: DNS query message
    - response: DNS response message (None if not yet received)
    - client_conn: Client connection details
    - server_conn: Server connection details
    - live: Whether the flow is still active
    - error: Error information if the flow failed
    """
    request: DNSMessage
    response: Optional[DNSMessage]
    client_conn: Client
    server_conn: Server
    live: bool
    error: Optional[Error]
    
    def __init__(self, client_conn: Client, server_conn: Server, request: DNSMessage) -> None: ...

class DNSMessage:
    """
    DNS query or response message.
    
    Attributes:
    - query: True if this is a query, False if response
    - content: Raw DNS message content as bytes
    - timestamp: Message timestamp
    """
    query: bool
    content: bytes
    timestamp: float
    
    def __init__(self, query: bool, content: bytes, timestamp: float = 0) -> None: ...

Usage Examples

TCP Flow Interception

from mitmproxy import tcp

def tcp_message(flow: tcp.TCPFlow):
    """Intercept and modify TCP messages."""
    message = flow.messages[-1]  # Get the latest message
    
    print(f"TCP message: {len(message.content)} bytes")
    print(f"Direction: {'client->server' if message.from_client else 'server->client'}")
    
    # Log raw content (be careful with binary data)
    try:
        content_str = message.content.decode('utf-8', errors='replace')
        print(f"Content preview: {content_str[:100]}...")
    except:
        print(f"Binary content: {message.content[:50].hex()}...")
    
    # Modify content
    if message.from_client and b"password" in message.content:
        # Replace password in client messages
        message.content = message.content.replace(b"password", b"********")
    
    # Block specific content
    if b"BLOCKED_COMMAND" in message.content:
        # Kill the message to prevent forwarding
        flow.kill()

def tcp_start(flow: tcp.TCPFlow):
    """Handle TCP flow start."""
    print(f"TCP connection: {flow.client_conn.address} -> {flow.server_conn.address}")
    
    # Could implement protocol detection here
    # by examining the first few bytes

def tcp_end(flow: tcp.TCPFlow):
    """Handle TCP flow completion."""
    total_messages = len(flow.messages)
    total_bytes = sum(len(msg.content) for msg in flow.messages)
    
    print(f"TCP flow completed: {total_messages} messages, {total_bytes} bytes")

UDP Packet Analysis

from mitmproxy import udp
import struct

def udp_message(flow: udp.UDPFlow):
    """Analyze UDP packets."""
    message = flow.messages[-1]
    
    print(f"UDP packet: {len(message.content)} bytes")
    print(f"Direction: {'client->server' if message.from_client else 'server->client'}")
    
    # Example: Parse DNS queries (assuming DNS over UDP)
    if flow.server_conn.address[1] == 53:  # DNS port
        try:
            # Simple DNS header parsing
            if len(message.content) >= 12:
                header = struct.unpack('!HHHHHH', message.content[:12])
                transaction_id, flags, questions, answers, authority, additional = header
                
                print(f"DNS Transaction ID: {transaction_id}")
                print(f"Questions: {questions}, Answers: {answers}")
                
                # Could parse questions/answers here
                
        except struct.error:
            print("Invalid DNS packet format")
    
    # Modify UDP content
    if message.from_client and b"blocked.com" in message.content:
        # Replace blocked domain
        message.content = message.content.replace(b"blocked.com", b"allowed.com")

def udp_start(flow: udp.UDPFlow):
    """Handle UDP flow start."""
    print(f"UDP flow: {flow.client_conn.address} -> {flow.server_conn.address}")

def udp_end(flow: udp.UDPFlow):
    """Handle UDP flow completion."""
    print(f"UDP flow ended: {len(flow.messages)} packets")

WebSocket Message Handling

from mitmproxy import http, websocket
import json

def websocket_message(flow: http.HTTPFlow):
    """Handle WebSocket messages within HTTP flows."""
    if not hasattr(flow, 'websocket') or not flow.websocket:
        return
    
    message = flow.websocket.messages[-1]
    
    print(f"WebSocket message type: {message.type}")
    print(f"Direction: {'client->server' if message.from_client else 'server->client'}")
    print(f"Content length: {len(message.content)} bytes")
    
    # Handle different message types
    if message.type == 1:  # Text message
        try:
            text = message.content.decode('utf-8')
            print(f"Text message: {text[:100]}...")
            
            # Try to parse as JSON
            try:
                data = json.loads(text)
                print(f"JSON data: {data}")
                
                # Modify JSON messages
                if message.from_client and "command" in data:
                    if data["command"] == "delete_all":
                        # Block dangerous commands
                        message.kill()
                        return
                    
                    # Add metadata to commands
                    data["proxy_timestamp"] = message.timestamp
                    message.content = json.dumps(data).encode('utf-8')
                
            except json.JSONDecodeError:
                print("Not JSON format")
                
        except UnicodeDecodeError:
            print("Not valid UTF-8 text")
    
    elif message.type == 2:  # Binary message
        print(f"Binary message: {message.content[:50].hex()}...")
        
        # Could handle binary protocols here
        
    elif message.type == 8:  # Close frame
        print("WebSocket close frame")
    
    elif message.type in (9, 10):  # Ping/Pong
        print(f"WebSocket {'ping' if message.type == 9 else 'pong'}")

def websocket_start(flow: http.HTTPFlow):
    """Handle WebSocket handshake completion."""
    print(f"WebSocket connection established: {flow.request.url}")
    
    # Initialize WebSocket data container if needed
    if not hasattr(flow, 'websocket'):
        flow.websocket = websocket.WebSocketData()

def websocket_end(flow: http.HTTPFlow):
    """Handle WebSocket connection termination."""
    if hasattr(flow, 'websocket') and flow.websocket:
        message_count = len(flow.websocket.messages)
        print(f"WebSocket connection closed: {message_count} messages exchanged")

Protocol Detection and Routing

from mitmproxy import tcp, flow

def tcp_message(flow: tcp.TCPFlow):
    """Implement protocol detection for TCP flows."""
    if len(flow.messages) == 1:  # First message
        message = flow.messages[0]
        
        # Simple protocol detection based on first bytes
        if message.content.startswith(b'GET ') or message.content.startswith(b'POST '):
            print("Detected HTTP over TCP (should have been handled as HTTP)")
        
        elif message.content.startswith(b'\x16\x03'):  # TLS handshake
            print("Detected TLS handshake")
        
        elif len(message.content) >= 4:
            # Check for common protocol signatures
            first_bytes = message.content[:4]
            
            if first_bytes == b'SSH-':
                print("Detected SSH protocol")
            elif first_bytes.startswith(b'\x00\x00'):
                print("Possible binary protocol")
            else:
                print(f"Unknown protocol, first bytes: {first_bytes.hex()}")

def next_layer(nextlayer):
    """Implement custom protocol detection logic."""
    # This would be called before TCP flow creation
    # to determine if the connection should be handled differently
    
    client_hello = nextlayer.data_client()
    
    if client_hello:
        # Examine initial data to determine protocol
        if client_hello.startswith(b'GET ') or client_hello.startswith(b'POST '):
            # Should be handled as HTTP
            nextlayer.layer = HttpLayer(nextlayer.context)
        
        elif client_hello.startswith(b'\x16\x03'):
            # TLS handshake - continue with TLS layer
            nextlayer.layer = TlsLayer(nextlayer.context)
        
        else:
            # Default to TCP for unknown protocols
            nextlayer.layer = TcpLayer(nextlayer.context)

Multi-Protocol Flow Analysis

from mitmproxy import http, tcp, udp

class ProtocolStats:
    def __init__(self):
        self.http_flows = 0
        self.tcp_flows = 0
        self.udp_flows = 0
        self.websocket_messages = 0
        self.total_bytes = 0

    def log_stats(self):
        print(f"Protocol Statistics:")
        print(f"  HTTP flows: {self.http_flows}")
        print(f"  TCP flows: {self.tcp_flows}")
        print(f"  UDP flows: {self.udp_flows}")
        print(f"  WebSocket messages: {self.websocket_messages}")
        print(f"  Total bytes: {self.total_bytes}")

stats = ProtocolStats()

def response(flow: http.HTTPFlow):
    """Track HTTP flow statistics."""
    stats.http_flows += 1
    if flow.request.content:
        stats.total_bytes += len(flow.request.content)
    if flow.response and flow.response.content:
        stats.total_bytes += len(flow.response.content)

def tcp_end(flow: tcp.TCPFlow):
    """Track TCP flow statistics."""
    stats.tcp_flows += 1
    for message in flow.messages:
        stats.total_bytes += len(message.content)

def udp_end(flow: udp.UDPFlow):
    """Track UDP flow statistics."""
    stats.udp_flows += 1
    for message in flow.messages:
        stats.total_bytes += len(message.content)

def websocket_message(flow: http.HTTPFlow):
    """Track WebSocket message statistics."""
    if hasattr(flow, 'websocket') and flow.websocket:
        stats.websocket_messages += 1
        message = flow.websocket.messages[-1]
        stats.total_bytes += len(message.content)

def done():
    """Print final statistics when mitmproxy shuts down."""
    stats.log_stats()

Install with Tessl CLI

npx tessl i tessl/pypi-mitmproxy

docs

addons.md

commands.md

configuration.md

connections.md

content.md

flow-io.md

http-flows.md

index.md

protocols.md

tile.json