0
# Worker Management
1
2
Worker process management and service coordination in Faust applications. Provides application lifecycle management, process coordination, service orchestration, and distributed system coordination for scalable stream processing deployments.
3
4
## Capabilities
5
6
### Worker Process Management
7
8
Core worker process management for running Faust applications with proper startup, shutdown, and lifecycle coordination across distributed environments.
9
10
```python { .api }
11
class Worker:
12
def __init__(
13
self,
14
app: App,
15
*,
16
loglevel: str = 'info',
17
logfile: str = None,
18
pidfile: str = None,
19
uid: int = None,
20
gid: int = None,
21
umask: int = None,
22
workdir: str = None,
23
daemon: bool = False,
24
**kwargs
25
):
26
"""
27
Worker process manager for Faust applications.
28
29
Args:
30
app: Faust application instance
31
loglevel: Logging level (debug, info, warning, error)
32
logfile: Path to log file
33
pidfile: Path to PID file
34
uid: User ID to run as
35
gid: Group ID to run as
36
umask: File creation mask
37
workdir: Working directory
38
daemon: Run as daemon process
39
"""
40
41
def start(self) -> None:
42
"""
43
Start the worker process.
44
45
Initializes the application, starts all services, agents, and
46
begins message processing. Blocks until stopped.
47
"""
48
49
def stop(self) -> None:
50
"""
51
Stop the worker process gracefully.
52
53
Initiates shutdown sequence, stops agents, commits offsets,
54
and cleanly terminates all services.
55
"""
56
57
def restart(self) -> None:
58
"""
59
Restart the worker process.
60
61
Performs graceful shutdown followed by startup. Useful for
62
configuration changes or deployment updates.
63
"""
64
65
def is_running(self) -> bool:
66
"""
67
Check if worker is currently running.
68
69
Returns:
70
True if worker process is active
71
"""
72
73
def get_pid(self) -> int:
74
"""
75
Get worker process ID.
76
77
Returns:
78
Process ID or None if not running
79
"""
80
81
def setup_logging(self) -> None:
82
"""Configure logging for worker process."""
83
84
def setup_signals(self) -> None:
85
"""Setup signal handlers for graceful shutdown."""
86
87
def daemonize(self) -> None:
88
"""
89
Convert process to daemon.
90
91
Detaches from terminal and runs in background.
92
"""
93
94
@property
95
def app(self) -> App:
96
"""Associated Faust application."""
97
98
@property
99
def loglevel(self) -> str:
100
"""Current logging level."""
101
102
@property
103
def logfile(self) -> str:
104
"""Log file path."""
105
106
@property
107
def pidfile(self) -> str:
108
"""PID file path."""
109
```
110
111
### Service Management
112
113
Service framework integration for managing application services, background tasks, and coordinated startup/shutdown sequences.
114
115
```python { .api }
116
class Service:
117
def __init__(self, **kwargs):
118
"""
119
Base service class for coordinated lifecycle management.
120
121
Args:
122
**kwargs: Service configuration options
123
"""
124
125
async def start(self) -> None:
126
"""
127
Start the service.
128
129
Called during application startup phase.
130
"""
131
132
async def stop(self) -> None:
133
"""
134
Stop the service gracefully.
135
136
Called during application shutdown phase.
137
"""
138
139
async def restart(self) -> None:
140
"""
141
Restart the service.
142
143
Default implementation stops then starts the service.
144
"""
145
146
def add_dependency(self, service: 'Service') -> None:
147
"""
148
Add service dependency.
149
150
Args:
151
service: Service that must start before this one
152
"""
153
154
def remove_dependency(self, service: 'Service') -> None:
155
"""
156
Remove service dependency.
157
158
Args:
159
service: Service to remove from dependencies
160
"""
161
162
@property
163
def dependencies(self) -> set:
164
"""Services this service depends on."""
165
166
@property
167
def label(self) -> str:
168
"""Service label for identification."""
169
170
@property
171
def beacon(self) -> any:
172
"""Beacon for coordinating with other services."""
173
174
class ServiceManager:
175
def __init__(self, app: App):
176
"""
177
Manager for coordinating multiple services.
178
179
Args:
180
app: Faust application instance
181
"""
182
183
def add_service(self, service: Service) -> None:
184
"""
185
Add service to management.
186
187
Args:
188
service: Service instance to manage
189
"""
190
191
def remove_service(self, service: Service) -> None:
192
"""
193
Remove service from management.
194
195
Args:
196
service: Service instance to remove
197
"""
198
199
async def start_services(self) -> None:
200
"""
201
Start all services in dependency order.
202
203
Ensures services start in correct sequence based on dependencies.
204
"""
205
206
async def stop_services(self) -> None:
207
"""
208
Stop all services in reverse dependency order.
209
210
Ensures clean shutdown respecting service dependencies.
211
"""
212
213
def get_service_status(self) -> dict:
214
"""
215
Get status of all managed services.
216
217
Returns:
218
Dictionary mapping service names to status info
219
"""
220
```
221
222
### Process Coordination
223
224
Utilities for coordinating multiple worker processes, partition assignment, and distributed processing coordination.
225
226
```python { .api }
227
class ProcessCoordinator:
228
def __init__(
229
self,
230
app: App,
231
*,
232
max_workers: int = None,
233
worker_timeout: float = 60.0,
234
coordination_topic: str = None,
235
**kwargs
236
):
237
"""
238
Coordinator for multiple worker processes.
239
240
Args:
241
app: Faust application instance
242
max_workers: Maximum number of worker processes
243
worker_timeout: Worker health check timeout
244
coordination_topic: Topic for worker coordination
245
"""
246
247
async def register_worker(self, worker_id: str, metadata: dict = None) -> None:
248
"""
249
Register worker process with coordinator.
250
251
Args:
252
worker_id: Unique worker identifier
253
metadata: Worker metadata (hostname, capabilities, etc.)
254
"""
255
256
async def unregister_worker(self, worker_id: str) -> None:
257
"""
258
Unregister worker process from coordinator.
259
260
Args:
261
worker_id: Worker identifier to remove
262
"""
263
264
async def get_active_workers(self) -> list:
265
"""
266
Get list of active worker processes.
267
268
Returns:
269
List of worker information dictionaries
270
"""
271
272
async def coordinate_partition_assignment(self) -> dict:
273
"""
274
Coordinate partition assignment across workers.
275
276
Returns:
277
Dictionary mapping workers to assigned partitions
278
"""
279
280
async def handle_worker_failure(self, worker_id: str) -> None:
281
"""
282
Handle worker process failure.
283
284
Args:
285
worker_id: Failed worker identifier
286
"""
287
288
async def rebalance_load(self) -> None:
289
"""
290
Trigger load rebalancing across workers.
291
292
Redistributes partitions and workload based on current capacity.
293
"""
294
295
class PartitionAssignment:
296
def __init__(self, topic: str, partition: int, worker_id: str):
297
"""
298
Represents partition assignment to worker.
299
300
Args:
301
topic: Topic name
302
partition: Partition number
303
worker_id: Assigned worker identifier
304
"""
305
306
@property
307
def topic(self) -> str:
308
"""Topic name."""
309
310
@property
311
def partition(self) -> int:
312
"""Partition number."""
313
314
@property
315
def worker_id(self) -> str:
316
"""Assigned worker ID."""
317
```
318
319
### Deployment Management
320
321
Utilities for managing application deployment, scaling, and operational concerns in production environments.
322
323
```python { .api }
324
class DeploymentManager:
325
def __init__(
326
self,
327
app: App,
328
*,
329
deployment_id: str = None,
330
health_check_interval: float = 30.0,
331
scaling_policy: dict = None,
332
**kwargs
333
):
334
"""
335
Manager for deployment and scaling operations.
336
337
Args:
338
app: Faust application instance
339
deployment_id: Unique deployment identifier
340
health_check_interval: Health check frequency
341
scaling_policy: Auto-scaling configuration
342
"""
343
344
async def deploy(self, version: str, config: dict = None) -> None:
345
"""
346
Deploy new application version.
347
348
Args:
349
version: Application version identifier
350
config: Deployment configuration
351
"""
352
353
async def rollback(self, target_version: str) -> None:
354
"""
355
Rollback to previous version.
356
357
Args:
358
target_version: Version to rollback to
359
"""
360
361
async def scale_workers(self, target_count: int) -> None:
362
"""
363
Scale worker processes to target count.
364
365
Args:
366
target_count: Desired number of workers
367
"""
368
369
async def health_check(self) -> dict:
370
"""
371
Perform comprehensive health check.
372
373
Returns:
374
Health status information
375
"""
376
377
async def get_deployment_status(self) -> dict:
378
"""
379
Get current deployment status.
380
381
Returns:
382
Deployment status information
383
"""
384
385
def configure_auto_scaling(self, policy: dict) -> None:
386
"""
387
Configure automatic scaling policy.
388
389
Args:
390
policy: Scaling policy configuration
391
"""
392
393
class HealthCheck:
394
def __init__(self, name: str, check_func: callable, interval: float = 30.0):
395
"""
396
Health check definition.
397
398
Args:
399
name: Check name
400
check_func: Function that performs the check
401
interval: Check interval in seconds
402
"""
403
404
async def execute(self) -> dict:
405
"""
406
Execute health check.
407
408
Returns:
409
Check result with status and details
410
"""
411
412
@property
413
def name(self) -> str:
414
"""Health check name."""
415
416
@property
417
def interval(self) -> float:
418
"""Check interval."""
419
```
420
421
### Configuration Management
422
423
Runtime configuration management and environment-specific settings for worker processes and deployment scenarios.
424
425
```python { .api }
426
class WorkerConfig:
427
def __init__(
428
self,
429
*,
430
worker_id: str = None,
431
concurrency: int = None,
432
max_memory: int = None,
433
timeout: float = None,
434
environment: str = None,
435
**kwargs
436
):
437
"""
438
Configuration for worker processes.
439
440
Args:
441
worker_id: Worker identifier
442
concurrency: Worker concurrency level
443
max_memory: Maximum memory usage (bytes)
444
timeout: Worker timeout
445
environment: Environment name (dev, staging, prod)
446
"""
447
448
def load_from_file(self, path: str) -> None:
449
"""
450
Load configuration from file.
451
452
Args:
453
path: Configuration file path
454
"""
455
456
def load_from_env(self, prefix: str = 'FAUST_') -> None:
457
"""
458
Load configuration from environment variables.
459
460
Args:
461
prefix: Environment variable prefix
462
"""
463
464
def validate(self) -> list:
465
"""
466
Validate configuration settings.
467
468
Returns:
469
List of validation errors (empty if valid)
470
"""
471
472
def merge(self, other: 'WorkerConfig') -> 'WorkerConfig':
473
"""
474
Merge with another configuration.
475
476
Args:
477
other: Configuration to merge
478
479
Returns:
480
New merged configuration
481
"""
482
483
@property
484
def worker_id(self) -> str:
485
"""Worker identifier."""
486
487
@property
488
def concurrency(self) -> int:
489
"""Worker concurrency level."""
490
491
@property
492
def environment(self) -> str:
493
"""Environment name."""
494
495
def configure_worker(
496
app: App,
497
config: WorkerConfig = None,
498
**kwargs
499
) -> Worker:
500
"""
501
Configure worker with given settings.
502
503
Args:
504
app: Faust application
505
config: Worker configuration
506
**kwargs: Additional worker options
507
508
Returns:
509
Configured Worker instance
510
"""
511
```
512
513
## Usage Examples
514
515
### Basic Worker Management
516
517
```python
518
import faust
519
520
app = faust.App('worker-app', broker='kafka://localhost:9092')
521
522
# Create and configure worker
523
worker = faust.Worker(
524
app,
525
loglevel='info',
526
logfile='/var/log/faust/worker.log'
527
)
528
529
# Define some agents for the worker to run
530
@app.agent()
531
async def process_events(stream):
532
async for event in stream:
533
print(f"Processing: {event}")
534
535
if __name__ == '__main__':
536
# Start the worker (blocks until stopped)
537
worker.start()
538
```
539
540
### Multi-Worker Coordination
541
542
```python
543
import os
544
import asyncio
545
from faust import ProcessCoordinator
546
547
# Worker coordination setup
548
coordinator = ProcessCoordinator(
549
app,
550
max_workers=4,
551
worker_timeout=60.0
552
)
553
554
@app.on_startup.connect
555
async def register_with_coordinator():
556
"""Register this worker when starting."""
557
worker_id = f"{os.getpid()}-{os.uname().nodename}"
558
metadata = {
559
'hostname': os.uname().nodename,
560
'pid': os.getpid(),
561
'started_at': time.time()
562
}
563
564
await coordinator.register_worker(worker_id, metadata)
565
app._worker_id = worker_id
566
567
@app.on_shutdown.connect
568
async def unregister_from_coordinator():
569
"""Unregister when shutting down."""
570
if hasattr(app, '_worker_id'):
571
await coordinator.unregister_worker(app._worker_id)
572
573
@app.timer(interval=30.0)
574
async def monitor_workers():
575
"""Monitor worker health and rebalance if needed."""
576
active_workers = await coordinator.get_active_workers()
577
578
if len(active_workers) < coordinator.max_workers:
579
print(f"Only {len(active_workers)} workers active, may need scaling")
580
581
# Trigger rebalancing if needed
582
await coordinator.rebalance_load()
583
```
584
585
### Service Management
586
587
```python
588
class DatabaseService(faust.Service):
589
"""Custom service for database connection management."""
590
591
def __init__(self, connection_string: str):
592
super().__init__()
593
self.connection_string = connection_string
594
self.connection = None
595
596
async def start(self):
597
"""Initialize database connection."""
598
print("Starting database service...")
599
# Initialize database connection
600
self.connection = await create_connection(self.connection_string)
601
print("Database service started")
602
603
async def stop(self):
604
"""Close database connection."""
605
print("Stopping database service...")
606
if self.connection:
607
await self.connection.close()
608
print("Database service stopped")
609
610
class CacheService(faust.Service):
611
"""Custom service for cache management."""
612
613
def __init__(self, redis_url: str):
614
super().__init__()
615
self.redis_url = redis_url
616
self.redis = None
617
618
async def start(self):
619
"""Initialize Redis connection."""
620
print("Starting cache service...")
621
self.redis = await create_redis_connection(self.redis_url)
622
print("Cache service started")
623
624
async def stop(self):
625
"""Close Redis connection."""
626
print("Stopping cache service...")
627
if self.redis:
628
await self.redis.close()
629
print("Cache service stopped")
630
631
# Register services with the app
632
db_service = DatabaseService('postgresql://localhost/mydb')
633
cache_service = CacheService('redis://localhost:6379')
634
635
# Cache service depends on database service
636
cache_service.add_dependency(db_service)
637
638
# Add services to app
639
app.service(db_service)
640
app.service(cache_service)
641
642
# Services will start in dependency order automatically
643
```
644
645
### Health Monitoring and Deployment
646
647
```python
648
from faust import DeploymentManager, HealthCheck
649
650
async def check_database_health():
651
"""Database connectivity check."""
652
try:
653
# Check database connection
654
await db_service.connection.execute('SELECT 1')
655
return {'status': 'healthy', 'latency': 0.001}
656
except Exception as e:
657
return {'status': 'unhealthy', 'error': str(e)}
658
659
async def check_message_processing():
660
"""Message processing health check."""
661
if hasattr(app, 'monitor'):
662
events_per_sec = app.monitor.events_per_second()
663
if events_per_sec > 0:
664
return {'status': 'healthy', 'events_per_second': events_per_sec}
665
else:
666
return {'status': 'warning', 'message': 'No events being processed'}
667
return {'status': 'unknown', 'message': 'No monitor available'}
668
669
# Setup deployment manager with health checks
670
deployment = DeploymentManager(
671
app,
672
deployment_id=f"faust-app-{os.getenv('VERSION', '1.0')}",
673
health_check_interval=30.0,
674
scaling_policy={
675
'min_workers': 2,
676
'max_workers': 10,
677
'target_cpu_percent': 70,
678
'scale_up_threshold': 80,
679
'scale_down_threshold': 30
680
}
681
)
682
683
# Register health checks
684
deployment.add_health_check(
685
HealthCheck('database', check_database_health, interval=30.0)
686
)
687
deployment.add_health_check(
688
HealthCheck('processing', check_message_processing, interval=10.0)
689
)
690
691
@app.timer(interval=60.0)
692
async def deployment_health_monitor():
693
"""Monitor deployment health and auto-scale if needed."""
694
health_status = await deployment.health_check()
695
696
if not health_status['healthy']:
697
print(f"Health check failed: {health_status['issues']}")
698
699
# Could trigger alerts here
700
# await send_alert(health_status)
701
702
# Check if scaling is needed
703
current_load = health_status.get('cpu_percent', 0)
704
scaling_policy = deployment.scaling_policy
705
706
if current_load > scaling_policy['scale_up_threshold']:
707
current_workers = len(await coordinator.get_active_workers())
708
if current_workers < scaling_policy['max_workers']:
709
await deployment.scale_workers(current_workers + 1)
710
print(f"Scaled up to {current_workers + 1} workers")
711
712
elif current_load < scaling_policy['scale_down_threshold']:
713
current_workers = len(await coordinator.get_active_workers())
714
if current_workers > scaling_policy['min_workers']:
715
await deployment.scale_workers(current_workers - 1)
716
print(f"Scaled down to {current_workers - 1} workers")
717
```
718
719
### Configuration Management
720
721
```python
722
from faust import WorkerConfig
723
724
# Load configuration from environment and files
725
config = WorkerConfig()
726
config.load_from_env('FAUST_') # Load FAUST_* environment variables
727
728
# Load additional configuration from file
729
config_file = os.getenv('FAUST_CONFIG_FILE', 'config/production.yaml')
730
if os.path.exists(config_file):
731
config.load_from_file(config_file)
732
733
# Validate configuration
734
validation_errors = config.validate()
735
if validation_errors:
736
print(f"Configuration errors: {validation_errors}")
737
sys.exit(1)
738
739
# Configure worker with loaded configuration
740
worker = configure_worker(app, config)
741
742
# Override specific settings for this environment
743
if config.environment == 'production':
744
worker.loglevel = 'warning'
745
worker.daemon = True
746
elif config.environment == 'development':
747
worker.loglevel = 'debug'
748
worker.daemon = False
749
750
print(f"Starting worker {config.worker_id} in {config.environment} environment")
751
worker.start()
752
```
753
754
### Production Deployment Script
755
756
```python
757
#!/usr/bin/env python3
758
"""Production deployment script for Faust application."""
759
760
import sys
761
import signal
762
import asyncio
763
from contextlib import asynccontextmanager
764
765
@asynccontextmanager
766
async def managed_worker():
767
"""Context manager for proper worker lifecycle."""
768
worker = None
769
try:
770
# Setup worker with production configuration
771
config = WorkerConfig(
772
worker_id=f"worker-{os.getpid()}",
773
environment='production',
774
concurrency=4,
775
max_memory=2 * 1024 * 1024 * 1024 # 2GB
776
)
777
778
worker = configure_worker(app, config)
779
780
# Setup signal handlers for graceful shutdown
781
def signal_handler(signum, frame):
782
print(f"Received signal {signum}, initiating shutdown...")
783
if worker:
784
worker.stop()
785
786
signal.signal(signal.SIGTERM, signal_handler)
787
signal.signal(signal.SIGINT, signal_handler)
788
789
# Start worker in background
790
worker_task = asyncio.create_task(
791
asyncio.to_thread(worker.start)
792
)
793
794
yield worker
795
796
except Exception as e:
797
print(f"Error during worker startup: {e}")
798
raise
799
finally:
800
if worker:
801
print("Shutting down worker...")
802
worker.stop()
803
804
async def main():
805
"""Main deployment entry point."""
806
async with managed_worker() as worker:
807
print(f"Worker {worker.get_pid()} started successfully")
808
809
# Monitor worker health
810
while worker.is_running():
811
health = await deployment.health_check()
812
if not health['healthy']:
813
print(f"Health check failed: {health}")
814
break
815
816
await asyncio.sleep(30)
817
818
print("Worker stopped")
819
820
if __name__ == '__main__':
821
try:
822
asyncio.run(main())
823
except KeyboardInterrupt:
824
print("Deployment interrupted by user")
825
sys.exit(0)
826
except Exception as e:
827
print(f"Deployment failed: {e}")
828
sys.exit(1)
829
```
830
831
## Type Interfaces
832
833
```python { .api }
834
from typing import Protocol, Dict, List, Any, Optional, Callable
835
836
class WorkerT(Protocol):
837
"""Type interface for Worker."""
838
839
app: 'AppT'
840
loglevel: str
841
logfile: Optional[str]
842
843
def start(self) -> None: ...
844
def stop(self) -> None: ...
845
def restart(self) -> None: ...
846
def is_running(self) -> bool: ...
847
def get_pid(self) -> Optional[int]: ...
848
849
class ServiceT(Protocol):
850
"""Type interface for Service."""
851
852
label: str
853
dependencies: set
854
855
async def start(self) -> None: ...
856
async def stop(self) -> None: ...
857
async def restart(self) -> None: ...
858
859
class ProcessCoordinatorT(Protocol):
860
"""Type interface for ProcessCoordinator."""
861
862
async def register_worker(self, worker_id: str, metadata: Optional[Dict] = None) -> None: ...
863
async def unregister_worker(self, worker_id: str) -> None: ...
864
async def get_active_workers(self) -> List[Dict]: ...
865
async def rebalance_load(self) -> None: ...
866
```