Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's configuration system manages settings for tasks, scheduler, and execution behavior through configuration files and environment variables. Configuration provides flexibility for different environments and deployment scenarios.
Main configuration parser that handles INI-format configuration files with Luigi-specific extensions and parameter resolution.
def get_config() -> LuigiConfigParser:
"""
Get the global Luigi configuration parser instance.
Returns:
LuigiConfigParser: Global configuration parser
"""
def add_config_path(path: str):
"""
Add a configuration file path to the configuration search paths.
Args:
path: Path to configuration file
"""
class LuigiConfigParser:
"""
Luigi's configuration parser extending ConfigParser with parameter resolution.
"""
def get(self, section: str, option: str, **kwargs):
"""
Get configuration value with parameter resolution.
Args:
section: Configuration section name
option: Configuration option name
**kwargs: Additional options (vars, fallback, etc.)
Returns:
Configuration value with parameter substitution
"""
def getint(self, section: str, option: str, **kwargs) -> int:
"""Get integer configuration value."""
def getfloat(self, section: str, option: str, **kwargs) -> float:
"""Get float configuration value."""
def getboolean(self, section: str, option: str, **kwargs) -> bool:
"""Get boolean configuration value."""
def has_option(self, section: str, option: str) -> bool:
"""Check if configuration option exists."""
def has_section(self, section: str) -> bool:
"""Check if configuration section exists."""
def sections(self) -> list:
"""Get list of configuration sections."""
def options(self, section: str) -> list:
"""Get list of options in a section."""
def items(self, section: str) -> list:
"""Get list of (option, value) pairs in a section."""
def set(self, section: str, option: str, value: str):
"""Set configuration value."""
def add_section(self, section: str):
"""Add configuration section."""
def remove_section(self, section: str) -> bool:
"""Remove configuration section."""
def remove_option(self, section: str, option: str) -> bool:
"""Remove configuration option."""
def read(self, filenames):
"""Read configuration from file(s)."""
def read_dict(self, dictionary: dict):
"""Read configuration from dictionary."""Alternative configuration parser that supports TOML format configuration files.
class LuigiTomlParser:
"""
TOML configuration parser for Luigi.
Provides similar interface to LuigiConfigParser but reads TOML files.
"""
def get(self, section: str, option: str, **kwargs):
"""Get TOML configuration value."""
def getint(self, section: str, option: str, **kwargs) -> int:
"""Get integer value from TOML configuration."""
def getfloat(self, section: str, option: str, **kwargs) -> float:
"""Get float value from TOML configuration."""
def getboolean(self, section: str, option: str, **kwargs) -> bool:
"""Get boolean value from TOML configuration."""
def has_option(self, section: str, option: str) -> bool:
"""Check if TOML option exists."""
def has_section(self, section: str) -> bool:
"""Check if TOML section exists."""Abstract base class for configuration parsers providing common functionality.
class BaseParser:
"""Base class for configuration parsers."""
def enabled(self) -> bool:
"""Check if parser is enabled and available."""
def read(self, config_paths: list):
"""Read configuration from files."""
def get(self, section: str, option: str, **kwargs):
"""Get configuration value."""
def getint(self, section: str, option: str, **kwargs) -> int:
"""Get integer configuration value."""
def getfloat(self, section: str, option: str, **kwargs) -> float:
"""Get float configuration value."""
def getboolean(self, section: str, option: str, **kwargs) -> bool:
"""Get boolean configuration value."""
def has_option(self, section: str, option: str) -> bool:
"""Check if option exists."""
def has_section(self, section: str) -> bool:
"""Check if section exists."""Luigi uses several predefined configuration sections for different aspects of the system.
# [core] section options
class CoreConfig:
"""Core Luigi configuration options."""
default_scheduler_host: str = 'localhost'
"""Default scheduler host address."""
default_scheduler_port: int = 8082
"""Default scheduler port."""
scheduler_host: str
"""Scheduler host override."""
scheduler_port: int
"""Scheduler port override."""
rpc_connect_timeout: float = 10.0
"""RPC connection timeout in seconds."""
rpc_retry_attempts: int = 3
"""Number of RPC retry attempts."""
rpc_retry_wait: int = 30
"""Wait time between RPC retries."""
no_configure_logging: bool = False
"""Disable Luigi's logging configuration."""
log_level: str = 'DEBUG'
"""Default logging level."""
logging_conf_file: str
"""Path to logging configuration file."""
parallel_scheduling: bool = False
"""Enable parallel task scheduling."""
assistant: bool = False
"""Enable Luigi assistant mode."""
worker_timeout: int = 0
"""Worker timeout in seconds (0 = no timeout)."""
keep_alive: bool = False
"""Keep worker alive after completion."""
max_reschedules: int = 1
"""Maximum task reschedule attempts."""# [worker] section options
class WorkerConfig:
"""Worker configuration options."""
keep_alive: bool = False
"""Keep worker process alive."""
count_uniques: bool = False
"""Count unique task failures."""
count_last_params: bool = False
"""Count parameters in recent tasks."""
worker_timeout: int = 0
"""Worker timeout in seconds."""
timeout: int = 0
"""Task timeout in seconds."""
task_limit: int = None
"""Maximum tasks per worker."""
retry_external_tasks: bool = False
"""Retry external task dependencies."""
no_configure_logging: bool = False
"""Disable worker logging configuration."""# [scheduler] section options
class SchedulerConfig:
"""Scheduler configuration options."""
record_task_history: bool = False
"""Record task execution history."""
state_path: str
"""Path to scheduler state file."""
remove_delay: int = 600
"""Delay before removing completed tasks (seconds)."""
worker_disconnect_delay: int = 60
"""Delay before disconnecting idle workers (seconds)."""
disable_window: int = 3600
"""Window for disabling failed tasks (seconds)."""
retry_delay: int = 900
"""Delay before retrying failed tasks (seconds)."""
disable_hard_timeout: int = 999999999
"""Hard timeout for disabling tasks (seconds)."""
max_shown_tasks: int = 100000
"""Maximum tasks shown in web interface."""
max_graph_nodes: int = 100000
"""Maximum nodes in dependency graph."""# luigi.cfg
[core]
scheduler_host = localhost
scheduler_port = 8082
log_level = INFO
parallel_scheduling = true
[worker]
keep_alive = true
timeout = 3600
task_limit = 10
[scheduler]
record_task_history = true
remove_delay = 300
retry_delay = 600
# Task-specific configuration
[MyTask]
batch_size = 1000
max_retries = 3
[DatabaseTask]
host = localhost
port = 5432
database = mydb# luigi.toml
[core]
scheduler_host = "localhost"
scheduler_port = 8082
log_level = "INFO"
parallel_scheduling = true
[worker]
keep_alive = true
timeout = 3600
task_limit = 10
[scheduler]
record_task_history = true
remove_delay = 300
retry_delay = 600
[MyTask]
batch_size = 1000
max_retries = 3import luigi
from luigi.configuration import get_config, add_config_path
# Add custom configuration file
add_config_path('/path/to/custom/luigi.cfg')
# Get configuration instance
config = get_config()
# Read configuration values
scheduler_host = config.get('core', 'scheduler_host', fallback='localhost')
scheduler_port = config.getint('core', 'scheduler_port', fallback=8082)
log_level = config.get('core', 'log_level', fallback='INFO')
print(f"Scheduler: {scheduler_host}:{scheduler_port}")
print(f"Log level: {log_level}")
# Set configuration values programmatically
config.set('core', 'parallel_scheduling', 'true')
config.set('worker', 'keep_alive', 'true')
# Check if options exist
if config.has_option('MyTask', 'batch_size'):
batch_size = config.getint('MyTask', 'batch_size')
print(f"Batch size: {batch_size}")import luigi
from luigi import Task, Parameter
from luigi.configuration import get_config
class ConfigurableTask(Task):
"""Task that reads configuration from config file."""
# Parameter with config file fallback
batch_size = luigi.IntParameter()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Read additional config
config = get_config()
self.timeout = config.getint('ConfigurableTask', 'timeout', fallback=3600)
self.retries = config.getint('ConfigurableTask', 'max_retries', fallback=3)
def output(self):
return luigi.LocalTarget(f"output_batch_{self.batch_size}.txt")
def run(self):
print(f"Running with batch_size={self.batch_size}, timeout={self.timeout}, retries={self.retries}")
with self.output().open('w') as f:
f.write(f"Processed with batch size {self.batch_size}")
# Configuration file would contain:
# [ConfigurableTask]
# batch_size = 5000
# timeout = 7200
# max_retries = 5import luigi
import os
from luigi.configuration import get_config, add_config_path
# Load environment-specific configuration
env = os.getenv('LUIGI_ENV', 'development')
config_file = f'/etc/luigi/luigi-{env}.cfg'
if os.path.exists(config_file):
add_config_path(config_file)
class EnvironmentTask(Task):
"""Task that adapts to different environments."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
config = get_config()
# Get environment-specific settings
self.database_host = config.get('database', 'host', fallback='localhost')
self.database_port = config.getint('database', 'port', fallback=5432)
self.cache_enabled = config.getboolean('cache', 'enabled', fallback=False)
def output(self):
return luigi.LocalTarget(f"output_{env}.txt")
def run(self):
print(f"Environment: {env}")
print(f"Database: {self.database_host}:{self.database_port}")
print(f"Cache enabled: {self.cache_enabled}")
# luigi-development.cfg:
# [database]
# host = dev-db.example.com
# port = 5432
#
# [cache]
# enabled = false
# luigi-production.cfg:
# [database]
# host = prod-db.example.com
# port = 5432
#
# [cache]
# enabled = trueimport luigi
from luigi.configuration import get_config
class DynamicConfigTask(Task):
"""Task that modifies configuration at runtime."""
environment = luigi.Parameter(default='development')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Modify configuration based on parameters
config = get_config()
if self.environment == 'production':
config.set('core', 'log_level', 'WARNING')
config.set('worker', 'timeout', '7200')
else:
config.set('core', 'log_level', 'DEBUG')
config.set('worker', 'timeout', '3600')
def output(self):
return luigi.LocalTarget(f"output_{self.environment}.txt")
def run(self):
config = get_config()
log_level = config.get('core', 'log_level')
timeout = config.getint('worker', 'timeout')
print(f"Running in {self.environment} mode")
print(f"Log level: {log_level}, Timeout: {timeout}")Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10