CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

worker-management.mddocs/

Worker Management

Worker process management and service coordination in Faust applications. Provides application lifecycle management, process coordination, service orchestration, and distributed system coordination for scalable stream processing deployments.

Capabilities

Worker Process Management

Core worker process management for running Faust applications with proper startup, shutdown, and lifecycle coordination across distributed environments.

class Worker:
    def __init__(
        self,
        app: App,
        *,
        loglevel: str = 'info',
        logfile: str = None,
        pidfile: str = None,
        uid: int = None,
        gid: int = None,
        umask: int = None,
        workdir: str = None,
        daemon: bool = False,
        **kwargs
    ):
        """
        Worker process manager for Faust applications.
        
        Args:
            app: Faust application instance
            loglevel: Logging level (debug, info, warning, error)
            logfile: Path to log file
            pidfile: Path to PID file
            uid: User ID to run as
            gid: Group ID to run as  
            umask: File creation mask
            workdir: Working directory
            daemon: Run as daemon process
        """

    def start(self) -> None:
        """
        Start the worker process.
        
        Initializes the application, starts all services, agents, and
        begins message processing. Blocks until stopped.
        """

    def stop(self) -> None:
        """
        Stop the worker process gracefully.
        
        Initiates shutdown sequence, stops agents, commits offsets,
        and cleanly terminates all services.
        """

    def restart(self) -> None:
        """
        Restart the worker process.
        
        Performs graceful shutdown followed by startup. Useful for
        configuration changes or deployment updates.
        """

    def is_running(self) -> bool:
        """
        Check if worker is currently running.
        
        Returns:
            True if worker process is active
        """

    def get_pid(self) -> int:
        """
        Get worker process ID.
        
        Returns:
            Process ID or None if not running
        """

    def setup_logging(self) -> None:
        """Configure logging for worker process."""

    def setup_signals(self) -> None:
        """Setup signal handlers for graceful shutdown."""

    def daemonize(self) -> None:
        """
        Convert process to daemon.
        
        Detaches from terminal and runs in background.
        """

    @property
    def app(self) -> App:
        """Associated Faust application."""

    @property
    def loglevel(self) -> str:
        """Current logging level."""

    @property
    def logfile(self) -> str:
        """Log file path."""

    @property
    def pidfile(self) -> str:
        """PID file path."""

Service Management

Service framework integration for managing application services, background tasks, and coordinated startup/shutdown sequences.

class Service:
    def __init__(self, **kwargs):
        """
        Base service class for coordinated lifecycle management.
        
        Args:
            **kwargs: Service configuration options
        """

    async def start(self) -> None:
        """
        Start the service.
        
        Called during application startup phase.
        """

    async def stop(self) -> None:
        """
        Stop the service gracefully.
        
        Called during application shutdown phase.
        """

    async def restart(self) -> None:
        """
        Restart the service.
        
        Default implementation stops then starts the service.
        """

    def add_dependency(self, service: 'Service') -> None:
        """
        Add service dependency.
        
        Args:
            service: Service that must start before this one
        """

    def remove_dependency(self, service: 'Service') -> None:
        """
        Remove service dependency.
        
        Args:
            service: Service to remove from dependencies
        """

    @property
    def dependencies(self) -> set:
        """Services this service depends on."""

    @property
    def label(self) -> str:
        """Service label for identification."""

    @property
    def beacon(self) -> any:
        """Beacon for coordinating with other services."""

class ServiceManager:
    def __init__(self, app: App):
        """
        Manager for coordinating multiple services.
        
        Args:
            app: Faust application instance
        """

    def add_service(self, service: Service) -> None:
        """
        Add service to management.
        
        Args:
            service: Service instance to manage
        """

    def remove_service(self, service: Service) -> None:
        """
        Remove service from management.
        
        Args:
            service: Service instance to remove
        """

    async def start_services(self) -> None:
        """
        Start all services in dependency order.
        
        Ensures services start in correct sequence based on dependencies.
        """

    async def stop_services(self) -> None:
        """
        Stop all services in reverse dependency order.
        
        Ensures clean shutdown respecting service dependencies.
        """

    def get_service_status(self) -> dict:
        """
        Get status of all managed services.
        
        Returns:
            Dictionary mapping service names to status info
        """

Process Coordination

Utilities for coordinating multiple worker processes, partition assignment, and distributed processing coordination.

class ProcessCoordinator:
    def __init__(
        self,
        app: App,
        *,
        max_workers: int = None,
        worker_timeout: float = 60.0,
        coordination_topic: str = None,
        **kwargs
    ):
        """
        Coordinator for multiple worker processes.
        
        Args:
            app: Faust application instance
            max_workers: Maximum number of worker processes
            worker_timeout: Worker health check timeout
            coordination_topic: Topic for worker coordination
        """

    async def register_worker(self, worker_id: str, metadata: dict = None) -> None:
        """
        Register worker process with coordinator.
        
        Args:
            worker_id: Unique worker identifier
            metadata: Worker metadata (hostname, capabilities, etc.)
        """

    async def unregister_worker(self, worker_id: str) -> None:
        """
        Unregister worker process from coordinator.
        
        Args:
            worker_id: Worker identifier to remove
        """

    async def get_active_workers(self) -> list:
        """
        Get list of active worker processes.
        
        Returns:
            List of worker information dictionaries
        """

    async def coordinate_partition_assignment(self) -> dict:
        """
        Coordinate partition assignment across workers.
        
        Returns:
            Dictionary mapping workers to assigned partitions
        """

    async def handle_worker_failure(self, worker_id: str) -> None:
        """
        Handle worker process failure.
        
        Args:
            worker_id: Failed worker identifier
        """

    async def rebalance_load(self) -> None:
        """
        Trigger load rebalancing across workers.
        
        Redistributes partitions and workload based on current capacity.
        """

class PartitionAssignment:
    def __init__(self, topic: str, partition: int, worker_id: str):
        """
        Represents partition assignment to worker.
        
        Args:
            topic: Topic name
            partition: Partition number
            worker_id: Assigned worker identifier
        """

    @property
    def topic(self) -> str:
        """Topic name."""

    @property
    def partition(self) -> int:
        """Partition number."""

    @property
    def worker_id(self) -> str:
        """Assigned worker ID."""

Deployment Management

Utilities for managing application deployment, scaling, and operational concerns in production environments.

class DeploymentManager:
    def __init__(
        self,
        app: App,
        *,
        deployment_id: str = None,
        health_check_interval: float = 30.0,
        scaling_policy: dict = None,
        **kwargs
    ):
        """
        Manager for deployment and scaling operations.
        
        Args:
            app: Faust application instance
            deployment_id: Unique deployment identifier
            health_check_interval: Health check frequency
            scaling_policy: Auto-scaling configuration
        """

    async def deploy(self, version: str, config: dict = None) -> None:
        """
        Deploy new application version.
        
        Args:
            version: Application version identifier
            config: Deployment configuration
        """

    async def rollback(self, target_version: str) -> None:
        """
        Rollback to previous version.
        
        Args:
            target_version: Version to rollback to
        """

    async def scale_workers(self, target_count: int) -> None:
        """
        Scale worker processes to target count.
        
        Args:
            target_count: Desired number of workers
        """

    async def health_check(self) -> dict:
        """
        Perform comprehensive health check.
        
        Returns:
            Health status information
        """

    async def get_deployment_status(self) -> dict:
        """
        Get current deployment status.
        
        Returns:
            Deployment status information
        """

    def configure_auto_scaling(self, policy: dict) -> None:
        """
        Configure automatic scaling policy.
        
        Args:
            policy: Scaling policy configuration
        """

class HealthCheck:
    def __init__(self, name: str, check_func: callable, interval: float = 30.0):
        """
        Health check definition.
        
        Args:
            name: Check name
            check_func: Function that performs the check
            interval: Check interval in seconds
        """

    async def execute(self) -> dict:
        """
        Execute health check.
        
        Returns:
            Check result with status and details
        """

    @property
    def name(self) -> str:
        """Health check name."""

    @property
    def interval(self) -> float:
        """Check interval."""

Configuration Management

Runtime configuration management and environment-specific settings for worker processes and deployment scenarios.

class WorkerConfig:
    def __init__(
        self,
        *,
        worker_id: str = None,
        concurrency: int = None,
        max_memory: int = None,
        timeout: float = None,
        environment: str = None,
        **kwargs
    ):
        """
        Configuration for worker processes.
        
        Args:
            worker_id: Worker identifier
            concurrency: Worker concurrency level
            max_memory: Maximum memory usage (bytes)
            timeout: Worker timeout
            environment: Environment name (dev, staging, prod)
        """

    def load_from_file(self, path: str) -> None:
        """
        Load configuration from file.
        
        Args:
            path: Configuration file path
        """

    def load_from_env(self, prefix: str = 'FAUST_') -> None:
        """
        Load configuration from environment variables.
        
        Args:
            prefix: Environment variable prefix
        """

    def validate(self) -> list:
        """
        Validate configuration settings.
        
        Returns:
            List of validation errors (empty if valid)
        """

    def merge(self, other: 'WorkerConfig') -> 'WorkerConfig':
        """
        Merge with another configuration.
        
        Args:
            other: Configuration to merge
            
        Returns:
            New merged configuration
        """

    @property
    def worker_id(self) -> str:
        """Worker identifier."""

    @property
    def concurrency(self) -> int:
        """Worker concurrency level."""

    @property
    def environment(self) -> str:
        """Environment name."""

def configure_worker(
    app: App,
    config: WorkerConfig = None,
    **kwargs
) -> Worker:
    """
    Configure worker with given settings.
    
    Args:
        app: Faust application
        config: Worker configuration
        **kwargs: Additional worker options
        
    Returns:
        Configured Worker instance
    """

Usage Examples

Basic Worker Management

import faust

app = faust.App('worker-app', broker='kafka://localhost:9092')

# Create and configure worker
worker = faust.Worker(
    app,
    loglevel='info',
    logfile='/var/log/faust/worker.log'
)

# Define some agents for the worker to run
@app.agent()
async def process_events(stream):
    async for event in stream:
        print(f"Processing: {event}")

if __name__ == '__main__':
    # Start the worker (blocks until stopped)
    worker.start()

Multi-Worker Coordination

import os
import asyncio
from faust import ProcessCoordinator

# Worker coordination setup
coordinator = ProcessCoordinator(
    app,
    max_workers=4,
    worker_timeout=60.0
)

@app.on_startup.connect
async def register_with_coordinator():
    """Register this worker when starting."""
    worker_id = f"{os.getpid()}-{os.uname().nodename}"
    metadata = {
        'hostname': os.uname().nodename,
        'pid': os.getpid(),
        'started_at': time.time()
    }
    
    await coordinator.register_worker(worker_id, metadata)
    app._worker_id = worker_id

@app.on_shutdown.connect
async def unregister_from_coordinator():
    """Unregister when shutting down."""
    if hasattr(app, '_worker_id'):
        await coordinator.unregister_worker(app._worker_id)

@app.timer(interval=30.0)
async def monitor_workers():
    """Monitor worker health and rebalance if needed."""
    active_workers = await coordinator.get_active_workers()
    
    if len(active_workers) < coordinator.max_workers:
        print(f"Only {len(active_workers)} workers active, may need scaling")
    
    # Trigger rebalancing if needed
    await coordinator.rebalance_load()

Service Management

class DatabaseService(faust.Service):
    """Custom service for database connection management."""
    
    def __init__(self, connection_string: str):
        super().__init__()
        self.connection_string = connection_string
        self.connection = None
    
    async def start(self):
        """Initialize database connection."""
        print("Starting database service...")
        # Initialize database connection
        self.connection = await create_connection(self.connection_string)
        print("Database service started")
    
    async def stop(self):
        """Close database connection."""
        print("Stopping database service...")
        if self.connection:
            await self.connection.close()
        print("Database service stopped")

class CacheService(faust.Service):
    """Custom service for cache management."""
    
    def __init__(self, redis_url: str):
        super().__init__()
        self.redis_url = redis_url
        self.redis = None
    
    async def start(self):
        """Initialize Redis connection."""
        print("Starting cache service...")
        self.redis = await create_redis_connection(self.redis_url)
        print("Cache service started")
    
    async def stop(self):
        """Close Redis connection."""
        print("Stopping cache service...")
        if self.redis:
            await self.redis.close()
        print("Cache service stopped")

# Register services with the app
db_service = DatabaseService('postgresql://localhost/mydb')
cache_service = CacheService('redis://localhost:6379')

# Cache service depends on database service
cache_service.add_dependency(db_service)

# Add services to app
app.service(db_service)
app.service(cache_service)

# Services will start in dependency order automatically

Health Monitoring and Deployment

from faust import DeploymentManager, HealthCheck

async def check_database_health():
    """Database connectivity check."""
    try:
        # Check database connection
        await db_service.connection.execute('SELECT 1')
        return {'status': 'healthy', 'latency': 0.001}
    except Exception as e:
        return {'status': 'unhealthy', 'error': str(e)}

async def check_message_processing():
    """Message processing health check."""
    if hasattr(app, 'monitor'):
        events_per_sec = app.monitor.events_per_second()
        if events_per_sec > 0:
            return {'status': 'healthy', 'events_per_second': events_per_sec}
        else:
            return {'status': 'warning', 'message': 'No events being processed'}
    return {'status': 'unknown', 'message': 'No monitor available'}

# Setup deployment manager with health checks
deployment = DeploymentManager(
    app,
    deployment_id=f"faust-app-{os.getenv('VERSION', '1.0')}",
    health_check_interval=30.0,
    scaling_policy={
        'min_workers': 2,
        'max_workers': 10,
        'target_cpu_percent': 70,
        'scale_up_threshold': 80,
        'scale_down_threshold': 30
    }
)

# Register health checks
deployment.add_health_check(
    HealthCheck('database', check_database_health, interval=30.0)
)
deployment.add_health_check(
    HealthCheck('processing', check_message_processing, interval=10.0)
)

@app.timer(interval=60.0)
async def deployment_health_monitor():
    """Monitor deployment health and auto-scale if needed."""
    health_status = await deployment.health_check()
    
    if not health_status['healthy']:
        print(f"Health check failed: {health_status['issues']}")
        
        # Could trigger alerts here
        # await send_alert(health_status)
    
    # Check if scaling is needed
    current_load = health_status.get('cpu_percent', 0)
    scaling_policy = deployment.scaling_policy
    
    if current_load > scaling_policy['scale_up_threshold']:
        current_workers = len(await coordinator.get_active_workers())
        if current_workers < scaling_policy['max_workers']:
            await deployment.scale_workers(current_workers + 1)
            print(f"Scaled up to {current_workers + 1} workers")
    
    elif current_load < scaling_policy['scale_down_threshold']:
        current_workers = len(await coordinator.get_active_workers())
        if current_workers > scaling_policy['min_workers']:
            await deployment.scale_workers(current_workers - 1)
            print(f"Scaled down to {current_workers - 1} workers")

Configuration Management

from faust import WorkerConfig

# Load configuration from environment and files
config = WorkerConfig()
config.load_from_env('FAUST_')  # Load FAUST_* environment variables

# Load additional configuration from file
config_file = os.getenv('FAUST_CONFIG_FILE', 'config/production.yaml')
if os.path.exists(config_file):
    config.load_from_file(config_file)

# Validate configuration
validation_errors = config.validate()
if validation_errors:
    print(f"Configuration errors: {validation_errors}")
    sys.exit(1)

# Configure worker with loaded configuration
worker = configure_worker(app, config)

# Override specific settings for this environment
if config.environment == 'production':
    worker.loglevel = 'warning'
    worker.daemon = True
elif config.environment == 'development':
    worker.loglevel = 'debug'
    worker.daemon = False

print(f"Starting worker {config.worker_id} in {config.environment} environment")
worker.start()

Production Deployment Script

#!/usr/bin/env python3
"""Production deployment script for Faust application."""

import sys
import signal
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_worker():
    """Context manager for proper worker lifecycle."""
    worker = None
    try:
        # Setup worker with production configuration
        config = WorkerConfig(
            worker_id=f"worker-{os.getpid()}",
            environment='production',
            concurrency=4,
            max_memory=2 * 1024 * 1024 * 1024  # 2GB
        )
        
        worker = configure_worker(app, config)
        
        # Setup signal handlers for graceful shutdown
        def signal_handler(signum, frame):
            print(f"Received signal {signum}, initiating shutdown...")
            if worker:
                worker.stop()
        
        signal.signal(signal.SIGTERM, signal_handler)
        signal.signal(signal.SIGINT, signal_handler)
        
        # Start worker in background
        worker_task = asyncio.create_task(
            asyncio.to_thread(worker.start)
        )
        
        yield worker
        
    except Exception as e:
        print(f"Error during worker startup: {e}")
        raise
    finally:
        if worker:
            print("Shutting down worker...")
            worker.stop()

async def main():
    """Main deployment entry point."""
    async with managed_worker() as worker:
        print(f"Worker {worker.get_pid()} started successfully")
        
        # Monitor worker health
        while worker.is_running():
            health = await deployment.health_check()
            if not health['healthy']:
                print(f"Health check failed: {health}")
                break
            
            await asyncio.sleep(30)
        
        print("Worker stopped")

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Deployment interrupted by user")
        sys.exit(0)
    except Exception as e:
        print(f"Deployment failed: {e}")
        sys.exit(1)

Type Interfaces

from typing import Protocol, Dict, List, Any, Optional, Callable

class WorkerT(Protocol):
    """Type interface for Worker."""
    
    app: 'AppT'
    loglevel: str
    logfile: Optional[str]
    
    def start(self) -> None: ...
    def stop(self) -> None: ...
    def restart(self) -> None: ...
    def is_running(self) -> bool: ...
    def get_pid(self) -> Optional[int]: ...

class ServiceT(Protocol):
    """Type interface for Service."""
    
    label: str
    dependencies: set
    
    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def restart(self) -> None: ...

class ProcessCoordinatorT(Protocol):
    """Type interface for ProcessCoordinator."""
    
    async def register_worker(self, worker_id: str, metadata: Optional[Dict] = None) -> None: ...
    async def unregister_worker(self, worker_id: str) -> None: ...
    async def get_active_workers(self) -> List[Dict]: ...
    async def rebalance_load(self) -> None: ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json