or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

addons.mdcommands.mdconfiguration.mdconnections.mdcontent.mdflow-io.mdhttp-flows.mdindex.mdprotocols.md

commands.mddocs/

0

# Command System

1

2

Type-safe command framework for interactive control and automation. Supports complex command composition, validation, and execution with built-in commands for all major operations and extensible custom command development.

3

4

## Capabilities

5

6

### CommandManager Class

7

8

Central command management and execution system.

9

10

```python { .api }

11

class CommandManager:

12

"""

13

Central command manager for mitmproxy.

14

15

Manages command registration, validation, and execution with type safety

16

and comprehensive error handling.

17

"""

18

def execute(self, cmdstr: str) -> Any:

19

"""

20

Execute a command string.

21

22

Parameters:

23

- cmdstr: Complete command string with arguments

24

25

Returns:

26

- Command execution result

27

28

Raises:

29

- CommandError: If command syntax is invalid or execution fails

30

"""

31

32

def call(self, path: str, *args: Any) -> Any:

33

"""

34

Call a command by path with arguments.

35

36

Parameters:

37

- path: Command path (e.g., "view.flows.select")

38

- *args: Command arguments

39

40

Returns:

41

- Command execution result

42

"""

43

44

def add(self, path: str, func: Callable) -> None:

45

"""

46

Register a new command.

47

48

Parameters:

49

- path: Command path for registration

50

- func: Function to execute for this command

51

"""

52

53

def collect_commands(self, addon: Any) -> None:

54

"""

55

Collect commands from an addon object.

56

57

Parameters:

58

- addon: Addon instance with @command.command decorated methods

59

"""

60

```

61

62

### Type System

63

64

Comprehensive type system for command arguments and validation.

65

66

```python { .api }

67

class TypeManager:

68

"""

69

Type management system for command arguments.

70

71

Provides type validation, conversion, and completion support

72

for command arguments.

73

"""

74

def command(self, name: str) -> Callable:

75

"""

76

Get command by name with type information.

77

78

Parameters:

79

- name: Command name

80

81

Returns:

82

- Command callable with type metadata

83

"""

84

85

def validate_arg(self, arg_type: type, value: str) -> Any:

86

"""

87

Validate and convert command argument.

88

89

Parameters:

90

- arg_type: Expected argument type

91

- value: String value to validate

92

93

Returns:

94

- Converted value

95

96

Raises:

97

- TypeError: If value cannot be converted to expected type

98

"""

99

100

def complete_arg(self, arg_type: type, prefix: str) -> List[str]:

101

"""

102

Get completion suggestions for argument.

103

104

Parameters:

105

- arg_type: Argument type

106

- prefix: Current input prefix

107

108

Returns:

109

- List of completion suggestions

110

"""

111

112

def verify_arg_signature(func: Callable, args: List[Any]) -> bool:

113

"""

114

Verify function signature matches provided arguments.

115

116

Parameters:

117

- func: Function to verify

118

- args: Arguments to check against signature

119

120

Returns:

121

- True if signature matches, False otherwise

122

"""

123

124

def typename(arg_type: type) -> str:

125

"""

126

Convert Python type to display string.

127

128

Parameters:

129

- arg_type: Python type object

130

131

Returns:

132

- Human-readable type name

133

"""

134

```

135

136

### Built-in Command Types

137

138

Standard command argument types with validation and completion.

139

140

```python { .api }

141

class CommandTypes:

142

"""Built-in command types with validation and completion."""

143

144

# Basic types

145

class Str(str):

146

"""String argument type."""

147

148

class Int(int):

149

"""Integer argument type."""

150

151

class Bool(bool):

152

"""Boolean argument type."""

153

154

# File system types

155

class Path(str):

156

"""File path argument with completion."""

157

158

def complete(self, prefix: str) -> List[str]:

159

"""Complete file paths."""

160

161

# Command types

162

class Cmd(str):

163

"""Command name argument with completion."""

164

165

def complete(self, prefix: str) -> List[str]:

166

"""Complete command names."""

167

168

class CmdArgs(str):

169

"""Command arguments as string."""

170

171

# Flow types

172

class Flow(object):

173

"""Flow object argument."""

174

175

class Flows(list):

176

"""List of flow objects."""

177

178

# Data types

179

class CutSpec(list):

180

"""Cut specification for data extraction."""

181

182

class Data(list):

183

"""Tabular data representation."""

184

185

# Choice types

186

class Choice(str):

187

"""Choice from predefined options."""

188

189

def __init__(self, choices: List[str]):

190

self.choices = choices

191

192

def complete(self, prefix: str) -> List[str]:

193

"""Complete from available choices."""

194

195

# Special types

196

class Marker(str):

197

"""Flow marker string."""

198

199

class Unknown(str):

200

"""Unknown/dynamic type."""

201

202

class Space(str):

203

"""Whitespace representation."""

204

```

205

206

## Usage Examples

207

208

### Custom Command Development

209

210

```python

211

from mitmproxy import command, http

212

import mitmproxy.ctx as ctx

213

from typing import Sequence

214

215

class FlowCommandsAddon:

216

"""Addon providing custom flow manipulation commands."""

217

218

@command.command("flow.mark")

219

def mark_flows(self, flows: Sequence[http.HTTPFlow], marker: str) -> None:

220

"""

221

Mark flows with a custom marker.

222

223

Usage: flow.mark @focus "important"

224

"""

225

count = 0

226

for flow in flows:

227

flow.marked = marker

228

count += 1

229

230

ctx.log.info(f"Marked {count} flows with '{marker}'")

231

232

@command.command("flow.clear_marks")

233

def clear_marks(self, flows: Sequence[http.HTTPFlow]) -> None:

234

"""

235

Clear markers from flows.

236

237

Usage: flow.clear_marks @all

238

"""

239

count = 0

240

for flow in flows:

241

if flow.marked:

242

flow.marked = ""

243

count += 1

244

245

ctx.log.info(f"Cleared marks from {count} flows")

246

247

@command.command("flow.stats")

248

def flow_stats(self, flows: Sequence[http.HTTPFlow]) -> str:

249

"""

250

Get statistics for selected flows.

251

252

Usage: flow.stats @all

253

"""

254

if not flows:

255

return "No flows selected"

256

257

total_flows = len(flows)

258

methods = {}

259

status_codes = {}

260

hosts = {}

261

total_bytes = 0

262

263

for flow in flows:

264

# Method statistics

265

method = flow.request.method

266

methods[method] = methods.get(method, 0) + 1

267

268

# Status code statistics

269

if flow.response:

270

status = flow.response.status_code

271

status_codes[status] = status_codes.get(status, 0) + 1

272

273

# Byte statistics

274

total_bytes += len(flow.request.content or b"")

275

total_bytes += len(flow.response.content or b"")

276

277

# Host statistics

278

host = flow.request.host

279

hosts[host] = hosts.get(host, 0) + 1

280

281

# Format statistics

282

stats = [

283

f"Flow Statistics ({total_flows} flows):",

284

f"Total bytes: {total_bytes:,}",

285

f"Methods: {dict(sorted(methods.items()))}",

286

f"Status codes: {dict(sorted(status_codes.items()))}",

287

f"Top hosts: {dict(sorted(hosts.items(), key=lambda x: x[1], reverse=True)[:5])}"

288

]

289

290

return "\n".join(stats)

291

292

@command.command("flow.filter_by_size")

293

def filter_by_size(self, flows: Sequence[http.HTTPFlow], min_size: int = 0, max_size: int = 0) -> Sequence[http.HTTPFlow]:

294

"""

295

Filter flows by response size.

296

297

Usage: flow.filter_by_size @all 1024 10240

298

"""

299

filtered = []

300

301

for flow in flows:

302

if not flow.response:

303

continue

304

305

size = len(flow.response.content or b"")

306

307

if min_size > 0 and size < min_size:

308

continue

309

if max_size > 0 and size > max_size:

310

continue

311

312

filtered.append(flow)

313

314

ctx.log.info(f"Filtered to {len(filtered)} flows by size")

315

return filtered

316

317

@command.command("flow.export_json")

318

def export_json(self, flows: Sequence[http.HTTPFlow], filename: str) -> None:

319

"""

320

Export flows to JSON file.

321

322

Usage: flow.export_json @all flows.json

323

"""

324

import json

325

326

flow_data = []

327

for flow in flows:

328

data = {

329

"method": flow.request.method,

330

"url": flow.request.url,

331

"status_code": flow.response.status_code if flow.response else None,

332

"timestamp": flow.request.timestamp_start,

333

"request_size": len(flow.request.content or b""),

334

"response_size": len(flow.response.content or b"") if flow.response else 0

335

}

336

flow_data.append(data)

337

338

with open(filename, 'w') as f:

339

json.dump(flow_data, f, indent=2)

340

341

ctx.log.info(f"Exported {len(flows)} flows to {filename}")

342

343

addons = [FlowCommandsAddon()]

344

```

345

346

### Advanced Command Usage

347

348

```python

349

from mitmproxy import command, http, options

350

import mitmproxy.ctx as ctx

351

from typing import Sequence, Optional

352

import re

353

import time

354

355

class AdvancedCommandsAddon:

356

"""Advanced command examples with complex functionality."""

357

358

@command.command("proxy.mode")

359

def set_proxy_mode(self, mode: str) -> str:

360

"""

361

Change proxy mode dynamically.

362

363

Usage: proxy.mode transparent

364

Usage: proxy.mode reverse:https://api.example.com

365

"""

366

old_mode = ctx.options.mode

367

368

try:

369

ctx.options.mode = mode

370

ctx.log.info(f"Proxy mode changed from '{old_mode}' to '{mode}'")

371

return f"Mode changed to: {mode}"

372

except Exception as e:

373

ctx.log.error(f"Failed to set mode to '{mode}': {e}")

374

return f"Error: {e}"

375

376

@command.command("proxy.intercept")

377

def set_intercept(self, pattern: Optional[str] = None) -> str:

378

"""

379

Set or clear intercept pattern.

380

381

Usage: proxy.intercept "~u /api/"

382

Usage: proxy.intercept # Clear intercept

383

"""

384

old_pattern = ctx.options.intercept

385

ctx.options.intercept = pattern

386

387

if pattern:

388

ctx.log.info(f"Intercept pattern set to: {pattern}")

389

return f"Intercepting: {pattern}"

390

else:

391

ctx.log.info("Intercept cleared")

392

return "Intercept cleared"

393

394

@command.command("flow.replay")

395

def replay_flows(self, flows: Sequence[http.HTTPFlow], count: int = 1) -> str:

396

"""

397

Replay flows multiple times.

398

399

Usage: flow.replay @focus 3

400

"""

401

if not flows:

402

return "No flows to replay"

403

404

replayed = 0

405

for _ in range(count):

406

for flow in flows:

407

# This would typically use the replay functionality

408

ctx.log.info(f"Replaying: {flow.request.method} {flow.request.url}")

409

replayed += 1

410

411

return f"Replayed {replayed} requests ({len(flows)} flows × {count} times)"

412

413

@command.command("flow.search")

414

def search_flows(self, flows: Sequence[http.HTTPFlow], pattern: str, field: str = "url") -> Sequence[http.HTTPFlow]:

415

"""

416

Search flows by pattern in specified field.

417

418

Usage: flow.search @all "api" url

419

Usage: flow.search @all "json" content_type

420

"""

421

results = []

422

423

for flow in flows:

424

search_text = ""

425

426

if field == "url":

427

search_text = flow.request.url

428

elif field == "method":

429

search_text = flow.request.method

430

elif field == "host":

431

search_text = flow.request.host

432

elif field == "content_type":

433

search_text = flow.response.headers.get("content-type", "") if flow.response else ""

434

elif field == "status":

435

search_text = str(flow.response.status_code) if flow.response else ""

436

437

if re.search(pattern, search_text, re.IGNORECASE):

438

results.append(flow)

439

440

ctx.log.info(f"Found {len(results)} flows matching '{pattern}' in {field}")

441

return results

442

443

@command.command("flow.time_range")

444

def filter_by_time(self, flows: Sequence[http.HTTPFlow], start_time: float, end_time: float) -> Sequence[http.HTTPFlow]:

445

"""

446

Filter flows by time range (Unix timestamps).

447

448

Usage: flow.time_range @all 1609459200 1609545600

449

"""

450

results = []

451

452

for flow in flows:

453

if start_time <= flow.request.timestamp_start <= end_time:

454

results.append(flow)

455

456

ctx.log.info(f"Found {len(results)} flows in time range")

457

return results

458

459

@command.command("debug.log_level")

460

def set_log_level(self, level: str) -> str:

461

"""

462

Set logging level.

463

464

Usage: debug.log_level info

465

Usage: debug.log_level debug

466

"""

467

valid_levels = ["debug", "info", "warn", "error"]

468

if level not in valid_levels:

469

return f"Invalid level. Valid options: {valid_levels}"

470

471

# This would set the actual log level

472

ctx.log.info(f"Log level set to: {level}")

473

return f"Log level: {level}"

474

475

@command.command("debug.memory_stats")

476

def memory_stats(self) -> str:

477

"""

478

Get memory usage statistics.

479

480

Usage: debug.memory_stats

481

"""

482

import psutil

483

import os

484

485

process = psutil.Process(os.getpid())

486

memory_info = process.memory_info()

487

488

stats = [

489

f"Memory Statistics:",

490

f"RSS: {memory_info.rss / 1024 / 1024:.1f} MB",

491

f"VMS: {memory_info.vms / 1024 / 1024:.1f} MB",

492

f"CPU Percent: {process.cpu_percent():.1f}%",

493

f"Open Files: {len(process.open_files())}"

494

]

495

496

return "\n".join(stats)

497

498

addons = [AdvancedCommandsAddon()]

499

```

500

501

### Command Composition and Scripting

502

503

```python

504

from mitmproxy import command, http

505

import mitmproxy.ctx as ctx

506

507

class ScriptingCommandsAddon:

508

"""Commands for scripting and automation."""

509

510

@command.command("script.wait")

511

def wait_command(self, seconds: float) -> str:

512

"""

513

Wait for specified seconds.

514

515

Usage: script.wait 2.5

516

"""

517

import time

518

time.sleep(seconds)

519

return f"Waited {seconds} seconds"

520

521

@command.command("script.log")

522

def log_command(self, level: str, message: str) -> str:

523

"""

524

Log a message at specified level.

525

526

Usage: script.log info "Processing complete"

527

"""

528

if level == "info":

529

ctx.log.info(message)

530

elif level == "warn":

531

ctx.log.warn(message)

532

elif level == "error":

533

ctx.log.error(message)

534

else:

535

ctx.log.info(f"[{level}] {message}")

536

537

return f"Logged: {message}"

538

539

@command.command("script.execute")

540

def execute_script(self, script_path: str) -> str:

541

"""

542

Execute a series of commands from file.

543

544

Usage: script.execute commands.txt

545

"""

546

try:

547

with open(script_path, 'r') as f:

548

commands = f.readlines()

549

550

executed = 0

551

for line in commands:

552

line = line.strip()

553

if line and not line.startswith('#'):

554

try:

555

result = ctx.master.commands.execute(line)

556

ctx.log.info(f"Executed: {line} -> {result}")

557

executed += 1

558

except Exception as e:

559

ctx.log.error(f"Failed to execute '{line}': {e}")

560

561

return f"Executed {executed} commands from {script_path}"

562

563

except FileNotFoundError:

564

return f"Script file not found: {script_path}"

565

except Exception as e:

566

return f"Script execution error: {e}"

567

568

@command.command("script.conditional")

569

def conditional_command(self, condition: str, true_cmd: str, false_cmd: str = "") -> str:

570

"""

571

Execute command based on condition.

572

573

Usage: script.conditional "flow.count > 10" "proxy.intercept ~all" "proxy.intercept"

574

"""

575

try:

576

# Simple condition evaluation (in real implementation, this would be more robust)

577

if "flow.count" in condition:

578

# Get current flow count

579

flow_count = len(ctx.master.view)

580

condition = condition.replace("flow.count", str(flow_count))

581

582

# Evaluate condition (unsafe - for demo only)

583

result = eval(condition)

584

585

if result and true_cmd:

586

return ctx.master.commands.execute(true_cmd)

587

elif not result and false_cmd:

588

return ctx.master.commands.execute(false_cmd)

589

else:

590

return f"Condition '{condition}' evaluated to {result}, no command executed"

591

592

except Exception as e:

593

return f"Condition evaluation error: {e}"

594

595

addons = [ScriptingCommandsAddon()]

596

```

597

598

### Command Integration Example

599

600

```python

601

# Example of using commands programmatically

602

603

from mitmproxy import master, options

604

from mitmproxy.tools.main import mitmdump

605

606

def automated_proxy_session():

607

"""Example of automated proxy control using commands."""

608

609

# Set up proxy with options

610

opts = options.Options(

611

listen_port=8080,

612

web_open_browser=False

613

)

614

615

# Create master

616

m = master.Master(opts)

617

618

# Add custom command addons

619

m.addons.add(FlowCommandsAddon())

620

m.addons.add(AdvancedCommandsAddon())

621

m.addons.add(ScriptingCommandsAddon())

622

623

try:

624

# Start proxy

625

m.run_loop = False # Don't start main loop yet

626

627

# Execute initial commands

628

commands_to_run = [

629

"proxy.mode transparent",

630

"proxy.intercept ~u /api/",

631

"script.log info 'Proxy automation started'",

632

"debug.log_level debug"

633

]

634

635

for cmd in commands_to_run:

636

try:

637

result = m.commands.execute(cmd)

638

print(f"Command: {cmd} -> {result}")

639

except Exception as e:

640

print(f"Command failed: {cmd} -> {e}")

641

642

# Run proxy

643

m.run()

644

645

except KeyboardInterrupt:

646

print("Shutting down...")

647

finally:

648

m.shutdown()

649

650

if __name__ == "__main__":

651

automated_proxy_session()

652

```