or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-application.mdexceptions.mdindex.mdresults-state.mdscheduling-beat.mdsignals-events.mdworkflow-primitives.md

signals-events.mddocs/

0

# Signals and Events

1

2

Signal-based event system for monitoring task lifecycle, worker status, and system events. These hooks enable custom logging, monitoring, debugging, and integration with external systems throughout Celery's execution pipeline.

3

4

## Capabilities

5

6

### Task Lifecycle Signals

7

8

Signals fired during task execution phases, providing hooks for monitoring, logging, and custom behavior at each stage.

9

10

```python { .api }

11

# Task execution signals

12

before_task_publish = Signal()

13

after_task_publish = Signal()

14

task_prerun = Signal()

15

task_postrun = Signal()

16

task_success = Signal()

17

task_failure = Signal()

18

task_retry = Signal()

19

task_revoked = Signal()

20

task_received = Signal()

21

task_rejected = Signal()

22

task_unknown = Signal()

23

24

class Signal:

25

def connect(self, callback, sender=None, weak=True, dispatch_uid=None):

26

"""

27

Connect callback to signal.

28

29

Args:

30

callback (callable): Function to call when signal fires

31

sender: Signal sender filter (None for all senders)

32

weak (bool): Use weak references to callback

33

dispatch_uid: Unique identifier for this connection

34

"""

35

36

def disconnect(self, callback=None, sender=None, dispatch_uid=None):

37

"""

38

Disconnect callback from signal.

39

40

Args:

41

callback (callable): Callback to disconnect

42

sender: Sender filter

43

dispatch_uid: Connection identifier

44

45

Returns:

46

bool: True if disconnected

47

"""

48

49

def send(self, sender, **kwargs):

50

"""

51

Send signal to all connected callbacks.

52

53

Args:

54

sender: Signal sender

55

**kwargs: Signal data

56

57

Returns:

58

list: [(receiver, response), ...] for each callback

59

"""

60

61

def before_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, declare=None, retry_policy=None, **kwargs):

62

"""

63

Signal fired before task is published to broker.

64

65

Args:

66

sender: Publisher instance

67

headers (dict): Message headers

68

body (dict): Message body

69

routing_key (str): Message routing key

70

exchange (str): Exchange name

71

declare (list): Exchanges/queues to declare

72

retry_policy (dict): Retry configuration

73

"""

74

75

def after_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, **kwargs):

76

"""

77

Signal fired after task is published to broker.

78

79

Args:

80

sender: Publisher instance

81

headers (dict): Message headers

82

body (dict): Message body

83

routing_key (str): Message routing key

84

exchange (str): Exchange name

85

"""

86

87

def task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):

88

"""

89

Signal fired before task execution.

90

91

Args:

92

sender: Task class

93

task_id (str): Task ID

94

task: Task instance

95

args (tuple): Task arguments

96

kwargs (dict): Task keyword arguments

97

"""

98

99

def task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):

100

"""

101

Signal fired after task execution.

102

103

Args:

104

sender: Task class

105

task_id (str): Task ID

106

task: Task instance

107

args (tuple): Task arguments

108

kwargs (dict): Task keyword arguments

109

retval: Task return value

110

state (str): Final task state

111

"""

112

113

def task_success(sender=None, result=None, **kwds):

114

"""

115

Signal fired when task succeeds.

116

117

Args:

118

sender: Task class

119

result: Task return value

120

"""

121

122

def task_failure(sender=None, task_id=None, exception=None, einfo=None, **kwds):

123

"""

124

Signal fired when task fails.

125

126

Args:

127

sender: Task class

128

task_id (str): Task ID

129

exception: Exception instance

130

einfo: Exception info object

131

"""

132

133

def task_retry(sender=None, task_id=None, reason=None, einfo=None, **kwds):

134

"""

135

Signal fired when task is retried.

136

137

Args:

138

sender: Task class

139

task_id (str): Task ID

140

reason: Retry reason/exception

141

einfo: Exception info

142

"""

143

144

def task_revoked(sender=None, request=None, terminated=None, signum=None, expired=None, **kwds):

145

"""

146

Signal fired when task is revoked.

147

148

Args:

149

sender: Task class

150

request: Task request object

151

terminated (bool): Process was terminated

152

signum (int): Signal number if terminated

153

expired (bool): Task expired

154

"""

155

156

def task_received(sender=None, request=None, **kwargs):

157

"""

158

Signal fired when worker receives task.

159

160

Args:

161

sender: Consumer instance

162

request: Task request object

163

"""

164

165

def task_rejected(sender=None, message=None, exc=None, **kwargs):

166

"""

167

Signal fired when task is rejected.

168

169

Args:

170

sender: Consumer instance

171

message: Rejected message

172

exc: Rejection exception

173

"""

174

175

def task_unknown(sender=None, message=None, exc=None, name=None, id=None, **kwargs):

176

"""

177

Signal fired when unknown task received.

178

179

Args:

180

sender: Consumer instance

181

message: Unknown message

182

exc: Exception raised

183

name (str): Unknown task name

184

id (str): Task ID

185

"""

186

```

187

188

### Worker Lifecycle Signals

189

190

Signals for monitoring worker startup, shutdown, and process management events.

191

192

```python { .api }

193

# Worker lifecycle signals

194

worker_init = Signal()

195

worker_process_init = Signal()

196

worker_process_shutdown = Signal()

197

worker_ready = Signal()

198

worker_shutdown = Signal()

199

worker_shutting_down = Signal()

200

celeryd_init = Signal()

201

celeryd_after_setup = Signal()

202

203

def worker_init(sender=None, **kwargs):

204

"""

205

Signal fired when worker initializes.

206

207

Args:

208

sender: Worker instance

209

"""

210

211

def worker_process_init(sender=None, **kwargs):

212

"""

213

Signal fired when worker process initializes.

214

215

Args:

216

sender: Worker instance

217

"""

218

219

def worker_process_shutdown(sender=None, pid=None, exitcode=None, **kwargs):

220

"""

221

Signal fired when worker process shuts down.

222

223

Args:

224

sender: Worker instance

225

pid (int): Process ID

226

exitcode (int): Process exit code

227

"""

228

229

def worker_ready(sender=None, **kwargs):

230

"""

231

Signal fired when worker is ready to receive tasks.

232

233

Args:

234

sender: Worker instance

235

"""

236

237

def worker_shutdown(sender=None, **kwargs):

238

"""

239

Signal fired when worker shuts down.

240

241

Args:

242

sender: Worker instance

243

"""

244

245

def worker_shutting_down(sender=None, **kwargs):

246

"""

247

Signal fired when worker begins shutdown.

248

249

Args:

250

sender: Worker instance

251

"""

252

253

def celeryd_init(sender=None, instance=None, conf=None, **kwargs):

254

"""

255

Signal fired when celery daemon initializes.

256

257

Args:

258

sender: Worker class

259

instance: Worker instance

260

conf: Configuration object

261

"""

262

263

def celeryd_after_setup(sender=None, instance=None, conf=None, **kwargs):

264

"""

265

Signal fired after celery daemon setup.

266

267

Args:

268

sender: Worker class

269

instance: Worker instance

270

conf: Configuration object

271

"""

272

```

273

274

### Beat Scheduler Signals

275

276

Signals for monitoring periodic task scheduler events and beat service lifecycle.

277

278

```python { .api }

279

# Beat scheduler signals

280

beat_init = Signal()

281

beat_embedded_init = Signal()

282

283

def beat_init(sender=None, **kwargs):

284

"""

285

Signal fired when beat scheduler initializes.

286

287

Args:

288

sender: Beat service instance

289

"""

290

291

def beat_embedded_init(sender=None, **kwargs):

292

"""

293

Signal fired when embedded beat initializes.

294

295

Args:

296

sender: Beat service instance

297

"""

298

```

299

300

### Logging and Setup Signals

301

302

Signals for customizing logging configuration and system setup procedures.

303

304

```python { .api }

305

# Logging signals

306

setup_logging = Signal()

307

after_setup_logger = Signal()

308

after_setup_task_logger = Signal()

309

310

def setup_logging(sender=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):

311

"""

312

Signal fired during logging setup.

313

314

Args:

315

sender: Logging setup caller

316

loglevel (int): Log level

317

logfile (str): Log file path

318

format (str): Log format string

319

colorize (bool): Enable colored output

320

"""

321

322

def after_setup_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):

323

"""

324

Signal fired after logger setup.

325

326

Args:

327

sender: Logger setup caller

328

logger: Logger instance

329

loglevel (int): Log level

330

logfile (str): Log file path

331

format (str): Log format

332

colorize (bool): Colored output enabled

333

"""

334

335

def after_setup_task_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):

336

"""

337

Signal fired after task logger setup.

338

339

Args:

340

sender: Task logger setup caller

341

logger: Task logger instance

342

loglevel (int): Log level

343

logfile (str): Log file path

344

format (str): Log format

345

colorize (bool): Colored output enabled

346

"""

347

```

348

349

### Signal Connection Decorators

350

351

Convenience decorators for connecting signal handlers to specific events.

352

353

```python { .api }

354

def receiver(signal, **kwargs):

355

"""

356

Decorator for connecting signal handlers.

357

358

Args:

359

signal: Signal instance to connect to

360

**kwargs: Connection options (sender, dispatch_uid, etc.)

361

362

Returns:

363

Decorator function

364

"""

365

366

# Usage patterns

367

@receiver(task_success)

368

def task_success_handler(sender=None, result=None, **kwargs):

369

"""Handle task success."""

370

pass

371

372

@receiver(task_failure)

373

def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):

374

"""Handle task failure."""

375

pass

376

```

377

378

## Usage Examples

379

380

### Basic Signal Handlers

381

382

```python

383

from celery import Celery

384

from celery.signals import (

385

task_prerun, task_postrun, task_success, task_failure,

386

worker_ready, worker_shutdown

387

)

388

389

app = Celery('signal_example')

390

391

@task_prerun.connect

392

def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):

393

"""Log task start."""

394

print(f'Task {task.name}[{task_id}] starting with args={args}, kwargs={kwargs}')

395

396

@task_postrun.connect

397

def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):

398

"""Log task completion."""

399

print(f'Task {task.name}[{task_id}] finished with state={state}, result={retval}')

400

401

@task_success.connect

402

def task_success_handler(sender=None, result=None, **kwargs):

403

"""Handle successful task completion."""

404

print(f'Task {sender.name} succeeded with result: {result}')

405

406

@task_failure.connect

407

def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):

408

"""Handle task failure."""

409

print(f'Task {sender.name}[{task_id}] failed: {exception}')

410

print(f'Traceback: {einfo.traceback}')

411

412

@worker_ready.connect

413

def worker_ready_handler(sender=None, **kwargs):

414

"""Log when worker becomes ready."""

415

print(f'Worker {sender.hostname} is ready to receive tasks')

416

417

@worker_shutdown.connect

418

def worker_shutdown_handler(sender=None, **kwargs):

419

"""Log worker shutdown."""

420

print(f'Worker {sender.hostname} is shutting down')

421

```

422

423

### Advanced Monitoring and Metrics

424

425

```python

426

import time

427

import json

428

from datetime import datetime

429

from celery.signals import (

430

before_task_publish, after_task_publish,

431

task_prerun, task_postrun, task_success, task_failure,

432

worker_process_init

433

)

434

435

# Task execution metrics

436

task_metrics = {

437

'published': 0,

438

'started': 0,

439

'completed': 0,

440

'failed': 0,

441

'total_execution_time': 0

442

}

443

444

@before_task_publish.connect

445

def track_task_published(sender=None, headers=None, body=None, **kwargs):

446

"""Track task publication."""

447

task_metrics['published'] += 1

448

task_name = headers.get('task', 'unknown')

449

print(f'Publishing task: {task_name}')

450

451

@task_prerun.connect

452

def track_task_start(sender=None, task_id=None, task=None, **kwargs):

453

"""Track task execution start."""

454

task_metrics['started'] += 1

455

456

# Store start time for duration calculation

457

task.request.start_time = time.time()

458

459

print(f'Starting task {task.name}[{task_id}] at {datetime.now()}')

460

461

@task_postrun.connect

462

def track_task_completion(sender=None, task_id=None, task=None, state=None, **kwargs):

463

"""Track task completion and calculate duration."""

464

task_metrics['completed'] += 1

465

466

# Calculate execution time

467

if hasattr(task.request, 'start_time'):

468

duration = time.time() - task.request.start_time

469

task_metrics['total_execution_time'] += duration

470

print(f'Task {task.name}[{task_id}] completed in {duration:.2f}s with state {state}')

471

472

@task_failure.connect

473

def track_task_failure(sender=None, task_id=None, exception=None, **kwargs):

474

"""Track task failures."""

475

task_metrics['failed'] += 1

476

print(f'Task {sender.name}[{task_id}] failed: {type(exception).__name__}: {exception}')

477

478

def print_metrics():

479

"""Print current metrics."""

480

print("\n=== Task Metrics ===")

481

print(f"Published: {task_metrics['published']}")

482

print(f"Started: {task_metrics['started']}")

483

print(f"Completed: {task_metrics['completed']}")

484

print(f"Failed: {task_metrics['failed']}")

485

if task_metrics['completed'] > 0:

486

avg_time = task_metrics['total_execution_time'] / task_metrics['completed']

487

print(f"Average execution time: {avg_time:.2f}s")

488

489

@worker_process_init.connect

490

def setup_worker_monitoring(sender=None, **kwargs):

491

"""Setup worker-specific monitoring."""

492

print(f'Worker process {sender.hostname} initialized with PID {sender.pid}')

493

```

494

495

### Custom Logging Integration

496

497

```python

498

import logging

499

from celery.signals import (

500

setup_logging, after_setup_logger,

501

task_prerun, task_postrun, task_failure

502

)

503

504

# Custom logger setup

505

@setup_logging.connect

506

def setup_custom_logging(sender=None, loglevel=None, logfile=None, **kwargs):

507

"""Setup custom logging configuration."""

508

509

# Configure root logger

510

logging.basicConfig(

511

level=loglevel,

512

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

513

handlers=[

514

logging.FileHandler(logfile or 'celery.log'),

515

logging.StreamHandler()

516

]

517

)

518

519

# Disable default Celery logging setup

520

return True

521

522

@after_setup_logger.connect

523

def configure_task_logger(sender=None, logger=None, **kwargs):

524

"""Configure task-specific logger."""

525

526

# Add custom handler for task logs

527

task_handler = logging.FileHandler('tasks.log')

528

task_handler.setFormatter(logging.Formatter(

529

'%(asctime)s [%(levelname)s] %(name)s: %(message)s'

530

))

531

532

logger.addHandler(task_handler)

533

logger.info('Custom task logger configured')

534

535

# Task-specific logging

536

@task_prerun.connect

537

def log_task_start(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):

538

"""Log task start with context."""

539

logger = logging.getLogger(f'tasks.{task.name}')

540

logger.info(f'Starting task {task_id} with args={args}, kwargs={kwargs}')

541

542

@task_postrun.connect

543

def log_task_end(sender=None, task_id=None, task=None, retval=None, state=None, **kwds):

544

"""Log task completion."""

545

logger = logging.getLogger(f'tasks.{task.name}')

546

logger.info(f'Task {task_id} finished with state={state}')

547

548

@task_failure.connect

549

def log_task_error(sender=None, task_id=None, exception=None, einfo=None, **kwargs):

550

"""Log task errors with full context."""

551

logger = logging.getLogger(f'tasks.{sender.name}')

552

logger.error(f'Task {task_id} failed: {exception}', exc_info=einfo)

553

```

554

555

### External System Integration

556

557

```python

558

import json

559

import requests

560

from celery.signals import task_success, task_failure, worker_ready

561

562

# Webhook notifications

563

WEBHOOK_URL = 'https://monitoring.example.com/webhook'

564

565

@task_success.connect

566

def notify_task_success(sender=None, result=None, **kwargs):

567

"""Send webhook notification on task success."""

568

569

payload = {

570

'event': 'task_success',

571

'task_name': sender.name,

572

'task_id': kwargs.get('task_id'),

573

'result': str(result),

574

'timestamp': datetime.now().isoformat()

575

}

576

577

try:

578

requests.post(WEBHOOK_URL, json=payload, timeout=5)

579

except Exception as e:

580

print(f'Failed to send webhook: {e}')

581

582

@task_failure.connect

583

def notify_task_failure(sender=None, task_id=None, exception=None, **kwargs):

584

"""Send alert on task failure."""

585

586

payload = {

587

'event': 'task_failure',

588

'task_name': sender.name,

589

'task_id': task_id,

590

'error': str(exception),

591

'error_type': type(exception).__name__,

592

'timestamp': datetime.now().isoformat(),

593

'severity': 'high'

594

}

595

596

try:

597

requests.post(WEBHOOK_URL, json=payload, timeout=5)

598

except Exception as e:

599

print(f'Failed to send failure alert: {e}')

600

601

# Metrics collection

602

@worker_ready.connect

603

def register_worker(sender=None, **kwargs):

604

"""Register worker with monitoring system."""

605

606

payload = {

607

'event': 'worker_ready',

608

'hostname': sender.hostname,

609

'pid': getattr(sender, 'pid', None),

610

'timestamp': datetime.now().isoformat()

611

}

612

613

try:

614

requests.post(f'{WEBHOOK_URL}/workers', json=payload, timeout=5)

615

except Exception as e:

616

print(f'Failed to register worker: {e}')

617

```

618

619

### Signal-Based Task Routing

620

621

```python

622

from celery.signals import before_task_publish

623

624

@before_task_publish.connect

625

def route_priority_tasks(sender=None, headers=None, body=None, routing_key=None, **kwargs):

626

"""Dynamically route tasks based on priority."""

627

628

task_name = headers.get('task')

629

priority = body.get('kwargs', {}).get('priority', 'normal')

630

631

# Route high priority tasks to dedicated queue

632

if priority == 'high':

633

headers['routing_key'] = 'high_priority'

634

print(f'Routing {task_name} to high priority queue')

635

636

# Route long-running tasks to separate workers

637

elif task_name in ['process_large_file', 'generate_report']:

638

headers['routing_key'] = 'long_running'

639

print(f'Routing {task_name} to long running queue')

640

```

641

642

### Debugging and Development

643

644

```python

645

from celery.signals import *

646

647

# Debug signal that logs all task activity

648

@task_prerun.connect

649

@task_postrun.connect

650

@task_success.connect

651

@task_failure.connect

652

@task_retry.connect

653

@task_revoked.connect

654

def debug_task_signals(sender=None, **kwargs):

655

"""Log all task signals for debugging."""

656

657

import inspect

658

signal_name = inspect.stack()[1].function.replace('_handler', '')

659

660

debug_info = {

661

'signal': signal_name,

662

'task': getattr(sender, 'name', str(sender)) if sender else None,

663

'data': {k: str(v) for k, v in kwargs.items() if k not in ['einfo']}

664

}

665

666

print(f'DEBUG SIGNAL: {json.dumps(debug_info, indent=2)}')

667

668

# Performance monitoring

669

task_times = {}

670

671

@task_prerun.connect

672

def start_timer(sender=None, task_id=None, **kwargs):

673

"""Start timing task execution."""

674

task_times[task_id] = time.time()

675

676

@task_postrun.connect

677

def end_timer(sender=None, task_id=None, **kwargs):

678

"""End timing and log duration."""

679

if task_id in task_times:

680

duration = time.time() - task_times[task_id]

681

print(f'Task {sender.name}[{task_id}] took {duration:.3f}s')

682

del task_times[task_id]

683

```

684

685

### Conditional Signal Handlers

686

687

```python

688

from celery.signals import task_failure

689

import os

690

691

# Only handle failures in production

692

@task_failure.connect

693

def production_error_handler(sender=None, task_id=None, exception=None, **kwargs):

694

"""Handle errors differently in production vs development."""

695

696

if os.environ.get('ENVIRONMENT') == 'production':

697

# Send to error tracking service

698

send_to_sentry(exception, task_id, sender.name)

699

700

# Page on-call engineer for critical tasks

701

if sender.name in ['process_payment', 'send_notification']:

702

page_oncall_engineer(f'Critical task {sender.name} failed: {exception}')

703

else:

704

# Just log in development

705

print(f'DEV: Task {sender.name}[{task_id}] failed: {exception}')

706

707

def send_to_sentry(exception, task_id, task_name):

708

"""Send error to Sentry."""

709

# Sentry integration code

710

pass

711

712

def page_oncall_engineer(message):

713

"""Send alert to on-call engineer."""

714

# Alerting system integration

715

pass

716

```