CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

targets.mddocs/

Targets

Luigi's target system represents data inputs and outputs with existence checking, file operations, and atomic writing capabilities. Targets define what tasks produce and consume.

Capabilities

Base Target Interface

Abstract base class that defines the interface for all targets in Luigi. Targets represent data that tasks consume or produce.

class Target:
    """Abstract base class for all targets."""
    
    def exists(self) -> bool:
        """
        Check if the target exists.
        
        Returns:
            bool: True if target exists, False otherwise
        """
    
    def open(self, mode: str = 'r'):
        """
        Open the target for reading or writing.
        
        Args:
            mode: File mode ('r', 'w', 'a', etc.)
            
        Returns:
            File-like object
        """

Local Filesystem Target

Target implementation for local filesystem files with atomic writing, existence checking, and file operations.

class LocalTarget(Target):
    """Target for local filesystem files."""
    
    def __init__(self, path: str, format=None, is_tmp: bool = False):
        """
        Initialize local target.
        
        Args:
            path: File path
            format: File format handler (optional)
            is_tmp: Whether this is a temporary file
        """
    
    def exists(self) -> bool:
        """Check if the local file exists."""
    
    def open(self, mode: str = 'r'):
        """
        Open the local file.
        
        Args:
            mode: File mode ('r', 'w', 'a', 'rb', 'wb', etc.)
            
        Returns:
            File object (possibly wrapped with format handler)
        """
    
    def remove(self):
        """Remove the local file if it exists."""
    
    def move(self, new_path: str, raise_if_exists: bool = False):
        """
        Move the file to a new location.
        
        Args:
            new_path: Destination path
            raise_if_exists: Whether to raise if destination exists
        """
    
    def copy(self, new_path: str, raise_if_exists: bool = False):
        """
        Copy the file to a new location.
        
        Args:
            new_path: Destination path  
            raise_if_exists: Whether to raise if destination exists
        """
    
    @property
    def path(self) -> str:
        """Get the file path."""
    
    @property
    def fs(self):
        """Get the filesystem handler."""

Filesystem Interface

Abstract filesystem interface that LocalTarget uses for file operations. Can be subclassed for custom filesystem implementations.

class FileSystem:
    """Abstract filesystem interface."""
    
    def exists(self, path: str) -> bool:
        """Check if path exists."""
    
    def remove(self, path: str, recursive: bool = True):
        """Remove file or directory."""
    
    def move(self, path: str, dest: str, raise_if_exists: bool = False):
        """Move file or directory."""
    
    def copy(self, path: str, dest: str, raise_if_exists: bool = False):
        """Copy file or directory."""
    
    def listdir(self, path: str) -> list:
        """List directory contents."""
    
    def mkdir(self, path: str, parents: bool = True, raise_if_exists: bool = False):
        """Create directory."""
    
    def isdir(self, path: str) -> bool:
        """Check if path is a directory."""
    
    def isfile(self, path: str) -> bool:
        """Check if path is a file."""

class LocalFileSystem(FileSystem):
    """Local filesystem implementation."""

Filesystem Target Base

Base class for filesystem-based targets that provides common filesystem operations.

class FileSystemTarget(Target):
    """Base class for filesystem targets."""
    
    def __init__(self, path: str, format=None):
        """
        Initialize filesystem target.
        
        Args:
            path: Target path
            format: File format handler
        """
    
    def move(self, new_path: str, raise_if_exists: bool = False):
        """Move the target to a new location."""
    
    def remove(self):
        """Remove the target."""
    
    def copy(self, new_path: str):
        """Copy the target to a new location."""
    
    @property
    def path(self) -> str:
        """Get the target path."""
    
    @property
    def fs(self):
        """Get the filesystem handler."""

Atomic File Operations

Context manager for atomic file writing that ensures files are written completely before being moved to their final location.

class AtomicLocalFile:
    """Context manager for atomic file writing."""
    
    def __init__(self, path: str):
        """
        Initialize atomic file writer.
        
        Args:
            path: Final file path
        """
    
    def __enter__(self):
        """Enter context and return temporary file object."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context and atomically move temp file to final location."""

def atomic_file(path: str):
    """
    Create atomic file context manager.
    
    Args:
        path: Final file path
        
    Returns:
        AtomicLocalFile: Context manager for atomic writing
    """

Target Exceptions

Exception classes for filesystem and target-related errors.

class FileSystemException(Exception):
    """Base exception for filesystem errors."""

class FileAlreadyExists(FileSystemException):
    """Exception raised when attempting to create existing file."""

class MissingParentDirectory(FileSystemException):
    """Exception raised when parent directory doesn't exist."""

class NotADirectory(FileSystemException):
    """Exception raised when path is not a directory."""

Usage Examples

Basic File Target Usage

import luigi
from luigi import Task, LocalTarget

class DataTask(Task):
    def output(self):
        return LocalTarget("data/output.txt")
    
    def run(self):
        # Write to target
        with self.output().open('w') as f:
            f.write("Hello, Luigi!")
        
        # Check if target exists
        assert self.output().exists()

class ProcessTask(Task):
    def requires(self):
        return DataTask()
    
    def output(self):
        return LocalTarget("data/processed.txt")
    
    def run(self):
        # Read from input target
        with self.input().open('r') as f:
            data = f.read()
        
        # Write processed data
        with self.output().open('w') as f:
            f.write(data.upper())

Multiple Output Targets

class MultiOutputTask(Task):
    def output(self):
        return {
            'summary': LocalTarget("output/summary.txt"),
            'details': LocalTarget("output/details.csv"),
            'metadata': LocalTarget("output/metadata.json")
        }
    
    def run(self):
        # Write to multiple outputs
        with self.output()['summary'].open('w') as f:
            f.write("Processing completed")
        
        with self.output()['details'].open('w') as f:
            f.write("id,value\n1,100\n")
        
        with self.output()['metadata'].open('w') as f:
            f.write('{"status": "complete"}')

class ProcessMultiInputTask(Task):
    def requires(self):
        return MultiOutputTask()
    
    def run(self):
        # Access specific input targets
        with self.input()['summary'].open('r') as f:
            summary = f.read()
        
        with self.input()['details'].open('r') as f:
            details = f.read()

Atomic File Writing

import luigi
from luigi.local_target import atomic_file

class SafeWriteTask(Task):
    def output(self):
        return LocalTarget("important_data.txt")
    
    def run(self):
        # Use atomic writing to ensure complete file writes
        with atomic_file(self.output().path) as f:
            f.write("Critical data that must be written atomically")
            # If an error occurs here, the original file remains unchanged

Target File Operations

class FileManagementTask(Task):
    def output(self):
        return LocalTarget("final_output.txt")
    
    def run(self):
        # Create temporary target
        temp_target = LocalTarget("temp_output.txt")
        
        # Write to temporary location
        with temp_target.open('w') as f:
            f.write("Temporary data")
        
        # Move to final location
        temp_target.move(self.output().path)
        
        # Verify final target exists
        assert self.output().exists()

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json