0
# Luigi
1
2
Luigi is a Python workflow management framework for building complex pipelines of batch jobs. It handles dependency resolution, failure recovery, command line integration, and provides a web interface for workflow visualization and monitoring. Luigi helps coordinate data processing workflows where multiple tasks depend on each other and need to be executed in the correct order.
3
4
## Package Information
5
6
- **Package Name**: luigi
7
- **Language**: Python
8
- **Installation**: `pip install luigi`
9
10
## Core Imports
11
12
```python
13
import luigi
14
```
15
16
Common imports for task and parameter classes:
17
18
```python
19
from luigi import Task, Target, LocalTarget
20
from luigi import Parameter, DateParameter, IntParameter
21
from luigi import build, run
22
```
23
24
## Basic Usage
25
26
```python
27
import luigi
28
from luigi import Task, LocalTarget, Parameter
29
30
class HelloWorldTask(Task):
31
"""A simple task that creates a greeting file."""
32
name = Parameter(default="World")
33
34
def output(self):
35
"""Define the output target for this task."""
36
return LocalTarget(f"hello_{self.name}.txt")
37
38
def run(self):
39
"""Execute the task logic."""
40
with self.output().open('w') as f:
41
f.write(f"Hello, {self.name}!")
42
43
class ProcessGreetingTask(Task):
44
"""A task that processes the greeting file."""
45
name = Parameter(default="World")
46
47
def requires(self):
48
"""Define task dependencies."""
49
return HelloWorldTask(name=self.name)
50
51
def output(self):
52
return LocalTarget(f"processed_{self.name}.txt")
53
54
def run(self):
55
# Read input from dependency
56
with self.input().open('r') as f:
57
greeting = f.read()
58
59
# Process and write output
60
with self.output().open('w') as f:
61
f.write(greeting.upper())
62
63
# Run the workflow
64
if __name__ == '__main__':
65
luigi.build([ProcessGreetingTask(name="Luigi")], local_scheduler=True)
66
```
67
68
## Architecture
69
70
Luigi's architecture centers around three core concepts:
71
72
- **Tasks**: Units of work that define dependencies, outputs, and execution logic
73
- **Targets**: Represent data inputs and outputs with existence checking capabilities
74
- **Parameters**: Configure and parameterize task behavior with type-safe values
75
- **Scheduler**: Central coordination service for dependency resolution and task execution
76
- **Workers**: Execute tasks locally or across multiple machines
77
78
This design enables building complex data pipelines with automatic dependency resolution, failure recovery, and workflow visualization through Luigi's web interface.
79
80
## Capabilities
81
82
### Core Task Classes
83
84
Base classes for defining workflow tasks including regular tasks, external dependencies, wrapper tasks, and configuration tasks. These form the foundation of all Luigi workflows.
85
86
```python { .api }
87
class Task:
88
def run(self): ...
89
def output(self): ...
90
def requires(self): ...
91
def complete(self): ...
92
93
class ExternalTask(Task): ...
94
class WrapperTask(Task): ...
95
class Config(Task): ...
96
```
97
98
[Task System](./tasks.md)
99
100
### Target System
101
102
File and data target classes for representing task inputs and outputs with existence checking, atomic writes, and filesystem operations.
103
104
```python { .api }
105
class Target:
106
def exists(self) -> bool: ...
107
def open(self, mode='r'): ...
108
109
class LocalTarget(Target):
110
def __init__(self, path: str): ...
111
def exists(self) -> bool: ...
112
def open(self, mode='r'): ...
113
def remove(self): ...
114
```
115
116
[Targets](./targets.md)
117
118
### Parameter System
119
120
Type-safe parameter system for task configuration including primitive types, date/time parameters, collections, and custom parameter types.
121
122
```python { .api }
123
class Parameter:
124
def __init__(self, default=None): ...
125
126
class DateParameter(Parameter): ...
127
class IntParameter(Parameter): ...
128
class BoolParameter(Parameter): ...
129
class ListParameter(Parameter): ...
130
class DictParameter(Parameter): ...
131
```
132
133
[Parameters](./parameters.md)
134
135
### Workflow Execution
136
137
Main entry points for running Luigi workflows with dependency resolution, scheduling, and execution management.
138
139
```python { .api }
140
def run(cmdline_args=None, main_task_cls=None,
141
worker_scheduler_factory=None, use_dynamic_argparse=None,
142
local_scheduler=False, detailed_summary=False) -> LuigiRunResult: ...
143
144
def build(tasks, worker_scheduler_factory=None,
145
detailed_summary=False, **env_params) -> LuigiRunResult: ...
146
```
147
148
[Execution](./execution.md)
149
150
### Configuration System
151
152
Configuration management for Luigi settings, task parameters, and scheduler options with support for multiple configuration file formats.
153
154
```python { .api }
155
def get_config() -> LuigiConfigParser: ...
156
def add_config_path(path: str): ...
157
158
class LuigiConfigParser: ...
159
class LuigiTomlParser: ...
160
```
161
162
[Configuration](./configuration.md)
163
164
### External System Integration
165
166
Comprehensive contrib modules for integrating with databases, cloud storage, big data platforms, job schedulers, and monitoring systems.
167
168
```python { .api }
169
# Database integration examples
170
from luigi.contrib.postgres import PostgresTarget, CopyToTable
171
from luigi.contrib.mysql import MySqlTarget
172
from luigi.contrib.mongodb import MongoTarget
173
174
# Cloud storage examples
175
from luigi.contrib.s3 import S3Target, S3Client
176
from luigi.contrib.gcs import GCSTarget
177
from luigi.contrib.azureblob import AzureBlobTarget
178
179
# Big data platform examples
180
from luigi.contrib.hdfs import HdfsTarget
181
from luigi.contrib.spark import SparkSubmitTask
182
from luigi.contrib.bigquery import BigQueryTarget
183
```
184
185
[External Integrations](./integrations.md)
186
187
### Scheduler and RPC
188
189
Remote scheduler client for distributed task execution and coordination across multiple worker processes and machines.
190
191
```python { .api }
192
class RemoteScheduler:
193
def add_task(self, task_id: str, status: str, runnable: bool): ...
194
def get_work(self, worker: str, host: str): ...
195
def ping(self, worker: str): ...
196
197
class RPCError(Exception): ...
198
```
199
200
[Scheduler & RPC](./scheduler.md)
201
202
### Events and Monitoring
203
204
Event system for monitoring task execution, workflow progress, and integrating with external monitoring systems.
205
206
```python { .api }
207
class Event: ...
208
209
# Status codes for execution results
210
class LuigiStatusCode:
211
SUCCESS: int
212
SUCCESS_WITH_RETRY: int
213
FAILED: int
214
FAILED_AND_SCHEDULING_FAILED: int
215
```
216
217
[Events & Monitoring](./events.md)
218
219
### Command Line Tools
220
221
Command-line utilities for workflow management, dependency analysis, and task introspection.
222
223
```python { .api }
224
# Main CLI commands
225
luigi --module mymodule MyTask --param value
226
luigid --background --port 8082
227
228
# Tool utilities
229
luigi.tools.deps MyTask
230
luigi.tools.deps_tree MyTask
231
luigi.tools.luigi_grep pattern
232
```
233
234
[Command Line Tools](./cli-tools.md)
235
236
## Common Types
237
238
```python { .api }
239
from typing import Any, Dict, List, Optional, Union
240
from datetime import datetime, date, timedelta
241
242
# Common parameter value types
243
ParameterValue = Union[str, int, float, bool, date, datetime, timedelta, List[Any], Dict[str, Any]]
244
245
# Task execution result
246
class LuigiRunResult:
247
status: LuigiStatusCode
248
worker: Any
249
scheduling_succeeded: bool
250
```