CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

tasks.mddocs/

Task System

Luigi's task system provides base classes for defining workflow units with dependency management, output specification, and execution logic. Tasks form the core building blocks of Luigi workflows.

Capabilities

Base Task Class

The fundamental Task class that all Luigi tasks inherit from. Defines the interface for dependency resolution, output specification, and execution logic.

class Task:
    """Base class for all Luigi tasks."""
    
    def run(self):
        """
        Execute the task logic. Must be implemented by subclasses.
        """
    
    def output(self):
        """
        Specify the output Target(s) for this task.
        
        Returns:
            Target or list of Targets
        """
    
    def requires(self):
        """
        Specify task dependencies.
        
        Returns:
            Task, list of Tasks, or dict of Tasks
        """
    
    def complete(self) -> bool:
        """
        Check if the task is complete by verifying all outputs exist.
        
        Returns:
            bool: True if task is complete, False otherwise
        """
    
    def clone(self, **kwargs):
        """
        Create a copy of this task with modified parameters.
        
        Args:
            **kwargs: Parameter overrides
            
        Returns:
            Task: New task instance with updated parameters
        """
    
    @property
    def task_id(self) -> str:
        """Unique identifier for this task instance."""
    
    @property
    def task_family(self) -> str:
        """Task family name (class name)."""
    
    @property
    def param_kwargs(self) -> dict:
        """Dictionary of parameter names and values."""

External Task

Represents tasks that exist outside the Luigi workflow, such as data files created by external systems or manual processes.

class ExternalTask(Task):
    """
    Task representing external dependencies.
    
    External tasks have no run() method since they are not executed
    by Luigi. They only specify outputs that should exist.
    """
    
    def run(self):
        """External tasks cannot be run - raises NotImplementedError."""

Wrapper Task

Task that groups multiple dependencies without producing its own output. Useful for creating workflow entry points and organizing related tasks.

class WrapperTask(Task):
    """
    Task that wraps other tasks without producing output.
    
    Wrapper tasks only specify dependencies through requires()
    and have no output() or run() methods.
    """
    
    def output(self):
        """Wrapper tasks have no output."""
        return []

Configuration Task

Base class for tasks that only hold configuration parameters without executing logic. Used for sharing configuration across multiple tasks.

class Config(Task):
    """
    Task that only holds configuration parameters.
    
    Config tasks are automatically marked as complete and are used
    to share parameter values across multiple tasks.
    """

Task Namespacing

Functions for organizing tasks into namespaces to avoid naming conflicts and improve task organization.

def namespace(namespace: str = None, scope: str = ''):
    """
    Set namespace for tasks declared after this call.
    
    Args:
        namespace: Namespace string to prepend to task names
        scope: Module scope to limit namespace application
    """

def auto_namespace(scope: str = ''):
    """
    Set namespace to the module name of each task class.
    
    Args:
        scope: Module scope to limit namespace application
    """

Task Utilities

Utility functions for working with tasks, task IDs, and task dependency structures.

def task_id_str(task_family: str, params: dict) -> str:
    """
    Generate task ID string from family and parameters.
    
    Args:
        task_family: Task class name
        params: Parameter dictionary
        
    Returns:
        str: Formatted task ID
    """

def externalize(task_obj) -> ExternalTask:
    """
    Convert a regular task to an external task.
    
    Args:
        task_obj: Task to externalize
        
    Returns:
        ExternalTask: External version of the task
    """

def getpaths(struct):
    """
    Extract file paths from task output structure.
    
    Args:
        struct: Task output structure (Target, list, or dict)
        
    Returns:
        Generator of file paths
    """

def flatten(struct):
    """
    Flatten nested task dependency structure.
    
    Args:
        struct: Nested structure of tasks, lists, and dicts
        
    Returns:
        Generator of individual tasks
    """

def flatten_output(task_output):
    """
    Flatten task output structure to individual targets.
    
    Args:
        task_output: Task output (Target, list, or dict)
        
    Returns:
        Generator of individual targets
    """

Bulk Operations

Mixin class for implementing bulk completion checking to optimize performance when dealing with many tasks.

class MixinNaiveBulkComplete:
    """
    Mixin that provides naive bulk completion checking.
    
    Override bulk_complete() method for custom bulk operations.
    """
    
    def bulk_complete(self, parameter_tuples):
        """
        Check completion status for multiple parameter combinations.
        
        Args:
            parameter_tuples: List of parameter dictionaries
            
        Returns:
            Generator of completion status booleans
        """

class BulkCompleteNotImplementedError(NotImplementedError):
    """Exception raised when bulk completion is not implemented."""

Usage Examples

Basic Task with Dependencies

import luigi
from luigi import Task, LocalTarget, Parameter

class DataIngestionTask(Task):
    """Task that ingests raw data."""
    date = luigi.DateParameter()
    
    def output(self):
        return LocalTarget(f"data/raw/{self.date}.csv")
    
    def run(self):
        # Simulate data ingestion
        with self.output().open('w') as f:
            f.write("id,value\n1,100\n2,200\n")

class DataProcessingTask(Task):
    """Task that processes ingested data."""
    date = luigi.DateParameter()
    
    def requires(self):
        return DataIngestionTask(date=self.date)
    
    def output(self):
        return LocalTarget(f"data/processed/{self.date}.csv")
    
    def run(self):
        # Read input data
        with self.input().open('r') as f:
            data = f.read()
        
        # Process and write output
        processed = data.replace('100', '1000').replace('200', '2000')
        with self.output().open('w') as f:
            f.write(processed)

Wrapper Task for Multiple Dependencies

class DailyReportTask(luigi.WrapperTask):
    """Wrapper task that generates multiple daily reports."""
    date = luigi.DateParameter()
    
    def requires(self):
        return [
            DataProcessingTask(date=self.date),
            MetricsCalculationTask(date=self.date),
            QualityCheckTask(date=self.date)
        ]

External Task for Input Files

class InputFileTask(luigi.ExternalTask):
    """External task representing an input file."""
    filename = luigi.Parameter()
    
    def output(self):
        return LocalTarget(f"input/{self.filename}")

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json