Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
72
Luigi's contrib modules provide extensive integration with external systems including databases, cloud storage, big data platforms, job schedulers, and monitoring systems. These modules extend Luigi's capabilities to work with diverse data infrastructure.
Comprehensive database integration modules for popular database systems with specialized targets and task types.
# PostgreSQL integration
from luigi.contrib.postgres import PostgresTarget, CopyToTable, PostgresQuery
class PostgresTarget:
"""Target for PostgreSQL database tables."""
def __init__(self, host: str, database: str, user: str, password: str,
table: str, update_id: str, port: int = 5432):
"""
Initialize PostgreSQL target.
Args:
host: Database host
database: Database name
user: Username
password: Password
table: Table name
update_id: Unique identifier for this update
port: Database port (default 5432)
"""
def exists(self) -> bool:
"""Check if the target exists in database."""
class CopyToTable(Task):
"""Task that copies data to PostgreSQL table."""
host: str
database: str
user: str
password: str
table: str
def copy(self, file_object):
"""Copy data from file object to table."""
def output(self):
"""Return PostgresTarget for the table."""
# MySQL integration
from luigi.contrib.mysqldb import MySqlTarget
class MySqlTarget:
"""Target for MySQL database tables."""
def __init__(self, host: str, database: str, user: str, password: str,
table: str, update_id: str, port: int = 3306):
"""Initialize MySQL target."""
# MongoDB integration
from luigi.contrib.mongodb import MongoTarget
class MongoTarget:
"""Target for MongoDB collections."""
def __init__(self, host: str, port: int, database: str, collection: str,
mongo_client=None):
"""Initialize MongoDB target."""Integration modules for major cloud storage platforms with specialized targets and operations.
# Amazon S3 integration
from luigi.contrib.s3 import S3Target, S3Client, S3FlagTarget, S3PathTask
class S3Target:
"""Target for Amazon S3 objects."""
def __init__(self, path: str, format=None, client=None,
is_tmp: bool = False):
"""
Initialize S3 target.
Args:
path: S3 path (s3://bucket/key)
format: File format handler
client: S3 client instance
is_tmp: Whether this is a temporary file
"""
def exists(self) -> bool:
"""Check if S3 object exists."""
def open(self, mode: str = 'r'):
"""Open S3 object for reading/writing."""
def remove(self):
"""Delete S3 object."""
class S3Client:
"""Client for S3 operations."""
def exists(self, path: str) -> bool:
"""Check if S3 object exists."""
def put(self, local_path: str, destination_s3_path: str):
"""Upload local file to S3."""
def get(self, s3_path: str, local_path: str):
"""Download S3 object to local file."""
def list(self, path: str) -> list:
"""List S3 objects with path prefix."""
# Google Cloud Storage integration
from luigi.contrib.gcs import GCSTarget, GCSClient
class GCSTarget:
"""Target for Google Cloud Storage objects."""
def __init__(self, path: str, format=None, client=None):
"""Initialize GCS target."""
class GCSClient:
"""Client for GCS operations."""
def exists(self, path: str) -> bool:
"""Check if GCS object exists."""
def put(self, local_path: str, destination_gcs_path: str):
"""Upload local file to GCS."""
# Azure Blob Storage integration
from luigi.contrib.azureblob import AzureBlobTarget
class AzureBlobTarget:
"""Target for Azure Blob Storage."""
def __init__(self, container: str, blob: str,
account_name: str, account_key: str):
"""Initialize Azure Blob target."""Integration with big data processing platforms and frameworks.
# HDFS integration
from luigi.contrib.hdfs import HdfsTarget, HdfsClient
class HdfsTarget:
"""Target for HDFS files."""
def __init__(self, path: str, format=None, client=None, is_tmp: bool = False):
"""Initialize HDFS target."""
def exists(self) -> bool:
"""Check if HDFS file exists."""
def open(self, mode: str = 'r'):
"""Open HDFS file."""
class HdfsClient:
"""Client for HDFS operations."""
def exists(self, path: str) -> bool:
"""Check if HDFS path exists."""
def put(self, local_path: str, destination_hdfs_path: str):
"""Upload local file to HDFS."""
def get(self, hdfs_path: str, local_path: str):
"""Download HDFS file to local path."""
# Apache Spark integration
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
class SparkSubmitTask(Task):
"""Task for submitting Spark applications."""
app: str # Spark application path
master: str = "local[*]" # Spark master URL
deploy_mode: str = "client" # Deploy mode
executor_memory: str = "1g" # Executor memory
driver_memory: str = "1g" # Driver memory
def app_options(self) -> list:
"""Return application-specific options."""
def run(self):
"""Submit Spark application."""
# Google BigQuery integration
from luigi.contrib.bigquery import BigQueryTarget, BigQueryLoadTask
class BigQueryTarget:
"""Target for BigQuery tables."""
def __init__(self, project_id: str, dataset_id: str, table_id: str,
client=None):
"""Initialize BigQuery target."""
def exists(self) -> bool:
"""Check if BigQuery table exists."""
class BigQueryLoadTask(Task):
"""Task for loading data into BigQuery."""
project_id: str
dataset_id: str
table_id: str
schema: list # Table schema
def run(self):
"""Load data into BigQuery table."""Integration with HPC and cluster job schedulers.
# SLURM integration
from luigi.contrib.slurm import SlurmTask
class SlurmTask(Task):
"""Base class for SLURM job submission."""
shared_tmp_dir: str # Shared temporary directory
job_name: str # SLURM job name
n_cpu: int = 1 # Number of CPUs
mem: str = "1GB" # Memory requirement
time: str = "1:00:00" # Time limit
partition: str # SLURM partition
def work(self):
"""Define work to be done in SLURM job."""
def run(self):
"""Submit job to SLURM scheduler."""
# LSF integration
from luigi.contrib.lsf import LSFTask
class LSFTask(Task):
"""Base class for LSF job submission."""
shared_tmp_dir: str
job_name: str
n_cpu: int = 1
resource: str # LSF resource requirements
queue: str # LSF queue
def work(self):
"""Define work for LSF job."""
# Sun Grid Engine integration
from luigi.contrib.sge import SGETask
class SGETask(Task):
"""Base class for SGE job submission."""
shared_tmp_dir: str
n_cpu: int = 1
run_locally: bool = False
def work(self):
"""Define work for SGE job."""Integration with containerization platforms and cloud services.
# Docker integration
from luigi.contrib.docker_runner import DockerTask
class DockerTask(Task):
"""Task for running commands in Docker containers."""
image: str # Docker image name
command: str # Command to run
container_options: dict = {} # Docker container options
def run(self):
"""Run command in Docker container."""
# Kubernetes integration
from luigi.contrib.kubernetes import KubernetesJobTask
class KubernetesJobTask(Task):
"""Task for running Kubernetes jobs."""
name: str # Job name
image: str # Container image
command: list # Command to run
def spec_schema(self) -> dict:
"""Return Kubernetes job spec."""
# AWS Batch integration
from luigi.contrib.batch import BatchTask
class BatchTask(Task):
"""Task for AWS Batch job execution."""
job_name: str
job_queue: str
job_definition: str
def run(self):
"""Submit job to AWS Batch."""
# Google Cloud Dataproc integration
from luigi.contrib.dataproc import DataprocJobTask
class DataprocJobTask(Task):
"""Base class for Google Cloud Dataproc jobs."""
cluster_name: str
project_id: str
region: str = "global"
def run(self):
"""Submit job to Dataproc cluster."""Integration with monitoring and metrics collection systems.
# Datadog metrics integration
from luigi.contrib.datadog_metric import DatadogMetric
class DatadogMetric:
"""Send metrics to Datadog."""
def __init__(self, metric_name: str, value: float, tags: list = None):
"""
Initialize Datadog metric.
Args:
metric_name: Metric name
value: Metric value
tags: List of tags
"""
def send(self):
"""Send metric to Datadog."""
# Prometheus metrics integration
from luigi.contrib.prometheus_metric import PrometheusMetric
class PrometheusMetric:
"""Send metrics to Prometheus pushgateway."""
def __init__(self, metric_name: str, value: float, labels: dict = None):
"""Initialize Prometheus metric."""
def push(self):
"""Push metric to Prometheus."""import luigi
from luigi.contrib.postgres import PostgresTarget, CopyToTable
class LoadDataToPostgres(CopyToTable):
"""Load CSV data into PostgreSQL table."""
host = "localhost"
database = "mydb"
user = "postgres"
password = "password"
table = "sales_data"
def requires(self):
return ProcessSalesData() # Task that generates CSV
def copy(self, file_object):
"""Custom copy logic if needed."""
# Default implementation handles CSV copying
return super().copy(file_object)
class QueryPostgresData(luigi.Task):
"""Query data from PostgreSQL."""
def requires(self):
return LoadDataToPostgres()
def output(self):
return luigi.LocalTarget("query_results.txt")
def run(self):
# Check that data was loaded
target = PostgresTarget(
host="localhost",
database="mydb",
user="postgres",
password="password",
table="sales_data",
update_id="loaded"
)
if target.exists():
with self.output().open('w') as f:
f.write("Data successfully loaded to PostgreSQL")import luigi
from luigi.contrib.s3 import S3Target, S3Client
class ProcessS3Data(luigi.Task):
"""Process data stored in S3."""
bucket = luigi.Parameter()
key = luigi.Parameter()
def output(self):
return S3Target(f"s3://{self.bucket}/processed/{self.key}")
def run(self):
# Read from S3 input
input_target = S3Target(f"s3://{self.bucket}/raw/{self.key}")
with input_target.open('r') as input_file:
data = input_file.read()
# Process data
processed_data = data.upper()
# Write to S3 output
with self.output().open('w') as output_file:
output_file.write(processed_data)
class S3DataPipeline(luigi.Task):
"""Pipeline that processes multiple S3 files."""
bucket = luigi.Parameter()
def run(self):
# List files in S3 bucket
client = S3Client()
files = client.list(f"s3://{self.bucket}/raw/")
# Process each file
tasks = []
for file_path in files:
key = file_path.split('/')[-1] # Extract filename
tasks.append(ProcessS3Data(bucket=self.bucket, key=key))
# Build all processing tasks
yield tasksimport luigi
from luigi.contrib.spark import SparkSubmitTask
class SparkDataProcessing(SparkSubmitTask):
"""Process large dataset using Spark."""
input_path = luigi.Parameter()
output_path = luigi.Parameter()
# Spark configuration
app = "spark_jobs/data_processing.py"
master = "spark://localhost:7077"
executor_memory = "4g"
driver_memory = "2g"
def app_options(self):
"""Pass parameters to Spark application."""
return [
"--input", self.input_path,
"--output", self.output_path
]
def output(self):
return luigi.LocalTarget(f"{self.output_path}/_SUCCESS")
# The Spark application (data_processing.py) would contain:
# from pyspark.sql import SparkSession
# import argparse
#
# parser = argparse.ArgumentParser()
# parser.add_argument("--input")
# parser.add_argument("--output")
# args = parser.parse_args()
#
# spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
# df = spark.read.csv(args.input, header=True)
# processed_df = df.filter(df.amount > 100)
# processed_df.write.csv(args.output, header=True)import luigi
from luigi.contrib.slurm import SlurmTask
class HeavyComputation(SlurmTask):
"""Run computationally intensive task on SLURM cluster."""
dataset = luigi.Parameter()
# SLURM job configuration
shared_tmp_dir = "/shared/tmp"
job_name = "heavy_computation"
n_cpu = 16
mem = "32GB"
time = "4:00:00"
partition = "compute"
def work(self):
"""Define work to be executed on cluster."""
# This runs on the SLURM compute node
import heavy_computation_module
result = heavy_computation_module.process_dataset(self.dataset)
# Write result to shared filesystem
with open(f"{self.shared_tmp_dir}/result_{self.dataset}.txt", 'w') as f:
f.write(str(result))
def output(self):
return luigi.LocalTarget(f"{self.shared_tmp_dir}/result_{self.dataset}.txt")import luigi
from luigi.contrib.s3 import S3Target
from luigi.contrib.postgres import CopyToTable
from luigi.contrib.spark import SparkSubmitTask
class DataIngestionPipeline(luigi.WrapperTask):
"""Complete data pipeline using multiple integrations."""
date = luigi.DateParameter()
def requires(self):
return [
# 1. Download data from S3
DownloadS3Data(date=self.date),
# 2. Process with Spark
SparkProcessing(date=self.date),
# 3. Load to PostgreSQL
LoadToDatabase(date=self.date),
# 4. Send metrics to monitoring
SendMetrics(date=self.date)
]
class DownloadS3Data(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget(f"data/raw_{self.date}.csv")
def run(self):
s3_target = S3Target(f"s3://data-bucket/raw/{self.date}.csv")
with s3_target.open('r') as s3_file, self.output().open('w') as local_file:
local_file.write(s3_file.read())
class SparkProcessing(SparkSubmitTask):
date = luigi.DateParameter()
app = "spark_jobs/daily_processing.py"
def requires(self):
return DownloadS3Data(date=self.date)
def app_options(self):
return ["--date", str(self.date)]
class LoadToDatabase(CopyToTable):
date = luigi.DateParameter()
host = "localhost"
database = "analytics"
user = "luigi"
password = "password"
table = "daily_metrics"
def requires(self):
return SparkProcessing(date=self.date)Install with Tessl CLI
npx tessl i tessl/pypi-luigidocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10