Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's execution system provides the main entry points for running workflows with dependency resolution, scheduling, and result reporting. The execution system coordinates task running and handles failures.
Primary functions for executing Luigi workflows with different levels of control and configuration options.
def run(cmdline_args=None, main_task_cls=None, worker_scheduler_factory=None,
use_dynamic_argparse=None, local_scheduler: bool = False, detailed_summary: bool = False) -> LuigiRunResult:
"""
Main entry point for running Luigi tasks from command line or programmatically.
Args:
cmdline_args: Command line arguments (list of strings)
main_task_cls: Main task class to run
worker_scheduler_factory: Custom worker/scheduler factory
use_dynamic_argparse: Deprecated parameter, ignored
local_scheduler: Use local scheduler instead of remote
detailed_summary: Include detailed execution summary
Returns:
LuigiRunResult: Execution result with status and details
"""
def build(tasks, worker_scheduler_factory=None, detailed_summary: bool = False,
**env_params) -> LuigiRunResult:
"""
Build and execute a list of tasks programmatically.
Args:
tasks: Task instance or list of task instances to execute
worker_scheduler_factory: Custom worker/scheduler factory
detailed_summary: Include detailed execution summary
**env_params: Additional environment parameters (e.g., local_scheduler=True)
Returns:
LuigiRunResult: Execution result with status and worker details
"""Classes representing the results and status of Luigi workflow execution.
class LuigiRunResult:
"""Container for Luigi execution results."""
status: LuigiStatusCode
"""Overall execution status code."""
worker: Any
"""Worker instance that executed the tasks."""
scheduling_succeeded: bool
"""Whether task scheduling succeeded."""
class LuigiStatusCode:
"""Enumeration of possible execution status codes."""
SUCCESS = 0
"""All tasks completed successfully."""
SUCCESS_WITH_RETRY = 1
"""Tasks completed successfully after retries."""
FAILED = 2
"""Task execution failed."""
FAILED_AND_SCHEDULING_FAILED = 3
"""Both task execution and scheduling failed."""
SCHEDULING_FAILED = 4
"""Task scheduling failed."""
NOT_RUN = 5
"""Tasks were not run."""
MISSING_EXT = 6
"""Missing external dependencies."""Configuration class for controlling Luigi interface behavior and execution parameters.
class core:
"""Core interface configuration options."""
default_scheduler_host: str
"""Default scheduler host address."""
default_scheduler_port: int
"""Default scheduler port number."""
no_configure_logging: bool
"""Disable automatic logging configuration."""
log_level: str
"""Default logging level."""
module: str
"""Default module for task discovery."""
parallel_scheduling: bool
"""Enable parallel task scheduling."""
assistant: bool
"""Enable Luigi assistant mode."""Exception classes for execution-related errors and failures.
class PidLockAlreadyTakenExit(SystemExit):
"""
Exception raised when PID lock is already taken.
Indicates another Luigi process is already running with the same configuration.
"""import luigi
from luigi import Task, LocalTarget, Parameter
class SimpleTask(Task):
name = Parameter(default="example")
def output(self):
return LocalTarget(f"output_{self.name}.txt")
def run(self):
with self.output().open('w') as f:
f.write(f"Hello from {self.name}")
# Execute single task programmatically
if __name__ == '__main__':
result = luigi.build([SimpleTask(name="test")], local_scheduler=True)
print(f"Execution status: {result.status}")
print(f"Scheduling succeeded: {result.scheduling_succeeded}")import luigi
from luigi import Task, DateParameter
from datetime import date
class DailyTask(Task):
date = DateParameter(default=date.today())
def output(self):
return luigi.LocalTarget(f"daily_{self.date}.txt")
def run(self):
with self.output().open('w') as f:
f.write(f"Daily processing for {self.date}")
# Run from command line using luigi.run()
if __name__ == '__main__':
# This allows command line execution:
# python script.py DailyTask --date 2023-01-15
result = luigi.run()
if result.status == luigi.LuigiStatusCode.SUCCESS:
print("All tasks completed successfully!")
else:
print(f"Execution failed with status: {result.status}")import luigi
from luigi import Task, DateParameter
from datetime import date, timedelta
class ProcessingTask(Task):
date = DateParameter()
def output(self):
return luigi.LocalTarget(f"processed_{self.date}.txt")
def run(self):
with self.output().open('w') as f:
f.write(f"Processed {self.date}")
# Execute multiple tasks for different dates
if __name__ == '__main__':
# Create tasks for the last 7 days
base_date = date.today()
tasks = []
for i in range(7):
task_date = base_date - timedelta(days=i)
tasks.append(ProcessingTask(date=task_date))
# Execute all tasks
result = luigi.build(
tasks,
local_scheduler=True,
detailed_summary=True,
log_level='INFO'
)
print(f"Batch execution status: {result.status}")
# Check individual task completion
for task in tasks:
if task.complete():
print(f"✓ {task.date} completed")
else:
print(f"✗ {task.date} failed")import luigi
from luigi import Task, build
import logging
class ConfiguredTask(Task):
def output(self):
return luigi.LocalTarget("configured_output.txt")
def run(self):
# This will use the configured logging level
logger = logging.getLogger('luigi-interface')
logger.info("Running configured task")
with self.output().open('w') as f:
f.write("Task executed with custom configuration")
# Execute with custom configuration
if __name__ == '__main__':
# Configure logging before execution
logging.basicConfig(level=logging.INFO)
result = luigi.build(
[ConfiguredTask()],
local_scheduler=True,
detailed_summary=True,
log_level='INFO',
no_configure_logging=True # Use our custom logging config
)
if result.status == luigi.LuigiStatusCode.SUCCESS:
print("Task executed successfully with custom configuration")
# Access worker details
if result.worker:
print(f"Worker processed tasks: {len(result.worker._scheduled_tasks)}")import luigi
from luigi import Task, LuigiStatusCode
class UnreliableTask(Task):
should_fail = luigi.BoolParameter(default=False)
def output(self):
return luigi.LocalTarget("unreliable_output.txt")
def run(self):
if self.should_fail:
raise Exception("Task configured to fail")
with self.output().open('w') as f:
f.write("Success!")
# Handle different execution outcomes
if __name__ == '__main__':
# Try successful execution
result = luigi.build([UnreliableTask(should_fail=False)], local_scheduler=True)
if result.status == LuigiStatusCode.SUCCESS:
print("✓ Task completed successfully")
elif result.status == LuigiStatusCode.SUCCESS_WITH_RETRY:
print("✓ Task completed after retries")
elif result.status == LuigiStatusCode.FAILED:
print("✗ Task execution failed")
elif result.status == LuigiStatusCode.SCHEDULING_FAILED:
print("✗ Task scheduling failed")
elif result.status == LuigiStatusCode.MISSING_EXT:
print("✗ Missing external dependencies")
else:
print(f"✗ Unknown status: {result.status}")
# Try failing execution
result = luigi.build([UnreliableTask(should_fail=True)], local_scheduler=True)
print(f"Failed execution status: {result.status}")
print(f"Scheduling succeeded: {result.scheduling_succeeded}")import luigi
from luigi import Task
class RemoteTask(Task):
def output(self):
return luigi.LocalTarget("remote_output.txt")
def run(self):
with self.output().open('w') as f:
f.write("Executed via remote scheduler")
# Execute using remote scheduler
if __name__ == '__main__':
# This will connect to remote scheduler at default host:port
result = luigi.build(
[RemoteTask()],
local_scheduler=False # Use remote scheduler
)
if result.status == LuigiStatusCode.SUCCESS:
print("Task executed via remote scheduler")
else:
print("Remote execution failed - check scheduler is running")
# Or use luigi.run() for command line interface with remote scheduler
# python script.py RemoteTask --scheduler-host localhost --scheduler-port 8082Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10