CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

parameters.mddocs/

Parameters

Luigi's parameter system provides type-safe configuration for tasks with automatic parsing, validation, and command-line integration. Parameters make tasks reusable and configurable.

Capabilities

Base Parameter Class

The fundamental parameter class that all Luigi parameters inherit from. Provides basic parameter functionality and configuration options.

class Parameter:
    """Base parameter class for string values."""
    
    def __init__(self, default=_no_value, is_global: bool = False, significant: bool = True, 
                 description: str = None, config_path: dict = None, positional: bool = True,
                 always_in_help: bool = False, batch_method=None,
                 visibility: ParameterVisibility = ParameterVisibility.PUBLIC):
        """
        Initialize parameter.
        
        Args:
            default: Default value if not provided (_no_value means required)
            is_global: Whether parameter is global across all tasks
            significant: Whether parameter is part of task's unique identifier
            description: Parameter description for help text
            config_path: Configuration path for parameter lookup
            positional: Whether parameter can be specified positionally
            always_in_help: Whether to always show parameter in help
            batch_method: Method for handling batch parameter values
            visibility: Parameter visibility level
        """
    
    def parse(self, x: str):
        """
        Parse string value to parameter type.
        
        Args:
            x: String value to parse
            
        Returns:
            Parsed value of appropriate type
        """
    
    def serialize(self, x) -> str:
        """
        Serialize parameter value to string.
        
        Args:
            x: Value to serialize
            
        Returns:
            str: Serialized string representation
        """
    
    def normalize(self, x):
        """Normalize parameter value."""
    
    @property
    def has_task_value(self, task_family: str, param_name: str) -> bool:
        """Check if parameter has value for specific task."""

Primitive Parameter Types

Basic parameter types for common data types with automatic parsing and validation.

class IntParameter(Parameter):
    """Parameter for integer values."""
    
    def parse(self, x: str) -> int:
        """Parse string to integer."""

class FloatParameter(Parameter):
    """Parameter for floating-point values."""
    
    def parse(self, x: str) -> float:
        """Parse string to float."""

class BoolParameter(Parameter):
    """Parameter for boolean values."""
    
    def parse(self, x: str) -> bool:
        """Parse string to boolean (accepts 'true', 'false', etc.)."""

class NumericalParameter(Parameter):
    """Base class for numerical parameters with min/max validation."""
    
    def __init__(self, min_value=None, max_value=None, **kwargs):
        """
        Initialize numerical parameter.
        
        Args:
            min_value: Minimum allowed value
            max_value: Maximum allowed value
            **kwargs: Additional parameter options  
        """

class ChoiceParameter(Parameter):
    """Parameter that accepts one value from a predefined set."""
    
    def __init__(self, choices, var_type=str, **kwargs):
        """
        Initialize choice parameter.
        
        Args:
            choices: List/tuple of allowed values
            var_type: Type for choice values
            **kwargs: Additional parameter options
        """

Date and Time Parameters

Specialized parameters for handling date and time values with automatic parsing and formatting.

class DateParameter(Parameter):
    """Parameter for date values (datetime.date)."""
    
    def parse(self, x: str) -> date:
        """Parse string to date (YYYY-MM-DD format)."""

class MonthParameter(DateParameter):
    """Parameter for month values (YYYY-MM format)."""
    
    def parse(self, x: str) -> date:
        """Parse string to first day of month."""

class YearParameter(DateParameter):
    """Parameter for year values (YYYY format)."""
    
    def parse(self, x: str) -> date:
        """Parse string to first day of year."""

class DateHourParameter(Parameter):
    """Parameter for date-hour values (YYYY-MM-DD-HH format)."""
    
    def parse(self, x: str) -> datetime:
        """Parse string to datetime with hour precision."""

class DateMinuteParameter(Parameter):
    """Parameter for date-minute values (YYYY-MM-DD-HH-MM format)."""
    
    def parse(self, x: str) -> datetime:
        """Parse string to datetime with minute precision."""

class DateSecondParameter(Parameter):
    """Parameter for date-second values (YYYY-MM-DD-HH-MM-SS format)."""
    
    def parse(self, x: str) -> datetime:
        """Parse string to datetime with second precision."""

class DateIntervalParameter(Parameter):
    """Parameter for date interval values."""
    
    def parse(self, x: str):
        """Parse string to DateInterval object."""

class TimeDeltaParameter(Parameter):
    """Parameter for time delta values."""
    
    def parse(self, x: str) -> timedelta:
        """Parse string to timedelta object."""

Collection Parameters

Parameters for handling collections of values including lists, tuples, and dictionaries.

class ListParameter(Parameter):
    """Parameter for list values."""
    
    def parse(self, x: str) -> list:
        """Parse JSON string to list."""
    
    def normalize(self, x) -> tuple:
        """Normalize list to tuple for hashing."""

class TupleParameter(Parameter):
    """Parameter for tuple values."""
    
    def parse(self, x: str) -> tuple:
        """Parse JSON string to tuple."""

class DictParameter(Parameter):
    """Parameter for dictionary values."""
    
    def parse(self, x: str) -> dict:
        """Parse JSON string to dictionary."""
    
    def normalize(self, x):
        """Normalize dictionary for hashing."""

class EnumParameter(Parameter):
    """Parameter for enumeration values."""
    
    def __init__(self, enum_class, **kwargs):
        """
        Initialize enum parameter.
        
        Args:
            enum_class: Enum class for allowed values
            **kwargs: Additional parameter options
        """
    
    def parse(self, x: str):
        """Parse string to enum value."""

class EnumListParameter(ListParameter):
    """Parameter for lists of enumeration values."""
    
    def __init__(self, enum_class, **kwargs):
        """
        Initialize enum list parameter.
        
        Args:
            enum_class: Enum class for list elements
            **kwargs: Additional parameter options
        """

Advanced Parameters

Specialized parameters for complex use cases including task references and optional parameters.

class TaskParameter(Parameter):
    """Parameter that references another task."""
    
    def parse(self, x: str):
        """Parse string to task instance."""

class OptionalParameter(Parameter):
    """Wrapper that makes any parameter optional."""
    
    def __init__(self, parameter: Parameter, **kwargs):
        """
        Initialize optional parameter wrapper.
        
        Args:
            parameter: Base parameter to make optional
            **kwargs: Additional parameter options
        """
    
    def parse(self, x: str):
        """Parse using wrapped parameter if value provided."""

Parameter Configuration

Configuration classes and enums for parameter behavior and visibility.

class ParameterVisibility:
    """Enumeration for parameter visibility levels."""
    PUBLIC = 0     # Visible in help and task signatures
    HIDDEN = 1     # Hidden from help but included in task signatures  
    PRIVATE = 2    # Excluded from task signatures

    @classmethod
    def has_value(cls, value: int) -> bool:
        """Check if value is valid visibility level."""
    
    def serialize(self) -> int:
        """Serialize visibility level."""

Parameter Exceptions

Exception classes for parameter parsing and validation errors.

class ParameterException(Exception):
    """Base exception for parameter errors."""

class MissingParameterException(ParameterException):
    """Exception raised when required parameter is missing."""

class UnknownParameterException(ParameterException):
    """Exception raised when unknown parameter is provided."""

class DuplicateParameterException(ParameterException):
    """Exception raised when parameter is specified multiple times."""

Usage Examples

Basic Parameter Usage

import luigi
from luigi import Task, Parameter, IntParameter, DateParameter
from datetime import date

class ProcessDataTask(Task):
    """Task with various parameter types."""
    
    # String parameter with default
    dataset_name = Parameter(default="default_dataset")
    
    # Integer parameter
    batch_size = IntParameter(default=1000)
    
    # Date parameter
    process_date = DateParameter(default=date.today())
    
    def output(self):
        return luigi.LocalTarget(f"output/{self.dataset_name}_{self.process_date}.csv")
    
    def run(self):
        print(f"Processing {self.dataset_name} with batch size {self.batch_size} for {self.process_date}")
        
        # Task logic here
        with self.output().open('w') as f:
            f.write(f"Processed {self.batch_size} records")

# Run with parameters from command line:
# python script.py ProcessDataTask --dataset-name "production" --batch-size 5000 --process-date 2023-01-15

Collection Parameters

import luigi
from luigi import Task, ListParameter, DictParameter
import json

class ConfigurableTask(Task):
    """Task with collection parameters."""
    
    # List of input files
    input_files = ListParameter()
    
    # Configuration dictionary
    config = DictParameter(default={})
    
    def run(self):
        print(f"Processing files: {self.input_files}")
        print(f"Configuration: {self.config}")
        
        # Process each input file
        for file_path in self.input_files:
            print(f"Processing {file_path}")

# Run with collection parameters:
# python script.py ConfigurableTask --input-files '["file1.csv", "file2.csv"]' --config '{"timeout": 30, "retry": true}'

Optional and Choice Parameters

import luigi
from luigi import Task, ChoiceParameter, OptionalParameter, IntParameter

class FlexibleTask(Task):
    """Task with optional and choice parameters."""
    
    # Choice parameter with predefined options
    mode = ChoiceParameter(choices=['fast', 'accurate', 'balanced'], default='balanced')
    
    # Optional parameter that may or may not be provided
    max_memory = OptionalParameter(IntParameter())
    
    def run(self):
        print(f"Running in {self.mode} mode")
        
        if self.max_memory is not None:
            print(f"Memory limit: {self.max_memory} MB")
        else:
            print("No memory limit specified")

# Run with optional parameters:
# python script.py FlexibleTask --mode fast --max-memory 2048
# python script.py FlexibleTask --mode accurate  # max_memory will be None

Date and Time Parameters

import luigi
from luigi import Task, DateParameter, DateHourParameter, TimeDeltaParameter
from datetime import date, datetime, timedelta

class TimeBasedTask(Task):
    """Task with various date/time parameters."""
    
    # Daily processing date
    date = DateParameter()
    
    # Hourly processing timestamp  
    hour = DateHourParameter()
    
    # Processing window duration
    window = TimeDeltaParameter(default=timedelta(hours=1))
    
    def run(self):
        print(f"Processing date: {self.date}")
        print(f"Processing hour: {self.hour}")
        print(f"Window duration: {self.window}")

# Run with date/time parameters:
# python script.py TimeBasedTask --date 2023-01-15 --hour 2023-01-15-14 --window "2:00:00"

Task Parameter for Dependencies

import luigi
from luigi import Task, TaskParameter

class DataSource(Task):
    """Source data task."""
    source_id = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget(f"data/source_{self.source_id}.csv")

class DataProcessor(Task):
    """Task that processes data from a configurable source."""
    
    # Parameter that references another task
    source_task = TaskParameter()
    
    def requires(self):
        return self.source_task
    
    def output(self):
        return luigi.LocalTarget(f"data/processed_{self.source_task.source_id}.csv")
    
    def run(self):
        with self.input().open('r') as f:
            data = f.read()
        
        # Process data
        processed = data.upper()
        
        with self.output().open('w') as f:
            f.write(processed)

# Run with task parameter:
# python script.py DataProcessor --source-task DataSource --source-task-source-id "dataset1"

Constants

# Special default value sentinel indicating parameter is required
_no_value = object()

class ParameterVisibility:
    """Parameter visibility levels."""
    PUBLIC = 0
    HIDDEN = 1  
    PRIVATE = 2

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json