An interactive, SSL/TLS-capable intercepting proxy for HTTP/1, HTTP/2, and WebSockets.
—
Reading and writing flows to files for replay, analysis, and testing. Supports filtering, format conversion, and batch processing for various use cases including traffic replay, debugging, and automated testing.
Reads flows from binary files or streams in mitmproxy's native format.
class FlowReader:
"""
Reads flows from a binary file or stream.
Supports reading flows saved by FlowWriter or mitmdump.
Also supports reading HAR (HTTP Archive) format files.
Handles version compatibility and format validation.
"""
def __init__(self, fo: BinaryIO) -> None:
"""
Initialize flow reader.
Parameters:
- fo: Binary file object or stream to read from
"""
def stream(self) -> Iterator[Flow]:
"""
Stream flows from the file.
Yields:
- Flow objects (HTTPFlow, TCPFlow, UDPFlow, etc.)
Raises:
- FlowReadException: If file format is invalid or corrupted
- IOError: If file cannot be read
"""
def close(self) -> None:
"""Close the underlying file object."""Writes flows to binary files or streams in mitmproxy's native format.
class FlowWriter:
"""
Writes flows to a binary file or stream.
Creates files compatible with FlowReader and mitmproxy tools.
Handles serialization and format versioning.
"""
def __init__(self, fo: BinaryIO) -> None:
"""
Initialize flow writer.
Parameters:
- fo: Binary file object or stream to write to
"""
def add(self, flow: Flow) -> None:
"""
Add a flow to the output file.
Parameters:
- flow: Flow object to write (HTTPFlow, TCPFlow, etc.)
Raises:
- IOError: If flow cannot be written
- TypeError: If flow type is not supported
"""
def close(self) -> None:
"""Close the underlying file object."""Writes flows with filtering capabilities based on flow properties.
class FilteredFlowWriter:
"""
Writes flows with filtering based on flow properties.
Allows selective writing of flows based on custom filter functions.
"""
def __init__(self, fo: BinaryIO, filter_func: Callable[[Flow], bool]) -> None:
"""
Initialize filtered flow writer.
Parameters:
- fo: Binary file object or stream to write to
- filter_func: Function that returns True for flows to include
"""
def add(self, flow: Flow) -> None:
"""
Add a flow if it passes the filter.
Parameters:
- flow: Flow object to potentially write
"""
def close(self) -> None:
"""Close the underlying file object."""Utility functions for reading flows from multiple sources.
def read_flows_from_paths(paths: Sequence[str]) -> Iterator[Flow]:
"""
Read flows from multiple file paths.
Parameters:
- paths: List of file paths to read flows from
Yields:
- Flow objects from all specified files
Raises:
- FlowReadException: If any file format is invalid
- FileNotFoundError: If any file doesn't exist
- IOError: If files cannot be read
"""
def flow_export_formats() -> Dict[str, str]:
"""
Get available flow export formats.
Returns:
- Dictionary of format_name -> description
"""from mitmproxy import io, http
import gzip
def save_flows_to_file():
"""Save current flows to a file."""
flows_to_save = [] # Assume this contains flows from somewhere
# Write flows to file
with open("captured_flows.mitm", "wb") as f:
writer = io.FlowWriter(f)
for flow in flows_to_save:
writer.add(flow)
print(f"Saved {len(flows_to_save)} flows to captured_flows.mitm")
def load_and_analyze_flows():
"""Load flows from file and analyze them."""
with open("captured_flows.mitm", "rb") as f:
reader = io.FlowReader(f)
http_count = 0
total_bytes = 0
for flow in reader.stream():
if isinstance(flow, http.HTTPFlow):
http_count += 1
# Calculate total bytes
if flow.request.content:
total_bytes += len(flow.request.content)
if flow.response and flow.response.content:
total_bytes += len(flow.response.content)
# Print flow summary
print(f"{flow.request.method} {flow.request.url}")
if flow.response:
print(f" -> {flow.response.status_code}")
print(f"Loaded {http_count} HTTP flows, {total_bytes} total bytes")
def load_from_multiple_files():
"""Load flows from multiple files."""
file_paths = ["session1.mitm", "session2.mitm", "session3.mitm"]
all_flows = list(io.read_flows_from_paths(file_paths))
print(f"Loaded {len(all_flows)} flows from {len(file_paths)} files")
# Process all flows
for flow in all_flows:
if isinstance(flow, http.HTTPFlow):
print(f"Flow: {flow.request.method} {flow.request.url}")from mitmproxy import io, http
def save_filtered_flows():
"""Save only specific flows based on criteria."""
def api_filter(flow):
"""Filter function to save only API calls."""
if isinstance(flow, http.HTTPFlow):
return "/api/" in flow.request.path
return False
def error_filter(flow):
"""Filter function to save only failed requests."""
if isinstance(flow, http.HTTPFlow):
return flow.response and flow.response.status_code >= 400
return False
flows_to_process = [] # Assume this contains flows
# Save API calls only
with open("api_calls.mitm", "wb") as f:
writer = io.FilteredFlowWriter(f, api_filter)
for flow in flows_to_process:
writer.add(flow)
# Save error responses only
with open("errors.mitm", "wb") as f:
writer = io.FilteredFlowWriter(f, error_filter)
for flow in flows_to_process:
writer.add(flow)
def save_flows_by_domain():
"""Save flows grouped by domain."""
flows_by_domain = {}
all_flows = [] # Assume this contains flows
# Group flows by domain
for flow in all_flows:
if isinstance(flow, http.HTTPFlow):
domain = flow.request.host
if domain not in flows_by_domain:
flows_by_domain[domain] = []
flows_by_domain[domain].append(flow)
# Save each domain's flows to separate files
for domain, flows in flows_by_domain.items():
filename = f"{domain.replace('.', '_')}_flows.mitm"
with open(filename, "wb") as f:
writer = io.FlowWriter(f)
for flow in flows:
writer.add(flow)
print(f"Saved {len(flows)} flows for {domain} to {filename}")from mitmproxy import io, http
from collections import defaultdict, Counter
import json
def analyze_flow_file(filename):
"""Comprehensive analysis of a flow file."""
stats = {
'total_flows': 0,
'http_flows': 0,
'tcp_flows': 0,
'udp_flows': 0,
'methods': Counter(),
'status_codes': Counter(),
'domains': Counter(),
'content_types': Counter(),
'total_request_bytes': 0,
'total_response_bytes': 0,
'error_count': 0
}
try:
with open(filename, "rb") as f:
reader = io.FlowReader(f)
for flow in reader.stream():
stats['total_flows'] += 1
if isinstance(flow, http.HTTPFlow):
stats['http_flows'] += 1
# Method statistics
stats['methods'][flow.request.method] += 1
# Domain statistics
stats['domains'][flow.request.host] += 1
# Request size
if flow.request.content:
stats['total_request_bytes'] += len(flow.request.content)
# Response analysis
if flow.response:
stats['status_codes'][flow.response.status_code] += 1
if flow.response.content:
stats['total_response_bytes'] += len(flow.response.content)
# Content type analysis
content_type = flow.response.headers.get('content-type', 'unknown')
content_type = content_type.split(';')[0] # Remove charset info
stats['content_types'][content_type] += 1
# Error tracking
if flow.error:
stats['error_count'] += 1
elif hasattr(flow, 'messages'): # TCP/UDP flows
if 'TCPFlow' in str(type(flow)):
stats['tcp_flows'] += 1
elif 'UDPFlow' in str(type(flow)):
stats['udp_flows'] += 1
except Exception as e:
print(f"Error reading flow file: {e}")
return None
return stats
def print_flow_statistics(stats):
"""Print formatted flow statistics."""
if not stats:
return
print(f"Flow File Analysis:")
print(f"==================")
print(f"Total Flows: {stats['total_flows']}")
print(f" HTTP: {stats['http_flows']}")
print(f" TCP: {stats['tcp_flows']}")
print(f" UDP: {stats['udp_flows']}")
print(f" Errors: {stats['error_count']}")
print()
print(f"Data Transfer:")
print(f" Request bytes: {stats['total_request_bytes']:,}")
print(f" Response bytes: {stats['total_response_bytes']:,}")
print(f" Total bytes: {stats['total_request_bytes'] + stats['total_response_bytes']:,}")
print()
if stats['methods']:
print("HTTP Methods:")
for method, count in stats['methods'].most_common():
print(f" {method}: {count}")
print()
if stats['status_codes']:
print("Status Codes:")
for code, count in stats['status_codes'].most_common():
print(f" {code}: {count}")
print()
if stats['domains']:
print("Top Domains:")
for domain, count in stats['domains'].most_common(10):
print(f" {domain}: {count}")
print()
if stats['content_types']:
print("Content Types:")
for content_type, count in stats['content_types'].most_common():
print(f" {content_type}: {count}")
# Usage
stats = analyze_flow_file("captured_flows.mitm")
print_flow_statistics(stats)from mitmproxy import io, http
import json
import csv
def convert_flows_to_json(input_file, output_file):
"""Convert flows to JSON format for external analysis."""
flows_data = []
with open(input_file, "rb") as f:
reader = io.FlowReader(f)
for flow in reader.stream():
if isinstance(flow, http.HTTPFlow):
flow_data = {
'timestamp': flow.request.timestamp_start,
'method': flow.request.method,
'url': flow.request.url,
'host': flow.request.host,
'path': flow.request.path,
'request_headers': dict(flow.request.headers),
'request_size': len(flow.request.content) if flow.request.content else 0,
}
if flow.response:
flow_data.update({
'status_code': flow.response.status_code,
'response_headers': dict(flow.response.headers),
'response_size': len(flow.response.content) if flow.response.content else 0,
'response_time': flow.response.timestamp_end - flow.request.timestamp_start if flow.response.timestamp_end else None
})
if flow.error:
flow_data['error'] = flow.error.msg
flows_data.append(flow_data)
with open(output_file, 'w') as f:
json.dump(flows_data, f, indent=2)
print(f"Converted {len(flows_data)} flows to {output_file}")
def convert_flows_to_csv(input_file, output_file):
"""Convert flows to CSV format for spreadsheet analysis."""
with open(input_file, "rb") as f:
reader = io.FlowReader(f)
with open(output_file, 'w', newline='') as csvfile:
fieldnames = ['timestamp', 'method', 'url', 'host', 'status_code',
'request_size', 'response_size', 'response_time', 'error']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for flow in reader.stream():
if isinstance(flow, http.HTTPFlow):
row = {
'timestamp': flow.request.timestamp_start,
'method': flow.request.method,
'url': flow.request.url,
'host': flow.request.host,
'request_size': len(flow.request.content) if flow.request.content else 0,
'status_code': flow.response.status_code if flow.response else None,
'response_size': len(flow.response.content) if flow.response and flow.response.content else 0,
'response_time': (flow.response.timestamp_end - flow.request.timestamp_start) if flow.response and flow.response.timestamp_end else None,
'error': flow.error.msg if flow.error else None
}
writer.writerow(row)
print(f"Converted flows to {output_file}")from mitmproxy import io, http
import gzip
import time
def merge_flow_files(input_files, output_file):
"""Merge multiple flow files into one, sorted by timestamp."""
all_flows = []
# Read all flows from input files
for filename in input_files:
print(f"Reading {filename}...")
flows = list(io.read_flows_from_paths([filename]))
all_flows.extend(flows)
# Sort flows by timestamp
def get_timestamp(flow):
if isinstance(flow, http.HTTPFlow):
return flow.request.timestamp_start
return 0
all_flows.sort(key=get_timestamp)
# Write merged flows
with open(output_file, "wb") as f:
writer = io.FlowWriter(f)
for flow in all_flows:
writer.add(flow)
print(f"Merged {len(all_flows)} flows into {output_file}")
def compress_flow_file(input_file, output_file):
"""Compress a flow file using gzip."""
with open(input_file, "rb") as f_in:
with gzip.open(output_file, "wb") as f_out:
f_out.write(f_in.read())
print(f"Compressed {input_file} to {output_file}")
def decompress_and_process_flows(compressed_file):
"""Process flows from a gzip-compressed file."""
with gzip.open(compressed_file, "rb") as f:
reader = io.FlowReader(f)
for flow in reader.stream():
if isinstance(flow, http.HTTPFlow):
print(f"Processing: {flow.request.method} {flow.request.url}")
# Process the flow hereInstall with Tessl CLI
npx tessl i tessl/pypi-mitmproxy