Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
npx @tessl/cli install tessl/pypi-luigi@2.8.0Luigi is a Python workflow management framework for building complex pipelines of batch jobs. It handles dependency resolution, failure recovery, command line integration, and provides a web interface for workflow visualization and monitoring. Luigi helps coordinate data processing workflows where multiple tasks depend on each other and need to be executed in the correct order.
pip install luigiimport luigiCommon imports for task and parameter classes:
from luigi import Task, Target, LocalTarget
from luigi import Parameter, DateParameter, IntParameter
from luigi import build, runimport luigi
from luigi import Task, LocalTarget, Parameter
class HelloWorldTask(Task):
"""A simple task that creates a greeting file."""
name = Parameter(default="World")
def output(self):
"""Define the output target for this task."""
return LocalTarget(f"hello_{self.name}.txt")
def run(self):
"""Execute the task logic."""
with self.output().open('w') as f:
f.write(f"Hello, {self.name}!")
class ProcessGreetingTask(Task):
"""A task that processes the greeting file."""
name = Parameter(default="World")
def requires(self):
"""Define task dependencies."""
return HelloWorldTask(name=self.name)
def output(self):
return LocalTarget(f"processed_{self.name}.txt")
def run(self):
# Read input from dependency
with self.input().open('r') as f:
greeting = f.read()
# Process and write output
with self.output().open('w') as f:
f.write(greeting.upper())
# Run the workflow
if __name__ == '__main__':
luigi.build([ProcessGreetingTask(name="Luigi")], local_scheduler=True)Luigi's architecture centers around three core concepts:
This design enables building complex data pipelines with automatic dependency resolution, failure recovery, and workflow visualization through Luigi's web interface.
Base classes for defining workflow tasks including regular tasks, external dependencies, wrapper tasks, and configuration tasks. These form the foundation of all Luigi workflows.
class Task:
def run(self): ...
def output(self): ...
def requires(self): ...
def complete(self): ...
class ExternalTask(Task): ...
class WrapperTask(Task): ...
class Config(Task): ...File and data target classes for representing task inputs and outputs with existence checking, atomic writes, and filesystem operations.
class Target:
def exists(self) -> bool: ...
def open(self, mode='r'): ...
class LocalTarget(Target):
def __init__(self, path: str): ...
def exists(self) -> bool: ...
def open(self, mode='r'): ...
def remove(self): ...Type-safe parameter system for task configuration including primitive types, date/time parameters, collections, and custom parameter types.
class Parameter:
def __init__(self, default=None): ...
class DateParameter(Parameter): ...
class IntParameter(Parameter): ...
class BoolParameter(Parameter): ...
class ListParameter(Parameter): ...
class DictParameter(Parameter): ...Main entry points for running Luigi workflows with dependency resolution, scheduling, and execution management.
def run(cmdline_args=None, main_task_cls=None,
worker_scheduler_factory=None, use_dynamic_argparse=None,
local_scheduler=False, detailed_summary=False) -> LuigiRunResult: ...
def build(tasks, worker_scheduler_factory=None,
detailed_summary=False, **env_params) -> LuigiRunResult: ...Configuration management for Luigi settings, task parameters, and scheduler options with support for multiple configuration file formats.
def get_config() -> LuigiConfigParser: ...
def add_config_path(path: str): ...
class LuigiConfigParser: ...
class LuigiTomlParser: ...Comprehensive contrib modules for integrating with databases, cloud storage, big data platforms, job schedulers, and monitoring systems.
# Database integration examples
from luigi.contrib.postgres import PostgresTarget, CopyToTable
from luigi.contrib.mysql import MySqlTarget
from luigi.contrib.mongodb import MongoTarget
# Cloud storage examples
from luigi.contrib.s3 import S3Target, S3Client
from luigi.contrib.gcs import GCSTarget
from luigi.contrib.azureblob import AzureBlobTarget
# Big data platform examples
from luigi.contrib.hdfs import HdfsTarget
from luigi.contrib.spark import SparkSubmitTask
from luigi.contrib.bigquery import BigQueryTargetRemote scheduler client for distributed task execution and coordination across multiple worker processes and machines.
class RemoteScheduler:
def add_task(self, task_id: str, status: str, runnable: bool): ...
def get_work(self, worker: str, host: str): ...
def ping(self, worker: str): ...
class RPCError(Exception): ...Event system for monitoring task execution, workflow progress, and integrating with external monitoring systems.
class Event: ...
# Status codes for execution results
class LuigiStatusCode:
SUCCESS: int
SUCCESS_WITH_RETRY: int
FAILED: int
FAILED_AND_SCHEDULING_FAILED: intCommand-line utilities for workflow management, dependency analysis, and task introspection.
# Main CLI commands
luigi --module mymodule MyTask --param value
luigid --background --port 8082
# Tool utilities
luigi.tools.deps MyTask
luigi.tools.deps_tree MyTask
luigi.tools.luigi_grep patternfrom typing import Any, Dict, List, Optional, Union
from datetime import datetime, date, timedelta
# Common parameter value types
ParameterValue = Union[str, int, float, bool, date, datetime, timedelta, List[Any], Dict[str, Any]]
# Task execution result
class LuigiRunResult:
status: LuigiStatusCode
worker: Any
scheduling_succeeded: bool