or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md
tile.json

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/luigi@2.8.x

To install, run

npx @tessl/cli install tessl/pypi-luigi@2.8.0

index.mddocs/

Luigi

Luigi is a Python workflow management framework for building complex pipelines of batch jobs. It handles dependency resolution, failure recovery, command line integration, and provides a web interface for workflow visualization and monitoring. Luigi helps coordinate data processing workflows where multiple tasks depend on each other and need to be executed in the correct order.

Package Information

  • Package Name: luigi
  • Language: Python
  • Installation: pip install luigi

Core Imports

import luigi

Common imports for task and parameter classes:

from luigi import Task, Target, LocalTarget
from luigi import Parameter, DateParameter, IntParameter
from luigi import build, run

Basic Usage

import luigi
from luigi import Task, LocalTarget, Parameter

class HelloWorldTask(Task):
    """A simple task that creates a greeting file."""
    name = Parameter(default="World")
    
    def output(self):
        """Define the output target for this task."""
        return LocalTarget(f"hello_{self.name}.txt")
    
    def run(self):
        """Execute the task logic."""
        with self.output().open('w') as f:
            f.write(f"Hello, {self.name}!")

class ProcessGreetingTask(Task):
    """A task that processes the greeting file."""
    name = Parameter(default="World")
    
    def requires(self):
        """Define task dependencies."""
        return HelloWorldTask(name=self.name)
    
    def output(self):
        return LocalTarget(f"processed_{self.name}.txt")
    
    def run(self):
        # Read input from dependency
        with self.input().open('r') as f:
            greeting = f.read()
        
        # Process and write output
        with self.output().open('w') as f:
            f.write(greeting.upper())

# Run the workflow
if __name__ == '__main__':
    luigi.build([ProcessGreetingTask(name="Luigi")], local_scheduler=True)

Architecture

Luigi's architecture centers around three core concepts:

  • Tasks: Units of work that define dependencies, outputs, and execution logic
  • Targets: Represent data inputs and outputs with existence checking capabilities
  • Parameters: Configure and parameterize task behavior with type-safe values
  • Scheduler: Central coordination service for dependency resolution and task execution
  • Workers: Execute tasks locally or across multiple machines

This design enables building complex data pipelines with automatic dependency resolution, failure recovery, and workflow visualization through Luigi's web interface.

Capabilities

Core Task Classes

Base classes for defining workflow tasks including regular tasks, external dependencies, wrapper tasks, and configuration tasks. These form the foundation of all Luigi workflows.

class Task:
    def run(self): ...
    def output(self): ...
    def requires(self): ...
    def complete(self): ...

class ExternalTask(Task): ...
class WrapperTask(Task): ...
class Config(Task): ...

Task System

Target System

File and data target classes for representing task inputs and outputs with existence checking, atomic writes, and filesystem operations.

class Target:
    def exists(self) -> bool: ...
    def open(self, mode='r'): ...

class LocalTarget(Target):
    def __init__(self, path: str): ...
    def exists(self) -> bool: ...
    def open(self, mode='r'): ...
    def remove(self): ...

Targets

Parameter System

Type-safe parameter system for task configuration including primitive types, date/time parameters, collections, and custom parameter types.

class Parameter:
    def __init__(self, default=None): ...

class DateParameter(Parameter): ...
class IntParameter(Parameter): ...
class BoolParameter(Parameter): ...
class ListParameter(Parameter): ...
class DictParameter(Parameter): ...

Parameters

Workflow Execution

Main entry points for running Luigi workflows with dependency resolution, scheduling, and execution management.

def run(cmdline_args=None, main_task_cls=None, 
        worker_scheduler_factory=None, use_dynamic_argparse=None,
        local_scheduler=False, detailed_summary=False) -> LuigiRunResult: ...

def build(tasks, worker_scheduler_factory=None, 
          detailed_summary=False, **env_params) -> LuigiRunResult: ...

Execution

Configuration System

Configuration management for Luigi settings, task parameters, and scheduler options with support for multiple configuration file formats.

def get_config() -> LuigiConfigParser: ...
def add_config_path(path: str): ...

class LuigiConfigParser: ...
class LuigiTomlParser: ...

Configuration

External System Integration

Comprehensive contrib modules for integrating with databases, cloud storage, big data platforms, job schedulers, and monitoring systems.

# Database integration examples
from luigi.contrib.postgres import PostgresTarget, CopyToTable
from luigi.contrib.mysql import MySqlTarget
from luigi.contrib.mongodb import MongoTarget

# Cloud storage examples  
from luigi.contrib.s3 import S3Target, S3Client
from luigi.contrib.gcs import GCSTarget
from luigi.contrib.azureblob import AzureBlobTarget

# Big data platform examples
from luigi.contrib.hdfs import HdfsTarget
from luigi.contrib.spark import SparkSubmitTask
from luigi.contrib.bigquery import BigQueryTarget

External Integrations

Scheduler and RPC

Remote scheduler client for distributed task execution and coordination across multiple worker processes and machines.

class RemoteScheduler:
    def add_task(self, task_id: str, status: str, runnable: bool): ...
    def get_work(self, worker: str, host: str): ...
    def ping(self, worker: str): ...

class RPCError(Exception): ...

Scheduler & RPC

Events and Monitoring

Event system for monitoring task execution, workflow progress, and integrating with external monitoring systems.

class Event: ...

# Status codes for execution results
class LuigiStatusCode:
    SUCCESS: int
    SUCCESS_WITH_RETRY: int  
    FAILED: int
    FAILED_AND_SCHEDULING_FAILED: int

Events & Monitoring

Command Line Tools

Command-line utilities for workflow management, dependency analysis, and task introspection.

# Main CLI commands
luigi --module mymodule MyTask --param value
luigid --background --port 8082

# Tool utilities
luigi.tools.deps MyTask
luigi.tools.deps_tree MyTask  
luigi.tools.luigi_grep pattern

Command Line Tools

Common Types

from typing import Any, Dict, List, Optional, Union
from datetime import datetime, date, timedelta

# Common parameter value types
ParameterValue = Union[str, int, float, bool, date, datetime, timedelta, List[Any], Dict[str, Any]]

# Task execution result
class LuigiRunResult:
    status: LuigiStatusCode
    worker: Any
    scheduling_succeeded: bool