or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

addons.mdcommands.mdconfiguration.mdconnections.mdcontent.mdflow-io.mdhttp-flows.mdindex.mdprotocols.md

flow-io.mddocs/

0

# Flow I/O and Persistence

1

2

Reading and writing flows to files for replay, analysis, and testing. Supports filtering, format conversion, and batch processing for various use cases including traffic replay, debugging, and automated testing.

3

4

## Capabilities

5

6

### FlowReader Class

7

8

Reads flows from binary files or streams in mitmproxy's native format.

9

10

```python { .api }

11

class FlowReader:

12

"""

13

Reads flows from a binary file or stream.

14

15

Supports reading flows saved by FlowWriter or mitmdump.

16

Also supports reading HAR (HTTP Archive) format files.

17

Handles version compatibility and format validation.

18

"""

19

def __init__(self, fo: BinaryIO) -> None:

20

"""

21

Initialize flow reader.

22

23

Parameters:

24

- fo: Binary file object or stream to read from

25

"""

26

27

def stream(self) -> Iterator[Flow]:

28

"""

29

Stream flows from the file.

30

31

Yields:

32

- Flow objects (HTTPFlow, TCPFlow, UDPFlow, etc.)

33

34

Raises:

35

- FlowReadException: If file format is invalid or corrupted

36

- IOError: If file cannot be read

37

"""

38

39

def close(self) -> None:

40

"""Close the underlying file object."""

41

```

42

43

### FlowWriter Class

44

45

Writes flows to binary files or streams in mitmproxy's native format.

46

47

```python { .api }

48

class FlowWriter:

49

"""

50

Writes flows to a binary file or stream.

51

52

Creates files compatible with FlowReader and mitmproxy tools.

53

Handles serialization and format versioning.

54

"""

55

def __init__(self, fo: BinaryIO) -> None:

56

"""

57

Initialize flow writer.

58

59

Parameters:

60

- fo: Binary file object or stream to write to

61

"""

62

63

def add(self, flow: Flow) -> None:

64

"""

65

Add a flow to the output file.

66

67

Parameters:

68

- flow: Flow object to write (HTTPFlow, TCPFlow, etc.)

69

70

Raises:

71

- IOError: If flow cannot be written

72

- TypeError: If flow type is not supported

73

"""

74

75

def close(self) -> None:

76

"""Close the underlying file object."""

77

```

78

79

### FilteredFlowWriter Class

80

81

Writes flows with filtering capabilities based on flow properties.

82

83

```python { .api }

84

class FilteredFlowWriter:

85

"""

86

Writes flows with filtering based on flow properties.

87

88

Allows selective writing of flows based on custom filter functions.

89

"""

90

def __init__(self, fo: BinaryIO, filter_func: Callable[[Flow], bool]) -> None:

91

"""

92

Initialize filtered flow writer.

93

94

Parameters:

95

- fo: Binary file object or stream to write to

96

- filter_func: Function that returns True for flows to include

97

"""

98

99

def add(self, flow: Flow) -> None:

100

"""

101

Add a flow if it passes the filter.

102

103

Parameters:

104

- flow: Flow object to potentially write

105

"""

106

107

def close(self) -> None:

108

"""Close the underlying file object."""

109

```

110

111

### Flow Reading Utilities

112

113

Utility functions for reading flows from multiple sources.

114

115

```python { .api }

116

def read_flows_from_paths(paths: Sequence[str]) -> Iterator[Flow]:

117

"""

118

Read flows from multiple file paths.

119

120

Parameters:

121

- paths: List of file paths to read flows from

122

123

Yields:

124

- Flow objects from all specified files

125

126

Raises:

127

- FlowReadException: If any file format is invalid

128

- FileNotFoundError: If any file doesn't exist

129

- IOError: If files cannot be read

130

"""

131

132

def flow_export_formats() -> Dict[str, str]:

133

"""

134

Get available flow export formats.

135

136

Returns:

137

- Dictionary of format_name -> description

138

"""

139

```

140

141

## Usage Examples

142

143

### Basic Flow Reading and Writing

144

145

```python

146

from mitmproxy import io, http

147

import gzip

148

149

def save_flows_to_file():

150

"""Save current flows to a file."""

151

flows_to_save = [] # Assume this contains flows from somewhere

152

153

# Write flows to file

154

with open("captured_flows.mitm", "wb") as f:

155

writer = io.FlowWriter(f)

156

for flow in flows_to_save:

157

writer.add(flow)

158

159

print(f"Saved {len(flows_to_save)} flows to captured_flows.mitm")

160

161

def load_and_analyze_flows():

162

"""Load flows from file and analyze them."""

163

with open("captured_flows.mitm", "rb") as f:

164

reader = io.FlowReader(f)

165

166

http_count = 0

167

total_bytes = 0

168

169

for flow in reader.stream():

170

if isinstance(flow, http.HTTPFlow):

171

http_count += 1

172

173

# Calculate total bytes

174

if flow.request.content:

175

total_bytes += len(flow.request.content)

176

if flow.response and flow.response.content:

177

total_bytes += len(flow.response.content)

178

179

# Print flow summary

180

print(f"{flow.request.method} {flow.request.url}")

181

if flow.response:

182

print(f" -> {flow.response.status_code}")

183

184

print(f"Loaded {http_count} HTTP flows, {total_bytes} total bytes")

185

186

def load_from_multiple_files():

187

"""Load flows from multiple files."""

188

file_paths = ["session1.mitm", "session2.mitm", "session3.mitm"]

189

190

all_flows = list(io.read_flows_from_paths(file_paths))

191

print(f"Loaded {len(all_flows)} flows from {len(file_paths)} files")

192

193

# Process all flows

194

for flow in all_flows:

195

if isinstance(flow, http.HTTPFlow):

196

print(f"Flow: {flow.request.method} {flow.request.url}")

197

```

198

199

### Filtered Flow Writing

200

201

```python

202

from mitmproxy import io, http

203

204

def save_filtered_flows():

205

"""Save only specific flows based on criteria."""

206

207

def api_filter(flow):

208

"""Filter function to save only API calls."""

209

if isinstance(flow, http.HTTPFlow):

210

return "/api/" in flow.request.path

211

return False

212

213

def error_filter(flow):

214

"""Filter function to save only failed requests."""

215

if isinstance(flow, http.HTTPFlow):

216

return flow.response and flow.response.status_code >= 400

217

return False

218

219

flows_to_process = [] # Assume this contains flows

220

221

# Save API calls only

222

with open("api_calls.mitm", "wb") as f:

223

writer = io.FilteredFlowWriter(f, api_filter)

224

for flow in flows_to_process:

225

writer.add(flow)

226

227

# Save error responses only

228

with open("errors.mitm", "wb") as f:

229

writer = io.FilteredFlowWriter(f, error_filter)

230

for flow in flows_to_process:

231

writer.add(flow)

232

233

def save_flows_by_domain():

234

"""Save flows grouped by domain."""

235

flows_by_domain = {}

236

all_flows = [] # Assume this contains flows

237

238

# Group flows by domain

239

for flow in all_flows:

240

if isinstance(flow, http.HTTPFlow):

241

domain = flow.request.host

242

if domain not in flows_by_domain:

243

flows_by_domain[domain] = []

244

flows_by_domain[domain].append(flow)

245

246

# Save each domain's flows to separate files

247

for domain, flows in flows_by_domain.items():

248

filename = f"{domain.replace('.', '_')}_flows.mitm"

249

250

with open(filename, "wb") as f:

251

writer = io.FlowWriter(f)

252

for flow in flows:

253

writer.add(flow)

254

255

print(f"Saved {len(flows)} flows for {domain} to {filename}")

256

```

257

258

### Flow Analysis and Statistics

259

260

```python

261

from mitmproxy import io, http

262

from collections import defaultdict, Counter

263

import json

264

265

def analyze_flow_file(filename):

266

"""Comprehensive analysis of a flow file."""

267

stats = {

268

'total_flows': 0,

269

'http_flows': 0,

270

'tcp_flows': 0,

271

'udp_flows': 0,

272

'methods': Counter(),

273

'status_codes': Counter(),

274

'domains': Counter(),

275

'content_types': Counter(),

276

'total_request_bytes': 0,

277

'total_response_bytes': 0,

278

'error_count': 0

279

}

280

281

try:

282

with open(filename, "rb") as f:

283

reader = io.FlowReader(f)

284

285

for flow in reader.stream():

286

stats['total_flows'] += 1

287

288

if isinstance(flow, http.HTTPFlow):

289

stats['http_flows'] += 1

290

291

# Method statistics

292

stats['methods'][flow.request.method] += 1

293

294

# Domain statistics

295

stats['domains'][flow.request.host] += 1

296

297

# Request size

298

if flow.request.content:

299

stats['total_request_bytes'] += len(flow.request.content)

300

301

# Response analysis

302

if flow.response:

303

stats['status_codes'][flow.response.status_code] += 1

304

305

if flow.response.content:

306

stats['total_response_bytes'] += len(flow.response.content)

307

308

# Content type analysis

309

content_type = flow.response.headers.get('content-type', 'unknown')

310

content_type = content_type.split(';')[0] # Remove charset info

311

stats['content_types'][content_type] += 1

312

313

# Error tracking

314

if flow.error:

315

stats['error_count'] += 1

316

317

elif hasattr(flow, 'messages'): # TCP/UDP flows

318

if 'TCPFlow' in str(type(flow)):

319

stats['tcp_flows'] += 1

320

elif 'UDPFlow' in str(type(flow)):

321

stats['udp_flows'] += 1

322

323

except Exception as e:

324

print(f"Error reading flow file: {e}")

325

return None

326

327

return stats

328

329

def print_flow_statistics(stats):

330

"""Print formatted flow statistics."""

331

if not stats:

332

return

333

334

print(f"Flow File Analysis:")

335

print(f"==================")

336

print(f"Total Flows: {stats['total_flows']}")

337

print(f" HTTP: {stats['http_flows']}")

338

print(f" TCP: {stats['tcp_flows']}")

339

print(f" UDP: {stats['udp_flows']}")

340

print(f" Errors: {stats['error_count']}")

341

print()

342

343

print(f"Data Transfer:")

344

print(f" Request bytes: {stats['total_request_bytes']:,}")

345

print(f" Response bytes: {stats['total_response_bytes']:,}")

346

print(f" Total bytes: {stats['total_request_bytes'] + stats['total_response_bytes']:,}")

347

print()

348

349

if stats['methods']:

350

print("HTTP Methods:")

351

for method, count in stats['methods'].most_common():

352

print(f" {method}: {count}")

353

print()

354

355

if stats['status_codes']:

356

print("Status Codes:")

357

for code, count in stats['status_codes'].most_common():

358

print(f" {code}: {count}")

359

print()

360

361

if stats['domains']:

362

print("Top Domains:")

363

for domain, count in stats['domains'].most_common(10):

364

print(f" {domain}: {count}")

365

print()

366

367

if stats['content_types']:

368

print("Content Types:")

369

for content_type, count in stats['content_types'].most_common():

370

print(f" {content_type}: {count}")

371

372

# Usage

373

stats = analyze_flow_file("captured_flows.mitm")

374

print_flow_statistics(stats)

375

```

376

377

### Flow Format Conversion

378

379

```python

380

from mitmproxy import io, http

381

import json

382

import csv

383

384

def convert_flows_to_json(input_file, output_file):

385

"""Convert flows to JSON format for external analysis."""

386

flows_data = []

387

388

with open(input_file, "rb") as f:

389

reader = io.FlowReader(f)

390

391

for flow in reader.stream():

392

if isinstance(flow, http.HTTPFlow):

393

flow_data = {

394

'timestamp': flow.request.timestamp_start,

395

'method': flow.request.method,

396

'url': flow.request.url,

397

'host': flow.request.host,

398

'path': flow.request.path,

399

'request_headers': dict(flow.request.headers),

400

'request_size': len(flow.request.content) if flow.request.content else 0,

401

}

402

403

if flow.response:

404

flow_data.update({

405

'status_code': flow.response.status_code,

406

'response_headers': dict(flow.response.headers),

407

'response_size': len(flow.response.content) if flow.response.content else 0,

408

'response_time': flow.response.timestamp_end - flow.request.timestamp_start if flow.response.timestamp_end else None

409

})

410

411

if flow.error:

412

flow_data['error'] = flow.error.msg

413

414

flows_data.append(flow_data)

415

416

with open(output_file, 'w') as f:

417

json.dump(flows_data, f, indent=2)

418

419

print(f"Converted {len(flows_data)} flows to {output_file}")

420

421

def convert_flows_to_csv(input_file, output_file):

422

"""Convert flows to CSV format for spreadsheet analysis."""

423

with open(input_file, "rb") as f:

424

reader = io.FlowReader(f)

425

426

with open(output_file, 'w', newline='') as csvfile:

427

fieldnames = ['timestamp', 'method', 'url', 'host', 'status_code',

428

'request_size', 'response_size', 'response_time', 'error']

429

writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

430

writer.writeheader()

431

432

for flow in reader.stream():

433

if isinstance(flow, http.HTTPFlow):

434

row = {

435

'timestamp': flow.request.timestamp_start,

436

'method': flow.request.method,

437

'url': flow.request.url,

438

'host': flow.request.host,

439

'request_size': len(flow.request.content) if flow.request.content else 0,

440

'status_code': flow.response.status_code if flow.response else None,

441

'response_size': len(flow.response.content) if flow.response and flow.response.content else 0,

442

'response_time': (flow.response.timestamp_end - flow.request.timestamp_start) if flow.response and flow.response.timestamp_end else None,

443

'error': flow.error.msg if flow.error else None

444

}

445

writer.writerow(row)

446

447

print(f"Converted flows to {output_file}")

448

```

449

450

### Advanced Flow Processing

451

452

```python

453

from mitmproxy import io, http

454

import gzip

455

import time

456

457

def merge_flow_files(input_files, output_file):

458

"""Merge multiple flow files into one, sorted by timestamp."""

459

all_flows = []

460

461

# Read all flows from input files

462

for filename in input_files:

463

print(f"Reading {filename}...")

464

flows = list(io.read_flows_from_paths([filename]))

465

all_flows.extend(flows)

466

467

# Sort flows by timestamp

468

def get_timestamp(flow):

469

if isinstance(flow, http.HTTPFlow):

470

return flow.request.timestamp_start

471

return 0

472

473

all_flows.sort(key=get_timestamp)

474

475

# Write merged flows

476

with open(output_file, "wb") as f:

477

writer = io.FlowWriter(f)

478

for flow in all_flows:

479

writer.add(flow)

480

481

print(f"Merged {len(all_flows)} flows into {output_file}")

482

483

def compress_flow_file(input_file, output_file):

484

"""Compress a flow file using gzip."""

485

with open(input_file, "rb") as f_in:

486

with gzip.open(output_file, "wb") as f_out:

487

f_out.write(f_in.read())

488

489

print(f"Compressed {input_file} to {output_file}")

490

491

def decompress_and_process_flows(compressed_file):

492

"""Process flows from a gzip-compressed file."""

493

with gzip.open(compressed_file, "rb") as f:

494

reader = io.FlowReader(f)

495

496

for flow in reader.stream():

497

if isinstance(flow, http.HTTPFlow):

498

print(f"Processing: {flow.request.method} {flow.request.url}")

499

# Process the flow here

500

```