or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

index.mddocs/

0

# Luigi

1

2

Luigi is a Python workflow management framework for building complex pipelines of batch jobs. It handles dependency resolution, failure recovery, command line integration, and provides a web interface for workflow visualization and monitoring. Luigi helps coordinate data processing workflows where multiple tasks depend on each other and need to be executed in the correct order.

3

4

## Package Information

5

6

- **Package Name**: luigi

7

- **Language**: Python

8

- **Installation**: `pip install luigi`

9

10

## Core Imports

11

12

```python

13

import luigi

14

```

15

16

Common imports for task and parameter classes:

17

18

```python

19

from luigi import Task, Target, LocalTarget

20

from luigi import Parameter, DateParameter, IntParameter

21

from luigi import build, run

22

```

23

24

## Basic Usage

25

26

```python

27

import luigi

28

from luigi import Task, LocalTarget, Parameter

29

30

class HelloWorldTask(Task):

31

"""A simple task that creates a greeting file."""

32

name = Parameter(default="World")

33

34

def output(self):

35

"""Define the output target for this task."""

36

return LocalTarget(f"hello_{self.name}.txt")

37

38

def run(self):

39

"""Execute the task logic."""

40

with self.output().open('w') as f:

41

f.write(f"Hello, {self.name}!")

42

43

class ProcessGreetingTask(Task):

44

"""A task that processes the greeting file."""

45

name = Parameter(default="World")

46

47

def requires(self):

48

"""Define task dependencies."""

49

return HelloWorldTask(name=self.name)

50

51

def output(self):

52

return LocalTarget(f"processed_{self.name}.txt")

53

54

def run(self):

55

# Read input from dependency

56

with self.input().open('r') as f:

57

greeting = f.read()

58

59

# Process and write output

60

with self.output().open('w') as f:

61

f.write(greeting.upper())

62

63

# Run the workflow

64

if __name__ == '__main__':

65

luigi.build([ProcessGreetingTask(name="Luigi")], local_scheduler=True)

66

```

67

68

## Architecture

69

70

Luigi's architecture centers around three core concepts:

71

72

- **Tasks**: Units of work that define dependencies, outputs, and execution logic

73

- **Targets**: Represent data inputs and outputs with existence checking capabilities

74

- **Parameters**: Configure and parameterize task behavior with type-safe values

75

- **Scheduler**: Central coordination service for dependency resolution and task execution

76

- **Workers**: Execute tasks locally or across multiple machines

77

78

This design enables building complex data pipelines with automatic dependency resolution, failure recovery, and workflow visualization through Luigi's web interface.

79

80

## Capabilities

81

82

### Core Task Classes

83

84

Base classes for defining workflow tasks including regular tasks, external dependencies, wrapper tasks, and configuration tasks. These form the foundation of all Luigi workflows.

85

86

```python { .api }

87

class Task:

88

def run(self): ...

89

def output(self): ...

90

def requires(self): ...

91

def complete(self): ...

92

93

class ExternalTask(Task): ...

94

class WrapperTask(Task): ...

95

class Config(Task): ...

96

```

97

98

[Task System](./tasks.md)

99

100

### Target System

101

102

File and data target classes for representing task inputs and outputs with existence checking, atomic writes, and filesystem operations.

103

104

```python { .api }

105

class Target:

106

def exists(self) -> bool: ...

107

def open(self, mode='r'): ...

108

109

class LocalTarget(Target):

110

def __init__(self, path: str): ...

111

def exists(self) -> bool: ...

112

def open(self, mode='r'): ...

113

def remove(self): ...

114

```

115

116

[Targets](./targets.md)

117

118

### Parameter System

119

120

Type-safe parameter system for task configuration including primitive types, date/time parameters, collections, and custom parameter types.

121

122

```python { .api }

123

class Parameter:

124

def __init__(self, default=None): ...

125

126

class DateParameter(Parameter): ...

127

class IntParameter(Parameter): ...

128

class BoolParameter(Parameter): ...

129

class ListParameter(Parameter): ...

130

class DictParameter(Parameter): ...

131

```

132

133

[Parameters](./parameters.md)

134

135

### Workflow Execution

136

137

Main entry points for running Luigi workflows with dependency resolution, scheduling, and execution management.

138

139

```python { .api }

140

def run(cmdline_args=None, main_task_cls=None,

141

worker_scheduler_factory=None, use_dynamic_argparse=None,

142

local_scheduler=False, detailed_summary=False) -> LuigiRunResult: ...

143

144

def build(tasks, worker_scheduler_factory=None,

145

detailed_summary=False, **env_params) -> LuigiRunResult: ...

146

```

147

148

[Execution](./execution.md)

149

150

### Configuration System

151

152

Configuration management for Luigi settings, task parameters, and scheduler options with support for multiple configuration file formats.

153

154

```python { .api }

155

def get_config() -> LuigiConfigParser: ...

156

def add_config_path(path: str): ...

157

158

class LuigiConfigParser: ...

159

class LuigiTomlParser: ...

160

```

161

162

[Configuration](./configuration.md)

163

164

### External System Integration

165

166

Comprehensive contrib modules for integrating with databases, cloud storage, big data platforms, job schedulers, and monitoring systems.

167

168

```python { .api }

169

# Database integration examples

170

from luigi.contrib.postgres import PostgresTarget, CopyToTable

171

from luigi.contrib.mysql import MySqlTarget

172

from luigi.contrib.mongodb import MongoTarget

173

174

# Cloud storage examples

175

from luigi.contrib.s3 import S3Target, S3Client

176

from luigi.contrib.gcs import GCSTarget

177

from luigi.contrib.azureblob import AzureBlobTarget

178

179

# Big data platform examples

180

from luigi.contrib.hdfs import HdfsTarget

181

from luigi.contrib.spark import SparkSubmitTask

182

from luigi.contrib.bigquery import BigQueryTarget

183

```

184

185

[External Integrations](./integrations.md)

186

187

### Scheduler and RPC

188

189

Remote scheduler client for distributed task execution and coordination across multiple worker processes and machines.

190

191

```python { .api }

192

class RemoteScheduler:

193

def add_task(self, task_id: str, status: str, runnable: bool): ...

194

def get_work(self, worker: str, host: str): ...

195

def ping(self, worker: str): ...

196

197

class RPCError(Exception): ...

198

```

199

200

[Scheduler & RPC](./scheduler.md)

201

202

### Events and Monitoring

203

204

Event system for monitoring task execution, workflow progress, and integrating with external monitoring systems.

205

206

```python { .api }

207

class Event: ...

208

209

# Status codes for execution results

210

class LuigiStatusCode:

211

SUCCESS: int

212

SUCCESS_WITH_RETRY: int

213

FAILED: int

214

FAILED_AND_SCHEDULING_FAILED: int

215

```

216

217

[Events & Monitoring](./events.md)

218

219

### Command Line Tools

220

221

Command-line utilities for workflow management, dependency analysis, and task introspection.

222

223

```python { .api }

224

# Main CLI commands

225

luigi --module mymodule MyTask --param value

226

luigid --background --port 8082

227

228

# Tool utilities

229

luigi.tools.deps MyTask

230

luigi.tools.deps_tree MyTask

231

luigi.tools.luigi_grep pattern

232

```

233

234

[Command Line Tools](./cli-tools.md)

235

236

## Common Types

237

238

```python { .api }

239

from typing import Any, Dict, List, Optional, Union

240

from datetime import datetime, date, timedelta

241

242

# Common parameter value types

243

ParameterValue = Union[str, int, float, bool, date, datetime, timedelta, List[Any], Dict[str, Any]]

244

245

# Task execution result

246

class LuigiRunResult:

247

status: LuigiStatusCode

248

worker: Any

249

scheduling_succeeded: bool

250

```