0
# Configuration
1
2
Luigi's configuration system manages settings for tasks, scheduler, and execution behavior through configuration files and environment variables. Configuration provides flexibility for different environments and deployment scenarios.
3
4
## Capabilities
5
6
### Configuration Parser
7
8
Main configuration parser that handles INI-format configuration files with Luigi-specific extensions and parameter resolution.
9
10
```python { .api }
11
def get_config() -> LuigiConfigParser:
12
"""
13
Get the global Luigi configuration parser instance.
14
15
Returns:
16
LuigiConfigParser: Global configuration parser
17
"""
18
19
def add_config_path(path: str):
20
"""
21
Add a configuration file path to the configuration search paths.
22
23
Args:
24
path: Path to configuration file
25
"""
26
27
class LuigiConfigParser:
28
"""
29
Luigi's configuration parser extending ConfigParser with parameter resolution.
30
"""
31
32
def get(self, section: str, option: str, **kwargs):
33
"""
34
Get configuration value with parameter resolution.
35
36
Args:
37
section: Configuration section name
38
option: Configuration option name
39
**kwargs: Additional options (vars, fallback, etc.)
40
41
Returns:
42
Configuration value with parameter substitution
43
"""
44
45
def getint(self, section: str, option: str, **kwargs) -> int:
46
"""Get integer configuration value."""
47
48
def getfloat(self, section: str, option: str, **kwargs) -> float:
49
"""Get float configuration value."""
50
51
def getboolean(self, section: str, option: str, **kwargs) -> bool:
52
"""Get boolean configuration value."""
53
54
def has_option(self, section: str, option: str) -> bool:
55
"""Check if configuration option exists."""
56
57
def has_section(self, section: str) -> bool:
58
"""Check if configuration section exists."""
59
60
def sections(self) -> list:
61
"""Get list of configuration sections."""
62
63
def options(self, section: str) -> list:
64
"""Get list of options in a section."""
65
66
def items(self, section: str) -> list:
67
"""Get list of (option, value) pairs in a section."""
68
69
def set(self, section: str, option: str, value: str):
70
"""Set configuration value."""
71
72
def add_section(self, section: str):
73
"""Add configuration section."""
74
75
def remove_section(self, section: str) -> bool:
76
"""Remove configuration section."""
77
78
def remove_option(self, section: str, option: str) -> bool:
79
"""Remove configuration option."""
80
81
def read(self, filenames):
82
"""Read configuration from file(s)."""
83
84
def read_dict(self, dictionary: dict):
85
"""Read configuration from dictionary."""
86
```
87
88
### TOML Configuration Parser
89
90
Alternative configuration parser that supports TOML format configuration files.
91
92
```python { .api }
93
class LuigiTomlParser:
94
"""
95
TOML configuration parser for Luigi.
96
97
Provides similar interface to LuigiConfigParser but reads TOML files.
98
"""
99
100
def get(self, section: str, option: str, **kwargs):
101
"""Get TOML configuration value."""
102
103
def getint(self, section: str, option: str, **kwargs) -> int:
104
"""Get integer value from TOML configuration."""
105
106
def getfloat(self, section: str, option: str, **kwargs) -> float:
107
"""Get float value from TOML configuration."""
108
109
def getboolean(self, section: str, option: str, **kwargs) -> bool:
110
"""Get boolean value from TOML configuration."""
111
112
def has_option(self, section: str, option: str) -> bool:
113
"""Check if TOML option exists."""
114
115
def has_section(self, section: str) -> bool:
116
"""Check if TOML section exists."""
117
```
118
119
### Base Configuration Parser
120
121
Abstract base class for configuration parsers providing common functionality.
122
123
```python { .api }
124
class BaseParser:
125
"""Base class for configuration parsers."""
126
127
def enabled(self) -> bool:
128
"""Check if parser is enabled and available."""
129
130
def read(self, config_paths: list):
131
"""Read configuration from files."""
132
133
def get(self, section: str, option: str, **kwargs):
134
"""Get configuration value."""
135
136
def getint(self, section: str, option: str, **kwargs) -> int:
137
"""Get integer configuration value."""
138
139
def getfloat(self, section: str, option: str, **kwargs) -> float:
140
"""Get float configuration value."""
141
142
def getboolean(self, section: str, option: str, **kwargs) -> bool:
143
"""Get boolean configuration value."""
144
145
def has_option(self, section: str, option: str) -> bool:
146
"""Check if option exists."""
147
148
def has_section(self, section: str) -> bool:
149
"""Check if section exists."""
150
```
151
152
## Configuration Sections
153
154
Luigi uses several predefined configuration sections for different aspects of the system.
155
156
### Core Configuration
157
158
```python { .api }
159
# [core] section options
160
class CoreConfig:
161
"""Core Luigi configuration options."""
162
163
default_scheduler_host: str = 'localhost'
164
"""Default scheduler host address."""
165
166
default_scheduler_port: int = 8082
167
"""Default scheduler port."""
168
169
scheduler_host: str
170
"""Scheduler host override."""
171
172
scheduler_port: int
173
"""Scheduler port override."""
174
175
rpc_connect_timeout: float = 10.0
176
"""RPC connection timeout in seconds."""
177
178
rpc_retry_attempts: int = 3
179
"""Number of RPC retry attempts."""
180
181
rpc_retry_wait: int = 30
182
"""Wait time between RPC retries."""
183
184
no_configure_logging: bool = False
185
"""Disable Luigi's logging configuration."""
186
187
log_level: str = 'DEBUG'
188
"""Default logging level."""
189
190
logging_conf_file: str
191
"""Path to logging configuration file."""
192
193
parallel_scheduling: bool = False
194
"""Enable parallel task scheduling."""
195
196
assistant: bool = False
197
"""Enable Luigi assistant mode."""
198
199
worker_timeout: int = 0
200
"""Worker timeout in seconds (0 = no timeout)."""
201
202
keep_alive: bool = False
203
"""Keep worker alive after completion."""
204
205
max_reschedules: int = 1
206
"""Maximum task reschedule attempts."""
207
```
208
209
### Worker Configuration
210
211
```python { .api }
212
# [worker] section options
213
class WorkerConfig:
214
"""Worker configuration options."""
215
216
keep_alive: bool = False
217
"""Keep worker process alive."""
218
219
count_uniques: bool = False
220
"""Count unique task failures."""
221
222
count_last_params: bool = False
223
"""Count parameters in recent tasks."""
224
225
worker_timeout: int = 0
226
"""Worker timeout in seconds."""
227
228
timeout: int = 0
229
"""Task timeout in seconds."""
230
231
task_limit: int = None
232
"""Maximum tasks per worker."""
233
234
retry_external_tasks: bool = False
235
"""Retry external task dependencies."""
236
237
no_configure_logging: bool = False
238
"""Disable worker logging configuration."""
239
```
240
241
### Scheduler Configuration
242
243
```python { .api }
244
# [scheduler] section options
245
class SchedulerConfig:
246
"""Scheduler configuration options."""
247
248
record_task_history: bool = False
249
"""Record task execution history."""
250
251
state_path: str
252
"""Path to scheduler state file."""
253
254
remove_delay: int = 600
255
"""Delay before removing completed tasks (seconds)."""
256
257
worker_disconnect_delay: int = 60
258
"""Delay before disconnecting idle workers (seconds)."""
259
260
disable_window: int = 3600
261
"""Window for disabling failed tasks (seconds)."""
262
263
retry_delay: int = 900
264
"""Delay before retrying failed tasks (seconds)."""
265
266
disable_hard_timeout: int = 999999999
267
"""Hard timeout for disabling tasks (seconds)."""
268
269
max_shown_tasks: int = 100000
270
"""Maximum tasks shown in web interface."""
271
272
max_graph_nodes: int = 100000
273
"""Maximum nodes in dependency graph."""
274
```
275
276
## Usage Examples
277
278
### Basic Configuration File
279
280
```ini
281
# luigi.cfg
282
[core]
283
scheduler_host = localhost
284
scheduler_port = 8082
285
log_level = INFO
286
parallel_scheduling = true
287
288
[worker]
289
keep_alive = true
290
timeout = 3600
291
task_limit = 10
292
293
[scheduler]
294
record_task_history = true
295
remove_delay = 300
296
retry_delay = 600
297
298
# Task-specific configuration
299
[MyTask]
300
batch_size = 1000
301
max_retries = 3
302
303
[DatabaseTask]
304
host = localhost
305
port = 5432
306
database = mydb
307
```
308
309
### TOML Configuration File
310
311
```toml
312
# luigi.toml
313
[core]
314
scheduler_host = "localhost"
315
scheduler_port = 8082
316
log_level = "INFO"
317
parallel_scheduling = true
318
319
[worker]
320
keep_alive = true
321
timeout = 3600
322
task_limit = 10
323
324
[scheduler]
325
record_task_history = true
326
remove_delay = 300
327
retry_delay = 600
328
329
[MyTask]
330
batch_size = 1000
331
max_retries = 3
332
```
333
334
### Programmatic Configuration
335
336
```python
337
import luigi
338
from luigi.configuration import get_config, add_config_path
339
340
# Add custom configuration file
341
add_config_path('/path/to/custom/luigi.cfg')
342
343
# Get configuration instance
344
config = get_config()
345
346
# Read configuration values
347
scheduler_host = config.get('core', 'scheduler_host', fallback='localhost')
348
scheduler_port = config.getint('core', 'scheduler_port', fallback=8082)
349
log_level = config.get('core', 'log_level', fallback='INFO')
350
351
print(f"Scheduler: {scheduler_host}:{scheduler_port}")
352
print(f"Log level: {log_level}")
353
354
# Set configuration values programmatically
355
config.set('core', 'parallel_scheduling', 'true')
356
config.set('worker', 'keep_alive', 'true')
357
358
# Check if options exist
359
if config.has_option('MyTask', 'batch_size'):
360
batch_size = config.getint('MyTask', 'batch_size')
361
print(f"Batch size: {batch_size}")
362
```
363
364
### Task-Specific Configuration
365
366
```python
367
import luigi
368
from luigi import Task, Parameter
369
from luigi.configuration import get_config
370
371
class ConfigurableTask(Task):
372
"""Task that reads configuration from config file."""
373
374
# Parameter with config file fallback
375
batch_size = luigi.IntParameter()
376
377
def __init__(self, *args, **kwargs):
378
super().__init__(*args, **kwargs)
379
380
# Read additional config
381
config = get_config()
382
self.timeout = config.getint('ConfigurableTask', 'timeout', fallback=3600)
383
self.retries = config.getint('ConfigurableTask', 'max_retries', fallback=3)
384
385
def output(self):
386
return luigi.LocalTarget(f"output_batch_{self.batch_size}.txt")
387
388
def run(self):
389
print(f"Running with batch_size={self.batch_size}, timeout={self.timeout}, retries={self.retries}")
390
391
with self.output().open('w') as f:
392
f.write(f"Processed with batch size {self.batch_size}")
393
394
# Configuration file would contain:
395
# [ConfigurableTask]
396
# batch_size = 5000
397
# timeout = 7200
398
# max_retries = 5
399
```
400
401
### Environment-Specific Configuration
402
403
```python
404
import luigi
405
import os
406
from luigi.configuration import get_config, add_config_path
407
408
# Load environment-specific configuration
409
env = os.getenv('LUIGI_ENV', 'development')
410
config_file = f'/etc/luigi/luigi-{env}.cfg'
411
412
if os.path.exists(config_file):
413
add_config_path(config_file)
414
415
class EnvironmentTask(Task):
416
"""Task that adapts to different environments."""
417
418
def __init__(self, *args, **kwargs):
419
super().__init__(*args, **kwargs)
420
421
config = get_config()
422
423
# Get environment-specific settings
424
self.database_host = config.get('database', 'host', fallback='localhost')
425
self.database_port = config.getint('database', 'port', fallback=5432)
426
self.cache_enabled = config.getboolean('cache', 'enabled', fallback=False)
427
428
def output(self):
429
return luigi.LocalTarget(f"output_{env}.txt")
430
431
def run(self):
432
print(f"Environment: {env}")
433
print(f"Database: {self.database_host}:{self.database_port}")
434
print(f"Cache enabled: {self.cache_enabled}")
435
436
# luigi-development.cfg:
437
# [database]
438
# host = dev-db.example.com
439
# port = 5432
440
#
441
# [cache]
442
# enabled = false
443
444
# luigi-production.cfg:
445
# [database]
446
# host = prod-db.example.com
447
# port = 5432
448
#
449
# [cache]
450
# enabled = true
451
```
452
453
### Dynamic Configuration
454
455
```python
456
import luigi
457
from luigi.configuration import get_config
458
459
class DynamicConfigTask(Task):
460
"""Task that modifies configuration at runtime."""
461
462
environment = luigi.Parameter(default='development')
463
464
def __init__(self, *args, **kwargs):
465
super().__init__(*args, **kwargs)
466
467
# Modify configuration based on parameters
468
config = get_config()
469
470
if self.environment == 'production':
471
config.set('core', 'log_level', 'WARNING')
472
config.set('worker', 'timeout', '7200')
473
else:
474
config.set('core', 'log_level', 'DEBUG')
475
config.set('worker', 'timeout', '3600')
476
477
def output(self):
478
return luigi.LocalTarget(f"output_{self.environment}.txt")
479
480
def run(self):
481
config = get_config()
482
log_level = config.get('core', 'log_level')
483
timeout = config.getint('worker', 'timeout')
484
485
print(f"Running in {self.environment} mode")
486
print(f"Log level: {log_level}, Timeout: {timeout}")
487
```