CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

scheduler.mddocs/

Scheduler & RPC

Luigi's scheduler and RPC system coordinates task execution across multiple workers with centralized scheduling, dependency resolution, and distributed coordination capabilities.

Capabilities

Remote Scheduler Client

Client for communicating with Luigi's central scheduler daemon for distributed task execution and coordination.

class RemoteScheduler:
    """Client for remote Luigi scheduler."""
    
    def __init__(self, host: str = 'localhost', port: int = 8082, 
                 connect_timeout: int = None):
        """
        Initialize remote scheduler client.
        
        Args:
            host: Scheduler host address
            port: Scheduler port number
            connect_timeout: Connection timeout in seconds
        """
    
    def add_task(self, task_id: str, status: str = 'PENDING', 
                 runnable: bool = True, priority: int = 0,
                 family: str = '', module: str = '', params: dict = None,
                 assistant: bool = False, tracking_url: str = None,
                 worker: str = None, batchable: bool = None,
                 retry_policy_dict: dict = None, owners: list = None,
                 **kwargs) -> dict:
        """
        Add task to scheduler.
        
        Args:
            task_id: Unique task identifier
            status: Task status (PENDING, RUNNING, DONE, FAILED)
            runnable: Whether task can be run
            priority: Task priority (higher = more priority)
            family: Task family name
            module: Task module name
            params: Task parameters
            assistant: Whether task is assistant-generated
            tracking_url: URL for task tracking
            worker: Worker identifier
            batchable: Whether task can be batched
            retry_policy_dict: Retry policy configuration
            owners: List of task owners
            
        Returns:
            dict: Scheduler response
        """
    
    def get_work(self, worker: str, host: str = '', 
                 assistant: bool = False, current_tasks: list = None) -> dict:
        """
        Get work assignment from scheduler.
        
        Args:
            worker: Worker identifier
            host: Worker host
            assistant: Whether worker is assistant
            current_tasks: Currently running tasks
            
        Returns:
            dict: Work assignment response
        """
    
    def ping(self, worker: str, current_tasks: list = None) -> dict:
        """
        Send heartbeat ping to scheduler.
        
        Args:
            worker: Worker identifier
            current_tasks: Currently running tasks
            
        Returns:
            dict: Ping response
        """
    
    def count_pending(self, worker: str) -> dict:
        """
        Get count of pending tasks.
        
        Args:
            worker: Worker identifier
            
        Returns:
            dict: Pending task count
        """
    
    def graph(self) -> dict:
        """
        Get dependency graph from scheduler.
        
        Returns:
            dict: Task dependency graph
        """
    
    def dep_graph(self, task_id: str) -> dict:
        """
        Get dependency graph for specific task.
        
        Args:
            task_id: Task identifier
            
        Returns:
            dict: Task dependency graph
        """
    
    def task_list(self, status: str = '', upstream_status: str = '',
                  limit: int = True, search: str = None) -> dict:
        """
        Get list of tasks from scheduler.
        
        Args:
            status: Filter by task status
            upstream_status: Filter by upstream task status
            limit: Limit number of results
            search: Search term for task filtering
            
        Returns:
            dict: Task list response
        """
    
    def fetch_error(self, task_id: str) -> dict:
        """
        Fetch error details for failed task.
        
        Args:
            task_id: Task identifier
            
        Returns:
            dict: Error details
        """

RPC Communication

Low-level RPC communication classes for scheduler-worker interaction.

class URLLibFetcher:
    """HTTP fetcher using urllib for RPC communication."""
    
    def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:
        """
        Fetch data via HTTP request.
        
        Args:
            full_url: Complete URL for request
            body: Request body data
            timeout: Request timeout
            
        Returns:
            bytes: Response data
        """

class RequestsFetcher:
    """HTTP fetcher using requests library for RPC communication."""
    
    def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:
        """Fetch data using requests library."""

class RPCError(Exception):
    """Exception for RPC communication errors."""
    
    def __init__(self, message: str, sub_exception: Exception = None):
        """
        Initialize RPC error.
        
        Args:
            message: Error message
            sub_exception: Underlying exception
        """

Scheduler Configuration

Configuration options for the Luigi scheduler daemon.

class scheduler:
    """Scheduler configuration section."""
    
    record_task_history: bool = False
    """Whether to record task execution history."""
    
    state_path: str = ''
    """Path to scheduler state persistence file."""
    
    remove_delay: int = 600
    """Seconds to wait before removing completed tasks."""
    
    worker_disconnect_delay: int = 60
    """Seconds to wait before considering worker disconnected."""
    
    disable_window: int = 3600
    """Time window for disabling failed tasks (seconds)."""
    
    retry_delay: int = 900
    """Delay before retrying failed tasks (seconds)."""
    
    disable_hard_timeout: int = 999999999
    """Hard timeout for task disabling (seconds)."""
    
    max_shown_tasks: int = 100000
    """Maximum tasks to show in web interface."""
    
    max_graph_nodes: int = 100000
    """Maximum nodes in dependency graph visualization."""
    
    prune_on_get_work: bool = True
    """Whether to prune completed tasks when getting work."""
    
    record_task_history: bool = False
    """Whether to maintain task execution history."""
    
    pause_enabled: bool = True
    """Whether task pausing is enabled."""

Usage Examples

Basic Remote Scheduler Usage

import luigi
from luigi.rpc import RemoteScheduler

# Connect to remote scheduler
scheduler = RemoteScheduler(host='scheduler.example.com', port=8082)

class RemoteTask(luigi.Task):
    """Task that runs via remote scheduler."""
    
    task_id = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget(f"output_{self.task_id}.txt")
    
    def run(self):
        # This task will be coordinated by remote scheduler
        with self.output().open('w') as f:
            f.write(f"Task {self.task_id} completed")

# Run tasks with remote scheduler
if __name__ == '__main__':
    # This will use remote scheduler automatically
    luigi.build([RemoteTask(task_id="example")], local_scheduler=False)

Scheduler Health Monitoring

import luigi
from luigi.rpc import RemoteScheduler, RPCError
import time

def monitor_scheduler():
    """Monitor scheduler health and status."""
    
    scheduler = RemoteScheduler()
    worker_id = "monitoring_worker"
    
    while True:
        try:
            # Send ping to check scheduler health
            response = scheduler.ping(worker=worker_id)
            
            if response.get('response') == 'ok':
                print("✓ Scheduler is healthy")
                
                # Get pending task count
                pending = scheduler.count_pending(worker=worker_id)
                print(f"Pending tasks: {pending.get('n_pending_tasks', 0)}")
                
                # Get current task list
                tasks = scheduler.task_list(limit=10)
                print(f"Total tasks in scheduler: {len(tasks.get('response', []))}")
                
            else:
                print("✗ Scheduler ping failed")
                
        except RPCError as e:
            print(f"✗ RPC Error: {e}")
        except Exception as e:
            print(f"✗ Unexpected error: {e}")
            
        time.sleep(30)  # Check every 30 seconds

if __name__ == '__main__':
    monitor_scheduler()

Task Dependency Graph Inspection

import luigi
from luigi.rpc import RemoteScheduler
import json

class InspectDependencies(luigi.Task):
    """Task to inspect dependency relationships."""
    
    target_task_id = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget(f"deps_{self.target_task_id}.json")
    
    def run(self):
        scheduler = RemoteScheduler()
        
        try:
            # Get dependency graph for specific task
            dep_graph = scheduler.dep_graph(self.target_task_id)
            
            # Get full scheduler graph for context
            full_graph = scheduler.graph()
            
            # Analyze dependencies
            analysis = {
                'target_task': self.target_task_id,
                'dependencies': dep_graph,
                'graph_stats': {
                    'total_nodes': len(full_graph.get('response', {}).get('nodes', [])),
                    'total_edges': len(full_graph.get('response', {}).get('edges', []))
                }
            }
            
            # Save analysis
            with self.output().open('w') as f:
                json.dump(analysis, f, indent=2)
                
        except Exception as e:
            print(f"Error inspecting dependencies: {e}")
            # Create empty result file
            with self.output().open('w') as f:
                json.dump({'error': str(e)}, f)

Custom Worker Implementation

import luigi
from luigi.rpc import RemoteScheduler
from luigi.worker import Worker
import time
import logging

class CustomWorker:
    """Custom worker implementation with enhanced monitoring."""
    
    def __init__(self, scheduler_host='localhost', scheduler_port=8082):
        self.scheduler = RemoteScheduler(host=scheduler_host, port=scheduler_port)
        self.worker_id = f"custom_worker_{int(time.time())}"
        self.running = True
        self.current_tasks = []
        
        # Configure logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(f'worker.{self.worker_id}')
        
    def run(self):
        """Main worker loop."""
        
        self.logger.info(f"Starting worker {self.worker_id}")
        
        while self.running:
            try:
                # Get work from scheduler
                work_response = self.scheduler.get_work(
                    worker=self.worker_id,
                    current_tasks=self.current_tasks
                )
                
                if work_response.get('n_pending_tasks', 0) > 0:
                    task_id = work_response.get('task_id')
                    
                    if task_id:
                        self.logger.info(f"Received task: {task_id}")
                        self.execute_task(task_id)
                else:
                    self.logger.debug("No pending tasks")
                
                # Send heartbeat
                self.scheduler.ping(
                    worker=self.worker_id,
                    current_tasks=self.current_tasks
                )
                
                time.sleep(5)  # Poll every 5 seconds
                
            except Exception as e:
                self.logger.error(f"Worker error: {e}")
                time.sleep(10)  # Wait longer on error
    
    def execute_task(self, task_id: str):
        """Execute a task received from scheduler."""
        
        self.current_tasks.append(task_id)
        
        try:
            self.logger.info(f"Executing task: {task_id}")
            
            # Here you would implement actual task execution
            # For this example, we'll simulate work
            time.sleep(2)
            
            self.logger.info(f"Task completed: {task_id}")
            
        except Exception as e:
            self.logger.error(f"Task failed: {task_id} - {e}")
            
        finally:
            self.current_tasks.remove(task_id)
    
    def stop(self):
        """Stop the worker."""
        self.running = False
        self.logger.info(f"Stopping worker {self.worker_id}")

# Usage
if __name__ == '__main__':
    worker = CustomWorker()
    try:
        worker.run()
    except KeyboardInterrupt:
        worker.stop()

Scheduler Error Handling

import luigi
from luigi.rpc import RemoteScheduler, RPCError

class RobustSchedulerTask(luigi.Task):
    """Task with robust scheduler error handling."""
    
    def run(self):
        scheduler = RemoteScheduler()
        max_retries = 3
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                # Try to communicate with scheduler
                response = scheduler.ping(worker="robust_worker")
                
                if response.get('response') == 'ok':
                    print("Scheduler connection successful")
                    break
                    
            except RPCError as e:
                retry_count += 1
                print(f"RPC Error (attempt {retry_count}/{max_retries}): {e}")
                
                if retry_count >= max_retries:
                    print("Max retries exceeded, falling back to local execution")
                    # Fallback to local processing
                    self.local_fallback()
                    return
                    
                time.sleep(2 ** retry_count)  # Exponential backoff
                
        # Normal task processing
        with self.output().open('w') as f:
            f.write("Task completed with scheduler coordination")
    
    def local_fallback(self):
        """Fallback execution when scheduler is unavailable."""
        print("Executing in local fallback mode")
        
        with self.output().open('w') as f:
            f.write("Task completed in local fallback mode")
    
    def output(self):
        return luigi.LocalTarget("robust_output.txt")

Scheduler Configuration Example

# luigi.cfg
"""
[scheduler]
# Enable task history recording
record_task_history = true

# Set state persistence file
state_path = /var/lib/luigi/scheduler.state

# Configure cleanup timings
remove_delay = 300
worker_disconnect_delay = 30
retry_delay = 300

# Configure UI limits
max_shown_tasks = 50000  
max_graph_nodes = 10000

# Enable task pausing
pause_enabled = true

[core]
# Remote scheduler configuration
default_scheduler_host = scheduler.company.com  
default_scheduler_port = 8082
rpc_connect_timeout = 15
rpc_retry_attempts = 5
rpc_retry_wait = 10
"""

import luigi
from luigi.configuration import get_config

class ConfiguredSchedulerTask(luigi.Task):
    """Task that uses scheduler configuration."""
    
    def run(self):
        config = get_config()
        
        # Read scheduler configuration
        scheduler_host = config.get('core', 'default_scheduler_host', 
                                   fallback='localhost')
        scheduler_port = config.getint('core', 'default_scheduler_port', 
                                       fallback=8082)
        
        print(f"Using scheduler: {scheduler_host}:{scheduler_port}")
        
        # Task execution logic
        with self.output().open('w') as f:
            f.write(f"Configured for scheduler {scheduler_host}:{scheduler_port}")
    
    def output(self):
        return luigi.LocalTarget("configured_output.txt")

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json