0
# Execution
1
2
Luigi's execution system provides the main entry points for running workflows with dependency resolution, scheduling, and result reporting. The execution system coordinates task running and handles failures.
3
4
## Capabilities
5
6
### Main Execution Functions
7
8
Primary functions for executing Luigi workflows with different levels of control and configuration options.
9
10
```python { .api }
11
def run(cmdline_args=None, main_task_cls=None, worker_scheduler_factory=None,
12
use_dynamic_argparse=None, local_scheduler: bool = False, detailed_summary: bool = False) -> LuigiRunResult:
13
"""
14
Main entry point for running Luigi tasks from command line or programmatically.
15
16
Args:
17
cmdline_args: Command line arguments (list of strings)
18
main_task_cls: Main task class to run
19
worker_scheduler_factory: Custom worker/scheduler factory
20
use_dynamic_argparse: Deprecated parameter, ignored
21
local_scheduler: Use local scheduler instead of remote
22
detailed_summary: Include detailed execution summary
23
24
Returns:
25
LuigiRunResult: Execution result with status and details
26
"""
27
28
def build(tasks, worker_scheduler_factory=None, detailed_summary: bool = False,
29
**env_params) -> LuigiRunResult:
30
"""
31
Build and execute a list of tasks programmatically.
32
33
Args:
34
tasks: Task instance or list of task instances to execute
35
worker_scheduler_factory: Custom worker/scheduler factory
36
detailed_summary: Include detailed execution summary
37
**env_params: Additional environment parameters (e.g., local_scheduler=True)
38
39
Returns:
40
LuigiRunResult: Execution result with status and worker details
41
"""
42
```
43
44
### Execution Result Types
45
46
Classes representing the results and status of Luigi workflow execution.
47
48
```python { .api }
49
class LuigiRunResult:
50
"""Container for Luigi execution results."""
51
52
status: LuigiStatusCode
53
"""Overall execution status code."""
54
55
worker: Any
56
"""Worker instance that executed the tasks."""
57
58
scheduling_succeeded: bool
59
"""Whether task scheduling succeeded."""
60
61
class LuigiStatusCode:
62
"""Enumeration of possible execution status codes."""
63
64
SUCCESS = 0
65
"""All tasks completed successfully."""
66
67
SUCCESS_WITH_RETRY = 1
68
"""Tasks completed successfully after retries."""
69
70
FAILED = 2
71
"""Task execution failed."""
72
73
FAILED_AND_SCHEDULING_FAILED = 3
74
"""Both task execution and scheduling failed."""
75
76
SCHEDULING_FAILED = 4
77
"""Task scheduling failed."""
78
79
NOT_RUN = 5
80
"""Tasks were not run."""
81
82
MISSING_EXT = 6
83
"""Missing external dependencies."""
84
```
85
86
### Interface Configuration
87
88
Configuration class for controlling Luigi interface behavior and execution parameters.
89
90
```python { .api }
91
class core:
92
"""Core interface configuration options."""
93
94
default_scheduler_host: str
95
"""Default scheduler host address."""
96
97
default_scheduler_port: int
98
"""Default scheduler port number."""
99
100
no_configure_logging: bool
101
"""Disable automatic logging configuration."""
102
103
log_level: str
104
"""Default logging level."""
105
106
module: str
107
"""Default module for task discovery."""
108
109
parallel_scheduling: bool
110
"""Enable parallel task scheduling."""
111
112
assistant: bool
113
"""Enable Luigi assistant mode."""
114
```
115
116
### Execution Exceptions
117
118
Exception classes for execution-related errors and failures.
119
120
```python { .api }
121
class PidLockAlreadyTakenExit(SystemExit):
122
"""
123
Exception raised when PID lock is already taken.
124
125
Indicates another Luigi process is already running with the same configuration.
126
"""
127
```
128
129
## Usage Examples
130
131
### Basic Task Execution
132
133
```python
134
import luigi
135
from luigi import Task, LocalTarget, Parameter
136
137
class SimpleTask(Task):
138
name = Parameter(default="example")
139
140
def output(self):
141
return LocalTarget(f"output_{self.name}.txt")
142
143
def run(self):
144
with self.output().open('w') as f:
145
f.write(f"Hello from {self.name}")
146
147
# Execute single task programmatically
148
if __name__ == '__main__':
149
result = luigi.build([SimpleTask(name="test")], local_scheduler=True)
150
print(f"Execution status: {result.status}")
151
print(f"Scheduling succeeded: {result.scheduling_succeeded}")
152
```
153
154
### Command Line Execution
155
156
```python
157
import luigi
158
from luigi import Task, DateParameter
159
from datetime import date
160
161
class DailyTask(Task):
162
date = DateParameter(default=date.today())
163
164
def output(self):
165
return luigi.LocalTarget(f"daily_{self.date}.txt")
166
167
def run(self):
168
with self.output().open('w') as f:
169
f.write(f"Daily processing for {self.date}")
170
171
# Run from command line using luigi.run()
172
if __name__ == '__main__':
173
# This allows command line execution:
174
# python script.py DailyTask --date 2023-01-15
175
result = luigi.run()
176
177
if result.status == luigi.LuigiStatusCode.SUCCESS:
178
print("All tasks completed successfully!")
179
else:
180
print(f"Execution failed with status: {result.status}")
181
```
182
183
### Batch Task Execution
184
185
```python
186
import luigi
187
from luigi import Task, DateParameter
188
from datetime import date, timedelta
189
190
class ProcessingTask(Task):
191
date = DateParameter()
192
193
def output(self):
194
return luigi.LocalTarget(f"processed_{self.date}.txt")
195
196
def run(self):
197
with self.output().open('w') as f:
198
f.write(f"Processed {self.date}")
199
200
# Execute multiple tasks for different dates
201
if __name__ == '__main__':
202
# Create tasks for the last 7 days
203
base_date = date.today()
204
tasks = []
205
206
for i in range(7):
207
task_date = base_date - timedelta(days=i)
208
tasks.append(ProcessingTask(date=task_date))
209
210
# Execute all tasks
211
result = luigi.build(
212
tasks,
213
local_scheduler=True,
214
detailed_summary=True,
215
log_level='INFO'
216
)
217
218
print(f"Batch execution status: {result.status}")
219
220
# Check individual task completion
221
for task in tasks:
222
if task.complete():
223
print(f"✓ {task.date} completed")
224
else:
225
print(f"✗ {task.date} failed")
226
```
227
228
### Custom Execution Configuration
229
230
```python
231
import luigi
232
from luigi import Task, build
233
import logging
234
235
class ConfiguredTask(Task):
236
def output(self):
237
return luigi.LocalTarget("configured_output.txt")
238
239
def run(self):
240
# This will use the configured logging level
241
logger = logging.getLogger('luigi-interface')
242
logger.info("Running configured task")
243
244
with self.output().open('w') as f:
245
f.write("Task executed with custom configuration")
246
247
# Execute with custom configuration
248
if __name__ == '__main__':
249
# Configure logging before execution
250
logging.basicConfig(level=logging.INFO)
251
252
result = luigi.build(
253
[ConfiguredTask()],
254
local_scheduler=True,
255
detailed_summary=True,
256
log_level='INFO',
257
no_configure_logging=True # Use our custom logging config
258
)
259
260
if result.status == luigi.LuigiStatusCode.SUCCESS:
261
print("Task executed successfully with custom configuration")
262
263
# Access worker details
264
if result.worker:
265
print(f"Worker processed tasks: {len(result.worker._scheduled_tasks)}")
266
```
267
268
### Error Handling and Status Checking
269
270
```python
271
import luigi
272
from luigi import Task, LuigiStatusCode
273
274
class UnreliableTask(Task):
275
should_fail = luigi.BoolParameter(default=False)
276
277
def output(self):
278
return luigi.LocalTarget("unreliable_output.txt")
279
280
def run(self):
281
if self.should_fail:
282
raise Exception("Task configured to fail")
283
284
with self.output().open('w') as f:
285
f.write("Success!")
286
287
# Handle different execution outcomes
288
if __name__ == '__main__':
289
# Try successful execution
290
result = luigi.build([UnreliableTask(should_fail=False)], local_scheduler=True)
291
292
if result.status == LuigiStatusCode.SUCCESS:
293
print("✓ Task completed successfully")
294
elif result.status == LuigiStatusCode.SUCCESS_WITH_RETRY:
295
print("✓ Task completed after retries")
296
elif result.status == LuigiStatusCode.FAILED:
297
print("✗ Task execution failed")
298
elif result.status == LuigiStatusCode.SCHEDULING_FAILED:
299
print("✗ Task scheduling failed")
300
elif result.status == LuigiStatusCode.MISSING_EXT:
301
print("✗ Missing external dependencies")
302
else:
303
print(f"✗ Unknown status: {result.status}")
304
305
# Try failing execution
306
result = luigi.build([UnreliableTask(should_fail=True)], local_scheduler=True)
307
308
print(f"Failed execution status: {result.status}")
309
print(f"Scheduling succeeded: {result.scheduling_succeeded}")
310
```
311
312
### Remote Scheduler Execution
313
314
```python
315
import luigi
316
from luigi import Task
317
318
class RemoteTask(Task):
319
def output(self):
320
return luigi.LocalTarget("remote_output.txt")
321
322
def run(self):
323
with self.output().open('w') as f:
324
f.write("Executed via remote scheduler")
325
326
# Execute using remote scheduler
327
if __name__ == '__main__':
328
# This will connect to remote scheduler at default host:port
329
result = luigi.build(
330
[RemoteTask()],
331
local_scheduler=False # Use remote scheduler
332
)
333
334
if result.status == LuigiStatusCode.SUCCESS:
335
print("Task executed via remote scheduler")
336
else:
337
print("Remote execution failed - check scheduler is running")
338
339
# Or use luigi.run() for command line interface with remote scheduler
340
# python script.py RemoteTask --scheduler-host localhost --scheduler-port 8082
341
```