Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's scheduler and RPC system coordinates task execution across multiple workers with centralized scheduling, dependency resolution, and distributed coordination capabilities.
Client for communicating with Luigi's central scheduler daemon for distributed task execution and coordination.
class RemoteScheduler:
"""Client for remote Luigi scheduler."""
def __init__(self, host: str = 'localhost', port: int = 8082,
connect_timeout: int = None):
"""
Initialize remote scheduler client.
Args:
host: Scheduler host address
port: Scheduler port number
connect_timeout: Connection timeout in seconds
"""
def add_task(self, task_id: str, status: str = 'PENDING',
runnable: bool = True, priority: int = 0,
family: str = '', module: str = '', params: dict = None,
assistant: bool = False, tracking_url: str = None,
worker: str = None, batchable: bool = None,
retry_policy_dict: dict = None, owners: list = None,
**kwargs) -> dict:
"""
Add task to scheduler.
Args:
task_id: Unique task identifier
status: Task status (PENDING, RUNNING, DONE, FAILED)
runnable: Whether task can be run
priority: Task priority (higher = more priority)
family: Task family name
module: Task module name
params: Task parameters
assistant: Whether task is assistant-generated
tracking_url: URL for task tracking
worker: Worker identifier
batchable: Whether task can be batched
retry_policy_dict: Retry policy configuration
owners: List of task owners
Returns:
dict: Scheduler response
"""
def get_work(self, worker: str, host: str = '',
assistant: bool = False, current_tasks: list = None) -> dict:
"""
Get work assignment from scheduler.
Args:
worker: Worker identifier
host: Worker host
assistant: Whether worker is assistant
current_tasks: Currently running tasks
Returns:
dict: Work assignment response
"""
def ping(self, worker: str, current_tasks: list = None) -> dict:
"""
Send heartbeat ping to scheduler.
Args:
worker: Worker identifier
current_tasks: Currently running tasks
Returns:
dict: Ping response
"""
def count_pending(self, worker: str) -> dict:
"""
Get count of pending tasks.
Args:
worker: Worker identifier
Returns:
dict: Pending task count
"""
def graph(self) -> dict:
"""
Get dependency graph from scheduler.
Returns:
dict: Task dependency graph
"""
def dep_graph(self, task_id: str) -> dict:
"""
Get dependency graph for specific task.
Args:
task_id: Task identifier
Returns:
dict: Task dependency graph
"""
def task_list(self, status: str = '', upstream_status: str = '',
limit: int = True, search: str = None) -> dict:
"""
Get list of tasks from scheduler.
Args:
status: Filter by task status
upstream_status: Filter by upstream task status
limit: Limit number of results
search: Search term for task filtering
Returns:
dict: Task list response
"""
def fetch_error(self, task_id: str) -> dict:
"""
Fetch error details for failed task.
Args:
task_id: Task identifier
Returns:
dict: Error details
"""Low-level RPC communication classes for scheduler-worker interaction.
class URLLibFetcher:
"""HTTP fetcher using urllib for RPC communication."""
def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:
"""
Fetch data via HTTP request.
Args:
full_url: Complete URL for request
body: Request body data
timeout: Request timeout
Returns:
bytes: Response data
"""
class RequestsFetcher:
"""HTTP fetcher using requests library for RPC communication."""
def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:
"""Fetch data using requests library."""
class RPCError(Exception):
"""Exception for RPC communication errors."""
def __init__(self, message: str, sub_exception: Exception = None):
"""
Initialize RPC error.
Args:
message: Error message
sub_exception: Underlying exception
"""Configuration options for the Luigi scheduler daemon.
class scheduler:
"""Scheduler configuration section."""
record_task_history: bool = False
"""Whether to record task execution history."""
state_path: str = ''
"""Path to scheduler state persistence file."""
remove_delay: int = 600
"""Seconds to wait before removing completed tasks."""
worker_disconnect_delay: int = 60
"""Seconds to wait before considering worker disconnected."""
disable_window: int = 3600
"""Time window for disabling failed tasks (seconds)."""
retry_delay: int = 900
"""Delay before retrying failed tasks (seconds)."""
disable_hard_timeout: int = 999999999
"""Hard timeout for task disabling (seconds)."""
max_shown_tasks: int = 100000
"""Maximum tasks to show in web interface."""
max_graph_nodes: int = 100000
"""Maximum nodes in dependency graph visualization."""
prune_on_get_work: bool = True
"""Whether to prune completed tasks when getting work."""
record_task_history: bool = False
"""Whether to maintain task execution history."""
pause_enabled: bool = True
"""Whether task pausing is enabled."""import luigi
from luigi.rpc import RemoteScheduler
# Connect to remote scheduler
scheduler = RemoteScheduler(host='scheduler.example.com', port=8082)
class RemoteTask(luigi.Task):
"""Task that runs via remote scheduler."""
task_id = luigi.Parameter()
def output(self):
return luigi.LocalTarget(f"output_{self.task_id}.txt")
def run(self):
# This task will be coordinated by remote scheduler
with self.output().open('w') as f:
f.write(f"Task {self.task_id} completed")
# Run tasks with remote scheduler
if __name__ == '__main__':
# This will use remote scheduler automatically
luigi.build([RemoteTask(task_id="example")], local_scheduler=False)import luigi
from luigi.rpc import RemoteScheduler, RPCError
import time
def monitor_scheduler():
"""Monitor scheduler health and status."""
scheduler = RemoteScheduler()
worker_id = "monitoring_worker"
while True:
try:
# Send ping to check scheduler health
response = scheduler.ping(worker=worker_id)
if response.get('response') == 'ok':
print("✓ Scheduler is healthy")
# Get pending task count
pending = scheduler.count_pending(worker=worker_id)
print(f"Pending tasks: {pending.get('n_pending_tasks', 0)}")
# Get current task list
tasks = scheduler.task_list(limit=10)
print(f"Total tasks in scheduler: {len(tasks.get('response', []))}")
else:
print("✗ Scheduler ping failed")
except RPCError as e:
print(f"✗ RPC Error: {e}")
except Exception as e:
print(f"✗ Unexpected error: {e}")
time.sleep(30) # Check every 30 seconds
if __name__ == '__main__':
monitor_scheduler()import luigi
from luigi.rpc import RemoteScheduler
import json
class InspectDependencies(luigi.Task):
"""Task to inspect dependency relationships."""
target_task_id = luigi.Parameter()
def output(self):
return luigi.LocalTarget(f"deps_{self.target_task_id}.json")
def run(self):
scheduler = RemoteScheduler()
try:
# Get dependency graph for specific task
dep_graph = scheduler.dep_graph(self.target_task_id)
# Get full scheduler graph for context
full_graph = scheduler.graph()
# Analyze dependencies
analysis = {
'target_task': self.target_task_id,
'dependencies': dep_graph,
'graph_stats': {
'total_nodes': len(full_graph.get('response', {}).get('nodes', [])),
'total_edges': len(full_graph.get('response', {}).get('edges', []))
}
}
# Save analysis
with self.output().open('w') as f:
json.dump(analysis, f, indent=2)
except Exception as e:
print(f"Error inspecting dependencies: {e}")
# Create empty result file
with self.output().open('w') as f:
json.dump({'error': str(e)}, f)import luigi
from luigi.rpc import RemoteScheduler
from luigi.worker import Worker
import time
import logging
class CustomWorker:
"""Custom worker implementation with enhanced monitoring."""
def __init__(self, scheduler_host='localhost', scheduler_port=8082):
self.scheduler = RemoteScheduler(host=scheduler_host, port=scheduler_port)
self.worker_id = f"custom_worker_{int(time.time())}"
self.running = True
self.current_tasks = []
# Configure logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(f'worker.{self.worker_id}')
def run(self):
"""Main worker loop."""
self.logger.info(f"Starting worker {self.worker_id}")
while self.running:
try:
# Get work from scheduler
work_response = self.scheduler.get_work(
worker=self.worker_id,
current_tasks=self.current_tasks
)
if work_response.get('n_pending_tasks', 0) > 0:
task_id = work_response.get('task_id')
if task_id:
self.logger.info(f"Received task: {task_id}")
self.execute_task(task_id)
else:
self.logger.debug("No pending tasks")
# Send heartbeat
self.scheduler.ping(
worker=self.worker_id,
current_tasks=self.current_tasks
)
time.sleep(5) # Poll every 5 seconds
except Exception as e:
self.logger.error(f"Worker error: {e}")
time.sleep(10) # Wait longer on error
def execute_task(self, task_id: str):
"""Execute a task received from scheduler."""
self.current_tasks.append(task_id)
try:
self.logger.info(f"Executing task: {task_id}")
# Here you would implement actual task execution
# For this example, we'll simulate work
time.sleep(2)
self.logger.info(f"Task completed: {task_id}")
except Exception as e:
self.logger.error(f"Task failed: {task_id} - {e}")
finally:
self.current_tasks.remove(task_id)
def stop(self):
"""Stop the worker."""
self.running = False
self.logger.info(f"Stopping worker {self.worker_id}")
# Usage
if __name__ == '__main__':
worker = CustomWorker()
try:
worker.run()
except KeyboardInterrupt:
worker.stop()import luigi
from luigi.rpc import RemoteScheduler, RPCError
class RobustSchedulerTask(luigi.Task):
"""Task with robust scheduler error handling."""
def run(self):
scheduler = RemoteScheduler()
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Try to communicate with scheduler
response = scheduler.ping(worker="robust_worker")
if response.get('response') == 'ok':
print("Scheduler connection successful")
break
except RPCError as e:
retry_count += 1
print(f"RPC Error (attempt {retry_count}/{max_retries}): {e}")
if retry_count >= max_retries:
print("Max retries exceeded, falling back to local execution")
# Fallback to local processing
self.local_fallback()
return
time.sleep(2 ** retry_count) # Exponential backoff
# Normal task processing
with self.output().open('w') as f:
f.write("Task completed with scheduler coordination")
def local_fallback(self):
"""Fallback execution when scheduler is unavailable."""
print("Executing in local fallback mode")
with self.output().open('w') as f:
f.write("Task completed in local fallback mode")
def output(self):
return luigi.LocalTarget("robust_output.txt")# luigi.cfg
"""
[scheduler]
# Enable task history recording
record_task_history = true
# Set state persistence file
state_path = /var/lib/luigi/scheduler.state
# Configure cleanup timings
remove_delay = 300
worker_disconnect_delay = 30
retry_delay = 300
# Configure UI limits
max_shown_tasks = 50000
max_graph_nodes = 10000
# Enable task pausing
pause_enabled = true
[core]
# Remote scheduler configuration
default_scheduler_host = scheduler.company.com
default_scheduler_port = 8082
rpc_connect_timeout = 15
rpc_retry_attempts = 5
rpc_retry_wait = 10
"""
import luigi
from luigi.configuration import get_config
class ConfiguredSchedulerTask(luigi.Task):
"""Task that uses scheduler configuration."""
def run(self):
config = get_config()
# Read scheduler configuration
scheduler_host = config.get('core', 'default_scheduler_host',
fallback='localhost')
scheduler_port = config.getint('core', 'default_scheduler_port',
fallback=8082)
print(f"Using scheduler: {scheduler_host}:{scheduler_port}")
# Task execution logic
with self.output().open('w') as f:
f.write(f"Configured for scheduler {scheduler_host}:{scheduler_port}")
def output(self):
return luigi.LocalTarget("configured_output.txt")Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10