or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

execution.mddocs/

0

# Execution

1

2

Luigi's execution system provides the main entry points for running workflows with dependency resolution, scheduling, and result reporting. The execution system coordinates task running and handles failures.

3

4

## Capabilities

5

6

### Main Execution Functions

7

8

Primary functions for executing Luigi workflows with different levels of control and configuration options.

9

10

```python { .api }

11

def run(cmdline_args=None, main_task_cls=None, worker_scheduler_factory=None,

12

use_dynamic_argparse=None, local_scheduler: bool = False, detailed_summary: bool = False) -> LuigiRunResult:

13

"""

14

Main entry point for running Luigi tasks from command line or programmatically.

15

16

Args:

17

cmdline_args: Command line arguments (list of strings)

18

main_task_cls: Main task class to run

19

worker_scheduler_factory: Custom worker/scheduler factory

20

use_dynamic_argparse: Deprecated parameter, ignored

21

local_scheduler: Use local scheduler instead of remote

22

detailed_summary: Include detailed execution summary

23

24

Returns:

25

LuigiRunResult: Execution result with status and details

26

"""

27

28

def build(tasks, worker_scheduler_factory=None, detailed_summary: bool = False,

29

**env_params) -> LuigiRunResult:

30

"""

31

Build and execute a list of tasks programmatically.

32

33

Args:

34

tasks: Task instance or list of task instances to execute

35

worker_scheduler_factory: Custom worker/scheduler factory

36

detailed_summary: Include detailed execution summary

37

**env_params: Additional environment parameters (e.g., local_scheduler=True)

38

39

Returns:

40

LuigiRunResult: Execution result with status and worker details

41

"""

42

```

43

44

### Execution Result Types

45

46

Classes representing the results and status of Luigi workflow execution.

47

48

```python { .api }

49

class LuigiRunResult:

50

"""Container for Luigi execution results."""

51

52

status: LuigiStatusCode

53

"""Overall execution status code."""

54

55

worker: Any

56

"""Worker instance that executed the tasks."""

57

58

scheduling_succeeded: bool

59

"""Whether task scheduling succeeded."""

60

61

class LuigiStatusCode:

62

"""Enumeration of possible execution status codes."""

63

64

SUCCESS = 0

65

"""All tasks completed successfully."""

66

67

SUCCESS_WITH_RETRY = 1

68

"""Tasks completed successfully after retries."""

69

70

FAILED = 2

71

"""Task execution failed."""

72

73

FAILED_AND_SCHEDULING_FAILED = 3

74

"""Both task execution and scheduling failed."""

75

76

SCHEDULING_FAILED = 4

77

"""Task scheduling failed."""

78

79

NOT_RUN = 5

80

"""Tasks were not run."""

81

82

MISSING_EXT = 6

83

"""Missing external dependencies."""

84

```

85

86

### Interface Configuration

87

88

Configuration class for controlling Luigi interface behavior and execution parameters.

89

90

```python { .api }

91

class core:

92

"""Core interface configuration options."""

93

94

default_scheduler_host: str

95

"""Default scheduler host address."""

96

97

default_scheduler_port: int

98

"""Default scheduler port number."""

99

100

no_configure_logging: bool

101

"""Disable automatic logging configuration."""

102

103

log_level: str

104

"""Default logging level."""

105

106

module: str

107

"""Default module for task discovery."""

108

109

parallel_scheduling: bool

110

"""Enable parallel task scheduling."""

111

112

assistant: bool

113

"""Enable Luigi assistant mode."""

114

```

115

116

### Execution Exceptions

117

118

Exception classes for execution-related errors and failures.

119

120

```python { .api }

121

class PidLockAlreadyTakenExit(SystemExit):

122

"""

123

Exception raised when PID lock is already taken.

124

125

Indicates another Luigi process is already running with the same configuration.

126

"""

127

```

128

129

## Usage Examples

130

131

### Basic Task Execution

132

133

```python

134

import luigi

135

from luigi import Task, LocalTarget, Parameter

136

137

class SimpleTask(Task):

138

name = Parameter(default="example")

139

140

def output(self):

141

return LocalTarget(f"output_{self.name}.txt")

142

143

def run(self):

144

with self.output().open('w') as f:

145

f.write(f"Hello from {self.name}")

146

147

# Execute single task programmatically

148

if __name__ == '__main__':

149

result = luigi.build([SimpleTask(name="test")], local_scheduler=True)

150

print(f"Execution status: {result.status}")

151

print(f"Scheduling succeeded: {result.scheduling_succeeded}")

152

```

153

154

### Command Line Execution

155

156

```python

157

import luigi

158

from luigi import Task, DateParameter

159

from datetime import date

160

161

class DailyTask(Task):

162

date = DateParameter(default=date.today())

163

164

def output(self):

165

return luigi.LocalTarget(f"daily_{self.date}.txt")

166

167

def run(self):

168

with self.output().open('w') as f:

169

f.write(f"Daily processing for {self.date}")

170

171

# Run from command line using luigi.run()

172

if __name__ == '__main__':

173

# This allows command line execution:

174

# python script.py DailyTask --date 2023-01-15

175

result = luigi.run()

176

177

if result.status == luigi.LuigiStatusCode.SUCCESS:

178

print("All tasks completed successfully!")

179

else:

180

print(f"Execution failed with status: {result.status}")

181

```

182

183

### Batch Task Execution

184

185

```python

186

import luigi

187

from luigi import Task, DateParameter

188

from datetime import date, timedelta

189

190

class ProcessingTask(Task):

191

date = DateParameter()

192

193

def output(self):

194

return luigi.LocalTarget(f"processed_{self.date}.txt")

195

196

def run(self):

197

with self.output().open('w') as f:

198

f.write(f"Processed {self.date}")

199

200

# Execute multiple tasks for different dates

201

if __name__ == '__main__':

202

# Create tasks for the last 7 days

203

base_date = date.today()

204

tasks = []

205

206

for i in range(7):

207

task_date = base_date - timedelta(days=i)

208

tasks.append(ProcessingTask(date=task_date))

209

210

# Execute all tasks

211

result = luigi.build(

212

tasks,

213

local_scheduler=True,

214

detailed_summary=True,

215

log_level='INFO'

216

)

217

218

print(f"Batch execution status: {result.status}")

219

220

# Check individual task completion

221

for task in tasks:

222

if task.complete():

223

print(f"✓ {task.date} completed")

224

else:

225

print(f"✗ {task.date} failed")

226

```

227

228

### Custom Execution Configuration

229

230

```python

231

import luigi

232

from luigi import Task, build

233

import logging

234

235

class ConfiguredTask(Task):

236

def output(self):

237

return luigi.LocalTarget("configured_output.txt")

238

239

def run(self):

240

# This will use the configured logging level

241

logger = logging.getLogger('luigi-interface')

242

logger.info("Running configured task")

243

244

with self.output().open('w') as f:

245

f.write("Task executed with custom configuration")

246

247

# Execute with custom configuration

248

if __name__ == '__main__':

249

# Configure logging before execution

250

logging.basicConfig(level=logging.INFO)

251

252

result = luigi.build(

253

[ConfiguredTask()],

254

local_scheduler=True,

255

detailed_summary=True,

256

log_level='INFO',

257

no_configure_logging=True # Use our custom logging config

258

)

259

260

if result.status == luigi.LuigiStatusCode.SUCCESS:

261

print("Task executed successfully with custom configuration")

262

263

# Access worker details

264

if result.worker:

265

print(f"Worker processed tasks: {len(result.worker._scheduled_tasks)}")

266

```

267

268

### Error Handling and Status Checking

269

270

```python

271

import luigi

272

from luigi import Task, LuigiStatusCode

273

274

class UnreliableTask(Task):

275

should_fail = luigi.BoolParameter(default=False)

276

277

def output(self):

278

return luigi.LocalTarget("unreliable_output.txt")

279

280

def run(self):

281

if self.should_fail:

282

raise Exception("Task configured to fail")

283

284

with self.output().open('w') as f:

285

f.write("Success!")

286

287

# Handle different execution outcomes

288

if __name__ == '__main__':

289

# Try successful execution

290

result = luigi.build([UnreliableTask(should_fail=False)], local_scheduler=True)

291

292

if result.status == LuigiStatusCode.SUCCESS:

293

print("✓ Task completed successfully")

294

elif result.status == LuigiStatusCode.SUCCESS_WITH_RETRY:

295

print("✓ Task completed after retries")

296

elif result.status == LuigiStatusCode.FAILED:

297

print("✗ Task execution failed")

298

elif result.status == LuigiStatusCode.SCHEDULING_FAILED:

299

print("✗ Task scheduling failed")

300

elif result.status == LuigiStatusCode.MISSING_EXT:

301

print("✗ Missing external dependencies")

302

else:

303

print(f"✗ Unknown status: {result.status}")

304

305

# Try failing execution

306

result = luigi.build([UnreliableTask(should_fail=True)], local_scheduler=True)

307

308

print(f"Failed execution status: {result.status}")

309

print(f"Scheduling succeeded: {result.scheduling_succeeded}")

310

```

311

312

### Remote Scheduler Execution

313

314

```python

315

import luigi

316

from luigi import Task

317

318

class RemoteTask(Task):

319

def output(self):

320

return luigi.LocalTarget("remote_output.txt")

321

322

def run(self):

323

with self.output().open('w') as f:

324

f.write("Executed via remote scheduler")

325

326

# Execute using remote scheduler

327

if __name__ == '__main__':

328

# This will connect to remote scheduler at default host:port

329

result = luigi.build(

330

[RemoteTask()],

331

local_scheduler=False # Use remote scheduler

332

)

333

334

if result.status == LuigiStatusCode.SUCCESS:

335

print("Task executed via remote scheduler")

336

else:

337

print("Remote execution failed - check scheduler is running")

338

339

# Or use luigi.run() for command line interface with remote scheduler

340

# python script.py RemoteTask --scheduler-host localhost --scheduler-port 8082

341

```