or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

worker-management.mddocs/

0

# Worker Management

1

2

Worker process management and service coordination in Faust applications. Provides application lifecycle management, process coordination, service orchestration, and distributed system coordination for scalable stream processing deployments.

3

4

## Capabilities

5

6

### Worker Process Management

7

8

Core worker process management for running Faust applications with proper startup, shutdown, and lifecycle coordination across distributed environments.

9

10

```python { .api }

11

class Worker:

12

def __init__(

13

self,

14

app: App,

15

*,

16

loglevel: str = 'info',

17

logfile: str = None,

18

pidfile: str = None,

19

uid: int = None,

20

gid: int = None,

21

umask: int = None,

22

workdir: str = None,

23

daemon: bool = False,

24

**kwargs

25

):

26

"""

27

Worker process manager for Faust applications.

28

29

Args:

30

app: Faust application instance

31

loglevel: Logging level (debug, info, warning, error)

32

logfile: Path to log file

33

pidfile: Path to PID file

34

uid: User ID to run as

35

gid: Group ID to run as

36

umask: File creation mask

37

workdir: Working directory

38

daemon: Run as daemon process

39

"""

40

41

def start(self) -> None:

42

"""

43

Start the worker process.

44

45

Initializes the application, starts all services, agents, and

46

begins message processing. Blocks until stopped.

47

"""

48

49

def stop(self) -> None:

50

"""

51

Stop the worker process gracefully.

52

53

Initiates shutdown sequence, stops agents, commits offsets,

54

and cleanly terminates all services.

55

"""

56

57

def restart(self) -> None:

58

"""

59

Restart the worker process.

60

61

Performs graceful shutdown followed by startup. Useful for

62

configuration changes or deployment updates.

63

"""

64

65

def is_running(self) -> bool:

66

"""

67

Check if worker is currently running.

68

69

Returns:

70

True if worker process is active

71

"""

72

73

def get_pid(self) -> int:

74

"""

75

Get worker process ID.

76

77

Returns:

78

Process ID or None if not running

79

"""

80

81

def setup_logging(self) -> None:

82

"""Configure logging for worker process."""

83

84

def setup_signals(self) -> None:

85

"""Setup signal handlers for graceful shutdown."""

86

87

def daemonize(self) -> None:

88

"""

89

Convert process to daemon.

90

91

Detaches from terminal and runs in background.

92

"""

93

94

@property

95

def app(self) -> App:

96

"""Associated Faust application."""

97

98

@property

99

def loglevel(self) -> str:

100

"""Current logging level."""

101

102

@property

103

def logfile(self) -> str:

104

"""Log file path."""

105

106

@property

107

def pidfile(self) -> str:

108

"""PID file path."""

109

```

110

111

### Service Management

112

113

Service framework integration for managing application services, background tasks, and coordinated startup/shutdown sequences.

114

115

```python { .api }

116

class Service:

117

def __init__(self, **kwargs):

118

"""

119

Base service class for coordinated lifecycle management.

120

121

Args:

122

**kwargs: Service configuration options

123

"""

124

125

async def start(self) -> None:

126

"""

127

Start the service.

128

129

Called during application startup phase.

130

"""

131

132

async def stop(self) -> None:

133

"""

134

Stop the service gracefully.

135

136

Called during application shutdown phase.

137

"""

138

139

async def restart(self) -> None:

140

"""

141

Restart the service.

142

143

Default implementation stops then starts the service.

144

"""

145

146

def add_dependency(self, service: 'Service') -> None:

147

"""

148

Add service dependency.

149

150

Args:

151

service: Service that must start before this one

152

"""

153

154

def remove_dependency(self, service: 'Service') -> None:

155

"""

156

Remove service dependency.

157

158

Args:

159

service: Service to remove from dependencies

160

"""

161

162

@property

163

def dependencies(self) -> set:

164

"""Services this service depends on."""

165

166

@property

167

def label(self) -> str:

168

"""Service label for identification."""

169

170

@property

171

def beacon(self) -> any:

172

"""Beacon for coordinating with other services."""

173

174

class ServiceManager:

175

def __init__(self, app: App):

176

"""

177

Manager for coordinating multiple services.

178

179

Args:

180

app: Faust application instance

181

"""

182

183

def add_service(self, service: Service) -> None:

184

"""

185

Add service to management.

186

187

Args:

188

service: Service instance to manage

189

"""

190

191

def remove_service(self, service: Service) -> None:

192

"""

193

Remove service from management.

194

195

Args:

196

service: Service instance to remove

197

"""

198

199

async def start_services(self) -> None:

200

"""

201

Start all services in dependency order.

202

203

Ensures services start in correct sequence based on dependencies.

204

"""

205

206

async def stop_services(self) -> None:

207

"""

208

Stop all services in reverse dependency order.

209

210

Ensures clean shutdown respecting service dependencies.

211

"""

212

213

def get_service_status(self) -> dict:

214

"""

215

Get status of all managed services.

216

217

Returns:

218

Dictionary mapping service names to status info

219

"""

220

```

221

222

### Process Coordination

223

224

Utilities for coordinating multiple worker processes, partition assignment, and distributed processing coordination.

225

226

```python { .api }

227

class ProcessCoordinator:

228

def __init__(

229

self,

230

app: App,

231

*,

232

max_workers: int = None,

233

worker_timeout: float = 60.0,

234

coordination_topic: str = None,

235

**kwargs

236

):

237

"""

238

Coordinator for multiple worker processes.

239

240

Args:

241

app: Faust application instance

242

max_workers: Maximum number of worker processes

243

worker_timeout: Worker health check timeout

244

coordination_topic: Topic for worker coordination

245

"""

246

247

async def register_worker(self, worker_id: str, metadata: dict = None) -> None:

248

"""

249

Register worker process with coordinator.

250

251

Args:

252

worker_id: Unique worker identifier

253

metadata: Worker metadata (hostname, capabilities, etc.)

254

"""

255

256

async def unregister_worker(self, worker_id: str) -> None:

257

"""

258

Unregister worker process from coordinator.

259

260

Args:

261

worker_id: Worker identifier to remove

262

"""

263

264

async def get_active_workers(self) -> list:

265

"""

266

Get list of active worker processes.

267

268

Returns:

269

List of worker information dictionaries

270

"""

271

272

async def coordinate_partition_assignment(self) -> dict:

273

"""

274

Coordinate partition assignment across workers.

275

276

Returns:

277

Dictionary mapping workers to assigned partitions

278

"""

279

280

async def handle_worker_failure(self, worker_id: str) -> None:

281

"""

282

Handle worker process failure.

283

284

Args:

285

worker_id: Failed worker identifier

286

"""

287

288

async def rebalance_load(self) -> None:

289

"""

290

Trigger load rebalancing across workers.

291

292

Redistributes partitions and workload based on current capacity.

293

"""

294

295

class PartitionAssignment:

296

def __init__(self, topic: str, partition: int, worker_id: str):

297

"""

298

Represents partition assignment to worker.

299

300

Args:

301

topic: Topic name

302

partition: Partition number

303

worker_id: Assigned worker identifier

304

"""

305

306

@property

307

def topic(self) -> str:

308

"""Topic name."""

309

310

@property

311

def partition(self) -> int:

312

"""Partition number."""

313

314

@property

315

def worker_id(self) -> str:

316

"""Assigned worker ID."""

317

```

318

319

### Deployment Management

320

321

Utilities for managing application deployment, scaling, and operational concerns in production environments.

322

323

```python { .api }

324

class DeploymentManager:

325

def __init__(

326

self,

327

app: App,

328

*,

329

deployment_id: str = None,

330

health_check_interval: float = 30.0,

331

scaling_policy: dict = None,

332

**kwargs

333

):

334

"""

335

Manager for deployment and scaling operations.

336

337

Args:

338

app: Faust application instance

339

deployment_id: Unique deployment identifier

340

health_check_interval: Health check frequency

341

scaling_policy: Auto-scaling configuration

342

"""

343

344

async def deploy(self, version: str, config: dict = None) -> None:

345

"""

346

Deploy new application version.

347

348

Args:

349

version: Application version identifier

350

config: Deployment configuration

351

"""

352

353

async def rollback(self, target_version: str) -> None:

354

"""

355

Rollback to previous version.

356

357

Args:

358

target_version: Version to rollback to

359

"""

360

361

async def scale_workers(self, target_count: int) -> None:

362

"""

363

Scale worker processes to target count.

364

365

Args:

366

target_count: Desired number of workers

367

"""

368

369

async def health_check(self) -> dict:

370

"""

371

Perform comprehensive health check.

372

373

Returns:

374

Health status information

375

"""

376

377

async def get_deployment_status(self) -> dict:

378

"""

379

Get current deployment status.

380

381

Returns:

382

Deployment status information

383

"""

384

385

def configure_auto_scaling(self, policy: dict) -> None:

386

"""

387

Configure automatic scaling policy.

388

389

Args:

390

policy: Scaling policy configuration

391

"""

392

393

class HealthCheck:

394

def __init__(self, name: str, check_func: callable, interval: float = 30.0):

395

"""

396

Health check definition.

397

398

Args:

399

name: Check name

400

check_func: Function that performs the check

401

interval: Check interval in seconds

402

"""

403

404

async def execute(self) -> dict:

405

"""

406

Execute health check.

407

408

Returns:

409

Check result with status and details

410

"""

411

412

@property

413

def name(self) -> str:

414

"""Health check name."""

415

416

@property

417

def interval(self) -> float:

418

"""Check interval."""

419

```

420

421

### Configuration Management

422

423

Runtime configuration management and environment-specific settings for worker processes and deployment scenarios.

424

425

```python { .api }

426

class WorkerConfig:

427

def __init__(

428

self,

429

*,

430

worker_id: str = None,

431

concurrency: int = None,

432

max_memory: int = None,

433

timeout: float = None,

434

environment: str = None,

435

**kwargs

436

):

437

"""

438

Configuration for worker processes.

439

440

Args:

441

worker_id: Worker identifier

442

concurrency: Worker concurrency level

443

max_memory: Maximum memory usage (bytes)

444

timeout: Worker timeout

445

environment: Environment name (dev, staging, prod)

446

"""

447

448

def load_from_file(self, path: str) -> None:

449

"""

450

Load configuration from file.

451

452

Args:

453

path: Configuration file path

454

"""

455

456

def load_from_env(self, prefix: str = 'FAUST_') -> None:

457

"""

458

Load configuration from environment variables.

459

460

Args:

461

prefix: Environment variable prefix

462

"""

463

464

def validate(self) -> list:

465

"""

466

Validate configuration settings.

467

468

Returns:

469

List of validation errors (empty if valid)

470

"""

471

472

def merge(self, other: 'WorkerConfig') -> 'WorkerConfig':

473

"""

474

Merge with another configuration.

475

476

Args:

477

other: Configuration to merge

478

479

Returns:

480

New merged configuration

481

"""

482

483

@property

484

def worker_id(self) -> str:

485

"""Worker identifier."""

486

487

@property

488

def concurrency(self) -> int:

489

"""Worker concurrency level."""

490

491

@property

492

def environment(self) -> str:

493

"""Environment name."""

494

495

def configure_worker(

496

app: App,

497

config: WorkerConfig = None,

498

**kwargs

499

) -> Worker:

500

"""

501

Configure worker with given settings.

502

503

Args:

504

app: Faust application

505

config: Worker configuration

506

**kwargs: Additional worker options

507

508

Returns:

509

Configured Worker instance

510

"""

511

```

512

513

## Usage Examples

514

515

### Basic Worker Management

516

517

```python

518

import faust

519

520

app = faust.App('worker-app', broker='kafka://localhost:9092')

521

522

# Create and configure worker

523

worker = faust.Worker(

524

app,

525

loglevel='info',

526

logfile='/var/log/faust/worker.log'

527

)

528

529

# Define some agents for the worker to run

530

@app.agent()

531

async def process_events(stream):

532

async for event in stream:

533

print(f"Processing: {event}")

534

535

if __name__ == '__main__':

536

# Start the worker (blocks until stopped)

537

worker.start()

538

```

539

540

### Multi-Worker Coordination

541

542

```python

543

import os

544

import asyncio

545

from faust import ProcessCoordinator

546

547

# Worker coordination setup

548

coordinator = ProcessCoordinator(

549

app,

550

max_workers=4,

551

worker_timeout=60.0

552

)

553

554

@app.on_startup.connect

555

async def register_with_coordinator():

556

"""Register this worker when starting."""

557

worker_id = f"{os.getpid()}-{os.uname().nodename}"

558

metadata = {

559

'hostname': os.uname().nodename,

560

'pid': os.getpid(),

561

'started_at': time.time()

562

}

563

564

await coordinator.register_worker(worker_id, metadata)

565

app._worker_id = worker_id

566

567

@app.on_shutdown.connect

568

async def unregister_from_coordinator():

569

"""Unregister when shutting down."""

570

if hasattr(app, '_worker_id'):

571

await coordinator.unregister_worker(app._worker_id)

572

573

@app.timer(interval=30.0)

574

async def monitor_workers():

575

"""Monitor worker health and rebalance if needed."""

576

active_workers = await coordinator.get_active_workers()

577

578

if len(active_workers) < coordinator.max_workers:

579

print(f"Only {len(active_workers)} workers active, may need scaling")

580

581

# Trigger rebalancing if needed

582

await coordinator.rebalance_load()

583

```

584

585

### Service Management

586

587

```python

588

class DatabaseService(faust.Service):

589

"""Custom service for database connection management."""

590

591

def __init__(self, connection_string: str):

592

super().__init__()

593

self.connection_string = connection_string

594

self.connection = None

595

596

async def start(self):

597

"""Initialize database connection."""

598

print("Starting database service...")

599

# Initialize database connection

600

self.connection = await create_connection(self.connection_string)

601

print("Database service started")

602

603

async def stop(self):

604

"""Close database connection."""

605

print("Stopping database service...")

606

if self.connection:

607

await self.connection.close()

608

print("Database service stopped")

609

610

class CacheService(faust.Service):

611

"""Custom service for cache management."""

612

613

def __init__(self, redis_url: str):

614

super().__init__()

615

self.redis_url = redis_url

616

self.redis = None

617

618

async def start(self):

619

"""Initialize Redis connection."""

620

print("Starting cache service...")

621

self.redis = await create_redis_connection(self.redis_url)

622

print("Cache service started")

623

624

async def stop(self):

625

"""Close Redis connection."""

626

print("Stopping cache service...")

627

if self.redis:

628

await self.redis.close()

629

print("Cache service stopped")

630

631

# Register services with the app

632

db_service = DatabaseService('postgresql://localhost/mydb')

633

cache_service = CacheService('redis://localhost:6379')

634

635

# Cache service depends on database service

636

cache_service.add_dependency(db_service)

637

638

# Add services to app

639

app.service(db_service)

640

app.service(cache_service)

641

642

# Services will start in dependency order automatically

643

```

644

645

### Health Monitoring and Deployment

646

647

```python

648

from faust import DeploymentManager, HealthCheck

649

650

async def check_database_health():

651

"""Database connectivity check."""

652

try:

653

# Check database connection

654

await db_service.connection.execute('SELECT 1')

655

return {'status': 'healthy', 'latency': 0.001}

656

except Exception as e:

657

return {'status': 'unhealthy', 'error': str(e)}

658

659

async def check_message_processing():

660

"""Message processing health check."""

661

if hasattr(app, 'monitor'):

662

events_per_sec = app.monitor.events_per_second()

663

if events_per_sec > 0:

664

return {'status': 'healthy', 'events_per_second': events_per_sec}

665

else:

666

return {'status': 'warning', 'message': 'No events being processed'}

667

return {'status': 'unknown', 'message': 'No monitor available'}

668

669

# Setup deployment manager with health checks

670

deployment = DeploymentManager(

671

app,

672

deployment_id=f"faust-app-{os.getenv('VERSION', '1.0')}",

673

health_check_interval=30.0,

674

scaling_policy={

675

'min_workers': 2,

676

'max_workers': 10,

677

'target_cpu_percent': 70,

678

'scale_up_threshold': 80,

679

'scale_down_threshold': 30

680

}

681

)

682

683

# Register health checks

684

deployment.add_health_check(

685

HealthCheck('database', check_database_health, interval=30.0)

686

)

687

deployment.add_health_check(

688

HealthCheck('processing', check_message_processing, interval=10.0)

689

)

690

691

@app.timer(interval=60.0)

692

async def deployment_health_monitor():

693

"""Monitor deployment health and auto-scale if needed."""

694

health_status = await deployment.health_check()

695

696

if not health_status['healthy']:

697

print(f"Health check failed: {health_status['issues']}")

698

699

# Could trigger alerts here

700

# await send_alert(health_status)

701

702

# Check if scaling is needed

703

current_load = health_status.get('cpu_percent', 0)

704

scaling_policy = deployment.scaling_policy

705

706

if current_load > scaling_policy['scale_up_threshold']:

707

current_workers = len(await coordinator.get_active_workers())

708

if current_workers < scaling_policy['max_workers']:

709

await deployment.scale_workers(current_workers + 1)

710

print(f"Scaled up to {current_workers + 1} workers")

711

712

elif current_load < scaling_policy['scale_down_threshold']:

713

current_workers = len(await coordinator.get_active_workers())

714

if current_workers > scaling_policy['min_workers']:

715

await deployment.scale_workers(current_workers - 1)

716

print(f"Scaled down to {current_workers - 1} workers")

717

```

718

719

### Configuration Management

720

721

```python

722

from faust import WorkerConfig

723

724

# Load configuration from environment and files

725

config = WorkerConfig()

726

config.load_from_env('FAUST_') # Load FAUST_* environment variables

727

728

# Load additional configuration from file

729

config_file = os.getenv('FAUST_CONFIG_FILE', 'config/production.yaml')

730

if os.path.exists(config_file):

731

config.load_from_file(config_file)

732

733

# Validate configuration

734

validation_errors = config.validate()

735

if validation_errors:

736

print(f"Configuration errors: {validation_errors}")

737

sys.exit(1)

738

739

# Configure worker with loaded configuration

740

worker = configure_worker(app, config)

741

742

# Override specific settings for this environment

743

if config.environment == 'production':

744

worker.loglevel = 'warning'

745

worker.daemon = True

746

elif config.environment == 'development':

747

worker.loglevel = 'debug'

748

worker.daemon = False

749

750

print(f"Starting worker {config.worker_id} in {config.environment} environment")

751

worker.start()

752

```

753

754

### Production Deployment Script

755

756

```python

757

#!/usr/bin/env python3

758

"""Production deployment script for Faust application."""

759

760

import sys

761

import signal

762

import asyncio

763

from contextlib import asynccontextmanager

764

765

@asynccontextmanager

766

async def managed_worker():

767

"""Context manager for proper worker lifecycle."""

768

worker = None

769

try:

770

# Setup worker with production configuration

771

config = WorkerConfig(

772

worker_id=f"worker-{os.getpid()}",

773

environment='production',

774

concurrency=4,

775

max_memory=2 * 1024 * 1024 * 1024 # 2GB

776

)

777

778

worker = configure_worker(app, config)

779

780

# Setup signal handlers for graceful shutdown

781

def signal_handler(signum, frame):

782

print(f"Received signal {signum}, initiating shutdown...")

783

if worker:

784

worker.stop()

785

786

signal.signal(signal.SIGTERM, signal_handler)

787

signal.signal(signal.SIGINT, signal_handler)

788

789

# Start worker in background

790

worker_task = asyncio.create_task(

791

asyncio.to_thread(worker.start)

792

)

793

794

yield worker

795

796

except Exception as e:

797

print(f"Error during worker startup: {e}")

798

raise

799

finally:

800

if worker:

801

print("Shutting down worker...")

802

worker.stop()

803

804

async def main():

805

"""Main deployment entry point."""

806

async with managed_worker() as worker:

807

print(f"Worker {worker.get_pid()} started successfully")

808

809

# Monitor worker health

810

while worker.is_running():

811

health = await deployment.health_check()

812

if not health['healthy']:

813

print(f"Health check failed: {health}")

814

break

815

816

await asyncio.sleep(30)

817

818

print("Worker stopped")

819

820

if __name__ == '__main__':

821

try:

822

asyncio.run(main())

823

except KeyboardInterrupt:

824

print("Deployment interrupted by user")

825

sys.exit(0)

826

except Exception as e:

827

print(f"Deployment failed: {e}")

828

sys.exit(1)

829

```

830

831

## Type Interfaces

832

833

```python { .api }

834

from typing import Protocol, Dict, List, Any, Optional, Callable

835

836

class WorkerT(Protocol):

837

"""Type interface for Worker."""

838

839

app: 'AppT'

840

loglevel: str

841

logfile: Optional[str]

842

843

def start(self) -> None: ...

844

def stop(self) -> None: ...

845

def restart(self) -> None: ...

846

def is_running(self) -> bool: ...

847

def get_pid(self) -> Optional[int]: ...

848

849

class ServiceT(Protocol):

850

"""Type interface for Service."""

851

852

label: str

853

dependencies: set

854

855

async def start(self) -> None: ...

856

async def stop(self) -> None: ...

857

async def restart(self) -> None: ...

858

859

class ProcessCoordinatorT(Protocol):

860

"""Type interface for ProcessCoordinator."""

861

862

async def register_worker(self, worker_id: str, metadata: Optional[Dict] = None) -> None: ...

863

async def unregister_worker(self, worker_id: str) -> None: ...

864

async def get_active_workers(self) -> List[Dict]: ...

865

async def rebalance_load(self) -> None: ...

866

```