0
# External Integrations
1
2
Luigi's contrib modules provide extensive integration with external systems including databases, cloud storage, big data platforms, job schedulers, and monitoring systems. These modules extend Luigi's capabilities to work with diverse data infrastructure.
3
4
## Capabilities
5
6
### Database Integration
7
8
Comprehensive database integration modules for popular database systems with specialized targets and task types.
9
10
```python { .api }
11
# PostgreSQL integration
12
from luigi.contrib.postgres import PostgresTarget, CopyToTable, PostgresQuery
13
14
class PostgresTarget:
15
"""Target for PostgreSQL database tables."""
16
17
def __init__(self, host: str, database: str, user: str, password: str,
18
table: str, update_id: str, port: int = 5432):
19
"""
20
Initialize PostgreSQL target.
21
22
Args:
23
host: Database host
24
database: Database name
25
user: Username
26
password: Password
27
table: Table name
28
update_id: Unique identifier for this update
29
port: Database port (default 5432)
30
"""
31
32
def exists(self) -> bool:
33
"""Check if the target exists in database."""
34
35
class CopyToTable(Task):
36
"""Task that copies data to PostgreSQL table."""
37
38
host: str
39
database: str
40
user: str
41
password: str
42
table: str
43
44
def copy(self, file_object):
45
"""Copy data from file object to table."""
46
47
def output(self):
48
"""Return PostgresTarget for the table."""
49
50
# MySQL integration
51
from luigi.contrib.mysqldb import MySqlTarget
52
53
class MySqlTarget:
54
"""Target for MySQL database tables."""
55
56
def __init__(self, host: str, database: str, user: str, password: str,
57
table: str, update_id: str, port: int = 3306):
58
"""Initialize MySQL target."""
59
60
# MongoDB integration
61
from luigi.contrib.mongodb import MongoTarget
62
63
class MongoTarget:
64
"""Target for MongoDB collections."""
65
66
def __init__(self, host: str, port: int, database: str, collection: str,
67
mongo_client=None):
68
"""Initialize MongoDB target."""
69
```
70
71
### Cloud Storage Integration
72
73
Integration modules for major cloud storage platforms with specialized targets and operations.
74
75
```python { .api }
76
# Amazon S3 integration
77
from luigi.contrib.s3 import S3Target, S3Client, S3FlagTarget, S3PathTask
78
79
class S3Target:
80
"""Target for Amazon S3 objects."""
81
82
def __init__(self, path: str, format=None, client=None,
83
is_tmp: bool = False):
84
"""
85
Initialize S3 target.
86
87
Args:
88
path: S3 path (s3://bucket/key)
89
format: File format handler
90
client: S3 client instance
91
is_tmp: Whether this is a temporary file
92
"""
93
94
def exists(self) -> bool:
95
"""Check if S3 object exists."""
96
97
def open(self, mode: str = 'r'):
98
"""Open S3 object for reading/writing."""
99
100
def remove(self):
101
"""Delete S3 object."""
102
103
class S3Client:
104
"""Client for S3 operations."""
105
106
def exists(self, path: str) -> bool:
107
"""Check if S3 object exists."""
108
109
def put(self, local_path: str, destination_s3_path: str):
110
"""Upload local file to S3."""
111
112
def get(self, s3_path: str, local_path: str):
113
"""Download S3 object to local file."""
114
115
def list(self, path: str) -> list:
116
"""List S3 objects with path prefix."""
117
118
# Google Cloud Storage integration
119
from luigi.contrib.gcs import GCSTarget, GCSClient
120
121
class GCSTarget:
122
"""Target for Google Cloud Storage objects."""
123
124
def __init__(self, path: str, format=None, client=None):
125
"""Initialize GCS target."""
126
127
class GCSClient:
128
"""Client for GCS operations."""
129
130
def exists(self, path: str) -> bool:
131
"""Check if GCS object exists."""
132
133
def put(self, local_path: str, destination_gcs_path: str):
134
"""Upload local file to GCS."""
135
136
# Azure Blob Storage integration
137
from luigi.contrib.azureblob import AzureBlobTarget
138
139
class AzureBlobTarget:
140
"""Target for Azure Blob Storage."""
141
142
def __init__(self, container: str, blob: str,
143
account_name: str, account_key: str):
144
"""Initialize Azure Blob target."""
145
```
146
147
### Big Data Platform Integration
148
149
Integration with big data processing platforms and frameworks.
150
151
```python { .api }
152
# HDFS integration
153
from luigi.contrib.hdfs import HdfsTarget, HdfsClient
154
155
class HdfsTarget:
156
"""Target for HDFS files."""
157
158
def __init__(self, path: str, format=None, client=None, is_tmp: bool = False):
159
"""Initialize HDFS target."""
160
161
def exists(self) -> bool:
162
"""Check if HDFS file exists."""
163
164
def open(self, mode: str = 'r'):
165
"""Open HDFS file."""
166
167
class HdfsClient:
168
"""Client for HDFS operations."""
169
170
def exists(self, path: str) -> bool:
171
"""Check if HDFS path exists."""
172
173
def put(self, local_path: str, destination_hdfs_path: str):
174
"""Upload local file to HDFS."""
175
176
def get(self, hdfs_path: str, local_path: str):
177
"""Download HDFS file to local path."""
178
179
# Apache Spark integration
180
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
181
182
class SparkSubmitTask(Task):
183
"""Task for submitting Spark applications."""
184
185
app: str # Spark application path
186
master: str = "local[*]" # Spark master URL
187
deploy_mode: str = "client" # Deploy mode
188
executor_memory: str = "1g" # Executor memory
189
driver_memory: str = "1g" # Driver memory
190
191
def app_options(self) -> list:
192
"""Return application-specific options."""
193
194
def run(self):
195
"""Submit Spark application."""
196
197
# Google BigQuery integration
198
from luigi.contrib.bigquery import BigQueryTarget, BigQueryLoadTask
199
200
class BigQueryTarget:
201
"""Target for BigQuery tables."""
202
203
def __init__(self, project_id: str, dataset_id: str, table_id: str,
204
client=None):
205
"""Initialize BigQuery target."""
206
207
def exists(self) -> bool:
208
"""Check if BigQuery table exists."""
209
210
class BigQueryLoadTask(Task):
211
"""Task for loading data into BigQuery."""
212
213
project_id: str
214
dataset_id: str
215
table_id: str
216
schema: list # Table schema
217
218
def run(self):
219
"""Load data into BigQuery table."""
220
```
221
222
### Job Scheduler Integration
223
224
Integration with HPC and cluster job schedulers.
225
226
```python { .api }
227
# SLURM integration
228
from luigi.contrib.slurm import SlurmTask
229
230
class SlurmTask(Task):
231
"""Base class for SLURM job submission."""
232
233
shared_tmp_dir: str # Shared temporary directory
234
job_name: str # SLURM job name
235
n_cpu: int = 1 # Number of CPUs
236
mem: str = "1GB" # Memory requirement
237
time: str = "1:00:00" # Time limit
238
partition: str # SLURM partition
239
240
def work(self):
241
"""Define work to be done in SLURM job."""
242
243
def run(self):
244
"""Submit job to SLURM scheduler."""
245
246
# LSF integration
247
from luigi.contrib.lsf import LSFTask
248
249
class LSFTask(Task):
250
"""Base class for LSF job submission."""
251
252
shared_tmp_dir: str
253
job_name: str
254
n_cpu: int = 1
255
resource: str # LSF resource requirements
256
queue: str # LSF queue
257
258
def work(self):
259
"""Define work for LSF job."""
260
261
# Sun Grid Engine integration
262
from luigi.contrib.sge import SGETask
263
264
class SGETask(Task):
265
"""Base class for SGE job submission."""
266
267
shared_tmp_dir: str
268
n_cpu: int = 1
269
run_locally: bool = False
270
271
def work(self):
272
"""Define work for SGE job."""
273
```
274
275
### Container and Cloud Platform Integration
276
277
Integration with containerization platforms and cloud services.
278
279
```python { .api }
280
# Docker integration
281
from luigi.contrib.docker_runner import DockerTask
282
283
class DockerTask(Task):
284
"""Task for running commands in Docker containers."""
285
286
image: str # Docker image name
287
command: str # Command to run
288
container_options: dict = {} # Docker container options
289
290
def run(self):
291
"""Run command in Docker container."""
292
293
# Kubernetes integration
294
from luigi.contrib.kubernetes import KubernetesJobTask
295
296
class KubernetesJobTask(Task):
297
"""Task for running Kubernetes jobs."""
298
299
name: str # Job name
300
image: str # Container image
301
command: list # Command to run
302
303
def spec_schema(self) -> dict:
304
"""Return Kubernetes job spec."""
305
306
# AWS Batch integration
307
from luigi.contrib.batch import BatchTask
308
309
class BatchTask(Task):
310
"""Task for AWS Batch job execution."""
311
312
job_name: str
313
job_queue: str
314
job_definition: str
315
316
def run(self):
317
"""Submit job to AWS Batch."""
318
319
# Google Cloud Dataproc integration
320
from luigi.contrib.dataproc import DataprocJobTask
321
322
class DataprocJobTask(Task):
323
"""Base class for Google Cloud Dataproc jobs."""
324
325
cluster_name: str
326
project_id: str
327
region: str = "global"
328
329
def run(self):
330
"""Submit job to Dataproc cluster."""
331
```
332
333
### Monitoring and Metrics Integration
334
335
Integration with monitoring and metrics collection systems.
336
337
```python { .api }
338
# Datadog metrics integration
339
from luigi.contrib.datadog_metric import DatadogMetric
340
341
class DatadogMetric:
342
"""Send metrics to Datadog."""
343
344
def __init__(self, metric_name: str, value: float, tags: list = None):
345
"""
346
Initialize Datadog metric.
347
348
Args:
349
metric_name: Metric name
350
value: Metric value
351
tags: List of tags
352
"""
353
354
def send(self):
355
"""Send metric to Datadog."""
356
357
# Prometheus metrics integration
358
from luigi.contrib.prometheus_metric import PrometheusMetric
359
360
class PrometheusMetric:
361
"""Send metrics to Prometheus pushgateway."""
362
363
def __init__(self, metric_name: str, value: float, labels: dict = None):
364
"""Initialize Prometheus metric."""
365
366
def push(self):
367
"""Push metric to Prometheus."""
368
```
369
370
## Usage Examples
371
372
### Database Integration Example
373
374
```python
375
import luigi
376
from luigi.contrib.postgres import PostgresTarget, CopyToTable
377
378
class LoadDataToPostgres(CopyToTable):
379
"""Load CSV data into PostgreSQL table."""
380
381
host = "localhost"
382
database = "mydb"
383
user = "postgres"
384
password = "password"
385
table = "sales_data"
386
387
def requires(self):
388
return ProcessSalesData() # Task that generates CSV
389
390
def copy(self, file_object):
391
"""Custom copy logic if needed."""
392
# Default implementation handles CSV copying
393
return super().copy(file_object)
394
395
class QueryPostgresData(luigi.Task):
396
"""Query data from PostgreSQL."""
397
398
def requires(self):
399
return LoadDataToPostgres()
400
401
def output(self):
402
return luigi.LocalTarget("query_results.txt")
403
404
def run(self):
405
# Check that data was loaded
406
target = PostgresTarget(
407
host="localhost",
408
database="mydb",
409
user="postgres",
410
password="password",
411
table="sales_data",
412
update_id="loaded"
413
)
414
415
if target.exists():
416
with self.output().open('w') as f:
417
f.write("Data successfully loaded to PostgreSQL")
418
```
419
420
### S3 Integration Example
421
422
```python
423
import luigi
424
from luigi.contrib.s3 import S3Target, S3Client
425
426
class ProcessS3Data(luigi.Task):
427
"""Process data stored in S3."""
428
429
bucket = luigi.Parameter()
430
key = luigi.Parameter()
431
432
def output(self):
433
return S3Target(f"s3://{self.bucket}/processed/{self.key}")
434
435
def run(self):
436
# Read from S3 input
437
input_target = S3Target(f"s3://{self.bucket}/raw/{self.key}")
438
439
with input_target.open('r') as input_file:
440
data = input_file.read()
441
442
# Process data
443
processed_data = data.upper()
444
445
# Write to S3 output
446
with self.output().open('w') as output_file:
447
output_file.write(processed_data)
448
449
class S3DataPipeline(luigi.Task):
450
"""Pipeline that processes multiple S3 files."""
451
452
bucket = luigi.Parameter()
453
454
def run(self):
455
# List files in S3 bucket
456
client = S3Client()
457
files = client.list(f"s3://{self.bucket}/raw/")
458
459
# Process each file
460
tasks = []
461
for file_path in files:
462
key = file_path.split('/')[-1] # Extract filename
463
tasks.append(ProcessS3Data(bucket=self.bucket, key=key))
464
465
# Build all processing tasks
466
yield tasks
467
```
468
469
### Spark Integration Example
470
471
```python
472
import luigi
473
from luigi.contrib.spark import SparkSubmitTask
474
475
class SparkDataProcessing(SparkSubmitTask):
476
"""Process large dataset using Spark."""
477
478
input_path = luigi.Parameter()
479
output_path = luigi.Parameter()
480
481
# Spark configuration
482
app = "spark_jobs/data_processing.py"
483
master = "spark://localhost:7077"
484
executor_memory = "4g"
485
driver_memory = "2g"
486
487
def app_options(self):
488
"""Pass parameters to Spark application."""
489
return [
490
"--input", self.input_path,
491
"--output", self.output_path
492
]
493
494
def output(self):
495
return luigi.LocalTarget(f"{self.output_path}/_SUCCESS")
496
497
# The Spark application (data_processing.py) would contain:
498
# from pyspark.sql import SparkSession
499
# import argparse
500
#
501
# parser = argparse.ArgumentParser()
502
# parser.add_argument("--input")
503
# parser.add_argument("--output")
504
# args = parser.parse_args()
505
#
506
# spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
507
# df = spark.read.csv(args.input, header=True)
508
# processed_df = df.filter(df.amount > 100)
509
# processed_df.write.csv(args.output, header=True)
510
```
511
512
### SLURM Cluster Integration Example
513
514
```python
515
import luigi
516
from luigi.contrib.slurm import SlurmTask
517
518
class HeavyComputation(SlurmTask):
519
"""Run computationally intensive task on SLURM cluster."""
520
521
dataset = luigi.Parameter()
522
523
# SLURM job configuration
524
shared_tmp_dir = "/shared/tmp"
525
job_name = "heavy_computation"
526
n_cpu = 16
527
mem = "32GB"
528
time = "4:00:00"
529
partition = "compute"
530
531
def work(self):
532
"""Define work to be executed on cluster."""
533
# This runs on the SLURM compute node
534
import heavy_computation_module
535
536
result = heavy_computation_module.process_dataset(self.dataset)
537
538
# Write result to shared filesystem
539
with open(f"{self.shared_tmp_dir}/result_{self.dataset}.txt", 'w') as f:
540
f.write(str(result))
541
542
def output(self):
543
return luigi.LocalTarget(f"{self.shared_tmp_dir}/result_{self.dataset}.txt")
544
```
545
546
### Multi-Platform Pipeline Example
547
548
```python
549
import luigi
550
from luigi.contrib.s3 import S3Target
551
from luigi.contrib.postgres import CopyToTable
552
from luigi.contrib.spark import SparkSubmitTask
553
554
class DataIngestionPipeline(luigi.WrapperTask):
555
"""Complete data pipeline using multiple integrations."""
556
557
date = luigi.DateParameter()
558
559
def requires(self):
560
return [
561
# 1. Download data from S3
562
DownloadS3Data(date=self.date),
563
564
# 2. Process with Spark
565
SparkProcessing(date=self.date),
566
567
# 3. Load to PostgreSQL
568
LoadToDatabase(date=self.date),
569
570
# 4. Send metrics to monitoring
571
SendMetrics(date=self.date)
572
]
573
574
class DownloadS3Data(luigi.Task):
575
date = luigi.DateParameter()
576
577
def output(self):
578
return luigi.LocalTarget(f"data/raw_{self.date}.csv")
579
580
def run(self):
581
s3_target = S3Target(f"s3://data-bucket/raw/{self.date}.csv")
582
with s3_target.open('r') as s3_file, self.output().open('w') as local_file:
583
local_file.write(s3_file.read())
584
585
class SparkProcessing(SparkSubmitTask):
586
date = luigi.DateParameter()
587
588
app = "spark_jobs/daily_processing.py"
589
590
def requires(self):
591
return DownloadS3Data(date=self.date)
592
593
def app_options(self):
594
return ["--date", str(self.date)]
595
596
class LoadToDatabase(CopyToTable):
597
date = luigi.DateParameter()
598
599
host = "localhost"
600
database = "analytics"
601
user = "luigi"
602
password = "password"
603
table = "daily_metrics"
604
605
def requires(self):
606
return SparkProcessing(date=self.date)
607
```