Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi provides comprehensive command-line utilities for workflow management, task execution, dependency analysis, and debugging. These tools enable both interactive development and production deployment.
Primary command-line interfaces for executing Luigi workflows and managing the scheduler daemon.
# Main task execution command
luigi --module mymodule MyTask [--param value] [options]
# Scheduler daemon
luigid [--background] [--pidfile FILE] [--logdir DIR] [--state-path FILE] [--address ADDRESS] [--port PORT]
# Command-line options for luigi command
class LuigiCLI:
"""Luigi command-line interface options."""
# Task specification
module: str # Module containing task
task: str # Task class name
# Scheduler options
scheduler_host: str = 'localhost' # Scheduler host
scheduler_port: int = 8082 # Scheduler port
local_scheduler: bool = False # Use local scheduler
# Worker options
workers: int = 1 # Number of worker processes
keep_alive: bool = False # Keep worker alive after completion
timeout: int = 0 # Task timeout (0 = no timeout)
# Logging options
log_level: str = 'DEBUG' # Logging level
logging_conf_file: str # Logging configuration file
# Execution options
no_lock: bool = False # Disable file locking
lock_size: int = 1 # Lock file size
lock_pid_dir: str # PID lock directory
take_lock: bool = False # Take exclusive lock
# Output options
retcode_already_running: int = 10 # Return code if already running
retcode_missing_data: int = 20 # Return code for missing data
retcode_not_run: int = 25 # Return code if not run
retcode_task_failed: int = 30 # Return code for task failure
retcode_scheduling_error: int = 35 # Return code for scheduling error
# Help and version
help: bool = False # Show help message
version: bool = False # Show version informationConfiguration options for the Luigi scheduler daemon (luigid).
class LuigiDaemon:
"""Luigi scheduler daemon options."""
# Server options
address: str = 'localhost' # Server bind address
port: int = 8082 # Server port
# Process options
background: bool = False # Run in background
pidfile: str # PID file path
user: str # Run as specific user
group: str # Run as specific group
# Logging options
logdir: str # Log directory
logging_conf_file: str # Logging configuration
# State persistence
state_path: str # Scheduler state file path
# Security options
unix_socket: str # Unix socket path
# Development options
dev: bool = False # Development modeCommand-line utilities for analyzing task dependencies and workflow structure.
# Dependency analysis commands
from luigi.tools.deps import deps_main
from luigi.tools.deps_tree import deps_tree_main
def deps_main(task_str: str, module: str = None) -> None:
"""
Analyze task dependencies.
Usage: python -m luigi.tools.deps MyTask --module mymodule
Args:
task_str: Task name and parameters
module: Module containing task
"""
def deps_tree_main(task_str: str, module: str = None) -> None:
"""
Display dependency tree visualization.
Usage: python -m luigi.tools.deps_tree MyTask --module mymodule
Args:
task_str: Task name and parameters
module: Module containing task
"""
# Task search utility
from luigi.tools.luigi_grep import luigi_grep_main
def luigi_grep_main(pattern: str, paths: list = None) -> None:
"""
Search for tasks matching pattern.
Usage: python -m luigi.tools.luigi_grep "pattern" [paths...]
Args:
pattern: Search pattern (regex)
paths: Paths to search in
"""Built-in utilities for working with range tasks and date-based workflows.
# Range task commands (auto-loaded if enabled)
from luigi.tools.range import RangeDaily, RangeHourly, RangeByMinutes
# Usage examples:
# luigi RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-31
# luigi RangeHourly --of MyTask --start 2023-01-01-00 --stop 2023-01-01-23# Execute single task with parameters
luigi MyTask --param1 value1 --param2 value2 --module mypackage.tasks
# Execute with local scheduler
luigi MyTask --local-scheduler --module mypackage.tasks
# Execute with remote scheduler
luigi MyTask --scheduler-host scheduler.example.com --scheduler-port 8082 --module mypackage.tasks
# Execute with multiple workers
luigi MyTask --workers 4 --module mypackage.tasks
# Execute with timeout
luigi MyTask --timeout 3600 --module mypackage.tasks# Start scheduler daemon
luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi
# Start scheduler on specific address/port
luigid --address 0.0.0.0 --port 8082 --background
# Start with persistent state
luigid --state-path /var/lib/luigi/scheduler.state --background
# Development mode with verbose logging
luigid --dev --logdir ./logs# Analyze task dependencies
python -m luigi.tools.deps MyTask --module mypackage.tasks
# Visualize dependency tree
python -m luigi.tools.deps_tree MyTask --module mypackage.tasks
# Search for tasks
python -m luigi.tools.luigi_grep "ProcessData" ./tasks/
# Analyze range task dependencies
python -m luigi.tools.deps RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-07# CLI wrapper script example
#!/usr/bin/env python3
"""Custom Luigi CLI wrapper with enhanced functionality."""
import luigi
import sys
import argparse
from luigi.cmdline import luigi_run
from luigi.configuration import get_config
def custom_luigi_main():
"""Enhanced Luigi CLI with custom options."""
parser = argparse.ArgumentParser(description='Enhanced Luigi CLI')
# Add custom options
parser.add_argument('--env', choices=['dev', 'staging', 'prod'],
default='dev', help='Environment to run in')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be executed without running')
parser.add_argument('--notify', help='Notification email for completion')
# Parse known args, let Luigi handle the rest
args, luigi_args = parser.parse_known_args()
# Configure environment
setup_environment(args.env)
if args.dry_run:
print("DRY RUN: Would execute:")
print(" ".join(['luigi'] + luigi_args))
return
# Execute Luigi with remaining arguments
sys.argv = ['luigi'] + luigi_args
result = luigi_run()
# Send notification if requested
if args.notify:
send_completion_notification(args.notify, result)
return result
def setup_environment(env: str):
"""Configure Luigi for specific environment."""
config = get_config()
if env == 'prod':
config.set('core', 'scheduler_host', 'prod-scheduler.example.com')
config.set('core', 'log_level', 'WARNING')
elif env == 'staging':
config.set('core', 'scheduler_host', 'staging-scheduler.example.com')
config.set('core', 'log_level', 'INFO')
else: # dev
config.set('core', 'local_scheduler', 'true')
config.set('core', 'log_level', 'DEBUG')
def send_completion_notification(email: str, result):
"""Send email notification on completion."""
# Implementation would send email with result status
print(f"Would send notification to {email}: Status {result.status}")
if __name__ == '__main__':
custom_luigi_main()#!/bin/bash
# Luigi batch execution script
# Configuration
LUIGI_MODULE="mypackage.tasks"
SCHEDULER_HOST="scheduler.example.com"
LOG_DIR="/var/log/luigi"
DATE=$(date +%Y-%m-%d)
# Create log directory
mkdir -p "$LOG_DIR"
# Function to run Luigi task with error handling
run_luigi_task() {
local task_name=$1
local log_file="$LOG_DIR/${task_name}_${DATE}.log"
echo "Starting $task_name at $(date)"
luigi "$task_name" \
--module "$LUIGI_MODULE" \
--scheduler-host "$SCHEDULER_HOST" \
--workers 2 \
--timeout 7200 \
>> "$log_file" 2>&1
local exit_code=$?
if [ $exit_code -eq 0 ]; then
echo "✓ $task_name completed successfully"
else
echo "✗ $task_name failed with exit code $exit_code"
echo "Check log file: $log_file"
fi
return $exit_code
}
# Execute daily tasks
echo "Starting daily Luigi workflow for $DATE"
run_luigi_task "DataIngestionTask --date $DATE"
run_luigi_task "ProcessingTask --date $DATE"
run_luigi_task "ReportGenerationTask --date $DATE"
echo "Daily workflow completed at $(date)"#!/bin/bash
# Luigi configuration management script
LUIGI_CONFIG_DIR="/etc/luigi"
LUIGI_ENV="${LUIGI_ENV:-development}"
# Function to switch Luigi configuration
switch_config() {
local env=$1
local config_file="$LUIGI_CONFIG_DIR/luigi-$env.cfg"
if [ ! -f "$config_file" ]; then
echo "Configuration file not found: $config_file"
exit 1
fi
# Link active configuration
ln -sf "$config_file" "$LUIGI_CONFIG_DIR/luigi.cfg"
echo "Switched to $env configuration"
}
# Function to validate configuration
validate_config() {
echo "Validating Luigi configuration..."
# Test scheduler connection
if luigi --help > /dev/null 2>&1; then
echo "✓ Luigi CLI is working"
else
echo "✗ Luigi CLI not working"
exit 1
fi
# Test configuration parsing
python3 -c "
import luigi.configuration
config = luigi.configuration.get_config()
print(f'Scheduler: {config.get(\"core\", \"default_scheduler_host\", fallback=\"localhost\")}')
print(f'Port: {config.getint(\"core\", \"default_scheduler_port\", fallback=8082)}')
"
}
# Main command handling
case "$1" in
switch)
switch_config "$2"
;;
validate)
validate_config
;;
*)
echo "Usage: $0 {switch|validate} [environment]"
echo "Environments: development, staging, production"
exit 1
;;
esac#!/bin/bash
# Luigi health check and monitoring script
SCHEDULER_HOST="${LUIGI_SCHEDULER_HOST:-localhost}"
SCHEDULER_PORT="${LUIGI_SCHEDULER_PORT:-8082}"
HEALTH_CHECK_URL="http://$SCHEDULER_HOST:$SCHEDULER_PORT/api/graph"
# Function to check scheduler health
check_scheduler_health() {
echo "Checking Luigi scheduler health..."
if curl -s --connect-timeout 5 "$HEALTH_CHECK_URL" > /dev/null; then
echo "✓ Scheduler is responding"
return 0
else
echo "✗ Scheduler is not responding"
return 1
fi
}
# Function to get scheduler statistics
get_scheduler_stats() {
echo "Getting scheduler statistics..."
local stats=$(curl -s "$HEALTH_CHECK_URL" | python3 -c "
import sys, json
try:
data = json.load(sys.stdin)
nodes = data.get('response', {}).get('nodes', [])
print(f'Total tasks: {len(nodes)}')
# Count by status
status_counts = {}
for node in nodes:
status = node.get('status', 'UNKNOWN')
status_counts[status] = status_counts.get(status, 0) + 1
for status, count in status_counts.items():
print(f'{status}: {count}')
except Exception as e:
print(f'Error parsing response: {e}')
")
echo "$stats"
}
# Function to restart scheduler if unhealthy
restart_scheduler_if_needed() {
if ! check_scheduler_health; then
echo "Attempting to restart scheduler..."
# Kill existing scheduler
pkill -f luigid
sleep 5
# Start new scheduler
luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi
sleep 10
# Check if restart was successful
if check_scheduler_health; then
echo "✓ Scheduler restarted successfully"
else
echo "✗ Scheduler restart failed"
exit 1
fi
fi
}
# Main execution
case "$1" in
health)
check_scheduler_health
;;
stats)
get_scheduler_stats
;;
restart)
restart_scheduler_if_needed
;;
monitor)
# Continuous monitoring loop
while true; do
echo "=== $(date) ==="
check_scheduler_health && get_scheduler_stats
echo
sleep 60
done
;;
*)
echo "Usage: $0 {health|stats|restart|monitor}"
exit 1
;;
esacInstall with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10