or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

configuration.mddocs/

0

# Configuration

1

2

Luigi's configuration system manages settings for tasks, scheduler, and execution behavior through configuration files and environment variables. Configuration provides flexibility for different environments and deployment scenarios.

3

4

## Capabilities

5

6

### Configuration Parser

7

8

Main configuration parser that handles INI-format configuration files with Luigi-specific extensions and parameter resolution.

9

10

```python { .api }

11

def get_config() -> LuigiConfigParser:

12

"""

13

Get the global Luigi configuration parser instance.

14

15

Returns:

16

LuigiConfigParser: Global configuration parser

17

"""

18

19

def add_config_path(path: str):

20

"""

21

Add a configuration file path to the configuration search paths.

22

23

Args:

24

path: Path to configuration file

25

"""

26

27

class LuigiConfigParser:

28

"""

29

Luigi's configuration parser extending ConfigParser with parameter resolution.

30

"""

31

32

def get(self, section: str, option: str, **kwargs):

33

"""

34

Get configuration value with parameter resolution.

35

36

Args:

37

section: Configuration section name

38

option: Configuration option name

39

**kwargs: Additional options (vars, fallback, etc.)

40

41

Returns:

42

Configuration value with parameter substitution

43

"""

44

45

def getint(self, section: str, option: str, **kwargs) -> int:

46

"""Get integer configuration value."""

47

48

def getfloat(self, section: str, option: str, **kwargs) -> float:

49

"""Get float configuration value."""

50

51

def getboolean(self, section: str, option: str, **kwargs) -> bool:

52

"""Get boolean configuration value."""

53

54

def has_option(self, section: str, option: str) -> bool:

55

"""Check if configuration option exists."""

56

57

def has_section(self, section: str) -> bool:

58

"""Check if configuration section exists."""

59

60

def sections(self) -> list:

61

"""Get list of configuration sections."""

62

63

def options(self, section: str) -> list:

64

"""Get list of options in a section."""

65

66

def items(self, section: str) -> list:

67

"""Get list of (option, value) pairs in a section."""

68

69

def set(self, section: str, option: str, value: str):

70

"""Set configuration value."""

71

72

def add_section(self, section: str):

73

"""Add configuration section."""

74

75

def remove_section(self, section: str) -> bool:

76

"""Remove configuration section."""

77

78

def remove_option(self, section: str, option: str) -> bool:

79

"""Remove configuration option."""

80

81

def read(self, filenames):

82

"""Read configuration from file(s)."""

83

84

def read_dict(self, dictionary: dict):

85

"""Read configuration from dictionary."""

86

```

87

88

### TOML Configuration Parser

89

90

Alternative configuration parser that supports TOML format configuration files.

91

92

```python { .api }

93

class LuigiTomlParser:

94

"""

95

TOML configuration parser for Luigi.

96

97

Provides similar interface to LuigiConfigParser but reads TOML files.

98

"""

99

100

def get(self, section: str, option: str, **kwargs):

101

"""Get TOML configuration value."""

102

103

def getint(self, section: str, option: str, **kwargs) -> int:

104

"""Get integer value from TOML configuration."""

105

106

def getfloat(self, section: str, option: str, **kwargs) -> float:

107

"""Get float value from TOML configuration."""

108

109

def getboolean(self, section: str, option: str, **kwargs) -> bool:

110

"""Get boolean value from TOML configuration."""

111

112

def has_option(self, section: str, option: str) -> bool:

113

"""Check if TOML option exists."""

114

115

def has_section(self, section: str) -> bool:

116

"""Check if TOML section exists."""

117

```

118

119

### Base Configuration Parser

120

121

Abstract base class for configuration parsers providing common functionality.

122

123

```python { .api }

124

class BaseParser:

125

"""Base class for configuration parsers."""

126

127

def enabled(self) -> bool:

128

"""Check if parser is enabled and available."""

129

130

def read(self, config_paths: list):

131

"""Read configuration from files."""

132

133

def get(self, section: str, option: str, **kwargs):

134

"""Get configuration value."""

135

136

def getint(self, section: str, option: str, **kwargs) -> int:

137

"""Get integer configuration value."""

138

139

def getfloat(self, section: str, option: str, **kwargs) -> float:

140

"""Get float configuration value."""

141

142

def getboolean(self, section: str, option: str, **kwargs) -> bool:

143

"""Get boolean configuration value."""

144

145

def has_option(self, section: str, option: str) -> bool:

146

"""Check if option exists."""

147

148

def has_section(self, section: str) -> bool:

149

"""Check if section exists."""

150

```

151

152

## Configuration Sections

153

154

Luigi uses several predefined configuration sections for different aspects of the system.

155

156

### Core Configuration

157

158

```python { .api }

159

# [core] section options

160

class CoreConfig:

161

"""Core Luigi configuration options."""

162

163

default_scheduler_host: str = 'localhost'

164

"""Default scheduler host address."""

165

166

default_scheduler_port: int = 8082

167

"""Default scheduler port."""

168

169

scheduler_host: str

170

"""Scheduler host override."""

171

172

scheduler_port: int

173

"""Scheduler port override."""

174

175

rpc_connect_timeout: float = 10.0

176

"""RPC connection timeout in seconds."""

177

178

rpc_retry_attempts: int = 3

179

"""Number of RPC retry attempts."""

180

181

rpc_retry_wait: int = 30

182

"""Wait time between RPC retries."""

183

184

no_configure_logging: bool = False

185

"""Disable Luigi's logging configuration."""

186

187

log_level: str = 'DEBUG'

188

"""Default logging level."""

189

190

logging_conf_file: str

191

"""Path to logging configuration file."""

192

193

parallel_scheduling: bool = False

194

"""Enable parallel task scheduling."""

195

196

assistant: bool = False

197

"""Enable Luigi assistant mode."""

198

199

worker_timeout: int = 0

200

"""Worker timeout in seconds (0 = no timeout)."""

201

202

keep_alive: bool = False

203

"""Keep worker alive after completion."""

204

205

max_reschedules: int = 1

206

"""Maximum task reschedule attempts."""

207

```

208

209

### Worker Configuration

210

211

```python { .api }

212

# [worker] section options

213

class WorkerConfig:

214

"""Worker configuration options."""

215

216

keep_alive: bool = False

217

"""Keep worker process alive."""

218

219

count_uniques: bool = False

220

"""Count unique task failures."""

221

222

count_last_params: bool = False

223

"""Count parameters in recent tasks."""

224

225

worker_timeout: int = 0

226

"""Worker timeout in seconds."""

227

228

timeout: int = 0

229

"""Task timeout in seconds."""

230

231

task_limit: int = None

232

"""Maximum tasks per worker."""

233

234

retry_external_tasks: bool = False

235

"""Retry external task dependencies."""

236

237

no_configure_logging: bool = False

238

"""Disable worker logging configuration."""

239

```

240

241

### Scheduler Configuration

242

243

```python { .api }

244

# [scheduler] section options

245

class SchedulerConfig:

246

"""Scheduler configuration options."""

247

248

record_task_history: bool = False

249

"""Record task execution history."""

250

251

state_path: str

252

"""Path to scheduler state file."""

253

254

remove_delay: int = 600

255

"""Delay before removing completed tasks (seconds)."""

256

257

worker_disconnect_delay: int = 60

258

"""Delay before disconnecting idle workers (seconds)."""

259

260

disable_window: int = 3600

261

"""Window for disabling failed tasks (seconds)."""

262

263

retry_delay: int = 900

264

"""Delay before retrying failed tasks (seconds)."""

265

266

disable_hard_timeout: int = 999999999

267

"""Hard timeout for disabling tasks (seconds)."""

268

269

max_shown_tasks: int = 100000

270

"""Maximum tasks shown in web interface."""

271

272

max_graph_nodes: int = 100000

273

"""Maximum nodes in dependency graph."""

274

```

275

276

## Usage Examples

277

278

### Basic Configuration File

279

280

```ini

281

# luigi.cfg

282

[core]

283

scheduler_host = localhost

284

scheduler_port = 8082

285

log_level = INFO

286

parallel_scheduling = true

287

288

[worker]

289

keep_alive = true

290

timeout = 3600

291

task_limit = 10

292

293

[scheduler]

294

record_task_history = true

295

remove_delay = 300

296

retry_delay = 600

297

298

# Task-specific configuration

299

[MyTask]

300

batch_size = 1000

301

max_retries = 3

302

303

[DatabaseTask]

304

host = localhost

305

port = 5432

306

database = mydb

307

```

308

309

### TOML Configuration File

310

311

```toml

312

# luigi.toml

313

[core]

314

scheduler_host = "localhost"

315

scheduler_port = 8082

316

log_level = "INFO"

317

parallel_scheduling = true

318

319

[worker]

320

keep_alive = true

321

timeout = 3600

322

task_limit = 10

323

324

[scheduler]

325

record_task_history = true

326

remove_delay = 300

327

retry_delay = 600

328

329

[MyTask]

330

batch_size = 1000

331

max_retries = 3

332

```

333

334

### Programmatic Configuration

335

336

```python

337

import luigi

338

from luigi.configuration import get_config, add_config_path

339

340

# Add custom configuration file

341

add_config_path('/path/to/custom/luigi.cfg')

342

343

# Get configuration instance

344

config = get_config()

345

346

# Read configuration values

347

scheduler_host = config.get('core', 'scheduler_host', fallback='localhost')

348

scheduler_port = config.getint('core', 'scheduler_port', fallback=8082)

349

log_level = config.get('core', 'log_level', fallback='INFO')

350

351

print(f"Scheduler: {scheduler_host}:{scheduler_port}")

352

print(f"Log level: {log_level}")

353

354

# Set configuration values programmatically

355

config.set('core', 'parallel_scheduling', 'true')

356

config.set('worker', 'keep_alive', 'true')

357

358

# Check if options exist

359

if config.has_option('MyTask', 'batch_size'):

360

batch_size = config.getint('MyTask', 'batch_size')

361

print(f"Batch size: {batch_size}")

362

```

363

364

### Task-Specific Configuration

365

366

```python

367

import luigi

368

from luigi import Task, Parameter

369

from luigi.configuration import get_config

370

371

class ConfigurableTask(Task):

372

"""Task that reads configuration from config file."""

373

374

# Parameter with config file fallback

375

batch_size = luigi.IntParameter()

376

377

def __init__(self, *args, **kwargs):

378

super().__init__(*args, **kwargs)

379

380

# Read additional config

381

config = get_config()

382

self.timeout = config.getint('ConfigurableTask', 'timeout', fallback=3600)

383

self.retries = config.getint('ConfigurableTask', 'max_retries', fallback=3)

384

385

def output(self):

386

return luigi.LocalTarget(f"output_batch_{self.batch_size}.txt")

387

388

def run(self):

389

print(f"Running with batch_size={self.batch_size}, timeout={self.timeout}, retries={self.retries}")

390

391

with self.output().open('w') as f:

392

f.write(f"Processed with batch size {self.batch_size}")

393

394

# Configuration file would contain:

395

# [ConfigurableTask]

396

# batch_size = 5000

397

# timeout = 7200

398

# max_retries = 5

399

```

400

401

### Environment-Specific Configuration

402

403

```python

404

import luigi

405

import os

406

from luigi.configuration import get_config, add_config_path

407

408

# Load environment-specific configuration

409

env = os.getenv('LUIGI_ENV', 'development')

410

config_file = f'/etc/luigi/luigi-{env}.cfg'

411

412

if os.path.exists(config_file):

413

add_config_path(config_file)

414

415

class EnvironmentTask(Task):

416

"""Task that adapts to different environments."""

417

418

def __init__(self, *args, **kwargs):

419

super().__init__(*args, **kwargs)

420

421

config = get_config()

422

423

# Get environment-specific settings

424

self.database_host = config.get('database', 'host', fallback='localhost')

425

self.database_port = config.getint('database', 'port', fallback=5432)

426

self.cache_enabled = config.getboolean('cache', 'enabled', fallback=False)

427

428

def output(self):

429

return luigi.LocalTarget(f"output_{env}.txt")

430

431

def run(self):

432

print(f"Environment: {env}")

433

print(f"Database: {self.database_host}:{self.database_port}")

434

print(f"Cache enabled: {self.cache_enabled}")

435

436

# luigi-development.cfg:

437

# [database]

438

# host = dev-db.example.com

439

# port = 5432

440

#

441

# [cache]

442

# enabled = false

443

444

# luigi-production.cfg:

445

# [database]

446

# host = prod-db.example.com

447

# port = 5432

448

#

449

# [cache]

450

# enabled = true

451

```

452

453

### Dynamic Configuration

454

455

```python

456

import luigi

457

from luigi.configuration import get_config

458

459

class DynamicConfigTask(Task):

460

"""Task that modifies configuration at runtime."""

461

462

environment = luigi.Parameter(default='development')

463

464

def __init__(self, *args, **kwargs):

465

super().__init__(*args, **kwargs)

466

467

# Modify configuration based on parameters

468

config = get_config()

469

470

if self.environment == 'production':

471

config.set('core', 'log_level', 'WARNING')

472

config.set('worker', 'timeout', '7200')

473

else:

474

config.set('core', 'log_level', 'DEBUG')

475

config.set('worker', 'timeout', '3600')

476

477

def output(self):

478

return luigi.LocalTarget(f"output_{self.environment}.txt")

479

480

def run(self):

481

config = get_config()

482

log_level = config.get('core', 'log_level')

483

timeout = config.getint('worker', 'timeout')

484

485

print(f"Running in {self.environment} mode")

486

print(f"Log level: {log_level}, Timeout: {timeout}")

487

```