Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's event system provides hooks for monitoring task execution, workflow progress, and integration with external monitoring systems. Events enable tracking, logging, and alerting capabilities.
Foundation class for Luigi's event system that enables monitoring and tracking of task execution lifecycle.
class Event:
"""Base class for Luigi events."""
def __init__(self):
"""Initialize event instance."""
@staticmethod
def trigger_event(event, task_obj=None, flushing=False):
"""
Trigger an event with optional task context.
Args:
event: Event instance to trigger
task_obj: Task object associated with event
flushing: Whether this is a flush event
"""Events that are triggered during different phases of task execution.
# Task execution events
class TaskEvent(Event):
"""Base class for task-related events."""
def __init__(self, task):
"""
Initialize task event.
Args:
task: Task instance associated with event
"""
class TaskStartEvent(TaskEvent):
"""Event triggered when task execution starts."""
class TaskSuccessEvent(TaskEvent):
"""Event triggered when task completes successfully."""
class TaskFailureEvent(TaskEvent):
"""Event triggered when task execution fails."""
def __init__(self, task, exception):
"""
Initialize task failure event.
Args:
task: Failed task instance
exception: Exception that caused failure
"""
class TaskProcessEvent(TaskEvent):
"""Event triggered during task processing."""
# Dependency events
class DependencyDiscovered(TaskEvent):
"""Event triggered when task dependency is discovered."""
def __init__(self, task, dependency, upstream):
"""
Initialize dependency discovered event.
Args:
task: Task with dependency
dependency: Discovered dependency task
upstream: Whether dependency is upstream
"""
class DependencyMissing(TaskEvent):
"""Event triggered when required dependency is missing."""
def __init__(self, task, missing_dependency):
"""
Initialize dependency missing event.
Args:
task: Task with missing dependency
missing_dependency: Missing dependency task
"""Enumeration of possible execution outcomes and status codes for Luigi workflows.
class LuigiStatusCode:
"""Status codes for Luigi execution results."""
SUCCESS = 0
"""All tasks completed successfully without any failures."""
SUCCESS_WITH_RETRY = 1
"""Tasks completed successfully but some required retries."""
FAILED = 2
"""One or more tasks failed during execution."""
FAILED_AND_SCHEDULING_FAILED = 3
"""Both task execution and scheduling encountered failures."""
SCHEDULING_FAILED = 4
"""Task scheduling failed before execution could begin."""
NOT_RUN = 5
"""Tasks were not executed (e.g., already complete)."""
MISSING_EXT = 6
"""Missing external dependencies prevented execution."""
@classmethod
def has_value(cls, value: int) -> bool:
"""
Check if value is a valid status code.
Args:
value: Status code value to check
Returns:
bool: True if valid status code
"""Classes for generating and managing execution summaries with detailed statistics and results.
class execution_summary:
"""Configuration for execution summary generation."""
summary_length: int = 100
"""Maximum length of execution summary."""
no_configure_logging: bool = False
"""Whether to disable logging configuration."""
def summary(tasks=None, worker_obj=None) -> dict:
"""
Generate execution summary for completed tasks.
Args:
tasks: List of tasks to summarize
worker_obj: Worker object with execution details
Returns:
dict: Execution summary with statistics and details
"""
class LuigiRunResult:
"""Container for Luigi execution results and status."""
def __init__(self, status: LuigiStatusCode, worker=None,
scheduling_succeeded: bool = True):
"""
Initialize run result.
Args:
status: Execution status code
worker: Worker instance that executed tasks
scheduling_succeeded: Whether scheduling was successful
"""
@property
def status(self) -> LuigiStatusCode:
"""Get execution status code."""
@property
def worker(self):
"""Get worker instance."""
@property
def scheduling_succeeded(self) -> bool:
"""Check if scheduling succeeded."""import luigi
from luigi.event import Event
class CustomEvent(Event):
"""Custom event for application-specific monitoring."""
def __init__(self, message: str, data: dict = None):
super().__init__()
self.message = message
self.data = data or {}
self.timestamp = time.time()
# Event handler functions
@CustomEvent.event_handler
def handle_custom_event(event):
"""Handle custom events."""
print(f"Custom event: {event.message}")
if event.data:
print(f"Event data: {event.data}")
class MonitoredTask(luigi.Task):
"""Task that emits custom events."""
def run(self):
# Emit start event
Event.trigger_event(CustomEvent("Task started", {"task_id": self.task_id}))
try:
# Task logic
with self.output().open('w') as f:
f.write("Task completed")
# Emit success event
Event.trigger_event(CustomEvent("Task completed successfully"))
except Exception as e:
# Emit failure event
Event.trigger_event(CustomEvent("Task failed", {"error": str(e)}))
raise
def output(self):
return luigi.LocalTarget("monitored_output.txt")import luigi
from luigi.event import Event
import logging
import time
# Set up logging for events
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('luigi.events')
# Event handlers for task lifecycle
@luigi.event.Event.trigger_event
def log_task_start(event):
"""Log when tasks start execution."""
if hasattr(event, 'task'):
logger.info(f"STARTED: {event.task.task_id}")
@luigi.event.Event.trigger_event
def log_task_success(event):
"""Log when tasks complete successfully."""
if hasattr(event, 'task'):
logger.info(f"SUCCESS: {event.task.task_id}")
@luigi.event.Event.trigger_event
def log_task_failure(event):
"""Log when tasks fail."""
if hasattr(event, 'task') and hasattr(event, 'exception'):
logger.error(f"FAILED: {event.task.task_id} - {event.exception}")
class LifecycleTask(luigi.Task):
"""Task with comprehensive lifecycle monitoring."""
task_name = luigi.Parameter()
should_fail = luigi.BoolParameter(default=False)
def run(self):
logger.info(f"Executing {self.task_name}")
# Simulate work
time.sleep(1)
if self.should_fail:
raise Exception(f"Task {self.task_name} configured to fail")
with self.output().open('w') as f:
f.write(f"Completed: {self.task_name}")
def output(self):
return luigi.LocalTarget(f"output_{self.task_name}.txt")
# Run tasks with lifecycle monitoring
if __name__ == '__main__':
tasks = [
LifecycleTask(task_name="task1"),
LifecycleTask(task_name="task2"),
LifecycleTask(task_name="task3", should_fail=True)
]
result = luigi.build(tasks, local_scheduler=True)
print(f"Final status: {result.status}")import luigi
from luigi.execution_summary import summary, LuigiStatusCode
from luigi.event import Event
import json
class SummaryAnalysisTask(luigi.Task):
"""Task that analyzes execution summaries."""
def output(self):
return luigi.LocalTarget("execution_analysis.json")
def run(self):
# Create some test tasks
test_tasks = [
SimpleTask(name=f"task_{i}") for i in range(5)
]
# Execute tasks and collect results
result = luigi.build(test_tasks, local_scheduler=True)
# Generate execution summary
exec_summary = summary(tasks=test_tasks, worker_obj=result.worker)
# Analyze results
analysis = {
'execution_status': result.status,
'scheduling_succeeded': result.scheduling_succeeded,
'summary': exec_summary,
'task_analysis': {
'total_tasks': len(test_tasks),
'completed_tasks': sum(1 for task in test_tasks if task.complete()),
'failed_tasks': len(test_tasks) - sum(1 for task in test_tasks if task.complete())
},
'status_interpretation': self.interpret_status(result.status)
}
# Save analysis
with self.output().open('w') as f:
json.dump(analysis, f, indent=2, default=str)
def interpret_status(self, status: LuigiStatusCode) -> str:
"""Interpret status code into human-readable description."""
interpretations = {
LuigiStatusCode.SUCCESS: "All tasks completed successfully",
LuigiStatusCode.SUCCESS_WITH_RETRY: "Tasks completed after retries",
LuigiStatusCode.FAILED: "Some tasks failed",
LuigiStatusCode.SCHEDULING_FAILED: "Task scheduling failed",
LuigiStatusCode.NOT_RUN: "Tasks were not executed",
LuigiStatusCode.MISSING_EXT: "Missing external dependencies"
}
return interpretations.get(status, f"Unknown status: {status}")
class SimpleTask(luigi.Task):
name = luigi.Parameter()
def output(self):
return luigi.LocalTarget(f"simple_{self.name}.txt")
def run(self):
with self.output().open('w') as f:
f.write(f"Simple task: {self.name}")import luigi
from luigi.event import Event
import requests
import json
from datetime import datetime
class MetricsCollector:
"""Collect and send metrics to external monitoring system."""
def __init__(self, metrics_endpoint: str):
self.endpoint = metrics_endpoint
self.metrics = []
def record_metric(self, metric_name: str, value: float, tags: dict = None):
"""Record a metric for later sending."""
metric = {
'name': metric_name,
'value': value,
'timestamp': datetime.utcnow().isoformat(),
'tags': tags or {}
}
self.metrics.append(metric)
def send_metrics(self):
"""Send collected metrics to monitoring system."""
if not self.metrics:
return
try:
response = requests.post(
self.endpoint,
json={'metrics': self.metrics},
timeout=10
)
response.raise_for_status()
print(f"Sent {len(self.metrics)} metrics")
self.metrics.clear()
except Exception as e:
print(f"Failed to send metrics: {e}")
# Global metrics collector
metrics = MetricsCollector("http://monitoring.example.com/api/metrics")
# Event handlers for metrics collection
@Event.event_handler
def collect_task_metrics(event):
"""Collect metrics from task events."""
if hasattr(event, 'task'):
task_id = event.task.task_id
task_family = event.task.task_family
if isinstance(event, luigi.event.TaskSuccessEvent):
metrics.record_metric(
'luigi.task.success',
1,
{'task_family': task_family, 'task_id': task_id}
)
elif isinstance(event, luigi.event.TaskFailureEvent):
metrics.record_metric(
'luigi.task.failure',
1,
{'task_family': task_family, 'task_id': task_id}
)
class MonitoredWorkflow(luigi.WrapperTask):
"""Workflow with external monitoring integration."""
def requires(self):
return [
ProcessDataTask(dataset="A"),
ProcessDataTask(dataset="B"),
ProcessDataTask(dataset="C")
]
def run(self):
# Send collected metrics after workflow completion
metrics.send_metrics()
class ProcessDataTask(luigi.Task):
dataset = luigi.Parameter()
def output(self):
return luigi.LocalTarget(f"processed_{self.dataset}.txt")
def run(self):
# Record custom business metrics
metrics.record_metric(
'luigi.dataset.processed',
1,
{'dataset': self.dataset}
)
# Simulate processing
import time
start_time = time.time()
with self.output().open('w') as f:
f.write(f"Processed dataset {self.dataset}")
# Record processing time
processing_time = time.time() - start_time
metrics.record_metric(
'luigi.processing.duration',
processing_time,
{'dataset': self.dataset}
)import luigi
from luigi.event import Event
from luigi.execution_summary import LuigiStatusCode
import smtplib
from email.mime.text import MIMEText
import logging
class HealthMonitor:
"""Monitor Luigi workflow health and send alerts."""
def __init__(self, alert_email: str, smtp_config: dict):
self.alert_email = alert_email
self.smtp_config = smtp_config
self.failure_count = 0
self.max_failures = 3
def check_health(self, result: luigi.LuigiRunResult):
"""Check workflow health and send alerts if needed."""
if result.status == LuigiStatusCode.SUCCESS:
self.failure_count = 0
self.send_success_notification()
elif result.status in [LuigiStatusCode.FAILED,
LuigiStatusCode.SCHEDULING_FAILED]:
self.failure_count += 1
if self.failure_count >= self.max_failures:
self.send_alert(f"Workflow has failed {self.failure_count} times")
else:
self.send_warning(f"Workflow failed (attempt {self.failure_count})")
def send_alert(self, message: str):
"""Send critical alert email."""
self._send_email(
subject="🚨 CRITICAL: Luigi Workflow Alert",
body=f"CRITICAL ALERT: {message}\n\nImmediate attention required."
)
def send_warning(self, message: str):
"""Send warning email."""
self._send_email(
subject="⚠️ WARNING: Luigi Workflow Warning",
body=f"WARNING: {message}\n\nMonitoring situation."
)
def send_success_notification(self):
"""Send success notification if recovering from failures."""
if self.failure_count > 0:
self._send_email(
subject="✅ SUCCESS: Luigi Workflow Recovered",
body="Workflow has recovered and is running successfully."
)
def _send_email(self, subject: str, body: str):
"""Send email notification."""
try:
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.smtp_config['from']
msg['To'] = self.alert_email
server = smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port'])
server.starttls()
server.login(self.smtp_config['user'], self.smtp_config['password'])
server.send_message(msg)
server.quit()
logging.info(f"Alert sent: {subject}")
except Exception as e:
logging.error(f"Failed to send alert: {e}")
# Configure health monitor
health_monitor = HealthMonitor(
alert_email="admin@example.com",
smtp_config={
'host': 'smtp.example.com',
'port': 587,
'user': 'luigi@example.com',
'password': 'password',
'from': 'luigi@example.com'
}
)
class MonitoredPipeline(luigi.WrapperTask):
"""Pipeline with health monitoring and alerting."""
def requires(self):
return [
CriticalTask(task_id=f"task_{i}") for i in range(3)
]
class CriticalTask(luigi.Task):
task_id = luigi.Parameter()
def output(self):
return luigi.LocalTarget(f"critical_{self.task_id}.txt")
def run(self):
# Simulate occasional failures for testing
import random
if random.random() < 0.2: # 20% failure rate
raise Exception(f"Simulated failure in {self.task_id}")
with self.output().open('w') as f:
f.write(f"Critical task {self.task_id} completed")
# Execute with health monitoring
if __name__ == '__main__':
result = luigi.build([MonitoredPipeline()], local_scheduler=True)
health_monitor.check_health(result)Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10