CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

events.mddocs/

Events & Monitoring

Luigi's event system provides hooks for monitoring task execution, workflow progress, and integration with external monitoring systems. Events enable tracking, logging, and alerting capabilities.

Capabilities

Base Event Class

Foundation class for Luigi's event system that enables monitoring and tracking of task execution lifecycle.

class Event:
    """Base class for Luigi events."""
    
    def __init__(self):
        """Initialize event instance."""
    
    @staticmethod
    def trigger_event(event, task_obj=None, flushing=False):
        """
        Trigger an event with optional task context.
        
        Args:
            event: Event instance to trigger
            task_obj: Task object associated with event
            flushing: Whether this is a flush event
        """

Task Lifecycle Events

Events that are triggered during different phases of task execution.

# Task execution events
class TaskEvent(Event):
    """Base class for task-related events."""
    
    def __init__(self, task):
        """
        Initialize task event.
        
        Args:
            task: Task instance associated with event
        """

class TaskStartEvent(TaskEvent):
    """Event triggered when task execution starts."""

class TaskSuccessEvent(TaskEvent):
    """Event triggered when task completes successfully."""

class TaskFailureEvent(TaskEvent):
    """Event triggered when task execution fails."""
    
    def __init__(self, task, exception):
        """
        Initialize task failure event.
        
        Args:
            task: Failed task instance
            exception: Exception that caused failure
        """

class TaskProcessEvent(TaskEvent):
    """Event triggered during task processing."""

# Dependency events
class DependencyDiscovered(TaskEvent):
    """Event triggered when task dependency is discovered."""
    
    def __init__(self, task, dependency, upstream):
        """
        Initialize dependency discovered event.
        
        Args:
            task: Task with dependency
            dependency: Discovered dependency task
            upstream: Whether dependency is upstream
        """

class DependencyMissing(TaskEvent):
    """Event triggered when required dependency is missing."""
    
    def __init__(self, task, missing_dependency):
        """
        Initialize dependency missing event.
        
        Args:
            task: Task with missing dependency
            missing_dependency: Missing dependency task
        """

Execution Status Codes

Enumeration of possible execution outcomes and status codes for Luigi workflows.

class LuigiStatusCode:
    """Status codes for Luigi execution results."""
    
    SUCCESS = 0
    """All tasks completed successfully without any failures."""
    
    SUCCESS_WITH_RETRY = 1
    """Tasks completed successfully but some required retries."""
    
    FAILED = 2
    """One or more tasks failed during execution."""
    
    FAILED_AND_SCHEDULING_FAILED = 3
    """Both task execution and scheduling encountered failures."""
    
    SCHEDULING_FAILED = 4
    """Task scheduling failed before execution could begin."""
    
    NOT_RUN = 5
    """Tasks were not executed (e.g., already complete)."""
    
    MISSING_EXT = 6
    """Missing external dependencies prevented execution."""

    @classmethod
    def has_value(cls, value: int) -> bool:
        """
        Check if value is a valid status code.
        
        Args:
            value: Status code value to check
            
        Returns:
            bool: True if valid status code
        """

Execution Summary

Classes for generating and managing execution summaries with detailed statistics and results.

class execution_summary:
    """Configuration for execution summary generation."""
    
    summary_length: int = 100
    """Maximum length of execution summary."""
    
    no_configure_logging: bool = False
    """Whether to disable logging configuration."""

def summary(tasks=None, worker_obj=None) -> dict:
    """
    Generate execution summary for completed tasks.
    
    Args:
        tasks: List of tasks to summarize
        worker_obj: Worker object with execution details
        
    Returns:
        dict: Execution summary with statistics and details
    """

class LuigiRunResult:
    """Container for Luigi execution results and status."""
    
    def __init__(self, status: LuigiStatusCode, worker=None, 
                 scheduling_succeeded: bool = True):
        """
        Initialize run result.
        
        Args:
            status: Execution status code
            worker: Worker instance that executed tasks
            scheduling_succeeded: Whether scheduling was successful
        """
    
    @property
    def status(self) -> LuigiStatusCode:
        """Get execution status code."""
    
    @property
    def worker(self):
        """Get worker instance."""
    
    @property
    def scheduling_succeeded(self) -> bool:
        """Check if scheduling succeeded."""

Usage Examples

Basic Event Handling

import luigi
from luigi.event import Event

class CustomEvent(Event):
    """Custom event for application-specific monitoring."""
    
    def __init__(self, message: str, data: dict = None):
        super().__init__()
        self.message = message
        self.data = data or {}
        self.timestamp = time.time()

# Event handler functions
@CustomEvent.event_handler
def handle_custom_event(event):
    """Handle custom events."""
    print(f"Custom event: {event.message}")
    if event.data:
        print(f"Event data: {event.data}")

class MonitoredTask(luigi.Task):
    """Task that emits custom events."""
    
    def run(self):
        # Emit start event
        Event.trigger_event(CustomEvent("Task started", {"task_id": self.task_id}))
        
        try:
            # Task logic
            with self.output().open('w') as f:
                f.write("Task completed")
            
            # Emit success event
            Event.trigger_event(CustomEvent("Task completed successfully"))
            
        except Exception as e:
            # Emit failure event
            Event.trigger_event(CustomEvent("Task failed", {"error": str(e)}))
            raise
    
    def output(self):
        return luigi.LocalTarget("monitored_output.txt")

Task Lifecycle Monitoring

import luigi
from luigi.event import Event
import logging
import time

# Set up logging for events
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('luigi.events')

# Event handlers for task lifecycle
@luigi.event.Event.trigger_event
def log_task_start(event):
    """Log when tasks start execution."""
    if hasattr(event, 'task'):
        logger.info(f"STARTED: {event.task.task_id}")

@luigi.event.Event.trigger_event  
def log_task_success(event):
    """Log when tasks complete successfully."""
    if hasattr(event, 'task'):
        logger.info(f"SUCCESS: {event.task.task_id}")

@luigi.event.Event.trigger_event
def log_task_failure(event):
    """Log when tasks fail."""
    if hasattr(event, 'task') and hasattr(event, 'exception'):
        logger.error(f"FAILED: {event.task.task_id} - {event.exception}")

class LifecycleTask(luigi.Task):
    """Task with comprehensive lifecycle monitoring."""
    
    task_name = luigi.Parameter()
    should_fail = luigi.BoolParameter(default=False)
    
    def run(self):
        logger.info(f"Executing {self.task_name}")
        
        # Simulate work
        time.sleep(1)
        
        if self.should_fail:
            raise Exception(f"Task {self.task_name} configured to fail")
        
        with self.output().open('w') as f:
            f.write(f"Completed: {self.task_name}")
    
    def output(self):
        return luigi.LocalTarget(f"output_{self.task_name}.txt")

# Run tasks with lifecycle monitoring
if __name__ == '__main__':
    tasks = [
        LifecycleTask(task_name="task1"),
        LifecycleTask(task_name="task2"),
        LifecycleTask(task_name="task3", should_fail=True)
    ]
    
    result = luigi.build(tasks, local_scheduler=True)
    print(f"Final status: {result.status}")

Execution Summary Analysis

import luigi
from luigi.execution_summary import summary, LuigiStatusCode
from luigi.event import Event
import json

class SummaryAnalysisTask(luigi.Task):
    """Task that analyzes execution summaries."""
    
    def output(self):
        return luigi.LocalTarget("execution_analysis.json")
    
    def run(self):
        # Create some test tasks
        test_tasks = [
            SimpleTask(name=f"task_{i}") for i in range(5)
        ]
        
        # Execute tasks and collect results
        result = luigi.build(test_tasks, local_scheduler=True)
        
        # Generate execution summary
        exec_summary = summary(tasks=test_tasks, worker_obj=result.worker)
        
        # Analyze results
        analysis = {
            'execution_status': result.status,
            'scheduling_succeeded': result.scheduling_succeeded,
            'summary': exec_summary,
            'task_analysis': {
                'total_tasks': len(test_tasks),
                'completed_tasks': sum(1 for task in test_tasks if task.complete()),
                'failed_tasks': len(test_tasks) - sum(1 for task in test_tasks if task.complete())
            },
            'status_interpretation': self.interpret_status(result.status)
        }
        
        # Save analysis
        with self.output().open('w') as f:
            json.dump(analysis, f, indent=2, default=str)
    
    def interpret_status(self, status: LuigiStatusCode) -> str:
        """Interpret status code into human-readable description."""
        interpretations = {
            LuigiStatusCode.SUCCESS: "All tasks completed successfully",
            LuigiStatusCode.SUCCESS_WITH_RETRY: "Tasks completed after retries",
            LuigiStatusCode.FAILED: "Some tasks failed",
            LuigiStatusCode.SCHEDULING_FAILED: "Task scheduling failed",
            LuigiStatusCode.NOT_RUN: "Tasks were not executed",
            LuigiStatusCode.MISSING_EXT: "Missing external dependencies"
        }
        return interpretations.get(status, f"Unknown status: {status}")

class SimpleTask(luigi.Task):
    name = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget(f"simple_{self.name}.txt")
    
    def run(self):
        with self.output().open('w') as f:
            f.write(f"Simple task: {self.name}")

External Monitoring Integration

import luigi
from luigi.event import Event
import requests
import json
from datetime import datetime

class MetricsCollector:
    """Collect and send metrics to external monitoring system."""
    
    def __init__(self, metrics_endpoint: str):
        self.endpoint = metrics_endpoint
        self.metrics = []
    
    def record_metric(self, metric_name: str, value: float, tags: dict = None):
        """Record a metric for later sending."""
        metric = {
            'name': metric_name,
            'value': value,
            'timestamp': datetime.utcnow().isoformat(),
            'tags': tags or {}
        }
        self.metrics.append(metric)
    
    def send_metrics(self):
        """Send collected metrics to monitoring system."""
        if not self.metrics:
            return
        
        try:
            response = requests.post(
                self.endpoint,
                json={'metrics': self.metrics},
                timeout=10
            )
            response.raise_for_status()
            print(f"Sent {len(self.metrics)} metrics")
            self.metrics.clear()
            
        except Exception as e:
            print(f"Failed to send metrics: {e}")

# Global metrics collector
metrics = MetricsCollector("http://monitoring.example.com/api/metrics")

# Event handlers for metrics collection
@Event.event_handler
def collect_task_metrics(event):
    """Collect metrics from task events."""
    
    if hasattr(event, 'task'):
        task_id = event.task.task_id
        task_family = event.task.task_family
        
        if isinstance(event, luigi.event.TaskSuccessEvent):
            metrics.record_metric(
                'luigi.task.success',
                1,
                {'task_family': task_family, 'task_id': task_id}
            )
            
        elif isinstance(event, luigi.event.TaskFailureEvent):
            metrics.record_metric(
                'luigi.task.failure', 
                1,
                {'task_family': task_family, 'task_id': task_id}
            )

class MonitoredWorkflow(luigi.WrapperTask):
    """Workflow with external monitoring integration."""
    
    def requires(self):
        return [
            ProcessDataTask(dataset="A"),
            ProcessDataTask(dataset="B"), 
            ProcessDataTask(dataset="C")
        ]
    
    def run(self):
        # Send collected metrics after workflow completion
        metrics.send_metrics()

class ProcessDataTask(luigi.Task):
    dataset = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget(f"processed_{self.dataset}.txt")
    
    def run(self):
        # Record custom business metrics
        metrics.record_metric(
            'luigi.dataset.processed',
            1,
            {'dataset': self.dataset}
        )
        
        # Simulate processing
        import time
        start_time = time.time()
        
        with self.output().open('w') as f:
            f.write(f"Processed dataset {self.dataset}")
        
        # Record processing time
        processing_time = time.time() - start_time
        metrics.record_metric(
            'luigi.processing.duration',
            processing_time,
            {'dataset': self.dataset}
        )

Health Check and Alerting

import luigi
from luigi.event import Event
from luigi.execution_summary import LuigiStatusCode
import smtplib
from email.mime.text import MIMEText
import logging

class HealthMonitor:
    """Monitor Luigi workflow health and send alerts."""
    
    def __init__(self, alert_email: str, smtp_config: dict):
        self.alert_email = alert_email
        self.smtp_config = smtp_config
        self.failure_count = 0
        self.max_failures = 3
        
    def check_health(self, result: luigi.LuigiRunResult):
        """Check workflow health and send alerts if needed."""
        
        if result.status == LuigiStatusCode.SUCCESS:
            self.failure_count = 0
            self.send_success_notification()
            
        elif result.status in [LuigiStatusCode.FAILED, 
                               LuigiStatusCode.SCHEDULING_FAILED]:
            self.failure_count += 1
            
            if self.failure_count >= self.max_failures:
                self.send_alert(f"Workflow has failed {self.failure_count} times")
            else:
                self.send_warning(f"Workflow failed (attempt {self.failure_count})")
    
    def send_alert(self, message: str):
        """Send critical alert email."""
        self._send_email(
            subject="🚨 CRITICAL: Luigi Workflow Alert",
            body=f"CRITICAL ALERT: {message}\n\nImmediate attention required."
        )
    
    def send_warning(self, message: str):
        """Send warning email."""
        self._send_email(
            subject="⚠️ WARNING: Luigi Workflow Warning", 
            body=f"WARNING: {message}\n\nMonitoring situation."
        )
    
    def send_success_notification(self):
        """Send success notification if recovering from failures."""
        if self.failure_count > 0:
            self._send_email(
                subject="✅ SUCCESS: Luigi Workflow Recovered",
                body="Workflow has recovered and is running successfully."
            )
    
    def _send_email(self, subject: str, body: str):
        """Send email notification."""
        try:
            msg = MIMEText(body)
            msg['Subject'] = subject
            msg['From'] = self.smtp_config['from']
            msg['To'] = self.alert_email
            
            server = smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port'])
            server.starttls()
            server.login(self.smtp_config['user'], self.smtp_config['password'])
            server.send_message(msg)
            server.quit()
            
            logging.info(f"Alert sent: {subject}")
            
        except Exception as e:
            logging.error(f"Failed to send alert: {e}")

# Configure health monitor
health_monitor = HealthMonitor(
    alert_email="admin@example.com",
    smtp_config={
        'host': 'smtp.example.com',
        'port': 587,
        'user': 'luigi@example.com',
        'password': 'password',
        'from': 'luigi@example.com'
    }
)

class MonitoredPipeline(luigi.WrapperTask):
    """Pipeline with health monitoring and alerting."""
    
    def requires(self):
        return [
            CriticalTask(task_id=f"task_{i}") for i in range(3)
        ]

class CriticalTask(luigi.Task):
    task_id = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget(f"critical_{self.task_id}.txt")
    
    def run(self):
        # Simulate occasional failures for testing
        import random
        if random.random() < 0.2:  # 20% failure rate
            raise Exception(f"Simulated failure in {self.task_id}")
        
        with self.output().open('w') as f:
            f.write(f"Critical task {self.task_id} completed")

# Execute with health monitoring
if __name__ == '__main__':
    result = luigi.build([MonitoredPipeline()], local_scheduler=True)
    health_monitor.check_health(result)

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json