CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

cli-tools.mddocs/

Command Line Tools

Luigi provides comprehensive command-line utilities for workflow management, task execution, dependency analysis, and debugging. These tools enable both interactive development and production deployment.

Capabilities

Main CLI Commands

Primary command-line interfaces for executing Luigi workflows and managing the scheduler daemon.

# Main task execution command
luigi --module mymodule MyTask [--param value] [options]

# Scheduler daemon
luigid [--background] [--pidfile FILE] [--logdir DIR] [--state-path FILE] [--address ADDRESS] [--port PORT]

# Command-line options for luigi command
class LuigiCLI:
    """Luigi command-line interface options."""
    
    # Task specification
    module: str  # Module containing task
    task: str    # Task class name
    
    # Scheduler options
    scheduler_host: str = 'localhost'  # Scheduler host
    scheduler_port: int = 8082         # Scheduler port
    local_scheduler: bool = False      # Use local scheduler
    
    # Worker options
    workers: int = 1           # Number of worker processes
    keep_alive: bool = False   # Keep worker alive after completion
    timeout: int = 0           # Task timeout (0 = no timeout)
    
    # Logging options
    log_level: str = 'DEBUG'   # Logging level
    logging_conf_file: str     # Logging configuration file
    
    # Execution options
    no_lock: bool = False      # Disable file locking
    lock_size: int = 1         # Lock file size
    lock_pid_dir: str         # PID lock directory
    take_lock: bool = False    # Take exclusive lock
    
    # Output options
    retcode_already_running: int = 10    # Return code if already running
    retcode_missing_data: int = 20       # Return code for missing data
    retcode_not_run: int = 25           # Return code if not run
    retcode_task_failed: int = 30       # Return code for task failure
    retcode_scheduling_error: int = 35   # Return code for scheduling error
    
    # Help and version
    help: bool = False         # Show help message
    version: bool = False      # Show version information

Scheduler Daemon Options

Configuration options for the Luigi scheduler daemon (luigid).

class LuigiDaemon:
    """Luigi scheduler daemon options."""
    
    # Server options
    address: str = 'localhost'  # Server bind address
    port: int = 8082           # Server port
    
    # Process options
    background: bool = False    # Run in background
    pidfile: str              # PID file path
    user: str                 # Run as specific user
    group: str                # Run as specific group
    
    # Logging options
    logdir: str               # Log directory
    logging_conf_file: str    # Logging configuration
    
    # State persistence
    state_path: str           # Scheduler state file path
    
    # Security options
    unix_socket: str          # Unix socket path
    
    # Development options
    dev: bool = False         # Development mode

Dependency Analysis Tools

Command-line utilities for analyzing task dependencies and workflow structure.

# Dependency analysis commands
from luigi.tools.deps import deps_main
from luigi.tools.deps_tree import deps_tree_main

def deps_main(task_str: str, module: str = None) -> None:
    """
    Analyze task dependencies.
    
    Usage: python -m luigi.tools.deps MyTask --module mymodule
    
    Args:
        task_str: Task name and parameters
        module: Module containing task
    """

def deps_tree_main(task_str: str, module: str = None) -> None:
    """
    Display dependency tree visualization.
    
    Usage: python -m luigi.tools.deps_tree MyTask --module mymodule
    
    Args:
        task_str: Task name and parameters  
        module: Module containing task
    """

# Task search utility
from luigi.tools.luigi_grep import luigi_grep_main

def luigi_grep_main(pattern: str, paths: list = None) -> None:
    """
    Search for tasks matching pattern.
    
    Usage: python -m luigi.tools.luigi_grep "pattern" [paths...]
    
    Args:
        pattern: Search pattern (regex)
        paths: Paths to search in
    """

Range Task Utilities

Built-in utilities for working with range tasks and date-based workflows.

# Range task commands (auto-loaded if enabled)
from luigi.tools.range import RangeDaily, RangeHourly, RangeByMinutes

# Usage examples:
# luigi RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-31
# luigi RangeHourly --of MyTask --start 2023-01-01-00 --stop 2023-01-01-23

Usage Examples

Basic Task Execution

# Execute single task with parameters
luigi MyTask --param1 value1 --param2 value2 --module mypackage.tasks

# Execute with local scheduler
luigi MyTask --local-scheduler --module mypackage.tasks

# Execute with remote scheduler
luigi MyTask --scheduler-host scheduler.example.com --scheduler-port 8082 --module mypackage.tasks

# Execute with multiple workers
luigi MyTask --workers 4 --module mypackage.tasks

# Execute with timeout
luigi MyTask --timeout 3600 --module mypackage.tasks

Scheduler Daemon Management

# Start scheduler daemon
luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi

# Start scheduler on specific address/port
luigid --address 0.0.0.0 --port 8082 --background

# Start with persistent state
luigid --state-path /var/lib/luigi/scheduler.state --background

# Development mode with verbose logging
luigid --dev --logdir ./logs

Dependency Analysis

# Analyze task dependencies
python -m luigi.tools.deps MyTask --module mypackage.tasks

# Visualize dependency tree
python -m luigi.tools.deps_tree MyTask --module mypackage.tasks

# Search for tasks
python -m luigi.tools.luigi_grep "ProcessData" ./tasks/

# Analyze range task dependencies
python -m luigi.tools.deps RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-07

Advanced CLI Usage

# CLI wrapper script example
#!/usr/bin/env python3
"""Custom Luigi CLI wrapper with enhanced functionality."""

import luigi
import sys
import argparse
from luigi.cmdline import luigi_run
from luigi.configuration import get_config

def custom_luigi_main():
    """Enhanced Luigi CLI with custom options."""
    
    parser = argparse.ArgumentParser(description='Enhanced Luigi CLI')
    
    # Add custom options
    parser.add_argument('--env', choices=['dev', 'staging', 'prod'],
                       default='dev', help='Environment to run in')
    parser.add_argument('--dry-run', action='store_true',
                       help='Show what would be executed without running')
    parser.add_argument('--notify', help='Notification email for completion')
    
    # Parse known args, let Luigi handle the rest
    args, luigi_args = parser.parse_known_args()
    
    # Configure environment
    setup_environment(args.env)
    
    if args.dry_run:
        print("DRY RUN: Would execute:")
        print(" ".join(['luigi'] + luigi_args))
        return
    
    # Execute Luigi with remaining arguments
    sys.argv = ['luigi'] + luigi_args
    result = luigi_run()
    
    # Send notification if requested
    if args.notify:
        send_completion_notification(args.notify, result)
    
    return result

def setup_environment(env: str):
    """Configure Luigi for specific environment."""
    config = get_config()
    
    if env == 'prod':
        config.set('core', 'scheduler_host', 'prod-scheduler.example.com')
        config.set('core', 'log_level', 'WARNING')
    elif env == 'staging':
        config.set('core', 'scheduler_host', 'staging-scheduler.example.com')
        config.set('core', 'log_level', 'INFO')
    else:  # dev
        config.set('core', 'local_scheduler', 'true')
        config.set('core', 'log_level', 'DEBUG')

def send_completion_notification(email: str, result):
    """Send email notification on completion."""
    # Implementation would send email with result status
    print(f"Would send notification to {email}: Status {result.status}")

if __name__ == '__main__':
    custom_luigi_main()

Batch Execution Scripts

#!/bin/bash
# Luigi batch execution script

# Configuration
LUIGI_MODULE="mypackage.tasks" 
SCHEDULER_HOST="scheduler.example.com"
LOG_DIR="/var/log/luigi"
DATE=$(date +%Y-%m-%d)

# Create log directory
mkdir -p "$LOG_DIR"

# Function to run Luigi task with error handling
run_luigi_task() {
    local task_name=$1
    local log_file="$LOG_DIR/${task_name}_${DATE}.log"
    
    echo "Starting $task_name at $(date)"
    
    luigi "$task_name" \
        --module "$LUIGI_MODULE" \
        --scheduler-host "$SCHEDULER_HOST" \
        --workers 2 \
        --timeout 7200 \
        >> "$log_file" 2>&1
    
    local exit_code=$?
    
    if [ $exit_code -eq 0 ]; then
        echo "✓ $task_name completed successfully"
    else
        echo "✗ $task_name failed with exit code $exit_code"
        echo "Check log file: $log_file"
    fi
    
    return $exit_code
}

# Execute daily tasks
echo "Starting daily Luigi workflow for $DATE"

run_luigi_task "DataIngestionTask --date $DATE"
run_luigi_task "ProcessingTask --date $DATE" 
run_luigi_task "ReportGenerationTask --date $DATE"

echo "Daily workflow completed at $(date)"

Configuration Management

#!/bin/bash
# Luigi configuration management script

LUIGI_CONFIG_DIR="/etc/luigi"
LUIGI_ENV="${LUIGI_ENV:-development}"

# Function to switch Luigi configuration
switch_config() {
    local env=$1
    local config_file="$LUIGI_CONFIG_DIR/luigi-$env.cfg"
    
    if [ ! -f "$config_file" ]; then
        echo "Configuration file not found: $config_file"
        exit 1
    fi
    
    # Link active configuration
    ln -sf "$config_file" "$LUIGI_CONFIG_DIR/luigi.cfg"
    echo "Switched to $env configuration"
}

# Function to validate configuration
validate_config() {
    echo "Validating Luigi configuration..."
    
    # Test scheduler connection
    if luigi --help > /dev/null 2>&1; then
        echo "✓ Luigi CLI is working"
    else
        echo "✗ Luigi CLI not working"
        exit 1
    fi
    
    # Test configuration parsing
    python3 -c "
import luigi.configuration
config = luigi.configuration.get_config()
print(f'Scheduler: {config.get(\"core\", \"default_scheduler_host\", fallback=\"localhost\")}')
print(f'Port: {config.getint(\"core\", \"default_scheduler_port\", fallback=8082)}')
"
}

# Main command handling
case "$1" in
    switch)
        switch_config "$2"
        ;;
    validate)
        validate_config
        ;;
    *)
        echo "Usage: $0 {switch|validate} [environment]"
        echo "Environments: development, staging, production"
        exit 1
        ;;
esac

Monitoring and Health Checks

#!/bin/bash
# Luigi health check and monitoring script

SCHEDULER_HOST="${LUIGI_SCHEDULER_HOST:-localhost}"
SCHEDULER_PORT="${LUIGI_SCHEDULER_PORT:-8082}"
HEALTH_CHECK_URL="http://$SCHEDULER_HOST:$SCHEDULER_PORT/api/graph"

# Function to check scheduler health
check_scheduler_health() {
    echo "Checking Luigi scheduler health..."
    
    if curl -s --connect-timeout 5 "$HEALTH_CHECK_URL" > /dev/null; then
        echo "✓ Scheduler is responding"
        return 0
    else
        echo "✗ Scheduler is not responding"
        return 1
    fi
}

# Function to get scheduler statistics  
get_scheduler_stats() {
    echo "Getting scheduler statistics..."
    
    local stats=$(curl -s "$HEALTH_CHECK_URL" | python3 -c "
import sys, json
try:
    data = json.load(sys.stdin)
    nodes = data.get('response', {}).get('nodes', [])
    print(f'Total tasks: {len(nodes)}')
    
    # Count by status
    status_counts = {}
    for node in nodes:
        status = node.get('status', 'UNKNOWN')
        status_counts[status] = status_counts.get(status, 0) + 1
    
    for status, count in status_counts.items():
        print(f'{status}: {count}')
        
except Exception as e:
    print(f'Error parsing response: {e}')
")
    
    echo "$stats"
}

# Function to restart scheduler if unhealthy
restart_scheduler_if_needed() {
    if ! check_scheduler_health; then
        echo "Attempting to restart scheduler..."
        
        # Kill existing scheduler
        pkill -f luigid
        sleep 5
        
        # Start new scheduler
        luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi
        sleep 10
        
        # Check if restart was successful
        if check_scheduler_health; then
            echo "✓ Scheduler restarted successfully"
        else
            echo "✗ Scheduler restart failed"
            exit 1
        fi
    fi
}

# Main execution
case "$1" in
    health)
        check_scheduler_health
        ;;
    stats)
        get_scheduler_stats
        ;;
    restart)
        restart_scheduler_if_needed
        ;;
    monitor)
        # Continuous monitoring loop
        while true; do
            echo "=== $(date) ==="
            check_scheduler_health && get_scheduler_stats
            echo
            sleep 60
        done
        ;;
    *)
        echo "Usage: $0 {health|stats|restart|monitor}"
        exit 1
        ;;
esac

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json