CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mitmproxy

An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.

Pending
Overview
Eval results
Files

flow-io.mddocs/

Flow I/O and Persistence

Reading and writing flows to files for replay, analysis, and testing. Supports filtering, format conversion, and batch processing for various use cases including traffic replay, debugging, and automated testing.

Capabilities

FlowReader Class

Reads flows from binary files or streams in mitmproxy's native format.

class FlowReader:
    """
    Reads flows from a binary file or stream.
    
    Supports reading flows saved by FlowWriter or mitmdump.
    Also supports reading HAR (HTTP Archive) format files.
    Handles version compatibility and format validation.
    """
    def __init__(self, fo: BinaryIO) -> None:
        """
        Initialize flow reader.
        
        Parameters:
        - fo: Binary file object or stream to read from
        """
    
    def stream(self) -> Iterator[Flow]:
        """
        Stream flows from the file.
        
        Yields:
        - Flow objects (HTTPFlow, TCPFlow, UDPFlow, etc.)
        
        Raises:
        - FlowReadException: If file format is invalid or corrupted
        - IOError: If file cannot be read
        """
    
    def close(self) -> None:
        """Close the underlying file object."""

FlowWriter Class

Writes flows to binary files or streams in mitmproxy's native format.

class FlowWriter:
    """
    Writes flows to a binary file or stream.
    
    Creates files compatible with FlowReader and mitmproxy tools.
    Handles serialization and format versioning.
    """
    def __init__(self, fo: BinaryIO) -> None:
        """
        Initialize flow writer.
        
        Parameters:
        - fo: Binary file object or stream to write to
        """
    
    def add(self, flow: Flow) -> None:
        """
        Add a flow to the output file.
        
        Parameters:
        - flow: Flow object to write (HTTPFlow, TCPFlow, etc.)
        
        Raises:
        - IOError: If flow cannot be written
        - TypeError: If flow type is not supported
        """
    
    def close(self) -> None:
        """Close the underlying file object."""

FilteredFlowWriter Class

Writes flows with filtering capabilities based on flow properties.

class FilteredFlowWriter:
    """
    Writes flows with filtering based on flow properties.
    
    Allows selective writing of flows based on custom filter functions.
    """
    def __init__(self, fo: BinaryIO, filter_func: Callable[[Flow], bool]) -> None:
        """
        Initialize filtered flow writer.
        
        Parameters:
        - fo: Binary file object or stream to write to
        - filter_func: Function that returns True for flows to include
        """
    
    def add(self, flow: Flow) -> None:
        """
        Add a flow if it passes the filter.
        
        Parameters:
        - flow: Flow object to potentially write
        """
    
    def close(self) -> None:
        """Close the underlying file object."""

Flow Reading Utilities

Utility functions for reading flows from multiple sources.

def read_flows_from_paths(paths: Sequence[str]) -> Iterator[Flow]:
    """
    Read flows from multiple file paths.
    
    Parameters:
    - paths: List of file paths to read flows from
    
    Yields:
    - Flow objects from all specified files
    
    Raises:
    - FlowReadException: If any file format is invalid
    - FileNotFoundError: If any file doesn't exist
    - IOError: If files cannot be read
    """

def flow_export_formats() -> Dict[str, str]:
    """
    Get available flow export formats.
    
    Returns:
    - Dictionary of format_name -> description
    """

Usage Examples

Basic Flow Reading and Writing

from mitmproxy import io, http
import gzip

def save_flows_to_file():
    """Save current flows to a file."""
    flows_to_save = []  # Assume this contains flows from somewhere
    
    # Write flows to file
    with open("captured_flows.mitm", "wb") as f:
        writer = io.FlowWriter(f)
        for flow in flows_to_save:
            writer.add(flow)
    
    print(f"Saved {len(flows_to_save)} flows to captured_flows.mitm")

def load_and_analyze_flows():
    """Load flows from file and analyze them."""
    with open("captured_flows.mitm", "rb") as f:
        reader = io.FlowReader(f)
        
        http_count = 0
        total_bytes = 0
        
        for flow in reader.stream():
            if isinstance(flow, http.HTTPFlow):
                http_count += 1
                
                # Calculate total bytes
                if flow.request.content:
                    total_bytes += len(flow.request.content)
                if flow.response and flow.response.content:
                    total_bytes += len(flow.response.content)
                
                # Print flow summary
                print(f"{flow.request.method} {flow.request.url}")
                if flow.response:
                    print(f"  -> {flow.response.status_code}")
        
        print(f"Loaded {http_count} HTTP flows, {total_bytes} total bytes")

def load_from_multiple_files():
    """Load flows from multiple files."""
    file_paths = ["session1.mitm", "session2.mitm", "session3.mitm"]
    
    all_flows = list(io.read_flows_from_paths(file_paths))
    print(f"Loaded {len(all_flows)} flows from {len(file_paths)} files")
    
    # Process all flows
    for flow in all_flows:
        if isinstance(flow, http.HTTPFlow):
            print(f"Flow: {flow.request.method} {flow.request.url}")

Filtered Flow Writing

from mitmproxy import io, http

def save_filtered_flows():
    """Save only specific flows based on criteria."""
    
    def api_filter(flow):
        """Filter function to save only API calls."""
        if isinstance(flow, http.HTTPFlow):
            return "/api/" in flow.request.path
        return False
    
    def error_filter(flow):
        """Filter function to save only failed requests."""
        if isinstance(flow, http.HTTPFlow):
            return flow.response and flow.response.status_code >= 400
        return False
    
    flows_to_process = []  # Assume this contains flows
    
    # Save API calls only
    with open("api_calls.mitm", "wb") as f:
        writer = io.FilteredFlowWriter(f, api_filter)
        for flow in flows_to_process:
            writer.add(flow)
    
    # Save error responses only
    with open("errors.mitm", "wb") as f:
        writer = io.FilteredFlowWriter(f, error_filter)
        for flow in flows_to_process:
            writer.add(flow)

def save_flows_by_domain():
    """Save flows grouped by domain."""
    flows_by_domain = {}
    all_flows = []  # Assume this contains flows
    
    # Group flows by domain
    for flow in all_flows:
        if isinstance(flow, http.HTTPFlow):
            domain = flow.request.host
            if domain not in flows_by_domain:
                flows_by_domain[domain] = []
            flows_by_domain[domain].append(flow)
    
    # Save each domain's flows to separate files
    for domain, flows in flows_by_domain.items():
        filename = f"{domain.replace('.', '_')}_flows.mitm"
        
        with open(filename, "wb") as f:
            writer = io.FlowWriter(f)
            for flow in flows:
                writer.add(flow)
        
        print(f"Saved {len(flows)} flows for {domain} to {filename}")

Flow Analysis and Statistics

from mitmproxy import io, http
from collections import defaultdict, Counter
import json

def analyze_flow_file(filename):
    """Comprehensive analysis of a flow file."""
    stats = {
        'total_flows': 0,
        'http_flows': 0,
        'tcp_flows': 0,
        'udp_flows': 0,
        'methods': Counter(),
        'status_codes': Counter(),
        'domains': Counter(),
        'content_types': Counter(),
        'total_request_bytes': 0,
        'total_response_bytes': 0,
        'error_count': 0
    }
    
    try:
        with open(filename, "rb") as f:
            reader = io.FlowReader(f)
            
            for flow in reader.stream():
                stats['total_flows'] += 1
                
                if isinstance(flow, http.HTTPFlow):
                    stats['http_flows'] += 1
                    
                    # Method statistics
                    stats['methods'][flow.request.method] += 1
                    
                    # Domain statistics
                    stats['domains'][flow.request.host] += 1
                    
                    # Request size
                    if flow.request.content:
                        stats['total_request_bytes'] += len(flow.request.content)
                    
                    # Response analysis
                    if flow.response:
                        stats['status_codes'][flow.response.status_code] += 1
                        
                        if flow.response.content:
                            stats['total_response_bytes'] += len(flow.response.content)
                        
                        # Content type analysis
                        content_type = flow.response.headers.get('content-type', 'unknown')
                        content_type = content_type.split(';')[0]  # Remove charset info
                        stats['content_types'][content_type] += 1
                    
                    # Error tracking
                    if flow.error:
                        stats['error_count'] += 1
                
                elif hasattr(flow, 'messages'):  # TCP/UDP flows
                    if 'TCPFlow' in str(type(flow)):
                        stats['tcp_flows'] += 1
                    elif 'UDPFlow' in str(type(flow)):
                        stats['udp_flows'] += 1
    
    except Exception as e:
        print(f"Error reading flow file: {e}")
        return None
    
    return stats

def print_flow_statistics(stats):
    """Print formatted flow statistics."""
    if not stats:
        return
    
    print(f"Flow File Analysis:")
    print(f"==================")
    print(f"Total Flows: {stats['total_flows']}")
    print(f"  HTTP: {stats['http_flows']}")
    print(f"  TCP: {stats['tcp_flows']}")
    print(f"  UDP: {stats['udp_flows']}")
    print(f"  Errors: {stats['error_count']}")
    print()
    
    print(f"Data Transfer:")
    print(f"  Request bytes: {stats['total_request_bytes']:,}")
    print(f"  Response bytes: {stats['total_response_bytes']:,}")
    print(f"  Total bytes: {stats['total_request_bytes'] + stats['total_response_bytes']:,}")
    print()
    
    if stats['methods']:
        print("HTTP Methods:")
        for method, count in stats['methods'].most_common():
            print(f"  {method}: {count}")
        print()
    
    if stats['status_codes']:
        print("Status Codes:")
        for code, count in stats['status_codes'].most_common():
            print(f"  {code}: {count}")
        print()
    
    if stats['domains']:
        print("Top Domains:")
        for domain, count in stats['domains'].most_common(10):
            print(f"  {domain}: {count}")
        print()
    
    if stats['content_types']:
        print("Content Types:")
        for content_type, count in stats['content_types'].most_common():
            print(f"  {content_type}: {count}")

# Usage
stats = analyze_flow_file("captured_flows.mitm")
print_flow_statistics(stats)

Flow Format Conversion

from mitmproxy import io, http
import json
import csv

def convert_flows_to_json(input_file, output_file):
    """Convert flows to JSON format for external analysis."""
    flows_data = []
    
    with open(input_file, "rb") as f:
        reader = io.FlowReader(f)
        
        for flow in reader.stream():
            if isinstance(flow, http.HTTPFlow):
                flow_data = {
                    'timestamp': flow.request.timestamp_start,
                    'method': flow.request.method,
                    'url': flow.request.url,
                    'host': flow.request.host,
                    'path': flow.request.path,
                    'request_headers': dict(flow.request.headers),
                    'request_size': len(flow.request.content) if flow.request.content else 0,
                }
                
                if flow.response:
                    flow_data.update({
                        'status_code': flow.response.status_code,
                        'response_headers': dict(flow.response.headers),
                        'response_size': len(flow.response.content) if flow.response.content else 0,
                        'response_time': flow.response.timestamp_end - flow.request.timestamp_start if flow.response.timestamp_end else None
                    })
                
                if flow.error:
                    flow_data['error'] = flow.error.msg
                
                flows_data.append(flow_data)
    
    with open(output_file, 'w') as f:
        json.dump(flows_data, f, indent=2)
    
    print(f"Converted {len(flows_data)} flows to {output_file}")

def convert_flows_to_csv(input_file, output_file):
    """Convert flows to CSV format for spreadsheet analysis."""
    with open(input_file, "rb") as f:
        reader = io.FlowReader(f)
        
        with open(output_file, 'w', newline='') as csvfile:
            fieldnames = ['timestamp', 'method', 'url', 'host', 'status_code', 
                         'request_size', 'response_size', 'response_time', 'error']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            
            for flow in reader.stream():
                if isinstance(flow, http.HTTPFlow):
                    row = {
                        'timestamp': flow.request.timestamp_start,
                        'method': flow.request.method,
                        'url': flow.request.url,
                        'host': flow.request.host,
                        'request_size': len(flow.request.content) if flow.request.content else 0,
                        'status_code': flow.response.status_code if flow.response else None,
                        'response_size': len(flow.response.content) if flow.response and flow.response.content else 0,
                        'response_time': (flow.response.timestamp_end - flow.request.timestamp_start) if flow.response and flow.response.timestamp_end else None,
                        'error': flow.error.msg if flow.error else None
                    }
                    writer.writerow(row)
    
    print(f"Converted flows to {output_file}")

Advanced Flow Processing

from mitmproxy import io, http
import gzip
import time

def merge_flow_files(input_files, output_file):
    """Merge multiple flow files into one, sorted by timestamp."""
    all_flows = []
    
    # Read all flows from input files
    for filename in input_files:
        print(f"Reading {filename}...")
        flows = list(io.read_flows_from_paths([filename]))
        all_flows.extend(flows)
    
    # Sort flows by timestamp
    def get_timestamp(flow):
        if isinstance(flow, http.HTTPFlow):
            return flow.request.timestamp_start
        return 0
    
    all_flows.sort(key=get_timestamp)
    
    # Write merged flows
    with open(output_file, "wb") as f:
        writer = io.FlowWriter(f)
        for flow in all_flows:
            writer.add(flow)
    
    print(f"Merged {len(all_flows)} flows into {output_file}")

def compress_flow_file(input_file, output_file):
    """Compress a flow file using gzip."""
    with open(input_file, "rb") as f_in:
        with gzip.open(output_file, "wb") as f_out:
            f_out.write(f_in.read())
    
    print(f"Compressed {input_file} to {output_file}")

def decompress_and_process_flows(compressed_file):
    """Process flows from a gzip-compressed file."""
    with gzip.open(compressed_file, "rb") as f:
        reader = io.FlowReader(f)
        
        for flow in reader.stream():
            if isinstance(flow, http.HTTPFlow):
                print(f"Processing: {flow.request.method} {flow.request.url}")
                # Process the flow here

Install with Tessl CLI

npx tessl i tessl/pypi-mitmproxy

docs

addons.md

commands.md

configuration.md

connections.md

content.md

flow-io.md

http-flows.md

index.md

protocols.md

tile.json