Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's parameter system provides type-safe configuration for tasks with automatic parsing, validation, and command-line integration. Parameters make tasks reusable and configurable.
The fundamental parameter class that all Luigi parameters inherit from. Provides basic parameter functionality and configuration options.
class Parameter:
"""Base parameter class for string values."""
def __init__(self, default=_no_value, is_global: bool = False, significant: bool = True,
description: str = None, config_path: dict = None, positional: bool = True,
always_in_help: bool = False, batch_method=None,
visibility: ParameterVisibility = ParameterVisibility.PUBLIC):
"""
Initialize parameter.
Args:
default: Default value if not provided (_no_value means required)
is_global: Whether parameter is global across all tasks
significant: Whether parameter is part of task's unique identifier
description: Parameter description for help text
config_path: Configuration path for parameter lookup
positional: Whether parameter can be specified positionally
always_in_help: Whether to always show parameter in help
batch_method: Method for handling batch parameter values
visibility: Parameter visibility level
"""
def parse(self, x: str):
"""
Parse string value to parameter type.
Args:
x: String value to parse
Returns:
Parsed value of appropriate type
"""
def serialize(self, x) -> str:
"""
Serialize parameter value to string.
Args:
x: Value to serialize
Returns:
str: Serialized string representation
"""
def normalize(self, x):
"""Normalize parameter value."""
@property
def has_task_value(self, task_family: str, param_name: str) -> bool:
"""Check if parameter has value for specific task."""Basic parameter types for common data types with automatic parsing and validation.
class IntParameter(Parameter):
"""Parameter for integer values."""
def parse(self, x: str) -> int:
"""Parse string to integer."""
class FloatParameter(Parameter):
"""Parameter for floating-point values."""
def parse(self, x: str) -> float:
"""Parse string to float."""
class BoolParameter(Parameter):
"""Parameter for boolean values."""
def parse(self, x: str) -> bool:
"""Parse string to boolean (accepts 'true', 'false', etc.)."""
class NumericalParameter(Parameter):
"""Base class for numerical parameters with min/max validation."""
def __init__(self, min_value=None, max_value=None, **kwargs):
"""
Initialize numerical parameter.
Args:
min_value: Minimum allowed value
max_value: Maximum allowed value
**kwargs: Additional parameter options
"""
class ChoiceParameter(Parameter):
"""Parameter that accepts one value from a predefined set."""
def __init__(self, choices, var_type=str, **kwargs):
"""
Initialize choice parameter.
Args:
choices: List/tuple of allowed values
var_type: Type for choice values
**kwargs: Additional parameter options
"""Specialized parameters for handling date and time values with automatic parsing and formatting.
class DateParameter(Parameter):
"""Parameter for date values (datetime.date)."""
def parse(self, x: str) -> date:
"""Parse string to date (YYYY-MM-DD format)."""
class MonthParameter(DateParameter):
"""Parameter for month values (YYYY-MM format)."""
def parse(self, x: str) -> date:
"""Parse string to first day of month."""
class YearParameter(DateParameter):
"""Parameter for year values (YYYY format)."""
def parse(self, x: str) -> date:
"""Parse string to first day of year."""
class DateHourParameter(Parameter):
"""Parameter for date-hour values (YYYY-MM-DD-HH format)."""
def parse(self, x: str) -> datetime:
"""Parse string to datetime with hour precision."""
class DateMinuteParameter(Parameter):
"""Parameter for date-minute values (YYYY-MM-DD-HH-MM format)."""
def parse(self, x: str) -> datetime:
"""Parse string to datetime with minute precision."""
class DateSecondParameter(Parameter):
"""Parameter for date-second values (YYYY-MM-DD-HH-MM-SS format)."""
def parse(self, x: str) -> datetime:
"""Parse string to datetime with second precision."""
class DateIntervalParameter(Parameter):
"""Parameter for date interval values."""
def parse(self, x: str):
"""Parse string to DateInterval object."""
class TimeDeltaParameter(Parameter):
"""Parameter for time delta values."""
def parse(self, x: str) -> timedelta:
"""Parse string to timedelta object."""Parameters for handling collections of values including lists, tuples, and dictionaries.
class ListParameter(Parameter):
"""Parameter for list values."""
def parse(self, x: str) -> list:
"""Parse JSON string to list."""
def normalize(self, x) -> tuple:
"""Normalize list to tuple for hashing."""
class TupleParameter(Parameter):
"""Parameter for tuple values."""
def parse(self, x: str) -> tuple:
"""Parse JSON string to tuple."""
class DictParameter(Parameter):
"""Parameter for dictionary values."""
def parse(self, x: str) -> dict:
"""Parse JSON string to dictionary."""
def normalize(self, x):
"""Normalize dictionary for hashing."""
class EnumParameter(Parameter):
"""Parameter for enumeration values."""
def __init__(self, enum_class, **kwargs):
"""
Initialize enum parameter.
Args:
enum_class: Enum class for allowed values
**kwargs: Additional parameter options
"""
def parse(self, x: str):
"""Parse string to enum value."""
class EnumListParameter(ListParameter):
"""Parameter for lists of enumeration values."""
def __init__(self, enum_class, **kwargs):
"""
Initialize enum list parameter.
Args:
enum_class: Enum class for list elements
**kwargs: Additional parameter options
"""Specialized parameters for complex use cases including task references and optional parameters.
class TaskParameter(Parameter):
"""Parameter that references another task."""
def parse(self, x: str):
"""Parse string to task instance."""
class OptionalParameter(Parameter):
"""Wrapper that makes any parameter optional."""
def __init__(self, parameter: Parameter, **kwargs):
"""
Initialize optional parameter wrapper.
Args:
parameter: Base parameter to make optional
**kwargs: Additional parameter options
"""
def parse(self, x: str):
"""Parse using wrapped parameter if value provided."""Configuration classes and enums for parameter behavior and visibility.
class ParameterVisibility:
"""Enumeration for parameter visibility levels."""
PUBLIC = 0 # Visible in help and task signatures
HIDDEN = 1 # Hidden from help but included in task signatures
PRIVATE = 2 # Excluded from task signatures
@classmethod
def has_value(cls, value: int) -> bool:
"""Check if value is valid visibility level."""
def serialize(self) -> int:
"""Serialize visibility level."""Exception classes for parameter parsing and validation errors.
class ParameterException(Exception):
"""Base exception for parameter errors."""
class MissingParameterException(ParameterException):
"""Exception raised when required parameter is missing."""
class UnknownParameterException(ParameterException):
"""Exception raised when unknown parameter is provided."""
class DuplicateParameterException(ParameterException):
"""Exception raised when parameter is specified multiple times."""import luigi
from luigi import Task, Parameter, IntParameter, DateParameter
from datetime import date
class ProcessDataTask(Task):
"""Task with various parameter types."""
# String parameter with default
dataset_name = Parameter(default="default_dataset")
# Integer parameter
batch_size = IntParameter(default=1000)
# Date parameter
process_date = DateParameter(default=date.today())
def output(self):
return luigi.LocalTarget(f"output/{self.dataset_name}_{self.process_date}.csv")
def run(self):
print(f"Processing {self.dataset_name} with batch size {self.batch_size} for {self.process_date}")
# Task logic here
with self.output().open('w') as f:
f.write(f"Processed {self.batch_size} records")
# Run with parameters from command line:
# python script.py ProcessDataTask --dataset-name "production" --batch-size 5000 --process-date 2023-01-15import luigi
from luigi import Task, ListParameter, DictParameter
import json
class ConfigurableTask(Task):
"""Task with collection parameters."""
# List of input files
input_files = ListParameter()
# Configuration dictionary
config = DictParameter(default={})
def run(self):
print(f"Processing files: {self.input_files}")
print(f"Configuration: {self.config}")
# Process each input file
for file_path in self.input_files:
print(f"Processing {file_path}")
# Run with collection parameters:
# python script.py ConfigurableTask --input-files '["file1.csv", "file2.csv"]' --config '{"timeout": 30, "retry": true}'import luigi
from luigi import Task, ChoiceParameter, OptionalParameter, IntParameter
class FlexibleTask(Task):
"""Task with optional and choice parameters."""
# Choice parameter with predefined options
mode = ChoiceParameter(choices=['fast', 'accurate', 'balanced'], default='balanced')
# Optional parameter that may or may not be provided
max_memory = OptionalParameter(IntParameter())
def run(self):
print(f"Running in {self.mode} mode")
if self.max_memory is not None:
print(f"Memory limit: {self.max_memory} MB")
else:
print("No memory limit specified")
# Run with optional parameters:
# python script.py FlexibleTask --mode fast --max-memory 2048
# python script.py FlexibleTask --mode accurate # max_memory will be Noneimport luigi
from luigi import Task, DateParameter, DateHourParameter, TimeDeltaParameter
from datetime import date, datetime, timedelta
class TimeBasedTask(Task):
"""Task with various date/time parameters."""
# Daily processing date
date = DateParameter()
# Hourly processing timestamp
hour = DateHourParameter()
# Processing window duration
window = TimeDeltaParameter(default=timedelta(hours=1))
def run(self):
print(f"Processing date: {self.date}")
print(f"Processing hour: {self.hour}")
print(f"Window duration: {self.window}")
# Run with date/time parameters:
# python script.py TimeBasedTask --date 2023-01-15 --hour 2023-01-15-14 --window "2:00:00"import luigi
from luigi import Task, TaskParameter
class DataSource(Task):
"""Source data task."""
source_id = luigi.Parameter()
def output(self):
return luigi.LocalTarget(f"data/source_{self.source_id}.csv")
class DataProcessor(Task):
"""Task that processes data from a configurable source."""
# Parameter that references another task
source_task = TaskParameter()
def requires(self):
return self.source_task
def output(self):
return luigi.LocalTarget(f"data/processed_{self.source_task.source_id}.csv")
def run(self):
with self.input().open('r') as f:
data = f.read()
# Process data
processed = data.upper()
with self.output().open('w') as f:
f.write(processed)
# Run with task parameter:
# python script.py DataProcessor --source-task DataSource --source-task-source-id "dataset1"# Special default value sentinel indicating parameter is required
_no_value = object()
class ParameterVisibility:
"""Parameter visibility levels."""
PUBLIC = 0
HIDDEN = 1
PRIVATE = 2Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10