0
# Command System
1
2
Type-safe command framework for interactive control and automation. Supports complex command composition, validation, and execution with built-in commands for all major operations and extensible custom command development.
3
4
## Capabilities
5
6
### CommandManager Class
7
8
Central command management and execution system.
9
10
```python { .api }
11
class CommandManager:
12
"""
13
Central command manager for mitmproxy.
14
15
Manages command registration, validation, and execution with type safety
16
and comprehensive error handling.
17
"""
18
def execute(self, cmdstr: str) -> Any:
19
"""
20
Execute a command string.
21
22
Parameters:
23
- cmdstr: Complete command string with arguments
24
25
Returns:
26
- Command execution result
27
28
Raises:
29
- CommandError: If command syntax is invalid or execution fails
30
"""
31
32
def call(self, path: str, *args: Any) -> Any:
33
"""
34
Call a command by path with arguments.
35
36
Parameters:
37
- path: Command path (e.g., "view.flows.select")
38
- *args: Command arguments
39
40
Returns:
41
- Command execution result
42
"""
43
44
def add(self, path: str, func: Callable) -> None:
45
"""
46
Register a new command.
47
48
Parameters:
49
- path: Command path for registration
50
- func: Function to execute for this command
51
"""
52
53
def collect_commands(self, addon: Any) -> None:
54
"""
55
Collect commands from an addon object.
56
57
Parameters:
58
- addon: Addon instance with @command.command decorated methods
59
"""
60
```
61
62
### Type System
63
64
Comprehensive type system for command arguments and validation.
65
66
```python { .api }
67
class TypeManager:
68
"""
69
Type management system for command arguments.
70
71
Provides type validation, conversion, and completion support
72
for command arguments.
73
"""
74
def command(self, name: str) -> Callable:
75
"""
76
Get command by name with type information.
77
78
Parameters:
79
- name: Command name
80
81
Returns:
82
- Command callable with type metadata
83
"""
84
85
def validate_arg(self, arg_type: type, value: str) -> Any:
86
"""
87
Validate and convert command argument.
88
89
Parameters:
90
- arg_type: Expected argument type
91
- value: String value to validate
92
93
Returns:
94
- Converted value
95
96
Raises:
97
- TypeError: If value cannot be converted to expected type
98
"""
99
100
def complete_arg(self, arg_type: type, prefix: str) -> List[str]:
101
"""
102
Get completion suggestions for argument.
103
104
Parameters:
105
- arg_type: Argument type
106
- prefix: Current input prefix
107
108
Returns:
109
- List of completion suggestions
110
"""
111
112
def verify_arg_signature(func: Callable, args: List[Any]) -> bool:
113
"""
114
Verify function signature matches provided arguments.
115
116
Parameters:
117
- func: Function to verify
118
- args: Arguments to check against signature
119
120
Returns:
121
- True if signature matches, False otherwise
122
"""
123
124
def typename(arg_type: type) -> str:
125
"""
126
Convert Python type to display string.
127
128
Parameters:
129
- arg_type: Python type object
130
131
Returns:
132
- Human-readable type name
133
"""
134
```
135
136
### Built-in Command Types
137
138
Standard command argument types with validation and completion.
139
140
```python { .api }
141
class CommandTypes:
142
"""Built-in command types with validation and completion."""
143
144
# Basic types
145
class Str(str):
146
"""String argument type."""
147
148
class Int(int):
149
"""Integer argument type."""
150
151
class Bool(bool):
152
"""Boolean argument type."""
153
154
# File system types
155
class Path(str):
156
"""File path argument with completion."""
157
158
def complete(self, prefix: str) -> List[str]:
159
"""Complete file paths."""
160
161
# Command types
162
class Cmd(str):
163
"""Command name argument with completion."""
164
165
def complete(self, prefix: str) -> List[str]:
166
"""Complete command names."""
167
168
class CmdArgs(str):
169
"""Command arguments as string."""
170
171
# Flow types
172
class Flow(object):
173
"""Flow object argument."""
174
175
class Flows(list):
176
"""List of flow objects."""
177
178
# Data types
179
class CutSpec(list):
180
"""Cut specification for data extraction."""
181
182
class Data(list):
183
"""Tabular data representation."""
184
185
# Choice types
186
class Choice(str):
187
"""Choice from predefined options."""
188
189
def __init__(self, choices: List[str]):
190
self.choices = choices
191
192
def complete(self, prefix: str) -> List[str]:
193
"""Complete from available choices."""
194
195
# Special types
196
class Marker(str):
197
"""Flow marker string."""
198
199
class Unknown(str):
200
"""Unknown/dynamic type."""
201
202
class Space(str):
203
"""Whitespace representation."""
204
```
205
206
## Usage Examples
207
208
### Custom Command Development
209
210
```python
211
from mitmproxy import command, http
212
import mitmproxy.ctx as ctx
213
from typing import Sequence
214
215
class FlowCommandsAddon:
216
"""Addon providing custom flow manipulation commands."""
217
218
@command.command("flow.mark")
219
def mark_flows(self, flows: Sequence[http.HTTPFlow], marker: str) -> None:
220
"""
221
Mark flows with a custom marker.
222
223
Usage: flow.mark @focus "important"
224
"""
225
count = 0
226
for flow in flows:
227
flow.marked = marker
228
count += 1
229
230
ctx.log.info(f"Marked {count} flows with '{marker}'")
231
232
@command.command("flow.clear_marks")
233
def clear_marks(self, flows: Sequence[http.HTTPFlow]) -> None:
234
"""
235
Clear markers from flows.
236
237
Usage: flow.clear_marks @all
238
"""
239
count = 0
240
for flow in flows:
241
if flow.marked:
242
flow.marked = ""
243
count += 1
244
245
ctx.log.info(f"Cleared marks from {count} flows")
246
247
@command.command("flow.stats")
248
def flow_stats(self, flows: Sequence[http.HTTPFlow]) -> str:
249
"""
250
Get statistics for selected flows.
251
252
Usage: flow.stats @all
253
"""
254
if not flows:
255
return "No flows selected"
256
257
total_flows = len(flows)
258
methods = {}
259
status_codes = {}
260
hosts = {}
261
total_bytes = 0
262
263
for flow in flows:
264
# Method statistics
265
method = flow.request.method
266
methods[method] = methods.get(method, 0) + 1
267
268
# Status code statistics
269
if flow.response:
270
status = flow.response.status_code
271
status_codes[status] = status_codes.get(status, 0) + 1
272
273
# Byte statistics
274
total_bytes += len(flow.request.content or b"")
275
total_bytes += len(flow.response.content or b"")
276
277
# Host statistics
278
host = flow.request.host
279
hosts[host] = hosts.get(host, 0) + 1
280
281
# Format statistics
282
stats = [
283
f"Flow Statistics ({total_flows} flows):",
284
f"Total bytes: {total_bytes:,}",
285
f"Methods: {dict(sorted(methods.items()))}",
286
f"Status codes: {dict(sorted(status_codes.items()))}",
287
f"Top hosts: {dict(sorted(hosts.items(), key=lambda x: x[1], reverse=True)[:5])}"
288
]
289
290
return "\n".join(stats)
291
292
@command.command("flow.filter_by_size")
293
def filter_by_size(self, flows: Sequence[http.HTTPFlow], min_size: int = 0, max_size: int = 0) -> Sequence[http.HTTPFlow]:
294
"""
295
Filter flows by response size.
296
297
Usage: flow.filter_by_size @all 1024 10240
298
"""
299
filtered = []
300
301
for flow in flows:
302
if not flow.response:
303
continue
304
305
size = len(flow.response.content or b"")
306
307
if min_size > 0 and size < min_size:
308
continue
309
if max_size > 0 and size > max_size:
310
continue
311
312
filtered.append(flow)
313
314
ctx.log.info(f"Filtered to {len(filtered)} flows by size")
315
return filtered
316
317
@command.command("flow.export_json")
318
def export_json(self, flows: Sequence[http.HTTPFlow], filename: str) -> None:
319
"""
320
Export flows to JSON file.
321
322
Usage: flow.export_json @all flows.json
323
"""
324
import json
325
326
flow_data = []
327
for flow in flows:
328
data = {
329
"method": flow.request.method,
330
"url": flow.request.url,
331
"status_code": flow.response.status_code if flow.response else None,
332
"timestamp": flow.request.timestamp_start,
333
"request_size": len(flow.request.content or b""),
334
"response_size": len(flow.response.content or b"") if flow.response else 0
335
}
336
flow_data.append(data)
337
338
with open(filename, 'w') as f:
339
json.dump(flow_data, f, indent=2)
340
341
ctx.log.info(f"Exported {len(flows)} flows to {filename}")
342
343
addons = [FlowCommandsAddon()]
344
```
345
346
### Advanced Command Usage
347
348
```python
349
from mitmproxy import command, http, options
350
import mitmproxy.ctx as ctx
351
from typing import Sequence, Optional
352
import re
353
import time
354
355
class AdvancedCommandsAddon:
356
"""Advanced command examples with complex functionality."""
357
358
@command.command("proxy.mode")
359
def set_proxy_mode(self, mode: str) -> str:
360
"""
361
Change proxy mode dynamically.
362
363
Usage: proxy.mode transparent
364
Usage: proxy.mode reverse:https://api.example.com
365
"""
366
old_mode = ctx.options.mode
367
368
try:
369
ctx.options.mode = mode
370
ctx.log.info(f"Proxy mode changed from '{old_mode}' to '{mode}'")
371
return f"Mode changed to: {mode}"
372
except Exception as e:
373
ctx.log.error(f"Failed to set mode to '{mode}': {e}")
374
return f"Error: {e}"
375
376
@command.command("proxy.intercept")
377
def set_intercept(self, pattern: Optional[str] = None) -> str:
378
"""
379
Set or clear intercept pattern.
380
381
Usage: proxy.intercept "~u /api/"
382
Usage: proxy.intercept # Clear intercept
383
"""
384
old_pattern = ctx.options.intercept
385
ctx.options.intercept = pattern
386
387
if pattern:
388
ctx.log.info(f"Intercept pattern set to: {pattern}")
389
return f"Intercepting: {pattern}"
390
else:
391
ctx.log.info("Intercept cleared")
392
return "Intercept cleared"
393
394
@command.command("flow.replay")
395
def replay_flows(self, flows: Sequence[http.HTTPFlow], count: int = 1) -> str:
396
"""
397
Replay flows multiple times.
398
399
Usage: flow.replay @focus 3
400
"""
401
if not flows:
402
return "No flows to replay"
403
404
replayed = 0
405
for _ in range(count):
406
for flow in flows:
407
# This would typically use the replay functionality
408
ctx.log.info(f"Replaying: {flow.request.method} {flow.request.url}")
409
replayed += 1
410
411
return f"Replayed {replayed} requests ({len(flows)} flows × {count} times)"
412
413
@command.command("flow.search")
414
def search_flows(self, flows: Sequence[http.HTTPFlow], pattern: str, field: str = "url") -> Sequence[http.HTTPFlow]:
415
"""
416
Search flows by pattern in specified field.
417
418
Usage: flow.search @all "api" url
419
Usage: flow.search @all "json" content_type
420
"""
421
results = []
422
423
for flow in flows:
424
search_text = ""
425
426
if field == "url":
427
search_text = flow.request.url
428
elif field == "method":
429
search_text = flow.request.method
430
elif field == "host":
431
search_text = flow.request.host
432
elif field == "content_type":
433
search_text = flow.response.headers.get("content-type", "") if flow.response else ""
434
elif field == "status":
435
search_text = str(flow.response.status_code) if flow.response else ""
436
437
if re.search(pattern, search_text, re.IGNORECASE):
438
results.append(flow)
439
440
ctx.log.info(f"Found {len(results)} flows matching '{pattern}' in {field}")
441
return results
442
443
@command.command("flow.time_range")
444
def filter_by_time(self, flows: Sequence[http.HTTPFlow], start_time: float, end_time: float) -> Sequence[http.HTTPFlow]:
445
"""
446
Filter flows by time range (Unix timestamps).
447
448
Usage: flow.time_range @all 1609459200 1609545600
449
"""
450
results = []
451
452
for flow in flows:
453
if start_time <= flow.request.timestamp_start <= end_time:
454
results.append(flow)
455
456
ctx.log.info(f"Found {len(results)} flows in time range")
457
return results
458
459
@command.command("debug.log_level")
460
def set_log_level(self, level: str) -> str:
461
"""
462
Set logging level.
463
464
Usage: debug.log_level info
465
Usage: debug.log_level debug
466
"""
467
valid_levels = ["debug", "info", "warn", "error"]
468
if level not in valid_levels:
469
return f"Invalid level. Valid options: {valid_levels}"
470
471
# This would set the actual log level
472
ctx.log.info(f"Log level set to: {level}")
473
return f"Log level: {level}"
474
475
@command.command("debug.memory_stats")
476
def memory_stats(self) -> str:
477
"""
478
Get memory usage statistics.
479
480
Usage: debug.memory_stats
481
"""
482
import psutil
483
import os
484
485
process = psutil.Process(os.getpid())
486
memory_info = process.memory_info()
487
488
stats = [
489
f"Memory Statistics:",
490
f"RSS: {memory_info.rss / 1024 / 1024:.1f} MB",
491
f"VMS: {memory_info.vms / 1024 / 1024:.1f} MB",
492
f"CPU Percent: {process.cpu_percent():.1f}%",
493
f"Open Files: {len(process.open_files())}"
494
]
495
496
return "\n".join(stats)
497
498
addons = [AdvancedCommandsAddon()]
499
```
500
501
### Command Composition and Scripting
502
503
```python
504
from mitmproxy import command, http
505
import mitmproxy.ctx as ctx
506
507
class ScriptingCommandsAddon:
508
"""Commands for scripting and automation."""
509
510
@command.command("script.wait")
511
def wait_command(self, seconds: float) -> str:
512
"""
513
Wait for specified seconds.
514
515
Usage: script.wait 2.5
516
"""
517
import time
518
time.sleep(seconds)
519
return f"Waited {seconds} seconds"
520
521
@command.command("script.log")
522
def log_command(self, level: str, message: str) -> str:
523
"""
524
Log a message at specified level.
525
526
Usage: script.log info "Processing complete"
527
"""
528
if level == "info":
529
ctx.log.info(message)
530
elif level == "warn":
531
ctx.log.warn(message)
532
elif level == "error":
533
ctx.log.error(message)
534
else:
535
ctx.log.info(f"[{level}] {message}")
536
537
return f"Logged: {message}"
538
539
@command.command("script.execute")
540
def execute_script(self, script_path: str) -> str:
541
"""
542
Execute a series of commands from file.
543
544
Usage: script.execute commands.txt
545
"""
546
try:
547
with open(script_path, 'r') as f:
548
commands = f.readlines()
549
550
executed = 0
551
for line in commands:
552
line = line.strip()
553
if line and not line.startswith('#'):
554
try:
555
result = ctx.master.commands.execute(line)
556
ctx.log.info(f"Executed: {line} -> {result}")
557
executed += 1
558
except Exception as e:
559
ctx.log.error(f"Failed to execute '{line}': {e}")
560
561
return f"Executed {executed} commands from {script_path}"
562
563
except FileNotFoundError:
564
return f"Script file not found: {script_path}"
565
except Exception as e:
566
return f"Script execution error: {e}"
567
568
@command.command("script.conditional")
569
def conditional_command(self, condition: str, true_cmd: str, false_cmd: str = "") -> str:
570
"""
571
Execute command based on condition.
572
573
Usage: script.conditional "flow.count > 10" "proxy.intercept ~all" "proxy.intercept"
574
"""
575
try:
576
# Simple condition evaluation (in real implementation, this would be more robust)
577
if "flow.count" in condition:
578
# Get current flow count
579
flow_count = len(ctx.master.view)
580
condition = condition.replace("flow.count", str(flow_count))
581
582
# Evaluate condition (unsafe - for demo only)
583
result = eval(condition)
584
585
if result and true_cmd:
586
return ctx.master.commands.execute(true_cmd)
587
elif not result and false_cmd:
588
return ctx.master.commands.execute(false_cmd)
589
else:
590
return f"Condition '{condition}' evaluated to {result}, no command executed"
591
592
except Exception as e:
593
return f"Condition evaluation error: {e}"
594
595
addons = [ScriptingCommandsAddon()]
596
```
597
598
### Command Integration Example
599
600
```python
601
# Example of using commands programmatically
602
603
from mitmproxy import master, options
604
from mitmproxy.tools.main import mitmdump
605
606
def automated_proxy_session():
607
"""Example of automated proxy control using commands."""
608
609
# Set up proxy with options
610
opts = options.Options(
611
listen_port=8080,
612
web_open_browser=False
613
)
614
615
# Create master
616
m = master.Master(opts)
617
618
# Add custom command addons
619
m.addons.add(FlowCommandsAddon())
620
m.addons.add(AdvancedCommandsAddon())
621
m.addons.add(ScriptingCommandsAddon())
622
623
try:
624
# Start proxy
625
m.run_loop = False # Don't start main loop yet
626
627
# Execute initial commands
628
commands_to_run = [
629
"proxy.mode transparent",
630
"proxy.intercept ~u /api/",
631
"script.log info 'Proxy automation started'",
632
"debug.log_level debug"
633
]
634
635
for cmd in commands_to_run:
636
try:
637
result = m.commands.execute(cmd)
638
print(f"Command: {cmd} -> {result}")
639
except Exception as e:
640
print(f"Command failed: {cmd} -> {e}")
641
642
# Run proxy
643
m.run()
644
645
except KeyboardInterrupt:
646
print("Shutting down...")
647
finally:
648
m.shutdown()
649
650
if __name__ == "__main__":
651
automated_proxy_session()
652
```