Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's task system provides base classes for defining workflow units with dependency management, output specification, and execution logic. Tasks form the core building blocks of Luigi workflows.
The fundamental Task class that all Luigi tasks inherit from. Defines the interface for dependency resolution, output specification, and execution logic.
class Task:
"""Base class for all Luigi tasks."""
def run(self):
"""
Execute the task logic. Must be implemented by subclasses.
"""
def output(self):
"""
Specify the output Target(s) for this task.
Returns:
Target or list of Targets
"""
def requires(self):
"""
Specify task dependencies.
Returns:
Task, list of Tasks, or dict of Tasks
"""
def complete(self) -> bool:
"""
Check if the task is complete by verifying all outputs exist.
Returns:
bool: True if task is complete, False otherwise
"""
def clone(self, **kwargs):
"""
Create a copy of this task with modified parameters.
Args:
**kwargs: Parameter overrides
Returns:
Task: New task instance with updated parameters
"""
@property
def task_id(self) -> str:
"""Unique identifier for this task instance."""
@property
def task_family(self) -> str:
"""Task family name (class name)."""
@property
def param_kwargs(self) -> dict:
"""Dictionary of parameter names and values."""Represents tasks that exist outside the Luigi workflow, such as data files created by external systems or manual processes.
class ExternalTask(Task):
"""
Task representing external dependencies.
External tasks have no run() method since they are not executed
by Luigi. They only specify outputs that should exist.
"""
def run(self):
"""External tasks cannot be run - raises NotImplementedError."""Task that groups multiple dependencies without producing its own output. Useful for creating workflow entry points and organizing related tasks.
class WrapperTask(Task):
"""
Task that wraps other tasks without producing output.
Wrapper tasks only specify dependencies through requires()
and have no output() or run() methods.
"""
def output(self):
"""Wrapper tasks have no output."""
return []Base class for tasks that only hold configuration parameters without executing logic. Used for sharing configuration across multiple tasks.
class Config(Task):
"""
Task that only holds configuration parameters.
Config tasks are automatically marked as complete and are used
to share parameter values across multiple tasks.
"""Functions for organizing tasks into namespaces to avoid naming conflicts and improve task organization.
def namespace(namespace: str = None, scope: str = ''):
"""
Set namespace for tasks declared after this call.
Args:
namespace: Namespace string to prepend to task names
scope: Module scope to limit namespace application
"""
def auto_namespace(scope: str = ''):
"""
Set namespace to the module name of each task class.
Args:
scope: Module scope to limit namespace application
"""Utility functions for working with tasks, task IDs, and task dependency structures.
def task_id_str(task_family: str, params: dict) -> str:
"""
Generate task ID string from family and parameters.
Args:
task_family: Task class name
params: Parameter dictionary
Returns:
str: Formatted task ID
"""
def externalize(task_obj) -> ExternalTask:
"""
Convert a regular task to an external task.
Args:
task_obj: Task to externalize
Returns:
ExternalTask: External version of the task
"""
def getpaths(struct):
"""
Extract file paths from task output structure.
Args:
struct: Task output structure (Target, list, or dict)
Returns:
Generator of file paths
"""
def flatten(struct):
"""
Flatten nested task dependency structure.
Args:
struct: Nested structure of tasks, lists, and dicts
Returns:
Generator of individual tasks
"""
def flatten_output(task_output):
"""
Flatten task output structure to individual targets.
Args:
task_output: Task output (Target, list, or dict)
Returns:
Generator of individual targets
"""Mixin class for implementing bulk completion checking to optimize performance when dealing with many tasks.
class MixinNaiveBulkComplete:
"""
Mixin that provides naive bulk completion checking.
Override bulk_complete() method for custom bulk operations.
"""
def bulk_complete(self, parameter_tuples):
"""
Check completion status for multiple parameter combinations.
Args:
parameter_tuples: List of parameter dictionaries
Returns:
Generator of completion status booleans
"""
class BulkCompleteNotImplementedError(NotImplementedError):
"""Exception raised when bulk completion is not implemented."""import luigi
from luigi import Task, LocalTarget, Parameter
class DataIngestionTask(Task):
"""Task that ingests raw data."""
date = luigi.DateParameter()
def output(self):
return LocalTarget(f"data/raw/{self.date}.csv")
def run(self):
# Simulate data ingestion
with self.output().open('w') as f:
f.write("id,value\n1,100\n2,200\n")
class DataProcessingTask(Task):
"""Task that processes ingested data."""
date = luigi.DateParameter()
def requires(self):
return DataIngestionTask(date=self.date)
def output(self):
return LocalTarget(f"data/processed/{self.date}.csv")
def run(self):
# Read input data
with self.input().open('r') as f:
data = f.read()
# Process and write output
processed = data.replace('100', '1000').replace('200', '2000')
with self.output().open('w') as f:
f.write(processed)class DailyReportTask(luigi.WrapperTask):
"""Wrapper task that generates multiple daily reports."""
date = luigi.DateParameter()
def requires(self):
return [
DataProcessingTask(date=self.date),
MetricsCalculationTask(date=self.date),
QualityCheckTask(date=self.date)
]class InputFileTask(luigi.ExternalTask):
"""External task representing an input file."""
filename = luigi.Parameter()
def output(self):
return LocalTarget(f"input/{self.filename}")Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10