0
# CLI Framework
1
2
Command-line interface framework for building application-specific commands in Faust applications. Provides decorators, argument parsing, option handling, and integration with the Faust CLI system for creating custom management and operational commands.
3
4
## Capabilities
5
6
### Command Base Classes
7
8
Foundation classes for implementing custom CLI commands with proper argument handling, application context, and integration with the Faust command system.
9
10
```python { .api }
11
class Command:
12
def __init__(self, *args, **kwargs):
13
"""
14
Base command class for CLI operations.
15
16
Args:
17
*args: Positional arguments
18
**kwargs: Keyword arguments and options
19
"""
20
21
def run(self, *args, **kwargs) -> any:
22
"""
23
Execute the command with given arguments.
24
25
Args:
26
*args: Command arguments
27
**kwargs: Command options
28
29
Returns:
30
Command execution result
31
"""
32
33
def add_arguments(self, parser) -> None:
34
"""
35
Add command-specific arguments to parser.
36
37
Args:
38
parser: Argument parser instance
39
"""
40
41
def handle(self, *args, **options) -> any:
42
"""
43
Handle command execution with parsed arguments.
44
45
Args:
46
*args: Parsed arguments
47
**options: Parsed options
48
49
Returns:
50
Command result
51
"""
52
53
@property
54
def help(self) -> str:
55
"""Command help text."""
56
57
@property
58
def description(self) -> str:
59
"""Command description."""
60
61
class AppCommand(Command):
62
def __init__(self, app: App, *args, **kwargs):
63
"""
64
Application-aware command with access to Faust app instance.
65
66
Args:
67
app: Faust application instance
68
*args: Additional arguments
69
**kwargs: Additional options
70
"""
71
super().__init__(*args, **kwargs)
72
self.app = app
73
74
async def run(self, *args, **kwargs) -> any:
75
"""
76
Execute async command with application context.
77
78
Args:
79
*args: Command arguments
80
**kwargs: Command options
81
82
Returns:
83
Command execution result
84
"""
85
86
def get_app_config(self) -> dict:
87
"""
88
Get application configuration.
89
90
Returns:
91
Application configuration dictionary
92
"""
93
94
def print_app_info(self) -> None:
95
"""Print application information."""
96
97
@property
98
def app(self) -> App:
99
"""Faust application instance."""
100
```
101
102
### Argument and Option Decorators
103
104
Decorators for defining command-line arguments and options with type validation, default values, and help text integration.
105
106
```python { .api }
107
def argument(
108
*args,
109
**kwargs
110
) -> callable:
111
"""
112
Decorator for adding positional arguments to commands.
113
114
Args:
115
*args: Argument names and configuration
116
**kwargs: Argument options (type, help, etc.)
117
118
Returns:
119
Command decorator function
120
121
Example:
122
@argument('filename', help='File to process')
123
@argument('--count', type=int, default=1, help='Number of items')
124
def my_command(filename, count=1):
125
pass
126
"""
127
128
def option(
129
*args,
130
**kwargs
131
) -> callable:
132
"""
133
Decorator for adding optional arguments to commands.
134
135
Args:
136
*args: Option names (e.g., '--verbose', '-v')
137
**kwargs: Option configuration
138
139
Returns:
140
Command decorator function
141
142
Example:
143
@option('--verbose', '-v', is_flag=True, help='Verbose output')
144
@option('--config', type=str, help='Config file path')
145
def my_command(verbose=False, config=None):
146
pass
147
"""
148
149
def flag(
150
*args,
151
help: str = None,
152
**kwargs
153
) -> callable:
154
"""
155
Decorator for boolean flag options.
156
157
Args:
158
*args: Flag names (e.g., '--debug', '-d')
159
help: Help text for flag
160
**kwargs: Additional flag options
161
162
Returns:
163
Command decorator function
164
"""
165
166
def choice(
167
*choices,
168
help: str = None,
169
**kwargs
170
) -> callable:
171
"""
172
Decorator for choice-based options.
173
174
Args:
175
*choices: Valid choice values
176
help: Help text
177
**kwargs: Additional choice options
178
179
Returns:
180
Command decorator function
181
"""
182
```
183
184
### Command Registration
185
186
System for registering and discovering commands within Faust applications, including automatic command discovery and namespace management.
187
188
```python { .api }
189
def command(
190
name: str = None,
191
*,
192
base: type = None,
193
help: str = None,
194
**kwargs
195
) -> callable:
196
"""
197
Register function as a CLI command.
198
199
Args:
200
name: Command name (defaults to function name)
201
base: Base command class
202
help: Command help text
203
**kwargs: Additional command options
204
205
Returns:
206
Command decorator function
207
208
Example:
209
@app.command()
210
def my_command():
211
'''Custom command implementation.'''
212
pass
213
"""
214
215
def call_command(
216
name: str,
217
*args,
218
app: App = None,
219
**kwargs
220
) -> any:
221
"""
222
Call registered command programmatically.
223
224
Args:
225
name: Command name
226
*args: Command arguments
227
app: Application instance
228
**kwargs: Command options
229
230
Returns:
231
Command result
232
"""
233
234
class CommandRegistry:
235
def __init__(self):
236
"""Registry for managing CLI commands."""
237
238
def register(self, name: str, command_class: type, **kwargs) -> None:
239
"""
240
Register command class with given name.
241
242
Args:
243
name: Command name
244
command_class: Command implementation class
245
**kwargs: Command metadata
246
"""
247
248
def get(self, name: str) -> type:
249
"""
250
Get command class by name.
251
252
Args:
253
name: Command name
254
255
Returns:
256
Command class
257
258
Raises:
259
KeyError: If command not found
260
"""
261
262
def list_commands(self) -> list:
263
"""
264
List all registered commands.
265
266
Returns:
267
List of command names
268
"""
269
270
def discover_commands(self, module: str) -> None:
271
"""
272
Auto-discover commands in module.
273
274
Args:
275
module: Module name to scan for commands
276
"""
277
```
278
279
### Command Decorators and Utilities
280
281
Decorators and utility functions for creating command-line arguments, options, and executing commands programmatically.
282
283
```python { .api }
284
class argument:
285
"""
286
Create command-line argument decorator.
287
288
This class wraps click.argument to provide command-line argument
289
parsing for Faust commands.
290
"""
291
def __init__(self, *args, **kwargs):
292
"""
293
Initialize argument decorator.
294
295
Args:
296
*args: Argument specification (same as click.argument)
297
**kwargs: Argument options (same as click.argument)
298
"""
299
300
def __call__(self, fun) -> callable:
301
"""Apply argument decorator to function."""
302
303
class option:
304
"""
305
Create command-line option decorator.
306
307
This class wraps click.option to provide command-line option
308
parsing for Faust commands.
309
"""
310
def __init__(self, *args, **kwargs):
311
"""
312
Initialize option decorator.
313
314
Args:
315
*args: Option specification (same as click.option)
316
**kwargs: Option configuration (same as click.option)
317
"""
318
319
def __call__(self, fun) -> callable:
320
"""Apply option decorator to function."""
321
322
def call_command(
323
command: str,
324
args: list = None,
325
stdout = None,
326
stderr = None,
327
side_effects: bool = False,
328
**kwargs
329
) -> tuple:
330
"""
331
Programmatically execute a Faust CLI command.
332
333
Args:
334
command: Command name to execute
335
args: List of command arguments
336
stdout: Output stream (defaults to StringIO)
337
stderr: Error stream (defaults to StringIO)
338
side_effects: Whether to allow side effects
339
**kwargs: Additional keyword arguments
340
341
Returns:
342
Tuple of (exit_code, stdout, stderr)
343
"""
344
```
345
346
### Built-in Commands
347
348
Pre-implemented commands for common Faust operations including worker management, topic operations, and application monitoring.
349
350
```python { .api }
351
class WorkerCommand(AppCommand):
352
"""Command for managing Faust workers."""
353
354
async def run(
355
self,
356
*,
357
loglevel: str = 'info',
358
logfile: str = None,
359
web_host: str = None,
360
web_port: int = None,
361
**kwargs
362
) -> None:
363
"""
364
Start Faust worker process.
365
366
Args:
367
loglevel: Logging level
368
logfile: Log file path
369
web_host: Web interface host
370
web_port: Web interface port
371
"""
372
373
class SendCommand(AppCommand):
374
"""Command for sending messages to topics."""
375
376
async def run(
377
self,
378
topic: str,
379
*,
380
key: str = None,
381
value: str = None,
382
key_type: str = None,
383
value_type: str = None,
384
partition: int = None,
385
**kwargs
386
) -> None:
387
"""
388
Send message to topic.
389
390
Args:
391
topic: Target topic name
392
key: Message key
393
value: Message value
394
key_type: Key serialization type
395
value_type: Value serialization type
396
partition: Target partition
397
"""
398
399
class TopicsCommand(AppCommand):
400
"""Command for topic management operations."""
401
402
async def run(
403
self,
404
operation: str,
405
*,
406
topic: str = None,
407
partitions: int = None,
408
replication_factor: int = None,
409
**kwargs
410
) -> None:
411
"""
412
Manage Kafka topics.
413
414
Args:
415
operation: Operation type (list, create, delete, describe)
416
topic: Topic name (for create/delete/describe)
417
partitions: Number of partitions (for create)
418
replication_factor: Replication factor (for create)
419
"""
420
421
class TablesCommand(AppCommand):
422
"""Command for table inspection and management."""
423
424
async def run(
425
self,
426
operation: str,
427
*,
428
table: str = None,
429
key: str = None,
430
**kwargs
431
) -> None:
432
"""
433
Manage application tables.
434
435
Args:
436
operation: Operation type (list, get, clear, describe)
437
table: Table name
438
key: Table key (for get operations)
439
"""
440
441
class AgentsCommand(AppCommand):
442
"""Command for agent monitoring and control."""
443
444
async def run(
445
self,
446
operation: str,
447
*,
448
agent: str = None,
449
**kwargs
450
) -> None:
451
"""
452
Monitor and control agents.
453
454
Args:
455
operation: Operation type (list, status, restart)
456
agent: Agent name (for specific operations)
457
"""
458
```
459
460
### Command Utilities
461
462
Helper functions and utilities for command implementation, including formatting, validation, and common operation patterns.
463
464
```python { .api }
465
def confirm(message: str, *, default: bool = False) -> bool:
466
"""
467
Ask user for confirmation.
468
469
Args:
470
message: Confirmation message
471
default: Default response
472
473
Returns:
474
True if user confirms
475
"""
476
477
def prompt(message: str, *, default: str = None, hide_input: bool = False) -> str:
478
"""
479
Prompt user for input.
480
481
Args:
482
message: Prompt message
483
default: Default value
484
hide_input: Hide input (for passwords)
485
486
Returns:
487
User input string
488
"""
489
490
def print_table(
491
data: list,
492
headers: list,
493
*,
494
format: str = 'table',
495
sort_key: str = None
496
) -> None:
497
"""
498
Print data in tabular format.
499
500
Args:
501
data: List of dictionaries
502
headers: Column headers
503
format: Output format (table, csv, json)
504
sort_key: Key to sort by
505
"""
506
507
def format_duration(seconds: float) -> str:
508
"""
509
Format duration in human-readable form.
510
511
Args:
512
seconds: Duration in seconds
513
514
Returns:
515
Formatted duration string
516
"""
517
518
def format_bytes(bytes_count: int) -> str:
519
"""
520
Format byte count in human-readable form.
521
522
Args:
523
bytes_count: Number of bytes
524
525
Returns:
526
Formatted byte string (KB, MB, GB, etc.)
527
"""
528
529
class ProgressBar:
530
def __init__(self, total: int, *, description: str = None):
531
"""
532
Progress bar for long-running operations.
533
534
Args:
535
total: Total number of items
536
description: Operation description
537
"""
538
539
def update(self, advance: int = 1) -> None:
540
"""
541
Update progress bar.
542
543
Args:
544
advance: Number of items completed
545
"""
546
547
def finish(self) -> None:
548
"""Complete progress bar."""
549
550
def __enter__(self):
551
return self
552
553
def __exit__(self, exc_type, exc_val, exc_tb):
554
self.finish()
555
```
556
557
## Usage Examples
558
559
### Basic Custom Command
560
561
```python
562
import faust
563
564
app = faust.App('cli-app', broker='kafka://localhost:9092')
565
566
@app.command()
567
async def hello():
568
"""Say hello command."""
569
print("Hello from Faust CLI!")
570
571
@app.command()
572
@faust.option('--name', '-n', help='Name to greet')
573
@faust.option('--count', '-c', type=int, default=1, help='Number of greetings')
574
async def greet(name: str = 'World', count: int = 1):
575
"""Greet someone multiple times."""
576
for i in range(count):
577
print(f"Hello, {name}!")
578
579
# Run with: python -m myapp greet --name=Alice --count=3
580
```
581
582
### Application Management Command
583
584
```python
585
@app.command()
586
@faust.option('--format', type=faust.choice('table', 'json', 'csv'),
587
default='table', help='Output format')
588
async def status(format: str = 'table'):
589
"""Show application status."""
590
591
# Collect status information
592
status_data = {
593
'app_id': app.conf.id,
594
'broker': app.conf.broker,
595
'agents': len(app.agents),
596
'topics': len(app.topics),
597
'tables': len(app.tables),
598
'web_enabled': app.conf.web_enabled,
599
'web_port': app.conf.web_port
600
}
601
602
if format == 'json':
603
import json
604
print(json.dumps(status_data, indent=2))
605
elif format == 'csv':
606
for key, value in status_data.items():
607
print(f"{key},{value}")
608
else:
609
print("=== Application Status ===")
610
for key, value in status_data.items():
611
print(f"{key:15}: {value}")
612
```
613
614
### Data Management Commands
615
616
```python
617
@app.command()
618
@faust.argument('table_name', help='Table name to inspect')
619
@faust.option('--limit', type=int, default=10, help='Limit number of items')
620
async def table_dump(table_name: str, limit: int = 10):
621
"""Dump table contents."""
622
623
if table_name not in app.tables:
624
print(f"Table '{table_name}' not found")
625
return
626
627
table = app.tables[table_name]
628
629
print(f"Contents of table '{table_name}':")
630
print("-" * 40)
631
632
count = 0
633
for key, value in table.items():
634
if count >= limit:
635
print(f"... (showing first {limit} items)")
636
break
637
638
print(f"{key}: {value}")
639
count += 1
640
641
print(f"\nTotal items shown: {count}")
642
643
@app.command()
644
@faust.argument('topic_name', help='Topic to send message to')
645
@faust.argument('message', help='Message to send')
646
@faust.option('--key', help='Message key')
647
@faust.option('--partition', type=int, help='Target partition')
648
async def send_message(topic_name: str, message: str, key: str = None, partition: int = None):
649
"""Send message to topic."""
650
651
if topic_name not in app.topics:
652
print(f"Topic '{topic_name}' not found")
653
return
654
655
topic = app.topics[topic_name]
656
657
try:
658
await topic.send(key=key, value=message, partition=partition)
659
print(f"Message sent to {topic_name}")
660
except Exception as e:
661
print(f"Error sending message: {e}")
662
```
663
664
### Monitoring and Diagnostics
665
666
```python
667
@app.command()
668
@faust.option('--watch', '-w', is_flag=True, help='Watch metrics continuously')
669
@faust.option('--interval', type=int, default=5, help='Update interval in seconds')
670
async def metrics(watch: bool = False, interval: int = 5):
671
"""Display application metrics."""
672
673
def print_metrics():
674
if hasattr(app, 'monitor'):
675
monitor = app.monitor
676
677
print("\033[2J\033[H") # Clear screen
678
print("=== Faust Application Metrics ===")
679
print(f"Messages received: {monitor.messages_received_total()}")
680
print(f"Messages sent: {monitor.messages_sent_total()}")
681
print(f"Events per second: {monitor.events_per_second():.2f}")
682
print(f"Table operations: {monitor.tables_get_total()}")
683
print(f"Commit latency: {monitor.commit_latency_avg():.3f}s")
684
685
# Agent status
686
print("\nAgent Status:")
687
for agent in app.agents:
688
print(f" {agent.name}: {'Running' if agent.started else 'Stopped'}")
689
else:
690
print("No monitor configured")
691
692
if watch:
693
import asyncio
694
while True:
695
print_metrics()
696
await asyncio.sleep(interval)
697
else:
698
print_metrics()
699
700
@app.command()
701
async def health():
702
"""Check application health."""
703
704
health_status = {
705
'app_started': app.started,
706
'agents_running': sum(1 for agent in app.agents if agent.started),
707
'total_agents': len(app.agents),
708
'tables_count': len(app.tables),
709
'topics_count': len(app.topics)
710
}
711
712
# Check if all agents are running
713
all_agents_running = health_status['agents_running'] == health_status['total_agents']
714
715
print("=== Health Check ===")
716
for key, value in health_status.items():
717
print(f"{key}: {value}")
718
719
if app.started and all_agents_running:
720
print("\n✅ Application is healthy")
721
return 0
722
else:
723
print("\n❌ Application has issues")
724
return 1
725
```
726
727
### Interactive Commands
728
729
```python
730
@app.command()
731
async def interactive():
732
"""Interactive command shell."""
733
734
import readline # Enable command history
735
736
print("Faust Interactive Shell")
737
print("Type 'help' for available commands or 'exit' to quit")
738
739
while True:
740
try:
741
command = input("faust> ").strip()
742
743
if command == 'exit':
744
break
745
elif command == 'help':
746
print("Available commands:")
747
for cmd_name in app._commands:
748
cmd = app._commands[cmd_name]
749
print(f" {cmd_name}: {cmd.help or 'No help available'}")
750
elif command.startswith('send '):
751
# Parse send command: send topic_name message
752
parts = command.split(' ', 2)
753
if len(parts) >= 3:
754
topic_name, message = parts[1], parts[2]
755
await send_message(topic_name, message)
756
else:
757
print("Usage: send <topic> <message>")
758
else:
759
# Try to run as registered command
760
try:
761
await faust.call_command(command, app=app)
762
except Exception as e:
763
print(f"Error: {e}")
764
765
except KeyboardInterrupt:
766
break
767
except EOFError:
768
break
769
770
print("\nGoodbye!")
771
772
@app.command()
773
@faust.argument('operation', type=faust.choice('backup', 'restore', 'list'))
774
@faust.option('--path', help='Backup file path')
775
@faust.option('--table', help='Specific table name')
776
async def backup(operation: str, path: str = None, table: str = None):
777
"""Backup and restore table data."""
778
779
import json
780
from datetime import datetime
781
782
if operation == 'backup':
783
if not path:
784
path = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
785
786
backup_data = {}
787
788
tables_to_backup = [table] if table else app.tables.keys()
789
790
for table_name in tables_to_backup:
791
if table_name in app.tables:
792
table_obj = app.tables[table_name]
793
backup_data[table_name] = dict(table_obj.items())
794
795
with open(path, 'w') as f:
796
json.dump(backup_data, f, indent=2, default=str)
797
798
print(f"Backup saved to {path}")
799
800
elif operation == 'restore':
801
if not path:
802
print("--path is required for restore operation")
803
return
804
805
with open(path, 'r') as f:
806
backup_data = json.load(f)
807
808
for table_name, data in backup_data.items():
809
if table_name in app.tables:
810
table_obj = app.tables[table_name]
811
table_obj.clear()
812
table_obj.update(data)
813
print(f"Restored {len(data)} items to table '{table_name}'")
814
815
elif operation == 'list':
816
import os
817
backup_files = [f for f in os.listdir('.') if f.startswith('backup_') and f.endswith('.json')]
818
819
if backup_files:
820
print("Available backup files:")
821
for file in sorted(backup_files):
822
size = os.path.getsize(file)
823
print(f" {file} ({size} bytes)")
824
else:
825
print("No backup files found")
826
```
827
828
## Type Interfaces
829
830
```python { .api }
831
from typing import Protocol, Any, Optional, List
832
833
class CommandT(Protocol):
834
"""Type interface for Command."""
835
836
help: str
837
description: str
838
839
def run(self, *args, **kwargs) -> Any: ...
840
def add_arguments(self, parser: Any) -> None: ...
841
def handle(self, *args, **options) -> Any: ...
842
843
class AppCommandT(CommandT, Protocol):
844
"""Type interface for AppCommand."""
845
846
app: 'AppT'
847
848
def get_app_config(self) -> dict: ...
849
def print_app_info(self) -> None: ...
850
851
class CommandRegistryT(Protocol):
852
"""Type interface for CommandRegistry."""
853
854
def register(self, name: str, command_class: type, **kwargs) -> None: ...
855
def get(self, name: str) -> type: ...
856
def list_commands(self) -> List[str]: ...
857
```