0
# Events & Monitoring
1
2
Luigi's event system provides hooks for monitoring task execution, workflow progress, and integration with external monitoring systems. Events enable tracking, logging, and alerting capabilities.
3
4
## Capabilities
5
6
### Base Event Class
7
8
Foundation class for Luigi's event system that enables monitoring and tracking of task execution lifecycle.
9
10
```python { .api }
11
class Event:
12
"""Base class for Luigi events."""
13
14
def __init__(self):
15
"""Initialize event instance."""
16
17
@staticmethod
18
def trigger_event(event, task_obj=None, flushing=False):
19
"""
20
Trigger an event with optional task context.
21
22
Args:
23
event: Event instance to trigger
24
task_obj: Task object associated with event
25
flushing: Whether this is a flush event
26
"""
27
```
28
29
### Task Lifecycle Events
30
31
Events that are triggered during different phases of task execution.
32
33
```python { .api }
34
# Task execution events
35
class TaskEvent(Event):
36
"""Base class for task-related events."""
37
38
def __init__(self, task):
39
"""
40
Initialize task event.
41
42
Args:
43
task: Task instance associated with event
44
"""
45
46
class TaskStartEvent(TaskEvent):
47
"""Event triggered when task execution starts."""
48
49
class TaskSuccessEvent(TaskEvent):
50
"""Event triggered when task completes successfully."""
51
52
class TaskFailureEvent(TaskEvent):
53
"""Event triggered when task execution fails."""
54
55
def __init__(self, task, exception):
56
"""
57
Initialize task failure event.
58
59
Args:
60
task: Failed task instance
61
exception: Exception that caused failure
62
"""
63
64
class TaskProcessEvent(TaskEvent):
65
"""Event triggered during task processing."""
66
67
# Dependency events
68
class DependencyDiscovered(TaskEvent):
69
"""Event triggered when task dependency is discovered."""
70
71
def __init__(self, task, dependency, upstream):
72
"""
73
Initialize dependency discovered event.
74
75
Args:
76
task: Task with dependency
77
dependency: Discovered dependency task
78
upstream: Whether dependency is upstream
79
"""
80
81
class DependencyMissing(TaskEvent):
82
"""Event triggered when required dependency is missing."""
83
84
def __init__(self, task, missing_dependency):
85
"""
86
Initialize dependency missing event.
87
88
Args:
89
task: Task with missing dependency
90
missing_dependency: Missing dependency task
91
"""
92
```
93
94
### Execution Status Codes
95
96
Enumeration of possible execution outcomes and status codes for Luigi workflows.
97
98
```python { .api }
99
class LuigiStatusCode:
100
"""Status codes for Luigi execution results."""
101
102
SUCCESS = 0
103
"""All tasks completed successfully without any failures."""
104
105
SUCCESS_WITH_RETRY = 1
106
"""Tasks completed successfully but some required retries."""
107
108
FAILED = 2
109
"""One or more tasks failed during execution."""
110
111
FAILED_AND_SCHEDULING_FAILED = 3
112
"""Both task execution and scheduling encountered failures."""
113
114
SCHEDULING_FAILED = 4
115
"""Task scheduling failed before execution could begin."""
116
117
NOT_RUN = 5
118
"""Tasks were not executed (e.g., already complete)."""
119
120
MISSING_EXT = 6
121
"""Missing external dependencies prevented execution."""
122
123
@classmethod
124
def has_value(cls, value: int) -> bool:
125
"""
126
Check if value is a valid status code.
127
128
Args:
129
value: Status code value to check
130
131
Returns:
132
bool: True if valid status code
133
"""
134
```
135
136
### Execution Summary
137
138
Classes for generating and managing execution summaries with detailed statistics and results.
139
140
```python { .api }
141
class execution_summary:
142
"""Configuration for execution summary generation."""
143
144
summary_length: int = 100
145
"""Maximum length of execution summary."""
146
147
no_configure_logging: bool = False
148
"""Whether to disable logging configuration."""
149
150
def summary(tasks=None, worker_obj=None) -> dict:
151
"""
152
Generate execution summary for completed tasks.
153
154
Args:
155
tasks: List of tasks to summarize
156
worker_obj: Worker object with execution details
157
158
Returns:
159
dict: Execution summary with statistics and details
160
"""
161
162
class LuigiRunResult:
163
"""Container for Luigi execution results and status."""
164
165
def __init__(self, status: LuigiStatusCode, worker=None,
166
scheduling_succeeded: bool = True):
167
"""
168
Initialize run result.
169
170
Args:
171
status: Execution status code
172
worker: Worker instance that executed tasks
173
scheduling_succeeded: Whether scheduling was successful
174
"""
175
176
@property
177
def status(self) -> LuigiStatusCode:
178
"""Get execution status code."""
179
180
@property
181
def worker(self):
182
"""Get worker instance."""
183
184
@property
185
def scheduling_succeeded(self) -> bool:
186
"""Check if scheduling succeeded."""
187
```
188
189
## Usage Examples
190
191
### Basic Event Handling
192
193
```python
194
import luigi
195
from luigi.event import Event
196
197
class CustomEvent(Event):
198
"""Custom event for application-specific monitoring."""
199
200
def __init__(self, message: str, data: dict = None):
201
super().__init__()
202
self.message = message
203
self.data = data or {}
204
self.timestamp = time.time()
205
206
# Event handler functions
207
@CustomEvent.event_handler
208
def handle_custom_event(event):
209
"""Handle custom events."""
210
print(f"Custom event: {event.message}")
211
if event.data:
212
print(f"Event data: {event.data}")
213
214
class MonitoredTask(luigi.Task):
215
"""Task that emits custom events."""
216
217
def run(self):
218
# Emit start event
219
Event.trigger_event(CustomEvent("Task started", {"task_id": self.task_id}))
220
221
try:
222
# Task logic
223
with self.output().open('w') as f:
224
f.write("Task completed")
225
226
# Emit success event
227
Event.trigger_event(CustomEvent("Task completed successfully"))
228
229
except Exception as e:
230
# Emit failure event
231
Event.trigger_event(CustomEvent("Task failed", {"error": str(e)}))
232
raise
233
234
def output(self):
235
return luigi.LocalTarget("monitored_output.txt")
236
```
237
238
### Task Lifecycle Monitoring
239
240
```python
241
import luigi
242
from luigi.event import Event
243
import logging
244
import time
245
246
# Set up logging for events
247
logging.basicConfig(level=logging.INFO)
248
logger = logging.getLogger('luigi.events')
249
250
# Event handlers for task lifecycle
251
@luigi.event.Event.trigger_event
252
def log_task_start(event):
253
"""Log when tasks start execution."""
254
if hasattr(event, 'task'):
255
logger.info(f"STARTED: {event.task.task_id}")
256
257
@luigi.event.Event.trigger_event
258
def log_task_success(event):
259
"""Log when tasks complete successfully."""
260
if hasattr(event, 'task'):
261
logger.info(f"SUCCESS: {event.task.task_id}")
262
263
@luigi.event.Event.trigger_event
264
def log_task_failure(event):
265
"""Log when tasks fail."""
266
if hasattr(event, 'task') and hasattr(event, 'exception'):
267
logger.error(f"FAILED: {event.task.task_id} - {event.exception}")
268
269
class LifecycleTask(luigi.Task):
270
"""Task with comprehensive lifecycle monitoring."""
271
272
task_name = luigi.Parameter()
273
should_fail = luigi.BoolParameter(default=False)
274
275
def run(self):
276
logger.info(f"Executing {self.task_name}")
277
278
# Simulate work
279
time.sleep(1)
280
281
if self.should_fail:
282
raise Exception(f"Task {self.task_name} configured to fail")
283
284
with self.output().open('w') as f:
285
f.write(f"Completed: {self.task_name}")
286
287
def output(self):
288
return luigi.LocalTarget(f"output_{self.task_name}.txt")
289
290
# Run tasks with lifecycle monitoring
291
if __name__ == '__main__':
292
tasks = [
293
LifecycleTask(task_name="task1"),
294
LifecycleTask(task_name="task2"),
295
LifecycleTask(task_name="task3", should_fail=True)
296
]
297
298
result = luigi.build(tasks, local_scheduler=True)
299
print(f"Final status: {result.status}")
300
```
301
302
### Execution Summary Analysis
303
304
```python
305
import luigi
306
from luigi.execution_summary import summary, LuigiStatusCode
307
from luigi.event import Event
308
import json
309
310
class SummaryAnalysisTask(luigi.Task):
311
"""Task that analyzes execution summaries."""
312
313
def output(self):
314
return luigi.LocalTarget("execution_analysis.json")
315
316
def run(self):
317
# Create some test tasks
318
test_tasks = [
319
SimpleTask(name=f"task_{i}") for i in range(5)
320
]
321
322
# Execute tasks and collect results
323
result = luigi.build(test_tasks, local_scheduler=True)
324
325
# Generate execution summary
326
exec_summary = summary(tasks=test_tasks, worker_obj=result.worker)
327
328
# Analyze results
329
analysis = {
330
'execution_status': result.status,
331
'scheduling_succeeded': result.scheduling_succeeded,
332
'summary': exec_summary,
333
'task_analysis': {
334
'total_tasks': len(test_tasks),
335
'completed_tasks': sum(1 for task in test_tasks if task.complete()),
336
'failed_tasks': len(test_tasks) - sum(1 for task in test_tasks if task.complete())
337
},
338
'status_interpretation': self.interpret_status(result.status)
339
}
340
341
# Save analysis
342
with self.output().open('w') as f:
343
json.dump(analysis, f, indent=2, default=str)
344
345
def interpret_status(self, status: LuigiStatusCode) -> str:
346
"""Interpret status code into human-readable description."""
347
interpretations = {
348
LuigiStatusCode.SUCCESS: "All tasks completed successfully",
349
LuigiStatusCode.SUCCESS_WITH_RETRY: "Tasks completed after retries",
350
LuigiStatusCode.FAILED: "Some tasks failed",
351
LuigiStatusCode.SCHEDULING_FAILED: "Task scheduling failed",
352
LuigiStatusCode.NOT_RUN: "Tasks were not executed",
353
LuigiStatusCode.MISSING_EXT: "Missing external dependencies"
354
}
355
return interpretations.get(status, f"Unknown status: {status}")
356
357
class SimpleTask(luigi.Task):
358
name = luigi.Parameter()
359
360
def output(self):
361
return luigi.LocalTarget(f"simple_{self.name}.txt")
362
363
def run(self):
364
with self.output().open('w') as f:
365
f.write(f"Simple task: {self.name}")
366
```
367
368
### External Monitoring Integration
369
370
```python
371
import luigi
372
from luigi.event import Event
373
import requests
374
import json
375
from datetime import datetime
376
377
class MetricsCollector:
378
"""Collect and send metrics to external monitoring system."""
379
380
def __init__(self, metrics_endpoint: str):
381
self.endpoint = metrics_endpoint
382
self.metrics = []
383
384
def record_metric(self, metric_name: str, value: float, tags: dict = None):
385
"""Record a metric for later sending."""
386
metric = {
387
'name': metric_name,
388
'value': value,
389
'timestamp': datetime.utcnow().isoformat(),
390
'tags': tags or {}
391
}
392
self.metrics.append(metric)
393
394
def send_metrics(self):
395
"""Send collected metrics to monitoring system."""
396
if not self.metrics:
397
return
398
399
try:
400
response = requests.post(
401
self.endpoint,
402
json={'metrics': self.metrics},
403
timeout=10
404
)
405
response.raise_for_status()
406
print(f"Sent {len(self.metrics)} metrics")
407
self.metrics.clear()
408
409
except Exception as e:
410
print(f"Failed to send metrics: {e}")
411
412
# Global metrics collector
413
metrics = MetricsCollector("http://monitoring.example.com/api/metrics")
414
415
# Event handlers for metrics collection
416
@Event.event_handler
417
def collect_task_metrics(event):
418
"""Collect metrics from task events."""
419
420
if hasattr(event, 'task'):
421
task_id = event.task.task_id
422
task_family = event.task.task_family
423
424
if isinstance(event, luigi.event.TaskSuccessEvent):
425
metrics.record_metric(
426
'luigi.task.success',
427
1,
428
{'task_family': task_family, 'task_id': task_id}
429
)
430
431
elif isinstance(event, luigi.event.TaskFailureEvent):
432
metrics.record_metric(
433
'luigi.task.failure',
434
1,
435
{'task_family': task_family, 'task_id': task_id}
436
)
437
438
class MonitoredWorkflow(luigi.WrapperTask):
439
"""Workflow with external monitoring integration."""
440
441
def requires(self):
442
return [
443
ProcessDataTask(dataset="A"),
444
ProcessDataTask(dataset="B"),
445
ProcessDataTask(dataset="C")
446
]
447
448
def run(self):
449
# Send collected metrics after workflow completion
450
metrics.send_metrics()
451
452
class ProcessDataTask(luigi.Task):
453
dataset = luigi.Parameter()
454
455
def output(self):
456
return luigi.LocalTarget(f"processed_{self.dataset}.txt")
457
458
def run(self):
459
# Record custom business metrics
460
metrics.record_metric(
461
'luigi.dataset.processed',
462
1,
463
{'dataset': self.dataset}
464
)
465
466
# Simulate processing
467
import time
468
start_time = time.time()
469
470
with self.output().open('w') as f:
471
f.write(f"Processed dataset {self.dataset}")
472
473
# Record processing time
474
processing_time = time.time() - start_time
475
metrics.record_metric(
476
'luigi.processing.duration',
477
processing_time,
478
{'dataset': self.dataset}
479
)
480
```
481
482
### Health Check and Alerting
483
484
```python
485
import luigi
486
from luigi.event import Event
487
from luigi.execution_summary import LuigiStatusCode
488
import smtplib
489
from email.mime.text import MIMEText
490
import logging
491
492
class HealthMonitor:
493
"""Monitor Luigi workflow health and send alerts."""
494
495
def __init__(self, alert_email: str, smtp_config: dict):
496
self.alert_email = alert_email
497
self.smtp_config = smtp_config
498
self.failure_count = 0
499
self.max_failures = 3
500
501
def check_health(self, result: luigi.LuigiRunResult):
502
"""Check workflow health and send alerts if needed."""
503
504
if result.status == LuigiStatusCode.SUCCESS:
505
self.failure_count = 0
506
self.send_success_notification()
507
508
elif result.status in [LuigiStatusCode.FAILED,
509
LuigiStatusCode.SCHEDULING_FAILED]:
510
self.failure_count += 1
511
512
if self.failure_count >= self.max_failures:
513
self.send_alert(f"Workflow has failed {self.failure_count} times")
514
else:
515
self.send_warning(f"Workflow failed (attempt {self.failure_count})")
516
517
def send_alert(self, message: str):
518
"""Send critical alert email."""
519
self._send_email(
520
subject="🚨 CRITICAL: Luigi Workflow Alert",
521
body=f"CRITICAL ALERT: {message}\n\nImmediate attention required."
522
)
523
524
def send_warning(self, message: str):
525
"""Send warning email."""
526
self._send_email(
527
subject="⚠️ WARNING: Luigi Workflow Warning",
528
body=f"WARNING: {message}\n\nMonitoring situation."
529
)
530
531
def send_success_notification(self):
532
"""Send success notification if recovering from failures."""
533
if self.failure_count > 0:
534
self._send_email(
535
subject="✅ SUCCESS: Luigi Workflow Recovered",
536
body="Workflow has recovered and is running successfully."
537
)
538
539
def _send_email(self, subject: str, body: str):
540
"""Send email notification."""
541
try:
542
msg = MIMEText(body)
543
msg['Subject'] = subject
544
msg['From'] = self.smtp_config['from']
545
msg['To'] = self.alert_email
546
547
server = smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port'])
548
server.starttls()
549
server.login(self.smtp_config['user'], self.smtp_config['password'])
550
server.send_message(msg)
551
server.quit()
552
553
logging.info(f"Alert sent: {subject}")
554
555
except Exception as e:
556
logging.error(f"Failed to send alert: {e}")
557
558
# Configure health monitor
559
health_monitor = HealthMonitor(
560
alert_email="admin@example.com",
561
smtp_config={
562
'host': 'smtp.example.com',
563
'port': 587,
564
'user': 'luigi@example.com',
565
'password': 'password',
566
'from': 'luigi@example.com'
567
}
568
)
569
570
class MonitoredPipeline(luigi.WrapperTask):
571
"""Pipeline with health monitoring and alerting."""
572
573
def requires(self):
574
return [
575
CriticalTask(task_id=f"task_{i}") for i in range(3)
576
]
577
578
class CriticalTask(luigi.Task):
579
task_id = luigi.Parameter()
580
581
def output(self):
582
return luigi.LocalTarget(f"critical_{self.task_id}.txt")
583
584
def run(self):
585
# Simulate occasional failures for testing
586
import random
587
if random.random() < 0.2: # 20% failure rate
588
raise Exception(f"Simulated failure in {self.task_id}")
589
590
with self.output().open('w') as f:
591
f.write(f"Critical task {self.task_id} completed")
592
593
# Execute with health monitoring
594
if __name__ == '__main__':
595
result = luigi.build([MonitoredPipeline()], local_scheduler=True)
596
health_monitor.check_health(result)
597
```