0
# Targets
1
2
Luigi's target system represents data inputs and outputs with existence checking, file operations, and atomic writing capabilities. Targets define what tasks produce and consume.
3
4
## Capabilities
5
6
### Base Target Interface
7
8
Abstract base class that defines the interface for all targets in Luigi. Targets represent data that tasks consume or produce.
9
10
```python { .api }
11
class Target:
12
"""Abstract base class for all targets."""
13
14
def exists(self) -> bool:
15
"""
16
Check if the target exists.
17
18
Returns:
19
bool: True if target exists, False otherwise
20
"""
21
22
def open(self, mode: str = 'r'):
23
"""
24
Open the target for reading or writing.
25
26
Args:
27
mode: File mode ('r', 'w', 'a', etc.)
28
29
Returns:
30
File-like object
31
"""
32
```
33
34
### Local Filesystem Target
35
36
Target implementation for local filesystem files with atomic writing, existence checking, and file operations.
37
38
```python { .api }
39
class LocalTarget(Target):
40
"""Target for local filesystem files."""
41
42
def __init__(self, path: str, format=None, is_tmp: bool = False):
43
"""
44
Initialize local target.
45
46
Args:
47
path: File path
48
format: File format handler (optional)
49
is_tmp: Whether this is a temporary file
50
"""
51
52
def exists(self) -> bool:
53
"""Check if the local file exists."""
54
55
def open(self, mode: str = 'r'):
56
"""
57
Open the local file.
58
59
Args:
60
mode: File mode ('r', 'w', 'a', 'rb', 'wb', etc.)
61
62
Returns:
63
File object (possibly wrapped with format handler)
64
"""
65
66
def remove(self):
67
"""Remove the local file if it exists."""
68
69
def move(self, new_path: str, raise_if_exists: bool = False):
70
"""
71
Move the file to a new location.
72
73
Args:
74
new_path: Destination path
75
raise_if_exists: Whether to raise if destination exists
76
"""
77
78
def copy(self, new_path: str, raise_if_exists: bool = False):
79
"""
80
Copy the file to a new location.
81
82
Args:
83
new_path: Destination path
84
raise_if_exists: Whether to raise if destination exists
85
"""
86
87
@property
88
def path(self) -> str:
89
"""Get the file path."""
90
91
@property
92
def fs(self):
93
"""Get the filesystem handler."""
94
```
95
96
### Filesystem Interface
97
98
Abstract filesystem interface that LocalTarget uses for file operations. Can be subclassed for custom filesystem implementations.
99
100
```python { .api }
101
class FileSystem:
102
"""Abstract filesystem interface."""
103
104
def exists(self, path: str) -> bool:
105
"""Check if path exists."""
106
107
def remove(self, path: str, recursive: bool = True):
108
"""Remove file or directory."""
109
110
def move(self, path: str, dest: str, raise_if_exists: bool = False):
111
"""Move file or directory."""
112
113
def copy(self, path: str, dest: str, raise_if_exists: bool = False):
114
"""Copy file or directory."""
115
116
def listdir(self, path: str) -> list:
117
"""List directory contents."""
118
119
def mkdir(self, path: str, parents: bool = True, raise_if_exists: bool = False):
120
"""Create directory."""
121
122
def isdir(self, path: str) -> bool:
123
"""Check if path is a directory."""
124
125
def isfile(self, path: str) -> bool:
126
"""Check if path is a file."""
127
128
class LocalFileSystem(FileSystem):
129
"""Local filesystem implementation."""
130
```
131
132
### Filesystem Target Base
133
134
Base class for filesystem-based targets that provides common filesystem operations.
135
136
```python { .api }
137
class FileSystemTarget(Target):
138
"""Base class for filesystem targets."""
139
140
def __init__(self, path: str, format=None):
141
"""
142
Initialize filesystem target.
143
144
Args:
145
path: Target path
146
format: File format handler
147
"""
148
149
def move(self, new_path: str, raise_if_exists: bool = False):
150
"""Move the target to a new location."""
151
152
def remove(self):
153
"""Remove the target."""
154
155
def copy(self, new_path: str):
156
"""Copy the target to a new location."""
157
158
@property
159
def path(self) -> str:
160
"""Get the target path."""
161
162
@property
163
def fs(self):
164
"""Get the filesystem handler."""
165
```
166
167
### Atomic File Operations
168
169
Context manager for atomic file writing that ensures files are written completely before being moved to their final location.
170
171
```python { .api }
172
class AtomicLocalFile:
173
"""Context manager for atomic file writing."""
174
175
def __init__(self, path: str):
176
"""
177
Initialize atomic file writer.
178
179
Args:
180
path: Final file path
181
"""
182
183
def __enter__(self):
184
"""Enter context and return temporary file object."""
185
186
def __exit__(self, exc_type, exc_val, exc_tb):
187
"""Exit context and atomically move temp file to final location."""
188
189
def atomic_file(path: str):
190
"""
191
Create atomic file context manager.
192
193
Args:
194
path: Final file path
195
196
Returns:
197
AtomicLocalFile: Context manager for atomic writing
198
"""
199
```
200
201
### Target Exceptions
202
203
Exception classes for filesystem and target-related errors.
204
205
```python { .api }
206
class FileSystemException(Exception):
207
"""Base exception for filesystem errors."""
208
209
class FileAlreadyExists(FileSystemException):
210
"""Exception raised when attempting to create existing file."""
211
212
class MissingParentDirectory(FileSystemException):
213
"""Exception raised when parent directory doesn't exist."""
214
215
class NotADirectory(FileSystemException):
216
"""Exception raised when path is not a directory."""
217
```
218
219
## Usage Examples
220
221
### Basic File Target Usage
222
223
```python
224
import luigi
225
from luigi import Task, LocalTarget
226
227
class DataTask(Task):
228
def output(self):
229
return LocalTarget("data/output.txt")
230
231
def run(self):
232
# Write to target
233
with self.output().open('w') as f:
234
f.write("Hello, Luigi!")
235
236
# Check if target exists
237
assert self.output().exists()
238
239
class ProcessTask(Task):
240
def requires(self):
241
return DataTask()
242
243
def output(self):
244
return LocalTarget("data/processed.txt")
245
246
def run(self):
247
# Read from input target
248
with self.input().open('r') as f:
249
data = f.read()
250
251
# Write processed data
252
with self.output().open('w') as f:
253
f.write(data.upper())
254
```
255
256
### Multiple Output Targets
257
258
```python
259
class MultiOutputTask(Task):
260
def output(self):
261
return {
262
'summary': LocalTarget("output/summary.txt"),
263
'details': LocalTarget("output/details.csv"),
264
'metadata': LocalTarget("output/metadata.json")
265
}
266
267
def run(self):
268
# Write to multiple outputs
269
with self.output()['summary'].open('w') as f:
270
f.write("Processing completed")
271
272
with self.output()['details'].open('w') as f:
273
f.write("id,value\n1,100\n")
274
275
with self.output()['metadata'].open('w') as f:
276
f.write('{"status": "complete"}')
277
278
class ProcessMultiInputTask(Task):
279
def requires(self):
280
return MultiOutputTask()
281
282
def run(self):
283
# Access specific input targets
284
with self.input()['summary'].open('r') as f:
285
summary = f.read()
286
287
with self.input()['details'].open('r') as f:
288
details = f.read()
289
```
290
291
### Atomic File Writing
292
293
```python
294
import luigi
295
from luigi.local_target import atomic_file
296
297
class SafeWriteTask(Task):
298
def output(self):
299
return LocalTarget("important_data.txt")
300
301
def run(self):
302
# Use atomic writing to ensure complete file writes
303
with atomic_file(self.output().path) as f:
304
f.write("Critical data that must be written atomically")
305
# If an error occurs here, the original file remains unchanged
306
```
307
308
### Target File Operations
309
310
```python
311
class FileManagementTask(Task):
312
def output(self):
313
return LocalTarget("final_output.txt")
314
315
def run(self):
316
# Create temporary target
317
temp_target = LocalTarget("temp_output.txt")
318
319
# Write to temporary location
320
with temp_target.open('w') as f:
321
f.write("Temporary data")
322
323
# Move to final location
324
temp_target.move(self.output().path)
325
326
# Verify final target exists
327
assert self.output().exists()
328
```