0
# Task System
1
2
Luigi's task system provides base classes for defining workflow units with dependency management, output specification, and execution logic. Tasks form the core building blocks of Luigi workflows.
3
4
## Capabilities
5
6
### Base Task Class
7
8
The fundamental Task class that all Luigi tasks inherit from. Defines the interface for dependency resolution, output specification, and execution logic.
9
10
```python { .api }
11
class Task:
12
"""Base class for all Luigi tasks."""
13
14
def run(self):
15
"""
16
Execute the task logic. Must be implemented by subclasses.
17
"""
18
19
def output(self):
20
"""
21
Specify the output Target(s) for this task.
22
23
Returns:
24
Target or list of Targets
25
"""
26
27
def requires(self):
28
"""
29
Specify task dependencies.
30
31
Returns:
32
Task, list of Tasks, or dict of Tasks
33
"""
34
35
def complete(self) -> bool:
36
"""
37
Check if the task is complete by verifying all outputs exist.
38
39
Returns:
40
bool: True if task is complete, False otherwise
41
"""
42
43
def clone(self, **kwargs):
44
"""
45
Create a copy of this task with modified parameters.
46
47
Args:
48
**kwargs: Parameter overrides
49
50
Returns:
51
Task: New task instance with updated parameters
52
"""
53
54
@property
55
def task_id(self) -> str:
56
"""Unique identifier for this task instance."""
57
58
@property
59
def task_family(self) -> str:
60
"""Task family name (class name)."""
61
62
@property
63
def param_kwargs(self) -> dict:
64
"""Dictionary of parameter names and values."""
65
```
66
67
### External Task
68
69
Represents tasks that exist outside the Luigi workflow, such as data files created by external systems or manual processes.
70
71
```python { .api }
72
class ExternalTask(Task):
73
"""
74
Task representing external dependencies.
75
76
External tasks have no run() method since they are not executed
77
by Luigi. They only specify outputs that should exist.
78
"""
79
80
def run(self):
81
"""External tasks cannot be run - raises NotImplementedError."""
82
```
83
84
### Wrapper Task
85
86
Task that groups multiple dependencies without producing its own output. Useful for creating workflow entry points and organizing related tasks.
87
88
```python { .api }
89
class WrapperTask(Task):
90
"""
91
Task that wraps other tasks without producing output.
92
93
Wrapper tasks only specify dependencies through requires()
94
and have no output() or run() methods.
95
"""
96
97
def output(self):
98
"""Wrapper tasks have no output."""
99
return []
100
```
101
102
### Configuration Task
103
104
Base class for tasks that only hold configuration parameters without executing logic. Used for sharing configuration across multiple tasks.
105
106
```python { .api }
107
class Config(Task):
108
"""
109
Task that only holds configuration parameters.
110
111
Config tasks are automatically marked as complete and are used
112
to share parameter values across multiple tasks.
113
"""
114
```
115
116
### Task Namespacing
117
118
Functions for organizing tasks into namespaces to avoid naming conflicts and improve task organization.
119
120
```python { .api }
121
def namespace(namespace: str = None, scope: str = ''):
122
"""
123
Set namespace for tasks declared after this call.
124
125
Args:
126
namespace: Namespace string to prepend to task names
127
scope: Module scope to limit namespace application
128
"""
129
130
def auto_namespace(scope: str = ''):
131
"""
132
Set namespace to the module name of each task class.
133
134
Args:
135
scope: Module scope to limit namespace application
136
"""
137
```
138
139
### Task Utilities
140
141
Utility functions for working with tasks, task IDs, and task dependency structures.
142
143
```python { .api }
144
def task_id_str(task_family: str, params: dict) -> str:
145
"""
146
Generate task ID string from family and parameters.
147
148
Args:
149
task_family: Task class name
150
params: Parameter dictionary
151
152
Returns:
153
str: Formatted task ID
154
"""
155
156
def externalize(task_obj) -> ExternalTask:
157
"""
158
Convert a regular task to an external task.
159
160
Args:
161
task_obj: Task to externalize
162
163
Returns:
164
ExternalTask: External version of the task
165
"""
166
167
def getpaths(struct):
168
"""
169
Extract file paths from task output structure.
170
171
Args:
172
struct: Task output structure (Target, list, or dict)
173
174
Returns:
175
Generator of file paths
176
"""
177
178
def flatten(struct):
179
"""
180
Flatten nested task dependency structure.
181
182
Args:
183
struct: Nested structure of tasks, lists, and dicts
184
185
Returns:
186
Generator of individual tasks
187
"""
188
189
def flatten_output(task_output):
190
"""
191
Flatten task output structure to individual targets.
192
193
Args:
194
task_output: Task output (Target, list, or dict)
195
196
Returns:
197
Generator of individual targets
198
"""
199
```
200
201
### Bulk Operations
202
203
Mixin class for implementing bulk completion checking to optimize performance when dealing with many tasks.
204
205
```python { .api }
206
class MixinNaiveBulkComplete:
207
"""
208
Mixin that provides naive bulk completion checking.
209
210
Override bulk_complete() method for custom bulk operations.
211
"""
212
213
def bulk_complete(self, parameter_tuples):
214
"""
215
Check completion status for multiple parameter combinations.
216
217
Args:
218
parameter_tuples: List of parameter dictionaries
219
220
Returns:
221
Generator of completion status booleans
222
"""
223
224
class BulkCompleteNotImplementedError(NotImplementedError):
225
"""Exception raised when bulk completion is not implemented."""
226
```
227
228
## Usage Examples
229
230
### Basic Task with Dependencies
231
232
```python
233
import luigi
234
from luigi import Task, LocalTarget, Parameter
235
236
class DataIngestionTask(Task):
237
"""Task that ingests raw data."""
238
date = luigi.DateParameter()
239
240
def output(self):
241
return LocalTarget(f"data/raw/{self.date}.csv")
242
243
def run(self):
244
# Simulate data ingestion
245
with self.output().open('w') as f:
246
f.write("id,value\n1,100\n2,200\n")
247
248
class DataProcessingTask(Task):
249
"""Task that processes ingested data."""
250
date = luigi.DateParameter()
251
252
def requires(self):
253
return DataIngestionTask(date=self.date)
254
255
def output(self):
256
return LocalTarget(f"data/processed/{self.date}.csv")
257
258
def run(self):
259
# Read input data
260
with self.input().open('r') as f:
261
data = f.read()
262
263
# Process and write output
264
processed = data.replace('100', '1000').replace('200', '2000')
265
with self.output().open('w') as f:
266
f.write(processed)
267
```
268
269
### Wrapper Task for Multiple Dependencies
270
271
```python
272
class DailyReportTask(luigi.WrapperTask):
273
"""Wrapper task that generates multiple daily reports."""
274
date = luigi.DateParameter()
275
276
def requires(self):
277
return [
278
DataProcessingTask(date=self.date),
279
MetricsCalculationTask(date=self.date),
280
QualityCheckTask(date=self.date)
281
]
282
```
283
284
### External Task for Input Files
285
286
```python
287
class InputFileTask(luigi.ExternalTask):
288
"""External task representing an input file."""
289
filename = luigi.Parameter()
290
291
def output(self):
292
return LocalTarget(f"input/{self.filename}")
293
```