0
# Flow I/O and Persistence
1
2
Reading and writing flows to files for replay, analysis, and testing. Supports filtering, format conversion, and batch processing for various use cases including traffic replay, debugging, and automated testing.
3
4
## Capabilities
5
6
### FlowReader Class
7
8
Reads flows from binary files or streams in mitmproxy's native format.
9
10
```python { .api }
11
class FlowReader:
12
"""
13
Reads flows from a binary file or stream.
14
15
Supports reading flows saved by FlowWriter or mitmdump.
16
Also supports reading HAR (HTTP Archive) format files.
17
Handles version compatibility and format validation.
18
"""
19
def __init__(self, fo: BinaryIO) -> None:
20
"""
21
Initialize flow reader.
22
23
Parameters:
24
- fo: Binary file object or stream to read from
25
"""
26
27
def stream(self) -> Iterator[Flow]:
28
"""
29
Stream flows from the file.
30
31
Yields:
32
- Flow objects (HTTPFlow, TCPFlow, UDPFlow, etc.)
33
34
Raises:
35
- FlowReadException: If file format is invalid or corrupted
36
- IOError: If file cannot be read
37
"""
38
39
def close(self) -> None:
40
"""Close the underlying file object."""
41
```
42
43
### FlowWriter Class
44
45
Writes flows to binary files or streams in mitmproxy's native format.
46
47
```python { .api }
48
class FlowWriter:
49
"""
50
Writes flows to a binary file or stream.
51
52
Creates files compatible with FlowReader and mitmproxy tools.
53
Handles serialization and format versioning.
54
"""
55
def __init__(self, fo: BinaryIO) -> None:
56
"""
57
Initialize flow writer.
58
59
Parameters:
60
- fo: Binary file object or stream to write to
61
"""
62
63
def add(self, flow: Flow) -> None:
64
"""
65
Add a flow to the output file.
66
67
Parameters:
68
- flow: Flow object to write (HTTPFlow, TCPFlow, etc.)
69
70
Raises:
71
- IOError: If flow cannot be written
72
- TypeError: If flow type is not supported
73
"""
74
75
def close(self) -> None:
76
"""Close the underlying file object."""
77
```
78
79
### FilteredFlowWriter Class
80
81
Writes flows with filtering capabilities based on flow properties.
82
83
```python { .api }
84
class FilteredFlowWriter:
85
"""
86
Writes flows with filtering based on flow properties.
87
88
Allows selective writing of flows based on custom filter functions.
89
"""
90
def __init__(self, fo: BinaryIO, filter_func: Callable[[Flow], bool]) -> None:
91
"""
92
Initialize filtered flow writer.
93
94
Parameters:
95
- fo: Binary file object or stream to write to
96
- filter_func: Function that returns True for flows to include
97
"""
98
99
def add(self, flow: Flow) -> None:
100
"""
101
Add a flow if it passes the filter.
102
103
Parameters:
104
- flow: Flow object to potentially write
105
"""
106
107
def close(self) -> None:
108
"""Close the underlying file object."""
109
```
110
111
### Flow Reading Utilities
112
113
Utility functions for reading flows from multiple sources.
114
115
```python { .api }
116
def read_flows_from_paths(paths: Sequence[str]) -> Iterator[Flow]:
117
"""
118
Read flows from multiple file paths.
119
120
Parameters:
121
- paths: List of file paths to read flows from
122
123
Yields:
124
- Flow objects from all specified files
125
126
Raises:
127
- FlowReadException: If any file format is invalid
128
- FileNotFoundError: If any file doesn't exist
129
- IOError: If files cannot be read
130
"""
131
132
def flow_export_formats() -> Dict[str, str]:
133
"""
134
Get available flow export formats.
135
136
Returns:
137
- Dictionary of format_name -> description
138
"""
139
```
140
141
## Usage Examples
142
143
### Basic Flow Reading and Writing
144
145
```python
146
from mitmproxy import io, http
147
import gzip
148
149
def save_flows_to_file():
150
"""Save current flows to a file."""
151
flows_to_save = [] # Assume this contains flows from somewhere
152
153
# Write flows to file
154
with open("captured_flows.mitm", "wb") as f:
155
writer = io.FlowWriter(f)
156
for flow in flows_to_save:
157
writer.add(flow)
158
159
print(f"Saved {len(flows_to_save)} flows to captured_flows.mitm")
160
161
def load_and_analyze_flows():
162
"""Load flows from file and analyze them."""
163
with open("captured_flows.mitm", "rb") as f:
164
reader = io.FlowReader(f)
165
166
http_count = 0
167
total_bytes = 0
168
169
for flow in reader.stream():
170
if isinstance(flow, http.HTTPFlow):
171
http_count += 1
172
173
# Calculate total bytes
174
if flow.request.content:
175
total_bytes += len(flow.request.content)
176
if flow.response and flow.response.content:
177
total_bytes += len(flow.response.content)
178
179
# Print flow summary
180
print(f"{flow.request.method} {flow.request.url}")
181
if flow.response:
182
print(f" -> {flow.response.status_code}")
183
184
print(f"Loaded {http_count} HTTP flows, {total_bytes} total bytes")
185
186
def load_from_multiple_files():
187
"""Load flows from multiple files."""
188
file_paths = ["session1.mitm", "session2.mitm", "session3.mitm"]
189
190
all_flows = list(io.read_flows_from_paths(file_paths))
191
print(f"Loaded {len(all_flows)} flows from {len(file_paths)} files")
192
193
# Process all flows
194
for flow in all_flows:
195
if isinstance(flow, http.HTTPFlow):
196
print(f"Flow: {flow.request.method} {flow.request.url}")
197
```
198
199
### Filtered Flow Writing
200
201
```python
202
from mitmproxy import io, http
203
204
def save_filtered_flows():
205
"""Save only specific flows based on criteria."""
206
207
def api_filter(flow):
208
"""Filter function to save only API calls."""
209
if isinstance(flow, http.HTTPFlow):
210
return "/api/" in flow.request.path
211
return False
212
213
def error_filter(flow):
214
"""Filter function to save only failed requests."""
215
if isinstance(flow, http.HTTPFlow):
216
return flow.response and flow.response.status_code >= 400
217
return False
218
219
flows_to_process = [] # Assume this contains flows
220
221
# Save API calls only
222
with open("api_calls.mitm", "wb") as f:
223
writer = io.FilteredFlowWriter(f, api_filter)
224
for flow in flows_to_process:
225
writer.add(flow)
226
227
# Save error responses only
228
with open("errors.mitm", "wb") as f:
229
writer = io.FilteredFlowWriter(f, error_filter)
230
for flow in flows_to_process:
231
writer.add(flow)
232
233
def save_flows_by_domain():
234
"""Save flows grouped by domain."""
235
flows_by_domain = {}
236
all_flows = [] # Assume this contains flows
237
238
# Group flows by domain
239
for flow in all_flows:
240
if isinstance(flow, http.HTTPFlow):
241
domain = flow.request.host
242
if domain not in flows_by_domain:
243
flows_by_domain[domain] = []
244
flows_by_domain[domain].append(flow)
245
246
# Save each domain's flows to separate files
247
for domain, flows in flows_by_domain.items():
248
filename = f"{domain.replace('.', '_')}_flows.mitm"
249
250
with open(filename, "wb") as f:
251
writer = io.FlowWriter(f)
252
for flow in flows:
253
writer.add(flow)
254
255
print(f"Saved {len(flows)} flows for {domain} to {filename}")
256
```
257
258
### Flow Analysis and Statistics
259
260
```python
261
from mitmproxy import io, http
262
from collections import defaultdict, Counter
263
import json
264
265
def analyze_flow_file(filename):
266
"""Comprehensive analysis of a flow file."""
267
stats = {
268
'total_flows': 0,
269
'http_flows': 0,
270
'tcp_flows': 0,
271
'udp_flows': 0,
272
'methods': Counter(),
273
'status_codes': Counter(),
274
'domains': Counter(),
275
'content_types': Counter(),
276
'total_request_bytes': 0,
277
'total_response_bytes': 0,
278
'error_count': 0
279
}
280
281
try:
282
with open(filename, "rb") as f:
283
reader = io.FlowReader(f)
284
285
for flow in reader.stream():
286
stats['total_flows'] += 1
287
288
if isinstance(flow, http.HTTPFlow):
289
stats['http_flows'] += 1
290
291
# Method statistics
292
stats['methods'][flow.request.method] += 1
293
294
# Domain statistics
295
stats['domains'][flow.request.host] += 1
296
297
# Request size
298
if flow.request.content:
299
stats['total_request_bytes'] += len(flow.request.content)
300
301
# Response analysis
302
if flow.response:
303
stats['status_codes'][flow.response.status_code] += 1
304
305
if flow.response.content:
306
stats['total_response_bytes'] += len(flow.response.content)
307
308
# Content type analysis
309
content_type = flow.response.headers.get('content-type', 'unknown')
310
content_type = content_type.split(';')[0] # Remove charset info
311
stats['content_types'][content_type] += 1
312
313
# Error tracking
314
if flow.error:
315
stats['error_count'] += 1
316
317
elif hasattr(flow, 'messages'): # TCP/UDP flows
318
if 'TCPFlow' in str(type(flow)):
319
stats['tcp_flows'] += 1
320
elif 'UDPFlow' in str(type(flow)):
321
stats['udp_flows'] += 1
322
323
except Exception as e:
324
print(f"Error reading flow file: {e}")
325
return None
326
327
return stats
328
329
def print_flow_statistics(stats):
330
"""Print formatted flow statistics."""
331
if not stats:
332
return
333
334
print(f"Flow File Analysis:")
335
print(f"==================")
336
print(f"Total Flows: {stats['total_flows']}")
337
print(f" HTTP: {stats['http_flows']}")
338
print(f" TCP: {stats['tcp_flows']}")
339
print(f" UDP: {stats['udp_flows']}")
340
print(f" Errors: {stats['error_count']}")
341
print()
342
343
print(f"Data Transfer:")
344
print(f" Request bytes: {stats['total_request_bytes']:,}")
345
print(f" Response bytes: {stats['total_response_bytes']:,}")
346
print(f" Total bytes: {stats['total_request_bytes'] + stats['total_response_bytes']:,}")
347
print()
348
349
if stats['methods']:
350
print("HTTP Methods:")
351
for method, count in stats['methods'].most_common():
352
print(f" {method}: {count}")
353
print()
354
355
if stats['status_codes']:
356
print("Status Codes:")
357
for code, count in stats['status_codes'].most_common():
358
print(f" {code}: {count}")
359
print()
360
361
if stats['domains']:
362
print("Top Domains:")
363
for domain, count in stats['domains'].most_common(10):
364
print(f" {domain}: {count}")
365
print()
366
367
if stats['content_types']:
368
print("Content Types:")
369
for content_type, count in stats['content_types'].most_common():
370
print(f" {content_type}: {count}")
371
372
# Usage
373
stats = analyze_flow_file("captured_flows.mitm")
374
print_flow_statistics(stats)
375
```
376
377
### Flow Format Conversion
378
379
```python
380
from mitmproxy import io, http
381
import json
382
import csv
383
384
def convert_flows_to_json(input_file, output_file):
385
"""Convert flows to JSON format for external analysis."""
386
flows_data = []
387
388
with open(input_file, "rb") as f:
389
reader = io.FlowReader(f)
390
391
for flow in reader.stream():
392
if isinstance(flow, http.HTTPFlow):
393
flow_data = {
394
'timestamp': flow.request.timestamp_start,
395
'method': flow.request.method,
396
'url': flow.request.url,
397
'host': flow.request.host,
398
'path': flow.request.path,
399
'request_headers': dict(flow.request.headers),
400
'request_size': len(flow.request.content) if flow.request.content else 0,
401
}
402
403
if flow.response:
404
flow_data.update({
405
'status_code': flow.response.status_code,
406
'response_headers': dict(flow.response.headers),
407
'response_size': len(flow.response.content) if flow.response.content else 0,
408
'response_time': flow.response.timestamp_end - flow.request.timestamp_start if flow.response.timestamp_end else None
409
})
410
411
if flow.error:
412
flow_data['error'] = flow.error.msg
413
414
flows_data.append(flow_data)
415
416
with open(output_file, 'w') as f:
417
json.dump(flows_data, f, indent=2)
418
419
print(f"Converted {len(flows_data)} flows to {output_file}")
420
421
def convert_flows_to_csv(input_file, output_file):
422
"""Convert flows to CSV format for spreadsheet analysis."""
423
with open(input_file, "rb") as f:
424
reader = io.FlowReader(f)
425
426
with open(output_file, 'w', newline='') as csvfile:
427
fieldnames = ['timestamp', 'method', 'url', 'host', 'status_code',
428
'request_size', 'response_size', 'response_time', 'error']
429
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
430
writer.writeheader()
431
432
for flow in reader.stream():
433
if isinstance(flow, http.HTTPFlow):
434
row = {
435
'timestamp': flow.request.timestamp_start,
436
'method': flow.request.method,
437
'url': flow.request.url,
438
'host': flow.request.host,
439
'request_size': len(flow.request.content) if flow.request.content else 0,
440
'status_code': flow.response.status_code if flow.response else None,
441
'response_size': len(flow.response.content) if flow.response and flow.response.content else 0,
442
'response_time': (flow.response.timestamp_end - flow.request.timestamp_start) if flow.response and flow.response.timestamp_end else None,
443
'error': flow.error.msg if flow.error else None
444
}
445
writer.writerow(row)
446
447
print(f"Converted flows to {output_file}")
448
```
449
450
### Advanced Flow Processing
451
452
```python
453
from mitmproxy import io, http
454
import gzip
455
import time
456
457
def merge_flow_files(input_files, output_file):
458
"""Merge multiple flow files into one, sorted by timestamp."""
459
all_flows = []
460
461
# Read all flows from input files
462
for filename in input_files:
463
print(f"Reading {filename}...")
464
flows = list(io.read_flows_from_paths([filename]))
465
all_flows.extend(flows)
466
467
# Sort flows by timestamp
468
def get_timestamp(flow):
469
if isinstance(flow, http.HTTPFlow):
470
return flow.request.timestamp_start
471
return 0
472
473
all_flows.sort(key=get_timestamp)
474
475
# Write merged flows
476
with open(output_file, "wb") as f:
477
writer = io.FlowWriter(f)
478
for flow in all_flows:
479
writer.add(flow)
480
481
print(f"Merged {len(all_flows)} flows into {output_file}")
482
483
def compress_flow_file(input_file, output_file):
484
"""Compress a flow file using gzip."""
485
with open(input_file, "rb") as f_in:
486
with gzip.open(output_file, "wb") as f_out:
487
f_out.write(f_in.read())
488
489
print(f"Compressed {input_file} to {output_file}")
490
491
def decompress_and_process_flows(compressed_file):
492
"""Process flows from a gzip-compressed file."""
493
with gzip.open(compressed_file, "rb") as f:
494
reader = io.FlowReader(f)
495
496
for flow in reader.stream():
497
if isinstance(flow, http.HTTPFlow):
498
print(f"Processing: {flow.request.method} {flow.request.url}")
499
# Process the flow here
500
```