Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's target system represents data inputs and outputs with existence checking, file operations, and atomic writing capabilities. Targets define what tasks produce and consume.
Abstract base class that defines the interface for all targets in Luigi. Targets represent data that tasks consume or produce.
class Target:
"""Abstract base class for all targets."""
def exists(self) -> bool:
"""
Check if the target exists.
Returns:
bool: True if target exists, False otherwise
"""
def open(self, mode: str = 'r'):
"""
Open the target for reading or writing.
Args:
mode: File mode ('r', 'w', 'a', etc.)
Returns:
File-like object
"""Target implementation for local filesystem files with atomic writing, existence checking, and file operations.
class LocalTarget(Target):
"""Target for local filesystem files."""
def __init__(self, path: str, format=None, is_tmp: bool = False):
"""
Initialize local target.
Args:
path: File path
format: File format handler (optional)
is_tmp: Whether this is a temporary file
"""
def exists(self) -> bool:
"""Check if the local file exists."""
def open(self, mode: str = 'r'):
"""
Open the local file.
Args:
mode: File mode ('r', 'w', 'a', 'rb', 'wb', etc.)
Returns:
File object (possibly wrapped with format handler)
"""
def remove(self):
"""Remove the local file if it exists."""
def move(self, new_path: str, raise_if_exists: bool = False):
"""
Move the file to a new location.
Args:
new_path: Destination path
raise_if_exists: Whether to raise if destination exists
"""
def copy(self, new_path: str, raise_if_exists: bool = False):
"""
Copy the file to a new location.
Args:
new_path: Destination path
raise_if_exists: Whether to raise if destination exists
"""
@property
def path(self) -> str:
"""Get the file path."""
@property
def fs(self):
"""Get the filesystem handler."""Abstract filesystem interface that LocalTarget uses for file operations. Can be subclassed for custom filesystem implementations.
class FileSystem:
"""Abstract filesystem interface."""
def exists(self, path: str) -> bool:
"""Check if path exists."""
def remove(self, path: str, recursive: bool = True):
"""Remove file or directory."""
def move(self, path: str, dest: str, raise_if_exists: bool = False):
"""Move file or directory."""
def copy(self, path: str, dest: str, raise_if_exists: bool = False):
"""Copy file or directory."""
def listdir(self, path: str) -> list:
"""List directory contents."""
def mkdir(self, path: str, parents: bool = True, raise_if_exists: bool = False):
"""Create directory."""
def isdir(self, path: str) -> bool:
"""Check if path is a directory."""
def isfile(self, path: str) -> bool:
"""Check if path is a file."""
class LocalFileSystem(FileSystem):
"""Local filesystem implementation."""Base class for filesystem-based targets that provides common filesystem operations.
class FileSystemTarget(Target):
"""Base class for filesystem targets."""
def __init__(self, path: str, format=None):
"""
Initialize filesystem target.
Args:
path: Target path
format: File format handler
"""
def move(self, new_path: str, raise_if_exists: bool = False):
"""Move the target to a new location."""
def remove(self):
"""Remove the target."""
def copy(self, new_path: str):
"""Copy the target to a new location."""
@property
def path(self) -> str:
"""Get the target path."""
@property
def fs(self):
"""Get the filesystem handler."""Context manager for atomic file writing that ensures files are written completely before being moved to their final location.
class AtomicLocalFile:
"""Context manager for atomic file writing."""
def __init__(self, path: str):
"""
Initialize atomic file writer.
Args:
path: Final file path
"""
def __enter__(self):
"""Enter context and return temporary file object."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context and atomically move temp file to final location."""
def atomic_file(path: str):
"""
Create atomic file context manager.
Args:
path: Final file path
Returns:
AtomicLocalFile: Context manager for atomic writing
"""Exception classes for filesystem and target-related errors.
class FileSystemException(Exception):
"""Base exception for filesystem errors."""
class FileAlreadyExists(FileSystemException):
"""Exception raised when attempting to create existing file."""
class MissingParentDirectory(FileSystemException):
"""Exception raised when parent directory doesn't exist."""
class NotADirectory(FileSystemException):
"""Exception raised when path is not a directory."""import luigi
from luigi import Task, LocalTarget
class DataTask(Task):
def output(self):
return LocalTarget("data/output.txt")
def run(self):
# Write to target
with self.output().open('w') as f:
f.write("Hello, Luigi!")
# Check if target exists
assert self.output().exists()
class ProcessTask(Task):
def requires(self):
return DataTask()
def output(self):
return LocalTarget("data/processed.txt")
def run(self):
# Read from input target
with self.input().open('r') as f:
data = f.read()
# Write processed data
with self.output().open('w') as f:
f.write(data.upper())class MultiOutputTask(Task):
def output(self):
return {
'summary': LocalTarget("output/summary.txt"),
'details': LocalTarget("output/details.csv"),
'metadata': LocalTarget("output/metadata.json")
}
def run(self):
# Write to multiple outputs
with self.output()['summary'].open('w') as f:
f.write("Processing completed")
with self.output()['details'].open('w') as f:
f.write("id,value\n1,100\n")
with self.output()['metadata'].open('w') as f:
f.write('{"status": "complete"}')
class ProcessMultiInputTask(Task):
def requires(self):
return MultiOutputTask()
def run(self):
# Access specific input targets
with self.input()['summary'].open('r') as f:
summary = f.read()
with self.input()['details'].open('r') as f:
details = f.read()import luigi
from luigi.local_target import atomic_file
class SafeWriteTask(Task):
def output(self):
return LocalTarget("important_data.txt")
def run(self):
# Use atomic writing to ensure complete file writes
with atomic_file(self.output().path) as f:
f.write("Critical data that must be written atomically")
# If an error occurs here, the original file remains unchangedclass FileManagementTask(Task):
def output(self):
return LocalTarget("final_output.txt")
def run(self):
# Create temporary target
temp_target = LocalTarget("temp_output.txt")
# Write to temporary location
with temp_target.open('w') as f:
f.write("Temporary data")
# Move to final location
temp_target.move(self.output().path)
# Verify final target exists
assert self.output().exists()Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10