CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

execution.mddocs/

Execution

Luigi's execution system provides the main entry points for running workflows with dependency resolution, scheduling, and result reporting. The execution system coordinates task running and handles failures.

Capabilities

Main Execution Functions

Primary functions for executing Luigi workflows with different levels of control and configuration options.

def run(cmdline_args=None, main_task_cls=None, worker_scheduler_factory=None,
        use_dynamic_argparse=None, local_scheduler: bool = False, detailed_summary: bool = False) -> LuigiRunResult:
    """
    Main entry point for running Luigi tasks from command line or programmatically.
    
    Args:
        cmdline_args: Command line arguments (list of strings)
        main_task_cls: Main task class to run
        worker_scheduler_factory: Custom worker/scheduler factory
        use_dynamic_argparse: Deprecated parameter, ignored
        local_scheduler: Use local scheduler instead of remote
        detailed_summary: Include detailed execution summary
        
    Returns:
        LuigiRunResult: Execution result with status and details
    """

def build(tasks, worker_scheduler_factory=None, detailed_summary: bool = False,
          **env_params) -> LuigiRunResult:
    """
    Build and execute a list of tasks programmatically.
    
    Args:
        tasks: Task instance or list of task instances to execute
        worker_scheduler_factory: Custom worker/scheduler factory
        detailed_summary: Include detailed execution summary
        **env_params: Additional environment parameters (e.g., local_scheduler=True)
        
    Returns:
        LuigiRunResult: Execution result with status and worker details
    """

Execution Result Types

Classes representing the results and status of Luigi workflow execution.

class LuigiRunResult:
    """Container for Luigi execution results."""
    
    status: LuigiStatusCode
    """Overall execution status code."""
    
    worker: Any
    """Worker instance that executed the tasks."""
    
    scheduling_succeeded: bool
    """Whether task scheduling succeeded."""

class LuigiStatusCode:
    """Enumeration of possible execution status codes."""
    
    SUCCESS = 0
    """All tasks completed successfully."""
    
    SUCCESS_WITH_RETRY = 1  
    """Tasks completed successfully after retries."""
    
    FAILED = 2
    """Task execution failed."""
    
    FAILED_AND_SCHEDULING_FAILED = 3
    """Both task execution and scheduling failed."""
    
    SCHEDULING_FAILED = 4
    """Task scheduling failed."""
    
    NOT_RUN = 5
    """Tasks were not run."""
    
    MISSING_EXT = 6
    """Missing external dependencies."""

Interface Configuration

Configuration class for controlling Luigi interface behavior and execution parameters.

class core:
    """Core interface configuration options."""
    
    default_scheduler_host: str
    """Default scheduler host address."""
    
    default_scheduler_port: int
    """Default scheduler port number."""
    
    no_configure_logging: bool
    """Disable automatic logging configuration."""
    
    log_level: str
    """Default logging level."""
    
    module: str
    """Default module for task discovery."""
    
    parallel_scheduling: bool
    """Enable parallel task scheduling."""
    
    assistant: bool
    """Enable Luigi assistant mode."""

Execution Exceptions

Exception classes for execution-related errors and failures.

class PidLockAlreadyTakenExit(SystemExit):
    """
    Exception raised when PID lock is already taken.
    
    Indicates another Luigi process is already running with the same configuration.
    """

Usage Examples

Basic Task Execution

import luigi
from luigi import Task, LocalTarget, Parameter

class SimpleTask(Task):
    name = Parameter(default="example")
    
    def output(self):
        return LocalTarget(f"output_{self.name}.txt")
    
    def run(self):
        with self.output().open('w') as f:
            f.write(f"Hello from {self.name}")

# Execute single task programmatically
if __name__ == '__main__':
    result = luigi.build([SimpleTask(name="test")], local_scheduler=True)
    print(f"Execution status: {result.status}")
    print(f"Scheduling succeeded: {result.scheduling_succeeded}")

Command Line Execution

import luigi
from luigi import Task, DateParameter
from datetime import date

class DailyTask(Task):
    date = DateParameter(default=date.today())
    
    def output(self):
        return luigi.LocalTarget(f"daily_{self.date}.txt")
    
    def run(self):
        with self.output().open('w') as f:
            f.write(f"Daily processing for {self.date}")

# Run from command line using luigi.run()
if __name__ == '__main__':
    # This allows command line execution:
    # python script.py DailyTask --date 2023-01-15
    result = luigi.run()
    
    if result.status == luigi.LuigiStatusCode.SUCCESS:
        print("All tasks completed successfully!")
    else:
        print(f"Execution failed with status: {result.status}")

Batch Task Execution

import luigi
from luigi import Task, DateParameter
from datetime import date, timedelta

class ProcessingTask(Task):
    date = DateParameter()
    
    def output(self):
        return luigi.LocalTarget(f"processed_{self.date}.txt")
    
    def run(self):
        with self.output().open('w') as f:
            f.write(f"Processed {self.date}")

# Execute multiple tasks for different dates
if __name__ == '__main__':
    # Create tasks for the last 7 days
    base_date = date.today()
    tasks = []
    
    for i in range(7):
        task_date = base_date - timedelta(days=i)
        tasks.append(ProcessingTask(date=task_date))
    
    # Execute all tasks
    result = luigi.build(
        tasks,
        local_scheduler=True,
        detailed_summary=True,
        log_level='INFO'
    )
    
    print(f"Batch execution status: {result.status}")
    
    # Check individual task completion
    for task in tasks:
        if task.complete():
            print(f"✓ {task.date} completed")
        else:
            print(f"✗ {task.date} failed")

Custom Execution Configuration

import luigi
from luigi import Task, build
import logging

class ConfiguredTask(Task):
    def output(self):
        return luigi.LocalTarget("configured_output.txt")
    
    def run(self):
        # This will use the configured logging level
        logger = logging.getLogger('luigi-interface')
        logger.info("Running configured task")
        
        with self.output().open('w') as f:
            f.write("Task executed with custom configuration")

# Execute with custom configuration
if __name__ == '__main__':
    # Configure logging before execution
    logging.basicConfig(level=logging.INFO)
    
    result = luigi.build(
        [ConfiguredTask()],
        local_scheduler=True,
        detailed_summary=True,
        log_level='INFO',
        no_configure_logging=True  # Use our custom logging config
    )
    
    if result.status == luigi.LuigiStatusCode.SUCCESS:
        print("Task executed successfully with custom configuration")
        
    # Access worker details
    if result.worker:
        print(f"Worker processed tasks: {len(result.worker._scheduled_tasks)}")

Error Handling and Status Checking

import luigi
from luigi import Task, LuigiStatusCode

class UnreliableTask(Task):
    should_fail = luigi.BoolParameter(default=False)
    
    def output(self):
        return luigi.LocalTarget("unreliable_output.txt")
    
    def run(self):
        if self.should_fail:
            raise Exception("Task configured to fail")
        
        with self.output().open('w') as f:
            f.write("Success!")

# Handle different execution outcomes
if __name__ == '__main__':
    # Try successful execution
    result = luigi.build([UnreliableTask(should_fail=False)], local_scheduler=True)
    
    if result.status == LuigiStatusCode.SUCCESS:
        print("✓ Task completed successfully")
    elif result.status == LuigiStatusCode.SUCCESS_WITH_RETRY:
        print("✓ Task completed after retries")  
    elif result.status == LuigiStatusCode.FAILED:
        print("✗ Task execution failed")
    elif result.status == LuigiStatusCode.SCHEDULING_FAILED:
        print("✗ Task scheduling failed")
    elif result.status == LuigiStatusCode.MISSING_EXT:
        print("✗ Missing external dependencies")
    else:
        print(f"✗ Unknown status: {result.status}")
    
    # Try failing execution
    result = luigi.build([UnreliableTask(should_fail=True)], local_scheduler=True)
    
    print(f"Failed execution status: {result.status}")
    print(f"Scheduling succeeded: {result.scheduling_succeeded}")

Remote Scheduler Execution

import luigi
from luigi import Task

class RemoteTask(Task):
    def output(self):
        return luigi.LocalTarget("remote_output.txt")
    
    def run(self):
        with self.output().open('w') as f:
            f.write("Executed via remote scheduler")

# Execute using remote scheduler
if __name__ == '__main__':
    # This will connect to remote scheduler at default host:port
    result = luigi.build(
        [RemoteTask()],
        local_scheduler=False  # Use remote scheduler
    )
    
    if result.status == LuigiStatusCode.SUCCESS:
        print("Task executed via remote scheduler")
    else:
        print("Remote execution failed - check scheduler is running")
        
    # Or use luigi.run() for command line interface with remote scheduler
    # python script.py RemoteTask --scheduler-host localhost --scheduler-port 8082

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json