Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
Command-line interface framework for building application-specific commands in Faust applications. Provides decorators, argument parsing, option handling, and integration with the Faust CLI system for creating custom management and operational commands.
Foundation classes for implementing custom CLI commands with proper argument handling, application context, and integration with the Faust command system.
class Command:
def __init__(self, *args, **kwargs):
"""
Base command class for CLI operations.
Args:
*args: Positional arguments
**kwargs: Keyword arguments and options
"""
def run(self, *args, **kwargs) -> any:
"""
Execute the command with given arguments.
Args:
*args: Command arguments
**kwargs: Command options
Returns:
Command execution result
"""
def add_arguments(self, parser) -> None:
"""
Add command-specific arguments to parser.
Args:
parser: Argument parser instance
"""
def handle(self, *args, **options) -> any:
"""
Handle command execution with parsed arguments.
Args:
*args: Parsed arguments
**options: Parsed options
Returns:
Command result
"""
@property
def help(self) -> str:
"""Command help text."""
@property
def description(self) -> str:
"""Command description."""
class AppCommand(Command):
def __init__(self, app: App, *args, **kwargs):
"""
Application-aware command with access to Faust app instance.
Args:
app: Faust application instance
*args: Additional arguments
**kwargs: Additional options
"""
super().__init__(*args, **kwargs)
self.app = app
async def run(self, *args, **kwargs) -> any:
"""
Execute async command with application context.
Args:
*args: Command arguments
**kwargs: Command options
Returns:
Command execution result
"""
def get_app_config(self) -> dict:
"""
Get application configuration.
Returns:
Application configuration dictionary
"""
def print_app_info(self) -> None:
"""Print application information."""
@property
def app(self) -> App:
"""Faust application instance."""Decorators for defining command-line arguments and options with type validation, default values, and help text integration.
def argument(
*args,
**kwargs
) -> callable:
"""
Decorator for adding positional arguments to commands.
Args:
*args: Argument names and configuration
**kwargs: Argument options (type, help, etc.)
Returns:
Command decorator function
Example:
@argument('filename', help='File to process')
@argument('--count', type=int, default=1, help='Number of items')
def my_command(filename, count=1):
pass
"""
def option(
*args,
**kwargs
) -> callable:
"""
Decorator for adding optional arguments to commands.
Args:
*args: Option names (e.g., '--verbose', '-v')
**kwargs: Option configuration
Returns:
Command decorator function
Example:
@option('--verbose', '-v', is_flag=True, help='Verbose output')
@option('--config', type=str, help='Config file path')
def my_command(verbose=False, config=None):
pass
"""
def flag(
*args,
help: str = None,
**kwargs
) -> callable:
"""
Decorator for boolean flag options.
Args:
*args: Flag names (e.g., '--debug', '-d')
help: Help text for flag
**kwargs: Additional flag options
Returns:
Command decorator function
"""
def choice(
*choices,
help: str = None,
**kwargs
) -> callable:
"""
Decorator for choice-based options.
Args:
*choices: Valid choice values
help: Help text
**kwargs: Additional choice options
Returns:
Command decorator function
"""System for registering and discovering commands within Faust applications, including automatic command discovery and namespace management.
def command(
name: str = None,
*,
base: type = None,
help: str = None,
**kwargs
) -> callable:
"""
Register function as a CLI command.
Args:
name: Command name (defaults to function name)
base: Base command class
help: Command help text
**kwargs: Additional command options
Returns:
Command decorator function
Example:
@app.command()
def my_command():
'''Custom command implementation.'''
pass
"""
def call_command(
name: str,
*args,
app: App = None,
**kwargs
) -> any:
"""
Call registered command programmatically.
Args:
name: Command name
*args: Command arguments
app: Application instance
**kwargs: Command options
Returns:
Command result
"""
class CommandRegistry:
def __init__(self):
"""Registry for managing CLI commands."""
def register(self, name: str, command_class: type, **kwargs) -> None:
"""
Register command class with given name.
Args:
name: Command name
command_class: Command implementation class
**kwargs: Command metadata
"""
def get(self, name: str) -> type:
"""
Get command class by name.
Args:
name: Command name
Returns:
Command class
Raises:
KeyError: If command not found
"""
def list_commands(self) -> list:
"""
List all registered commands.
Returns:
List of command names
"""
def discover_commands(self, module: str) -> None:
"""
Auto-discover commands in module.
Args:
module: Module name to scan for commands
"""Decorators and utility functions for creating command-line arguments, options, and executing commands programmatically.
class argument:
"""
Create command-line argument decorator.
This class wraps click.argument to provide command-line argument
parsing for Faust commands.
"""
def __init__(self, *args, **kwargs):
"""
Initialize argument decorator.
Args:
*args: Argument specification (same as click.argument)
**kwargs: Argument options (same as click.argument)
"""
def __call__(self, fun) -> callable:
"""Apply argument decorator to function."""
class option:
"""
Create command-line option decorator.
This class wraps click.option to provide command-line option
parsing for Faust commands.
"""
def __init__(self, *args, **kwargs):
"""
Initialize option decorator.
Args:
*args: Option specification (same as click.option)
**kwargs: Option configuration (same as click.option)
"""
def __call__(self, fun) -> callable:
"""Apply option decorator to function."""
def call_command(
command: str,
args: list = None,
stdout = None,
stderr = None,
side_effects: bool = False,
**kwargs
) -> tuple:
"""
Programmatically execute a Faust CLI command.
Args:
command: Command name to execute
args: List of command arguments
stdout: Output stream (defaults to StringIO)
stderr: Error stream (defaults to StringIO)
side_effects: Whether to allow side effects
**kwargs: Additional keyword arguments
Returns:
Tuple of (exit_code, stdout, stderr)
"""Pre-implemented commands for common Faust operations including worker management, topic operations, and application monitoring.
class WorkerCommand(AppCommand):
"""Command for managing Faust workers."""
async def run(
self,
*,
loglevel: str = 'info',
logfile: str = None,
web_host: str = None,
web_port: int = None,
**kwargs
) -> None:
"""
Start Faust worker process.
Args:
loglevel: Logging level
logfile: Log file path
web_host: Web interface host
web_port: Web interface port
"""
class SendCommand(AppCommand):
"""Command for sending messages to topics."""
async def run(
self,
topic: str,
*,
key: str = None,
value: str = None,
key_type: str = None,
value_type: str = None,
partition: int = None,
**kwargs
) -> None:
"""
Send message to topic.
Args:
topic: Target topic name
key: Message key
value: Message value
key_type: Key serialization type
value_type: Value serialization type
partition: Target partition
"""
class TopicsCommand(AppCommand):
"""Command for topic management operations."""
async def run(
self,
operation: str,
*,
topic: str = None,
partitions: int = None,
replication_factor: int = None,
**kwargs
) -> None:
"""
Manage Kafka topics.
Args:
operation: Operation type (list, create, delete, describe)
topic: Topic name (for create/delete/describe)
partitions: Number of partitions (for create)
replication_factor: Replication factor (for create)
"""
class TablesCommand(AppCommand):
"""Command for table inspection and management."""
async def run(
self,
operation: str,
*,
table: str = None,
key: str = None,
**kwargs
) -> None:
"""
Manage application tables.
Args:
operation: Operation type (list, get, clear, describe)
table: Table name
key: Table key (for get operations)
"""
class AgentsCommand(AppCommand):
"""Command for agent monitoring and control."""
async def run(
self,
operation: str,
*,
agent: str = None,
**kwargs
) -> None:
"""
Monitor and control agents.
Args:
operation: Operation type (list, status, restart)
agent: Agent name (for specific operations)
"""Helper functions and utilities for command implementation, including formatting, validation, and common operation patterns.
def confirm(message: str, *, default: bool = False) -> bool:
"""
Ask user for confirmation.
Args:
message: Confirmation message
default: Default response
Returns:
True if user confirms
"""
def prompt(message: str, *, default: str = None, hide_input: bool = False) -> str:
"""
Prompt user for input.
Args:
message: Prompt message
default: Default value
hide_input: Hide input (for passwords)
Returns:
User input string
"""
def print_table(
data: list,
headers: list,
*,
format: str = 'table',
sort_key: str = None
) -> None:
"""
Print data in tabular format.
Args:
data: List of dictionaries
headers: Column headers
format: Output format (table, csv, json)
sort_key: Key to sort by
"""
def format_duration(seconds: float) -> str:
"""
Format duration in human-readable form.
Args:
seconds: Duration in seconds
Returns:
Formatted duration string
"""
def format_bytes(bytes_count: int) -> str:
"""
Format byte count in human-readable form.
Args:
bytes_count: Number of bytes
Returns:
Formatted byte string (KB, MB, GB, etc.)
"""
class ProgressBar:
def __init__(self, total: int, *, description: str = None):
"""
Progress bar for long-running operations.
Args:
total: Total number of items
description: Operation description
"""
def update(self, advance: int = 1) -> None:
"""
Update progress bar.
Args:
advance: Number of items completed
"""
def finish(self) -> None:
"""Complete progress bar."""
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.finish()import faust
app = faust.App('cli-app', broker='kafka://localhost:9092')
@app.command()
async def hello():
"""Say hello command."""
print("Hello from Faust CLI!")
@app.command()
@faust.option('--name', '-n', help='Name to greet')
@faust.option('--count', '-c', type=int, default=1, help='Number of greetings')
async def greet(name: str = 'World', count: int = 1):
"""Greet someone multiple times."""
for i in range(count):
print(f"Hello, {name}!")
# Run with: python -m myapp greet --name=Alice --count=3@app.command()
@faust.option('--format', type=faust.choice('table', 'json', 'csv'),
default='table', help='Output format')
async def status(format: str = 'table'):
"""Show application status."""
# Collect status information
status_data = {
'app_id': app.conf.id,
'broker': app.conf.broker,
'agents': len(app.agents),
'topics': len(app.topics),
'tables': len(app.tables),
'web_enabled': app.conf.web_enabled,
'web_port': app.conf.web_port
}
if format == 'json':
import json
print(json.dumps(status_data, indent=2))
elif format == 'csv':
for key, value in status_data.items():
print(f"{key},{value}")
else:
print("=== Application Status ===")
for key, value in status_data.items():
print(f"{key:15}: {value}")@app.command()
@faust.argument('table_name', help='Table name to inspect')
@faust.option('--limit', type=int, default=10, help='Limit number of items')
async def table_dump(table_name: str, limit: int = 10):
"""Dump table contents."""
if table_name not in app.tables:
print(f"Table '{table_name}' not found")
return
table = app.tables[table_name]
print(f"Contents of table '{table_name}':")
print("-" * 40)
count = 0
for key, value in table.items():
if count >= limit:
print(f"... (showing first {limit} items)")
break
print(f"{key}: {value}")
count += 1
print(f"\nTotal items shown: {count}")
@app.command()
@faust.argument('topic_name', help='Topic to send message to')
@faust.argument('message', help='Message to send')
@faust.option('--key', help='Message key')
@faust.option('--partition', type=int, help='Target partition')
async def send_message(topic_name: str, message: str, key: str = None, partition: int = None):
"""Send message to topic."""
if topic_name not in app.topics:
print(f"Topic '{topic_name}' not found")
return
topic = app.topics[topic_name]
try:
await topic.send(key=key, value=message, partition=partition)
print(f"Message sent to {topic_name}")
except Exception as e:
print(f"Error sending message: {e}")@app.command()
@faust.option('--watch', '-w', is_flag=True, help='Watch metrics continuously')
@faust.option('--interval', type=int, default=5, help='Update interval in seconds')
async def metrics(watch: bool = False, interval: int = 5):
"""Display application metrics."""
def print_metrics():
if hasattr(app, 'monitor'):
monitor = app.monitor
print("\033[2J\033[H") # Clear screen
print("=== Faust Application Metrics ===")
print(f"Messages received: {monitor.messages_received_total()}")
print(f"Messages sent: {monitor.messages_sent_total()}")
print(f"Events per second: {monitor.events_per_second():.2f}")
print(f"Table operations: {monitor.tables_get_total()}")
print(f"Commit latency: {monitor.commit_latency_avg():.3f}s")
# Agent status
print("\nAgent Status:")
for agent in app.agents:
print(f" {agent.name}: {'Running' if agent.started else 'Stopped'}")
else:
print("No monitor configured")
if watch:
import asyncio
while True:
print_metrics()
await asyncio.sleep(interval)
else:
print_metrics()
@app.command()
async def health():
"""Check application health."""
health_status = {
'app_started': app.started,
'agents_running': sum(1 for agent in app.agents if agent.started),
'total_agents': len(app.agents),
'tables_count': len(app.tables),
'topics_count': len(app.topics)
}
# Check if all agents are running
all_agents_running = health_status['agents_running'] == health_status['total_agents']
print("=== Health Check ===")
for key, value in health_status.items():
print(f"{key}: {value}")
if app.started and all_agents_running:
print("\n✅ Application is healthy")
return 0
else:
print("\n❌ Application has issues")
return 1@app.command()
async def interactive():
"""Interactive command shell."""
import readline # Enable command history
print("Faust Interactive Shell")
print("Type 'help' for available commands or 'exit' to quit")
while True:
try:
command = input("faust> ").strip()
if command == 'exit':
break
elif command == 'help':
print("Available commands:")
for cmd_name in app._commands:
cmd = app._commands[cmd_name]
print(f" {cmd_name}: {cmd.help or 'No help available'}")
elif command.startswith('send '):
# Parse send command: send topic_name message
parts = command.split(' ', 2)
if len(parts) >= 3:
topic_name, message = parts[1], parts[2]
await send_message(topic_name, message)
else:
print("Usage: send <topic> <message>")
else:
# Try to run as registered command
try:
await faust.call_command(command, app=app)
except Exception as e:
print(f"Error: {e}")
except KeyboardInterrupt:
break
except EOFError:
break
print("\nGoodbye!")
@app.command()
@faust.argument('operation', type=faust.choice('backup', 'restore', 'list'))
@faust.option('--path', help='Backup file path')
@faust.option('--table', help='Specific table name')
async def backup(operation: str, path: str = None, table: str = None):
"""Backup and restore table data."""
import json
from datetime import datetime
if operation == 'backup':
if not path:
path = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
backup_data = {}
tables_to_backup = [table] if table else app.tables.keys()
for table_name in tables_to_backup:
if table_name in app.tables:
table_obj = app.tables[table_name]
backup_data[table_name] = dict(table_obj.items())
with open(path, 'w') as f:
json.dump(backup_data, f, indent=2, default=str)
print(f"Backup saved to {path}")
elif operation == 'restore':
if not path:
print("--path is required for restore operation")
return
with open(path, 'r') as f:
backup_data = json.load(f)
for table_name, data in backup_data.items():
if table_name in app.tables:
table_obj = app.tables[table_name]
table_obj.clear()
table_obj.update(data)
print(f"Restored {len(data)} items to table '{table_name}'")
elif operation == 'list':
import os
backup_files = [f for f in os.listdir('.') if f.startswith('backup_') and f.endswith('.json')]
if backup_files:
print("Available backup files:")
for file in sorted(backup_files):
size = os.path.getsize(file)
print(f" {file} ({size} bytes)")
else:
print("No backup files found")from typing import Protocol, Any, Optional, List
class CommandT(Protocol):
"""Type interface for Command."""
help: str
description: str
def run(self, *args, **kwargs) -> Any: ...
def add_arguments(self, parser: Any) -> None: ...
def handle(self, *args, **options) -> Any: ...
class AppCommandT(CommandT, Protocol):
"""Type interface for AppCommand."""
app: 'AppT'
def get_app_config(self) -> dict: ...
def print_app_info(self) -> None: ...
class CommandRegistryT(Protocol):
"""Type interface for CommandRegistry."""
def register(self, name: str, command_class: type, **kwargs) -> None: ...
def get(self, name: str) -> type: ...
def list_commands(self) -> List[str]: ...Install with Tessl CLI
npx tessl i tessl/pypi-faust