or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

tasks.mddocs/

0

# Task System

1

2

Luigi's task system provides base classes for defining workflow units with dependency management, output specification, and execution logic. Tasks form the core building blocks of Luigi workflows.

3

4

## Capabilities

5

6

### Base Task Class

7

8

The fundamental Task class that all Luigi tasks inherit from. Defines the interface for dependency resolution, output specification, and execution logic.

9

10

```python { .api }

11

class Task:

12

"""Base class for all Luigi tasks."""

13

14

def run(self):

15

"""

16

Execute the task logic. Must be implemented by subclasses.

17

"""

18

19

def output(self):

20

"""

21

Specify the output Target(s) for this task.

22

23

Returns:

24

Target or list of Targets

25

"""

26

27

def requires(self):

28

"""

29

Specify task dependencies.

30

31

Returns:

32

Task, list of Tasks, or dict of Tasks

33

"""

34

35

def complete(self) -> bool:

36

"""

37

Check if the task is complete by verifying all outputs exist.

38

39

Returns:

40

bool: True if task is complete, False otherwise

41

"""

42

43

def clone(self, **kwargs):

44

"""

45

Create a copy of this task with modified parameters.

46

47

Args:

48

**kwargs: Parameter overrides

49

50

Returns:

51

Task: New task instance with updated parameters

52

"""

53

54

@property

55

def task_id(self) -> str:

56

"""Unique identifier for this task instance."""

57

58

@property

59

def task_family(self) -> str:

60

"""Task family name (class name)."""

61

62

@property

63

def param_kwargs(self) -> dict:

64

"""Dictionary of parameter names and values."""

65

```

66

67

### External Task

68

69

Represents tasks that exist outside the Luigi workflow, such as data files created by external systems or manual processes.

70

71

```python { .api }

72

class ExternalTask(Task):

73

"""

74

Task representing external dependencies.

75

76

External tasks have no run() method since they are not executed

77

by Luigi. They only specify outputs that should exist.

78

"""

79

80

def run(self):

81

"""External tasks cannot be run - raises NotImplementedError."""

82

```

83

84

### Wrapper Task

85

86

Task that groups multiple dependencies without producing its own output. Useful for creating workflow entry points and organizing related tasks.

87

88

```python { .api }

89

class WrapperTask(Task):

90

"""

91

Task that wraps other tasks without producing output.

92

93

Wrapper tasks only specify dependencies through requires()

94

and have no output() or run() methods.

95

"""

96

97

def output(self):

98

"""Wrapper tasks have no output."""

99

return []

100

```

101

102

### Configuration Task

103

104

Base class for tasks that only hold configuration parameters without executing logic. Used for sharing configuration across multiple tasks.

105

106

```python { .api }

107

class Config(Task):

108

"""

109

Task that only holds configuration parameters.

110

111

Config tasks are automatically marked as complete and are used

112

to share parameter values across multiple tasks.

113

"""

114

```

115

116

### Task Namespacing

117

118

Functions for organizing tasks into namespaces to avoid naming conflicts and improve task organization.

119

120

```python { .api }

121

def namespace(namespace: str = None, scope: str = ''):

122

"""

123

Set namespace for tasks declared after this call.

124

125

Args:

126

namespace: Namespace string to prepend to task names

127

scope: Module scope to limit namespace application

128

"""

129

130

def auto_namespace(scope: str = ''):

131

"""

132

Set namespace to the module name of each task class.

133

134

Args:

135

scope: Module scope to limit namespace application

136

"""

137

```

138

139

### Task Utilities

140

141

Utility functions for working with tasks, task IDs, and task dependency structures.

142

143

```python { .api }

144

def task_id_str(task_family: str, params: dict) -> str:

145

"""

146

Generate task ID string from family and parameters.

147

148

Args:

149

task_family: Task class name

150

params: Parameter dictionary

151

152

Returns:

153

str: Formatted task ID

154

"""

155

156

def externalize(task_obj) -> ExternalTask:

157

"""

158

Convert a regular task to an external task.

159

160

Args:

161

task_obj: Task to externalize

162

163

Returns:

164

ExternalTask: External version of the task

165

"""

166

167

def getpaths(struct):

168

"""

169

Extract file paths from task output structure.

170

171

Args:

172

struct: Task output structure (Target, list, or dict)

173

174

Returns:

175

Generator of file paths

176

"""

177

178

def flatten(struct):

179

"""

180

Flatten nested task dependency structure.

181

182

Args:

183

struct: Nested structure of tasks, lists, and dicts

184

185

Returns:

186

Generator of individual tasks

187

"""

188

189

def flatten_output(task_output):

190

"""

191

Flatten task output structure to individual targets.

192

193

Args:

194

task_output: Task output (Target, list, or dict)

195

196

Returns:

197

Generator of individual targets

198

"""

199

```

200

201

### Bulk Operations

202

203

Mixin class for implementing bulk completion checking to optimize performance when dealing with many tasks.

204

205

```python { .api }

206

class MixinNaiveBulkComplete:

207

"""

208

Mixin that provides naive bulk completion checking.

209

210

Override bulk_complete() method for custom bulk operations.

211

"""

212

213

def bulk_complete(self, parameter_tuples):

214

"""

215

Check completion status for multiple parameter combinations.

216

217

Args:

218

parameter_tuples: List of parameter dictionaries

219

220

Returns:

221

Generator of completion status booleans

222

"""

223

224

class BulkCompleteNotImplementedError(NotImplementedError):

225

"""Exception raised when bulk completion is not implemented."""

226

```

227

228

## Usage Examples

229

230

### Basic Task with Dependencies

231

232

```python

233

import luigi

234

from luigi import Task, LocalTarget, Parameter

235

236

class DataIngestionTask(Task):

237

"""Task that ingests raw data."""

238

date = luigi.DateParameter()

239

240

def output(self):

241

return LocalTarget(f"data/raw/{self.date}.csv")

242

243

def run(self):

244

# Simulate data ingestion

245

with self.output().open('w') as f:

246

f.write("id,value\n1,100\n2,200\n")

247

248

class DataProcessingTask(Task):

249

"""Task that processes ingested data."""

250

date = luigi.DateParameter()

251

252

def requires(self):

253

return DataIngestionTask(date=self.date)

254

255

def output(self):

256

return LocalTarget(f"data/processed/{self.date}.csv")

257

258

def run(self):

259

# Read input data

260

with self.input().open('r') as f:

261

data = f.read()

262

263

# Process and write output

264

processed = data.replace('100', '1000').replace('200', '2000')

265

with self.output().open('w') as f:

266

f.write(processed)

267

```

268

269

### Wrapper Task for Multiple Dependencies

270

271

```python

272

class DailyReportTask(luigi.WrapperTask):

273

"""Wrapper task that generates multiple daily reports."""

274

date = luigi.DateParameter()

275

276

def requires(self):

277

return [

278

DataProcessingTask(date=self.date),

279

MetricsCalculationTask(date=self.date),

280

QualityCheckTask(date=self.date)

281

]

282

```

283

284

### External Task for Input Files

285

286

```python

287

class InputFileTask(luigi.ExternalTask):

288

"""External task representing an input file."""

289

filename = luigi.Parameter()

290

291

def output(self):

292

return LocalTarget(f"input/{self.filename}")

293

```