or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

scheduler.mddocs/

0

# Scheduler & RPC

1

2

Luigi's scheduler and RPC system coordinates task execution across multiple workers with centralized scheduling, dependency resolution, and distributed coordination capabilities.

3

4

## Capabilities

5

6

### Remote Scheduler Client

7

8

Client for communicating with Luigi's central scheduler daemon for distributed task execution and coordination.

9

10

```python { .api }

11

class RemoteScheduler:

12

"""Client for remote Luigi scheduler."""

13

14

def __init__(self, host: str = 'localhost', port: int = 8082,

15

connect_timeout: int = None):

16

"""

17

Initialize remote scheduler client.

18

19

Args:

20

host: Scheduler host address

21

port: Scheduler port number

22

connect_timeout: Connection timeout in seconds

23

"""

24

25

def add_task(self, task_id: str, status: str = 'PENDING',

26

runnable: bool = True, priority: int = 0,

27

family: str = '', module: str = '', params: dict = None,

28

assistant: bool = False, tracking_url: str = None,

29

worker: str = None, batchable: bool = None,

30

retry_policy_dict: dict = None, owners: list = None,

31

**kwargs) -> dict:

32

"""

33

Add task to scheduler.

34

35

Args:

36

task_id: Unique task identifier

37

status: Task status (PENDING, RUNNING, DONE, FAILED)

38

runnable: Whether task can be run

39

priority: Task priority (higher = more priority)

40

family: Task family name

41

module: Task module name

42

params: Task parameters

43

assistant: Whether task is assistant-generated

44

tracking_url: URL for task tracking

45

worker: Worker identifier

46

batchable: Whether task can be batched

47

retry_policy_dict: Retry policy configuration

48

owners: List of task owners

49

50

Returns:

51

dict: Scheduler response

52

"""

53

54

def get_work(self, worker: str, host: str = '',

55

assistant: bool = False, current_tasks: list = None) -> dict:

56

"""

57

Get work assignment from scheduler.

58

59

Args:

60

worker: Worker identifier

61

host: Worker host

62

assistant: Whether worker is assistant

63

current_tasks: Currently running tasks

64

65

Returns:

66

dict: Work assignment response

67

"""

68

69

def ping(self, worker: str, current_tasks: list = None) -> dict:

70

"""

71

Send heartbeat ping to scheduler.

72

73

Args:

74

worker: Worker identifier

75

current_tasks: Currently running tasks

76

77

Returns:

78

dict: Ping response

79

"""

80

81

def count_pending(self, worker: str) -> dict:

82

"""

83

Get count of pending tasks.

84

85

Args:

86

worker: Worker identifier

87

88

Returns:

89

dict: Pending task count

90

"""

91

92

def graph(self) -> dict:

93

"""

94

Get dependency graph from scheduler.

95

96

Returns:

97

dict: Task dependency graph

98

"""

99

100

def dep_graph(self, task_id: str) -> dict:

101

"""

102

Get dependency graph for specific task.

103

104

Args:

105

task_id: Task identifier

106

107

Returns:

108

dict: Task dependency graph

109

"""

110

111

def task_list(self, status: str = '', upstream_status: str = '',

112

limit: int = True, search: str = None) -> dict:

113

"""

114

Get list of tasks from scheduler.

115

116

Args:

117

status: Filter by task status

118

upstream_status: Filter by upstream task status

119

limit: Limit number of results

120

search: Search term for task filtering

121

122

Returns:

123

dict: Task list response

124

"""

125

126

def fetch_error(self, task_id: str) -> dict:

127

"""

128

Fetch error details for failed task.

129

130

Args:

131

task_id: Task identifier

132

133

Returns:

134

dict: Error details

135

"""

136

```

137

138

### RPC Communication

139

140

Low-level RPC communication classes for scheduler-worker interaction.

141

142

```python { .api }

143

class URLLibFetcher:

144

"""HTTP fetcher using urllib for RPC communication."""

145

146

def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:

147

"""

148

Fetch data via HTTP request.

149

150

Args:

151

full_url: Complete URL for request

152

body: Request body data

153

timeout: Request timeout

154

155

Returns:

156

bytes: Response data

157

"""

158

159

class RequestsFetcher:

160

"""HTTP fetcher using requests library for RPC communication."""

161

162

def fetch(self, full_url: str, body: bytes, timeout: int) -> bytes:

163

"""Fetch data using requests library."""

164

165

class RPCError(Exception):

166

"""Exception for RPC communication errors."""

167

168

def __init__(self, message: str, sub_exception: Exception = None):

169

"""

170

Initialize RPC error.

171

172

Args:

173

message: Error message

174

sub_exception: Underlying exception

175

"""

176

```

177

178

### Scheduler Configuration

179

180

Configuration options for the Luigi scheduler daemon.

181

182

```python { .api }

183

class scheduler:

184

"""Scheduler configuration section."""

185

186

record_task_history: bool = False

187

"""Whether to record task execution history."""

188

189

state_path: str = ''

190

"""Path to scheduler state persistence file."""

191

192

remove_delay: int = 600

193

"""Seconds to wait before removing completed tasks."""

194

195

worker_disconnect_delay: int = 60

196

"""Seconds to wait before considering worker disconnected."""

197

198

disable_window: int = 3600

199

"""Time window for disabling failed tasks (seconds)."""

200

201

retry_delay: int = 900

202

"""Delay before retrying failed tasks (seconds)."""

203

204

disable_hard_timeout: int = 999999999

205

"""Hard timeout for task disabling (seconds)."""

206

207

max_shown_tasks: int = 100000

208

"""Maximum tasks to show in web interface."""

209

210

max_graph_nodes: int = 100000

211

"""Maximum nodes in dependency graph visualization."""

212

213

prune_on_get_work: bool = True

214

"""Whether to prune completed tasks when getting work."""

215

216

record_task_history: bool = False

217

"""Whether to maintain task execution history."""

218

219

pause_enabled: bool = True

220

"""Whether task pausing is enabled."""

221

```

222

223

## Usage Examples

224

225

### Basic Remote Scheduler Usage

226

227

```python

228

import luigi

229

from luigi.rpc import RemoteScheduler

230

231

# Connect to remote scheduler

232

scheduler = RemoteScheduler(host='scheduler.example.com', port=8082)

233

234

class RemoteTask(luigi.Task):

235

"""Task that runs via remote scheduler."""

236

237

task_id = luigi.Parameter()

238

239

def output(self):

240

return luigi.LocalTarget(f"output_{self.task_id}.txt")

241

242

def run(self):

243

# This task will be coordinated by remote scheduler

244

with self.output().open('w') as f:

245

f.write(f"Task {self.task_id} completed")

246

247

# Run tasks with remote scheduler

248

if __name__ == '__main__':

249

# This will use remote scheduler automatically

250

luigi.build([RemoteTask(task_id="example")], local_scheduler=False)

251

```

252

253

### Scheduler Health Monitoring

254

255

```python

256

import luigi

257

from luigi.rpc import RemoteScheduler, RPCError

258

import time

259

260

def monitor_scheduler():

261

"""Monitor scheduler health and status."""

262

263

scheduler = RemoteScheduler()

264

worker_id = "monitoring_worker"

265

266

while True:

267

try:

268

# Send ping to check scheduler health

269

response = scheduler.ping(worker=worker_id)

270

271

if response.get('response') == 'ok':

272

print("✓ Scheduler is healthy")

273

274

# Get pending task count

275

pending = scheduler.count_pending(worker=worker_id)

276

print(f"Pending tasks: {pending.get('n_pending_tasks', 0)}")

277

278

# Get current task list

279

tasks = scheduler.task_list(limit=10)

280

print(f"Total tasks in scheduler: {len(tasks.get('response', []))}")

281

282

else:

283

print("✗ Scheduler ping failed")

284

285

except RPCError as e:

286

print(f"✗ RPC Error: {e}")

287

except Exception as e:

288

print(f"✗ Unexpected error: {e}")

289

290

time.sleep(30) # Check every 30 seconds

291

292

if __name__ == '__main__':

293

monitor_scheduler()

294

```

295

296

### Task Dependency Graph Inspection

297

298

```python

299

import luigi

300

from luigi.rpc import RemoteScheduler

301

import json

302

303

class InspectDependencies(luigi.Task):

304

"""Task to inspect dependency relationships."""

305

306

target_task_id = luigi.Parameter()

307

308

def output(self):

309

return luigi.LocalTarget(f"deps_{self.target_task_id}.json")

310

311

def run(self):

312

scheduler = RemoteScheduler()

313

314

try:

315

# Get dependency graph for specific task

316

dep_graph = scheduler.dep_graph(self.target_task_id)

317

318

# Get full scheduler graph for context

319

full_graph = scheduler.graph()

320

321

# Analyze dependencies

322

analysis = {

323

'target_task': self.target_task_id,

324

'dependencies': dep_graph,

325

'graph_stats': {

326

'total_nodes': len(full_graph.get('response', {}).get('nodes', [])),

327

'total_edges': len(full_graph.get('response', {}).get('edges', []))

328

}

329

}

330

331

# Save analysis

332

with self.output().open('w') as f:

333

json.dump(analysis, f, indent=2)

334

335

except Exception as e:

336

print(f"Error inspecting dependencies: {e}")

337

# Create empty result file

338

with self.output().open('w') as f:

339

json.dump({'error': str(e)}, f)

340

```

341

342

### Custom Worker Implementation

343

344

```python

345

import luigi

346

from luigi.rpc import RemoteScheduler

347

from luigi.worker import Worker

348

import time

349

import logging

350

351

class CustomWorker:

352

"""Custom worker implementation with enhanced monitoring."""

353

354

def __init__(self, scheduler_host='localhost', scheduler_port=8082):

355

self.scheduler = RemoteScheduler(host=scheduler_host, port=scheduler_port)

356

self.worker_id = f"custom_worker_{int(time.time())}"

357

self.running = True

358

self.current_tasks = []

359

360

# Configure logging

361

logging.basicConfig(level=logging.INFO)

362

self.logger = logging.getLogger(f'worker.{self.worker_id}')

363

364

def run(self):

365

"""Main worker loop."""

366

367

self.logger.info(f"Starting worker {self.worker_id}")

368

369

while self.running:

370

try:

371

# Get work from scheduler

372

work_response = self.scheduler.get_work(

373

worker=self.worker_id,

374

current_tasks=self.current_tasks

375

)

376

377

if work_response.get('n_pending_tasks', 0) > 0:

378

task_id = work_response.get('task_id')

379

380

if task_id:

381

self.logger.info(f"Received task: {task_id}")

382

self.execute_task(task_id)

383

else:

384

self.logger.debug("No pending tasks")

385

386

# Send heartbeat

387

self.scheduler.ping(

388

worker=self.worker_id,

389

current_tasks=self.current_tasks

390

)

391

392

time.sleep(5) # Poll every 5 seconds

393

394

except Exception as e:

395

self.logger.error(f"Worker error: {e}")

396

time.sleep(10) # Wait longer on error

397

398

def execute_task(self, task_id: str):

399

"""Execute a task received from scheduler."""

400

401

self.current_tasks.append(task_id)

402

403

try:

404

self.logger.info(f"Executing task: {task_id}")

405

406

# Here you would implement actual task execution

407

# For this example, we'll simulate work

408

time.sleep(2)

409

410

self.logger.info(f"Task completed: {task_id}")

411

412

except Exception as e:

413

self.logger.error(f"Task failed: {task_id} - {e}")

414

415

finally:

416

self.current_tasks.remove(task_id)

417

418

def stop(self):

419

"""Stop the worker."""

420

self.running = False

421

self.logger.info(f"Stopping worker {self.worker_id}")

422

423

# Usage

424

if __name__ == '__main__':

425

worker = CustomWorker()

426

try:

427

worker.run()

428

except KeyboardInterrupt:

429

worker.stop()

430

```

431

432

### Scheduler Error Handling

433

434

```python

435

import luigi

436

from luigi.rpc import RemoteScheduler, RPCError

437

438

class RobustSchedulerTask(luigi.Task):

439

"""Task with robust scheduler error handling."""

440

441

def run(self):

442

scheduler = RemoteScheduler()

443

max_retries = 3

444

retry_count = 0

445

446

while retry_count < max_retries:

447

try:

448

# Try to communicate with scheduler

449

response = scheduler.ping(worker="robust_worker")

450

451

if response.get('response') == 'ok':

452

print("Scheduler connection successful")

453

break

454

455

except RPCError as e:

456

retry_count += 1

457

print(f"RPC Error (attempt {retry_count}/{max_retries}): {e}")

458

459

if retry_count >= max_retries:

460

print("Max retries exceeded, falling back to local execution")

461

# Fallback to local processing

462

self.local_fallback()

463

return

464

465

time.sleep(2 ** retry_count) # Exponential backoff

466

467

# Normal task processing

468

with self.output().open('w') as f:

469

f.write("Task completed with scheduler coordination")

470

471

def local_fallback(self):

472

"""Fallback execution when scheduler is unavailable."""

473

print("Executing in local fallback mode")

474

475

with self.output().open('w') as f:

476

f.write("Task completed in local fallback mode")

477

478

def output(self):

479

return luigi.LocalTarget("robust_output.txt")

480

```

481

482

### Scheduler Configuration Example

483

484

```python

485

# luigi.cfg

486

"""

487

[scheduler]

488

# Enable task history recording

489

record_task_history = true

490

491

# Set state persistence file

492

state_path = /var/lib/luigi/scheduler.state

493

494

# Configure cleanup timings

495

remove_delay = 300

496

worker_disconnect_delay = 30

497

retry_delay = 300

498

499

# Configure UI limits

500

max_shown_tasks = 50000

501

max_graph_nodes = 10000

502

503

# Enable task pausing

504

pause_enabled = true

505

506

[core]

507

# Remote scheduler configuration

508

default_scheduler_host = scheduler.company.com

509

default_scheduler_port = 8082

510

rpc_connect_timeout = 15

511

rpc_retry_attempts = 5

512

rpc_retry_wait = 10

513

"""

514

515

import luigi

516

from luigi.configuration import get_config

517

518

class ConfiguredSchedulerTask(luigi.Task):

519

"""Task that uses scheduler configuration."""

520

521

def run(self):

522

config = get_config()

523

524

# Read scheduler configuration

525

scheduler_host = config.get('core', 'default_scheduler_host',

526

fallback='localhost')

527

scheduler_port = config.getint('core', 'default_scheduler_port',

528

fallback=8082)

529

530

print(f"Using scheduler: {scheduler_host}:{scheduler_port}")

531

532

# Task execution logic

533

with self.output().open('w') as f:

534

f.write(f"Configured for scheduler {scheduler_host}:{scheduler_port}")

535

536

def output(self):

537

return luigi.LocalTarget("configured_output.txt")

538

```