0
# Scheduler & RPC
1
2
Luigi's scheduler and RPC system coordinates task execution across multiple workers with centralized scheduling, dependency resolution, and distributed coordination capabilities.
3
4
## Capabilities
5
6
### Remote Scheduler Client
7
8
Client for communicating with Luigi's central scheduler daemon for distributed task execution and coordination.
9
10
```python { .api }
11
class RemoteScheduler:
12
"""Client for remote Luigi scheduler."""
13
14
def __init__(self, host: str = 'localhost', port: int = 8082,
15
connect_timeout: int = None):
16
"""
17
Initialize remote scheduler client.
18
19
Args:
20
host: Scheduler host address
21
port: Scheduler port number
22
connect_timeout: Connection timeout in seconds
23
"""
24
25
def add_task(self, task_id: str, status: str = 'PENDING',
26
runnable: bool = True, priority: int = 0,
27
family: str = '', module: str = '', params: dict = None,
28
assistant: bool = False, tracking_url: str = None,
29
worker: str = None, batchable: bool = None,
30
retry_policy_dict: dict = None, owners: list = None,
31
**kwargs) -> dict:
32
"""
33
Add task to scheduler.
34
35
Args:
36
task_id: Unique task identifier
37
status: Task status (PENDING, RUNNING, DONE, FAILED)
38
runnable: Whether task can be run
39
priority: Task priority (higher = more priority)
40
family: Task family name
41
module: Task module name
42
params: Task parameters
43
assistant: Whether task is assistant-generated
44
tracking_url: URL for task tracking
45
worker: Worker identifier
46
batchable: Whether task can be batched
47
retry_policy_dict: Retry policy configuration
48
owners: List of task owners
49
50
Returns:
51
dict: Scheduler response
52
"""
53
54
def get_work(self, worker: str, host: str = '',
55
assistant: bool = False, current_tasks: list = None) -> dict:
56
"""
57
Get work assignment from scheduler.
58
59
Args:
60
worker: Worker identifier
61
host: Worker host
62
assistant: Whether worker is assistant
63
current_tasks: Currently running tasks
64
65
Returns:
66
dict: Work assignment response
67
"""
68
69
def ping(self, worker: str, current_tasks: list = None) -> dict:
70
"""
71
Send heartbeat ping to scheduler.
72
73
Args:
74
worker: Worker identifier
75
current_tasks: Currently running tasks
76
77
Returns:
78
dict: Ping response
79
"""
80
81
def count_pending(self, worker: str) -> dict:
82
"""
83
Get count of pending tasks.
84
85
Args:
86
worker: Worker identifier
87
88
Returns:
89
dict: Pending task count
90
"""
91
92
def graph(self) -> dict:
93
"""
94
Get dependency graph from scheduler.
95
96
Returns:
97
dict: Task dependency graph
98
"""
99
100
def dep_graph(self, task_id: str) -> dict:
101
"""
102
Get dependency graph for specific task.
103
104
Args:
105
task_id: Task identifier
106
107
Returns:
108
dict: Task dependency graph
109
"""
110
111
def task_list(self, status: str = '', upstream_status: str = '',
112
limit: int = True, search: str = None) -> dict:
113
"""
114
Get list of tasks from scheduler.
115
116
Args:
117
status: Filter by task status
118
upstream_status: Filter by upstream task status
119
limit: Limit number of results
120
search: Search term for task filtering
121
122
Returns:
123
dict: Task list response
124
"""
125
126
def fetch_error(self, task_id: str) -> dict:
127
"""
128
Fetch error details for failed task.
129
130
Args:
131
task_id: Task identifier
132
133
Returns:
134
dict: Error details
135
"""
136
```
137
138
### RPC Communication
139
140
Low-level RPC communication classes for scheduler-worker interaction.
141
142
```python { .api }
143
class URLLibFetcher:
144
"""HTTP fetcher using urllib for RPC communication."""
145
146
def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:
147
"""
148
Fetch data via HTTP request.
149
150
Args:
151
full_url: Complete URL for request
152
body: Request body data
153
timeout: Request timeout
154
155
Returns:
156
bytes: Response data
157
"""
158
159
class RequestsFetcher:
160
"""HTTP fetcher using requests library for RPC communication."""
161
162
def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:
163
"""Fetch data using requests library."""
164
165
class RPCError(Exception):
166
"""Exception for RPC communication errors."""
167
168
def __init__(self, message: str, sub_exception: Exception = None):
169
"""
170
Initialize RPC error.
171
172
Args:
173
message: Error message
174
sub_exception: Underlying exception
175
"""
176
```
177
178
### Scheduler Configuration
179
180
Configuration options for the Luigi scheduler daemon.
181
182
```python { .api }
183
class scheduler:
184
"""Scheduler configuration section."""
185
186
record_task_history: bool = False
187
"""Whether to record task execution history."""
188
189
state_path: str = ''
190
"""Path to scheduler state persistence file."""
191
192
remove_delay: int = 600
193
"""Seconds to wait before removing completed tasks."""
194
195
worker_disconnect_delay: int = 60
196
"""Seconds to wait before considering worker disconnected."""
197
198
disable_window: int = 3600
199
"""Time window for disabling failed tasks (seconds)."""
200
201
retry_delay: int = 900
202
"""Delay before retrying failed tasks (seconds)."""
203
204
disable_hard_timeout: int = 999999999
205
"""Hard timeout for task disabling (seconds)."""
206
207
max_shown_tasks: int = 100000
208
"""Maximum tasks to show in web interface."""
209
210
max_graph_nodes: int = 100000
211
"""Maximum nodes in dependency graph visualization."""
212
213
prune_on_get_work: bool = True
214
"""Whether to prune completed tasks when getting work."""
215
216
record_task_history: bool = False
217
"""Whether to maintain task execution history."""
218
219
pause_enabled: bool = True
220
"""Whether task pausing is enabled."""
221
```
222
223
## Usage Examples
224
225
### Basic Remote Scheduler Usage
226
227
```python
228
import luigi
229
from luigi.rpc import RemoteScheduler
230
231
# Connect to remote scheduler
232
scheduler = RemoteScheduler(host='scheduler.example.com', port=8082)
233
234
class RemoteTask(luigi.Task):
235
"""Task that runs via remote scheduler."""
236
237
task_id = luigi.Parameter()
238
239
def output(self):
240
return luigi.LocalTarget(f"output_{self.task_id}.txt")
241
242
def run(self):
243
# This task will be coordinated by remote scheduler
244
with self.output().open('w') as f:
245
f.write(f"Task {self.task_id} completed")
246
247
# Run tasks with remote scheduler
248
if __name__ == '__main__':
249
# This will use remote scheduler automatically
250
luigi.build([RemoteTask(task_id="example")], local_scheduler=False)
251
```
252
253
### Scheduler Health Monitoring
254
255
```python
256
import luigi
257
from luigi.rpc import RemoteScheduler, RPCError
258
import time
259
260
def monitor_scheduler():
261
"""Monitor scheduler health and status."""
262
263
scheduler = RemoteScheduler()
264
worker_id = "monitoring_worker"
265
266
while True:
267
try:
268
# Send ping to check scheduler health
269
response = scheduler.ping(worker=worker_id)
270
271
if response.get('response') == 'ok':
272
print("✓ Scheduler is healthy")
273
274
# Get pending task count
275
pending = scheduler.count_pending(worker=worker_id)
276
print(f"Pending tasks: {pending.get('n_pending_tasks', 0)}")
277
278
# Get current task list
279
tasks = scheduler.task_list(limit=10)
280
print(f"Total tasks in scheduler: {len(tasks.get('response', []))}")
281
282
else:
283
print("✗ Scheduler ping failed")
284
285
except RPCError as e:
286
print(f"✗ RPC Error: {e}")
287
except Exception as e:
288
print(f"✗ Unexpected error: {e}")
289
290
time.sleep(30) # Check every 30 seconds
291
292
if __name__ == '__main__':
293
monitor_scheduler()
294
```
295
296
### Task Dependency Graph Inspection
297
298
```python
299
import luigi
300
from luigi.rpc import RemoteScheduler
301
import json
302
303
class InspectDependencies(luigi.Task):
304
"""Task to inspect dependency relationships."""
305
306
target_task_id = luigi.Parameter()
307
308
def output(self):
309
return luigi.LocalTarget(f"deps_{self.target_task_id}.json")
310
311
def run(self):
312
scheduler = RemoteScheduler()
313
314
try:
315
# Get dependency graph for specific task
316
dep_graph = scheduler.dep_graph(self.target_task_id)
317
318
# Get full scheduler graph for context
319
full_graph = scheduler.graph()
320
321
# Analyze dependencies
322
analysis = {
323
'target_task': self.target_task_id,
324
'dependencies': dep_graph,
325
'graph_stats': {
326
'total_nodes': len(full_graph.get('response', {}).get('nodes', [])),
327
'total_edges': len(full_graph.get('response', {}).get('edges', []))
328
}
329
}
330
331
# Save analysis
332
with self.output().open('w') as f:
333
json.dump(analysis, f, indent=2)
334
335
except Exception as e:
336
print(f"Error inspecting dependencies: {e}")
337
# Create empty result file
338
with self.output().open('w') as f:
339
json.dump({'error': str(e)}, f)
340
```
341
342
### Custom Worker Implementation
343
344
```python
345
import luigi
346
from luigi.rpc import RemoteScheduler
347
from luigi.worker import Worker
348
import time
349
import logging
350
351
class CustomWorker:
352
"""Custom worker implementation with enhanced monitoring."""
353
354
def __init__(self, scheduler_host='localhost', scheduler_port=8082):
355
self.scheduler = RemoteScheduler(host=scheduler_host, port=scheduler_port)
356
self.worker_id = f"custom_worker_{int(time.time())}"
357
self.running = True
358
self.current_tasks = []
359
360
# Configure logging
361
logging.basicConfig(level=logging.INFO)
362
self.logger = logging.getLogger(f'worker.{self.worker_id}')
363
364
def run(self):
365
"""Main worker loop."""
366
367
self.logger.info(f"Starting worker {self.worker_id}")
368
369
while self.running:
370
try:
371
# Get work from scheduler
372
work_response = self.scheduler.get_work(
373
worker=self.worker_id,
374
current_tasks=self.current_tasks
375
)
376
377
if work_response.get('n_pending_tasks', 0) > 0:
378
task_id = work_response.get('task_id')
379
380
if task_id:
381
self.logger.info(f"Received task: {task_id}")
382
self.execute_task(task_id)
383
else:
384
self.logger.debug("No pending tasks")
385
386
# Send heartbeat
387
self.scheduler.ping(
388
worker=self.worker_id,
389
current_tasks=self.current_tasks
390
)
391
392
time.sleep(5) # Poll every 5 seconds
393
394
except Exception as e:
395
self.logger.error(f"Worker error: {e}")
396
time.sleep(10) # Wait longer on error
397
398
def execute_task(self, task_id: str):
399
"""Execute a task received from scheduler."""
400
401
self.current_tasks.append(task_id)
402
403
try:
404
self.logger.info(f"Executing task: {task_id}")
405
406
# Here you would implement actual task execution
407
# For this example, we'll simulate work
408
time.sleep(2)
409
410
self.logger.info(f"Task completed: {task_id}")
411
412
except Exception as e:
413
self.logger.error(f"Task failed: {task_id} - {e}")
414
415
finally:
416
self.current_tasks.remove(task_id)
417
418
def stop(self):
419
"""Stop the worker."""
420
self.running = False
421
self.logger.info(f"Stopping worker {self.worker_id}")
422
423
# Usage
424
if __name__ == '__main__':
425
worker = CustomWorker()
426
try:
427
worker.run()
428
except KeyboardInterrupt:
429
worker.stop()
430
```
431
432
### Scheduler Error Handling
433
434
```python
435
import luigi
436
from luigi.rpc import RemoteScheduler, RPCError
437
438
class RobustSchedulerTask(luigi.Task):
439
"""Task with robust scheduler error handling."""
440
441
def run(self):
442
scheduler = RemoteScheduler()
443
max_retries = 3
444
retry_count = 0
445
446
while retry_count < max_retries:
447
try:
448
# Try to communicate with scheduler
449
response = scheduler.ping(worker="robust_worker")
450
451
if response.get('response') == 'ok':
452
print("Scheduler connection successful")
453
break
454
455
except RPCError as e:
456
retry_count += 1
457
print(f"RPC Error (attempt {retry_count}/{max_retries}): {e}")
458
459
if retry_count >= max_retries:
460
print("Max retries exceeded, falling back to local execution")
461
# Fallback to local processing
462
self.local_fallback()
463
return
464
465
time.sleep(2 ** retry_count) # Exponential backoff
466
467
# Normal task processing
468
with self.output().open('w') as f:
469
f.write("Task completed with scheduler coordination")
470
471
def local_fallback(self):
472
"""Fallback execution when scheduler is unavailable."""
473
print("Executing in local fallback mode")
474
475
with self.output().open('w') as f:
476
f.write("Task completed in local fallback mode")
477
478
def output(self):
479
return luigi.LocalTarget("robust_output.txt")
480
```
481
482
### Scheduler Configuration Example
483
484
```python
485
# luigi.cfg
486
"""
487
[scheduler]
488
# Enable task history recording
489
record_task_history = true
490
491
# Set state persistence file
492
state_path = /var/lib/luigi/scheduler.state
493
494
# Configure cleanup timings
495
remove_delay = 300
496
worker_disconnect_delay = 30
497
retry_delay = 300
498
499
# Configure UI limits
500
max_shown_tasks = 50000
501
max_graph_nodes = 10000
502
503
# Enable task pausing
504
pause_enabled = true
505
506
[core]
507
# Remote scheduler configuration
508
default_scheduler_host = scheduler.company.com
509
default_scheduler_port = 8082
510
rpc_connect_timeout = 15
511
rpc_retry_attempts = 5
512
rpc_retry_wait = 10
513
"""
514
515
import luigi
516
from luigi.configuration import get_config
517
518
class ConfiguredSchedulerTask(luigi.Task):
519
"""Task that uses scheduler configuration."""
520
521
def run(self):
522
config = get_config()
523
524
# Read scheduler configuration
525
scheduler_host = config.get('core', 'default_scheduler_host',
526
fallback='localhost')
527
scheduler_port = config.getint('core', 'default_scheduler_port',
528
fallback=8082)
529
530
print(f"Using scheduler: {scheduler_host}:{scheduler_port}")
531
532
# Task execution logic
533
with self.output().open('w') as f:
534
f.write(f"Configured for scheduler {scheduler_host}:{scheduler_port}")
535
536
def output(self):
537
return luigi.LocalTarget("configured_output.txt")
538
```