CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-luigi

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

72

1.30x
Overview
Eval results
Files

integrations.mddocs/

External Integrations

Luigi's contrib modules provide extensive integration with external systems including databases, cloud storage, big data platforms, job schedulers, and monitoring systems. These modules extend Luigi's capabilities to work with diverse data infrastructure.

Capabilities

Database Integration

Comprehensive database integration modules for popular database systems with specialized targets and task types.

# PostgreSQL integration
from luigi.contrib.postgres import PostgresTarget, CopyToTable, PostgresQuery

class PostgresTarget:
    """Target for PostgreSQL database tables."""
    
    def __init__(self, host: str, database: str, user: str, password: str,
                 table: str, update_id: str, port: int = 5432):
        """
        Initialize PostgreSQL target.
        
        Args:
            host: Database host
            database: Database name
            user: Username
            password: Password
            table: Table name
            update_id: Unique identifier for this update
            port: Database port (default 5432)
        """
    
    def exists(self) -> bool:
        """Check if the target exists in database."""

class CopyToTable(Task):
    """Task that copies data to PostgreSQL table."""
    
    host: str
    database: str
    user: str
    password: str
    table: str
    
    def copy(self, file_object):
        """Copy data from file object to table."""
    
    def output(self):
        """Return PostgresTarget for the table."""

# MySQL integration  
from luigi.contrib.mysqldb import MySqlTarget

class MySqlTarget:
    """Target for MySQL database tables."""
    
    def __init__(self, host: str, database: str, user: str, password: str,
                 table: str, update_id: str, port: int = 3306):
        """Initialize MySQL target."""

# MongoDB integration
from luigi.contrib.mongodb import MongoTarget

class MongoTarget:
    """Target for MongoDB collections."""
    
    def __init__(self, host: str, port: int, database: str, collection: str,
                 mongo_client=None):
        """Initialize MongoDB target."""

Cloud Storage Integration

Integration modules for major cloud storage platforms with specialized targets and operations.

# Amazon S3 integration
from luigi.contrib.s3 import S3Target, S3Client, S3FlagTarget, S3PathTask

class S3Target:
    """Target for Amazon S3 objects."""
    
    def __init__(self, path: str, format=None, client=None, 
                 is_tmp: bool = False):
        """
        Initialize S3 target.
        
        Args:
            path: S3 path (s3://bucket/key)
            format: File format handler
            client: S3 client instance
            is_tmp: Whether this is a temporary file
        """
    
    def exists(self) -> bool:
        """Check if S3 object exists."""
    
    def open(self, mode: str = 'r'):
        """Open S3 object for reading/writing."""
    
    def remove(self):
        """Delete S3 object."""

class S3Client:
    """Client for S3 operations."""
    
    def exists(self, path: str) -> bool:
        """Check if S3 object exists."""
    
    def put(self, local_path: str, destination_s3_path: str):
        """Upload local file to S3."""
    
    def get(self, s3_path: str, local_path: str):
        """Download S3 object to local file."""
    
    def list(self, path: str) -> list:
        """List S3 objects with path prefix."""

# Google Cloud Storage integration
from luigi.contrib.gcs import GCSTarget, GCSClient

class GCSTarget:
    """Target for Google Cloud Storage objects."""
    
    def __init__(self, path: str, format=None, client=None):
        """Initialize GCS target."""

class GCSClient:
    """Client for GCS operations."""
    
    def exists(self, path: str) -> bool:
        """Check if GCS object exists."""
    
    def put(self, local_path: str, destination_gcs_path: str):
        """Upload local file to GCS."""

# Azure Blob Storage integration  
from luigi.contrib.azureblob import AzureBlobTarget

class AzureBlobTarget:
    """Target for Azure Blob Storage."""
    
    def __init__(self, container: str, blob: str, 
                 account_name: str, account_key: str):
        """Initialize Azure Blob target."""

Big Data Platform Integration

Integration with big data processing platforms and frameworks.

# HDFS integration
from luigi.contrib.hdfs import HdfsTarget, HdfsClient

class HdfsTarget:
    """Target for HDFS files."""
    
    def __init__(self, path: str, format=None, client=None, is_tmp: bool = False):
        """Initialize HDFS target."""
    
    def exists(self) -> bool:
        """Check if HDFS file exists."""
    
    def open(self, mode: str = 'r'):
        """Open HDFS file."""

class HdfsClient:
    """Client for HDFS operations."""
    
    def exists(self, path: str) -> bool:
        """Check if HDFS path exists."""
    
    def put(self, local_path: str, destination_hdfs_path: str):
        """Upload local file to HDFS."""
    
    def get(self, hdfs_path: str, local_path: str):
        """Download HDFS file to local path."""

# Apache Spark integration
from luigi.contrib.spark import SparkSubmitTask, PySparkTask

class SparkSubmitTask(Task):
    """Task for submitting Spark applications."""
    
    app: str  # Spark application path
    master: str = "local[*]"  # Spark master URL
    deploy_mode: str = "client"  # Deploy mode
    executor_memory: str = "1g"  # Executor memory
    driver_memory: str = "1g"  # Driver memory
    
    def app_options(self) -> list:
        """Return application-specific options."""
    
    def run(self):
        """Submit Spark application."""

# Google BigQuery integration
from luigi.contrib.bigquery import BigQueryTarget, BigQueryLoadTask

class BigQueryTarget:
    """Target for BigQuery tables."""
    
    def __init__(self, project_id: str, dataset_id: str, table_id: str,
                 client=None):
        """Initialize BigQuery target."""
    
    def exists(self) -> bool:
        """Check if BigQuery table exists."""

class BigQueryLoadTask(Task):
    """Task for loading data into BigQuery."""
    
    project_id: str
    dataset_id: str  
    table_id: str
    schema: list  # Table schema
    
    def run(self):
        """Load data into BigQuery table."""

Job Scheduler Integration

Integration with HPC and cluster job schedulers.

# SLURM integration
from luigi.contrib.slurm import SlurmTask

class SlurmTask(Task):
    """Base class for SLURM job submission."""
    
    shared_tmp_dir: str  # Shared temporary directory
    job_name: str  # SLURM job name
    n_cpu: int = 1  # Number of CPUs
    mem: str = "1GB"  # Memory requirement
    time: str = "1:00:00"  # Time limit
    partition: str  # SLURM partition
    
    def work(self):
        """Define work to be done in SLURM job."""
    
    def run(self):
        """Submit job to SLURM scheduler."""

# LSF integration  
from luigi.contrib.lsf import LSFTask

class LSFTask(Task):
    """Base class for LSF job submission."""
    
    shared_tmp_dir: str
    job_name: str
    n_cpu: int = 1
    resource: str  # LSF resource requirements
    queue: str  # LSF queue
    
    def work(self):
        """Define work for LSF job."""

# Sun Grid Engine integration
from luigi.contrib.sge import SGETask

class SGETask(Task):
    """Base class for SGE job submission."""
    
    shared_tmp_dir: str
    n_cpu: int = 1
    run_locally: bool = False
    
    def work(self):
        """Define work for SGE job."""

Container and Cloud Platform Integration

Integration with containerization platforms and cloud services.

# Docker integration
from luigi.contrib.docker_runner import DockerTask

class DockerTask(Task):
    """Task for running commands in Docker containers."""
    
    image: str  # Docker image name
    command: str  # Command to run
    container_options: dict = {}  # Docker container options
    
    def run(self):
        """Run command in Docker container."""

# Kubernetes integration
from luigi.contrib.kubernetes import KubernetesJobTask

class KubernetesJobTask(Task):
    """Task for running Kubernetes jobs."""
    
    name: str  # Job name
    image: str  # Container image
    command: list  # Command to run
    
    def spec_schema(self) -> dict:
        """Return Kubernetes job spec."""

# AWS Batch integration
from luigi.contrib.batch import BatchTask

class BatchTask(Task):
    """Task for AWS Batch job execution."""
    
    job_name: str
    job_queue: str
    job_definition: str
    
    def run(self):
        """Submit job to AWS Batch."""

# Google Cloud Dataproc integration
from luigi.contrib.dataproc import DataprocJobTask

class DataprocJobTask(Task):
    """Base class for Google Cloud Dataproc jobs."""
    
    cluster_name: str
    project_id: str
    region: str = "global"
    
    def run(self):
        """Submit job to Dataproc cluster."""

Monitoring and Metrics Integration

Integration with monitoring and metrics collection systems.

# Datadog metrics integration
from luigi.contrib.datadog_metric import DatadogMetric

class DatadogMetric:
    """Send metrics to Datadog."""
    
    def __init__(self, metric_name: str, value: float, tags: list = None):
        """
        Initialize Datadog metric.
        
        Args:
            metric_name: Metric name
            value: Metric value
            tags: List of tags
        """
    
    def send(self):
        """Send metric to Datadog."""

# Prometheus metrics integration  
from luigi.contrib.prometheus_metric import PrometheusMetric

class PrometheusMetric:
    """Send metrics to Prometheus pushgateway."""
    
    def __init__(self, metric_name: str, value: float, labels: dict = None):
        """Initialize Prometheus metric."""
    
    def push(self):
        """Push metric to Prometheus."""

Usage Examples

Database Integration Example

import luigi
from luigi.contrib.postgres import PostgresTarget, CopyToTable

class LoadDataToPostgres(CopyToTable):
    """Load CSV data into PostgreSQL table."""
    
    host = "localhost"
    database = "mydb"
    user = "postgres"
    password = "password"
    table = "sales_data"
    
    def requires(self):
        return ProcessSalesData()  # Task that generates CSV
    
    def copy(self, file_object):
        """Custom copy logic if needed."""
        # Default implementation handles CSV copying
        return super().copy(file_object)

class QueryPostgresData(luigi.Task):
    """Query data from PostgreSQL."""
    
    def requires(self):
        return LoadDataToPostgres()
    
    def output(self):
        return luigi.LocalTarget("query_results.txt")
    
    def run(self):
        # Check that data was loaded
        target = PostgresTarget(
            host="localhost",
            database="mydb", 
            user="postgres",
            password="password",
            table="sales_data",
            update_id="loaded"
        )
        
        if target.exists():
            with self.output().open('w') as f:
                f.write("Data successfully loaded to PostgreSQL")

S3 Integration Example

import luigi
from luigi.contrib.s3 import S3Target, S3Client

class ProcessS3Data(luigi.Task):
    """Process data stored in S3."""
    
    bucket = luigi.Parameter()
    key = luigi.Parameter()
    
    def output(self):
        return S3Target(f"s3://{self.bucket}/processed/{self.key}")
    
    def run(self):
        # Read from S3 input
        input_target = S3Target(f"s3://{self.bucket}/raw/{self.key}")
        
        with input_target.open('r') as input_file:
            data = input_file.read()
        
        # Process data
        processed_data = data.upper()
        
        # Write to S3 output
        with self.output().open('w') as output_file:
            output_file.write(processed_data)

class S3DataPipeline(luigi.Task):
    """Pipeline that processes multiple S3 files."""
    
    bucket = luigi.Parameter()
    
    def run(self):
        # List files in S3 bucket
        client = S3Client()
        files = client.list(f"s3://{self.bucket}/raw/")
        
        # Process each file
        tasks = []
        for file_path in files:
            key = file_path.split('/')[-1]  # Extract filename
            tasks.append(ProcessS3Data(bucket=self.bucket, key=key))
        
        # Build all processing tasks
        yield tasks

Spark Integration Example

import luigi
from luigi.contrib.spark import SparkSubmitTask

class SparkDataProcessing(SparkSubmitTask):
    """Process large dataset using Spark."""
    
    input_path = luigi.Parameter()
    output_path = luigi.Parameter()
    
    # Spark configuration
    app = "spark_jobs/data_processing.py"
    master = "spark://localhost:7077"
    executor_memory = "4g"
    driver_memory = "2g"
    
    def app_options(self):
        """Pass parameters to Spark application."""
        return [
            "--input", self.input_path,
            "--output", self.output_path
        ]
    
    def output(self):
        return luigi.LocalTarget(f"{self.output_path}/_SUCCESS")

# The Spark application (data_processing.py) would contain:
# from pyspark.sql import SparkSession
# import argparse
# 
# parser = argparse.ArgumentParser()
# parser.add_argument("--input")
# parser.add_argument("--output")
# args = parser.parse_args()
# 
# spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
# df = spark.read.csv(args.input, header=True)
# processed_df = df.filter(df.amount > 100)
# processed_df.write.csv(args.output, header=True)

SLURM Cluster Integration Example

import luigi
from luigi.contrib.slurm import SlurmTask

class HeavyComputation(SlurmTask):
    """Run computationally intensive task on SLURM cluster."""
    
    dataset = luigi.Parameter()
    
    # SLURM job configuration
    shared_tmp_dir = "/shared/tmp"
    job_name = "heavy_computation"
    n_cpu = 16
    mem = "32GB"
    time = "4:00:00"
    partition = "compute"
    
    def work(self):
        """Define work to be executed on cluster."""
        # This runs on the SLURM compute node
        import heavy_computation_module
        
        result = heavy_computation_module.process_dataset(self.dataset)
        
        # Write result to shared filesystem
        with open(f"{self.shared_tmp_dir}/result_{self.dataset}.txt", 'w') as f:
            f.write(str(result))
    
    def output(self):
        return luigi.LocalTarget(f"{self.shared_tmp_dir}/result_{self.dataset}.txt")

Multi-Platform Pipeline Example

import luigi
from luigi.contrib.s3 import S3Target
from luigi.contrib.postgres import CopyToTable
from luigi.contrib.spark import SparkSubmitTask

class DataIngestionPipeline(luigi.WrapperTask):
    """Complete data pipeline using multiple integrations."""
    
    date = luigi.DateParameter()
    
    def requires(self):
        return [
            # 1. Download data from S3
            DownloadS3Data(date=self.date),
            
            # 2. Process with Spark
            SparkProcessing(date=self.date),
            
            # 3. Load to PostgreSQL
            LoadToDatabase(date=self.date),
            
            # 4. Send metrics to monitoring
            SendMetrics(date=self.date)
        ]

class DownloadS3Data(luigi.Task):
    date = luigi.DateParameter()
    
    def output(self):
        return luigi.LocalTarget(f"data/raw_{self.date}.csv")
    
    def run(self):
        s3_target = S3Target(f"s3://data-bucket/raw/{self.date}.csv")
        with s3_target.open('r') as s3_file, self.output().open('w') as local_file:
            local_file.write(s3_file.read())

class SparkProcessing(SparkSubmitTask):
    date = luigi.DateParameter()
    
    app = "spark_jobs/daily_processing.py"
    
    def requires(self):
        return DownloadS3Data(date=self.date)
    
    def app_options(self):
        return ["--date", str(self.date)]

class LoadToDatabase(CopyToTable):
    date = luigi.DateParameter()
    
    host = "localhost"
    database = "analytics"
    user = "luigi"
    password = "password"
    table = "daily_metrics"
    
    def requires(self):
        return SparkProcessing(date=self.date)

Install with Tessl CLI

npx tessl i tessl/pypi-luigi

docs

cli-tools.md

configuration.md

events.md

execution.md

index.md

integrations.md

parameters.md

scheduler.md

targets.md

tasks.md

tile.json