or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

integrations.mddocs/

0

# External Integrations

1

2

Luigi's contrib modules provide extensive integration with external systems including databases, cloud storage, big data platforms, job schedulers, and monitoring systems. These modules extend Luigi's capabilities to work with diverse data infrastructure.

3

4

## Capabilities

5

6

### Database Integration

7

8

Comprehensive database integration modules for popular database systems with specialized targets and task types.

9

10

```python { .api }

11

# PostgreSQL integration

12

from luigi.contrib.postgres import PostgresTarget, CopyToTable, PostgresQuery

13

14

class PostgresTarget:

15

"""Target for PostgreSQL database tables."""

16

17

def __init__(self, host: str, database: str, user: str, password: str,

18

table: str, update_id: str, port: int = 5432):

19

"""

20

Initialize PostgreSQL target.

21

22

Args:

23

host: Database host

24

database: Database name

25

user: Username

26

password: Password

27

table: Table name

28

update_id: Unique identifier for this update

29

port: Database port (default 5432)

30

"""

31

32

def exists(self) -> bool:

33

"""Check if the target exists in database."""

34

35

class CopyToTable(Task):

36

"""Task that copies data to PostgreSQL table."""

37

38

host: str

39

database: str

40

user: str

41

password: str

42

table: str

43

44

def copy(self, file_object):

45

"""Copy data from file object to table."""

46

47

def output(self):

48

"""Return PostgresTarget for the table."""

49

50

# MySQL integration

51

from luigi.contrib.mysqldb import MySqlTarget

52

53

class MySqlTarget:

54

"""Target for MySQL database tables."""

55

56

def __init__(self, host: str, database: str, user: str, password: str,

57

table: str, update_id: str, port: int = 3306):

58

"""Initialize MySQL target."""

59

60

# MongoDB integration

61

from luigi.contrib.mongodb import MongoTarget

62

63

class MongoTarget:

64

"""Target for MongoDB collections."""

65

66

def __init__(self, host: str, port: int, database: str, collection: str,

67

mongo_client=None):

68

"""Initialize MongoDB target."""

69

```

70

71

### Cloud Storage Integration

72

73

Integration modules for major cloud storage platforms with specialized targets and operations.

74

75

```python { .api }

76

# Amazon S3 integration

77

from luigi.contrib.s3 import S3Target, S3Client, S3FlagTarget, S3PathTask

78

79

class S3Target:

80

"""Target for Amazon S3 objects."""

81

82

def __init__(self, path: str, format=None, client=None,

83

is_tmp: bool = False):

84

"""

85

Initialize S3 target.

86

87

Args:

88

path: S3 path (s3://bucket/key)

89

format: File format handler

90

client: S3 client instance

91

is_tmp: Whether this is a temporary file

92

"""

93

94

def exists(self) -> bool:

95

"""Check if S3 object exists."""

96

97

def open(self, mode: str = 'r'):

98

"""Open S3 object for reading/writing."""

99

100

def remove(self):

101

"""Delete S3 object."""

102

103

class S3Client:

104

"""Client for S3 operations."""

105

106

def exists(self, path: str) -> bool:

107

"""Check if S3 object exists."""

108

109

def put(self, local_path: str, destination_s3_path: str):

110

"""Upload local file to S3."""

111

112

def get(self, s3_path: str, local_path: str):

113

"""Download S3 object to local file."""

114

115

def list(self, path: str) -> list:

116

"""List S3 objects with path prefix."""

117

118

# Google Cloud Storage integration

119

from luigi.contrib.gcs import GCSTarget, GCSClient

120

121

class GCSTarget:

122

"""Target for Google Cloud Storage objects."""

123

124

def __init__(self, path: str, format=None, client=None):

125

"""Initialize GCS target."""

126

127

class GCSClient:

128

"""Client for GCS operations."""

129

130

def exists(self, path: str) -> bool:

131

"""Check if GCS object exists."""

132

133

def put(self, local_path: str, destination_gcs_path: str):

134

"""Upload local file to GCS."""

135

136

# Azure Blob Storage integration

137

from luigi.contrib.azureblob import AzureBlobTarget

138

139

class AzureBlobTarget:

140

"""Target for Azure Blob Storage."""

141

142

def __init__(self, container: str, blob: str,

143

account_name: str, account_key: str):

144

"""Initialize Azure Blob target."""

145

```

146

147

### Big Data Platform Integration

148

149

Integration with big data processing platforms and frameworks.

150

151

```python { .api }

152

# HDFS integration

153

from luigi.contrib.hdfs import HdfsTarget, HdfsClient

154

155

class HdfsTarget:

156

"""Target for HDFS files."""

157

158

def __init__(self, path: str, format=None, client=None, is_tmp: bool = False):

159

"""Initialize HDFS target."""

160

161

def exists(self) -> bool:

162

"""Check if HDFS file exists."""

163

164

def open(self, mode: str = 'r'):

165

"""Open HDFS file."""

166

167

class HdfsClient:

168

"""Client for HDFS operations."""

169

170

def exists(self, path: str) -> bool:

171

"""Check if HDFS path exists."""

172

173

def put(self, local_path: str, destination_hdfs_path: str):

174

"""Upload local file to HDFS."""

175

176

def get(self, hdfs_path: str, local_path: str):

177

"""Download HDFS file to local path."""

178

179

# Apache Spark integration

180

from luigi.contrib.spark import SparkSubmitTask, PySparkTask

181

182

class SparkSubmitTask(Task):

183

"""Task for submitting Spark applications."""

184

185

app: str # Spark application path

186

master: str = "local[*]" # Spark master URL

187

deploy_mode: str = "client" # Deploy mode

188

executor_memory: str = "1g" # Executor memory

189

driver_memory: str = "1g" # Driver memory

190

191

def app_options(self) -> list:

192

"""Return application-specific options."""

193

194

def run(self):

195

"""Submit Spark application."""

196

197

# Google BigQuery integration

198

from luigi.contrib.bigquery import BigQueryTarget, BigQueryLoadTask

199

200

class BigQueryTarget:

201

"""Target for BigQuery tables."""

202

203

def __init__(self, project_id: str, dataset_id: str, table_id: str,

204

client=None):

205

"""Initialize BigQuery target."""

206

207

def exists(self) -> bool:

208

"""Check if BigQuery table exists."""

209

210

class BigQueryLoadTask(Task):

211

"""Task for loading data into BigQuery."""

212

213

project_id: str

214

dataset_id: str

215

table_id: str

216

schema: list # Table schema

217

218

def run(self):

219

"""Load data into BigQuery table."""

220

```

221

222

### Job Scheduler Integration

223

224

Integration with HPC and cluster job schedulers.

225

226

```python { .api }

227

# SLURM integration

228

from luigi.contrib.slurm import SlurmTask

229

230

class SlurmTask(Task):

231

"""Base class for SLURM job submission."""

232

233

shared_tmp_dir: str # Shared temporary directory

234

job_name: str # SLURM job name

235

n_cpu: int = 1 # Number of CPUs

236

mem: str = "1GB" # Memory requirement

237

time: str = "1:00:00" # Time limit

238

partition: str # SLURM partition

239

240

def work(self):

241

"""Define work to be done in SLURM job."""

242

243

def run(self):

244

"""Submit job to SLURM scheduler."""

245

246

# LSF integration

247

from luigi.contrib.lsf import LSFTask

248

249

class LSFTask(Task):

250

"""Base class for LSF job submission."""

251

252

shared_tmp_dir: str

253

job_name: str

254

n_cpu: int = 1

255

resource: str # LSF resource requirements

256

queue: str # LSF queue

257

258

def work(self):

259

"""Define work for LSF job."""

260

261

# Sun Grid Engine integration

262

from luigi.contrib.sge import SGETask

263

264

class SGETask(Task):

265

"""Base class for SGE job submission."""

266

267

shared_tmp_dir: str

268

n_cpu: int = 1

269

run_locally: bool = False

270

271

def work(self):

272

"""Define work for SGE job."""

273

```

274

275

### Container and Cloud Platform Integration

276

277

Integration with containerization platforms and cloud services.

278

279

```python { .api }

280

# Docker integration

281

from luigi.contrib.docker_runner import DockerTask

282

283

class DockerTask(Task):

284

"""Task for running commands in Docker containers."""

285

286

image: str # Docker image name

287

command: str # Command to run

288

container_options: dict = {} # Docker container options

289

290

def run(self):

291

"""Run command in Docker container."""

292

293

# Kubernetes integration

294

from luigi.contrib.kubernetes import KubernetesJobTask

295

296

class KubernetesJobTask(Task):

297

"""Task for running Kubernetes jobs."""

298

299

name: str # Job name

300

image: str # Container image

301

command: list # Command to run

302

303

def spec_schema(self) -> dict:

304

"""Return Kubernetes job spec."""

305

306

# AWS Batch integration

307

from luigi.contrib.batch import BatchTask

308

309

class BatchTask(Task):

310

"""Task for AWS Batch job execution."""

311

312

job_name: str

313

job_queue: str

314

job_definition: str

315

316

def run(self):

317

"""Submit job to AWS Batch."""

318

319

# Google Cloud Dataproc integration

320

from luigi.contrib.dataproc import DataprocJobTask

321

322

class DataprocJobTask(Task):

323

"""Base class for Google Cloud Dataproc jobs."""

324

325

cluster_name: str

326

project_id: str

327

region: str = "global"

328

329

def run(self):

330

"""Submit job to Dataproc cluster."""

331

```

332

333

### Monitoring and Metrics Integration

334

335

Integration with monitoring and metrics collection systems.

336

337

```python { .api }

338

# Datadog metrics integration

339

from luigi.contrib.datadog_metric import DatadogMetric

340

341

class DatadogMetric:

342

"""Send metrics to Datadog."""

343

344

def __init__(self, metric_name: str, value: float, tags: list = None):

345

"""

346

Initialize Datadog metric.

347

348

Args:

349

metric_name: Metric name

350

value: Metric value

351

tags: List of tags

352

"""

353

354

def send(self):

355

"""Send metric to Datadog."""

356

357

# Prometheus metrics integration

358

from luigi.contrib.prometheus_metric import PrometheusMetric

359

360

class PrometheusMetric:

361

"""Send metrics to Prometheus pushgateway."""

362

363

def __init__(self, metric_name: str, value: float, labels: dict = None):

364

"""Initialize Prometheus metric."""

365

366

def push(self):

367

"""Push metric to Prometheus."""

368

```

369

370

## Usage Examples

371

372

### Database Integration Example

373

374

```python

375

import luigi

376

from luigi.contrib.postgres import PostgresTarget, CopyToTable

377

378

class LoadDataToPostgres(CopyToTable):

379

"""Load CSV data into PostgreSQL table."""

380

381

host = "localhost"

382

database = "mydb"

383

user = "postgres"

384

password = "password"

385

table = "sales_data"

386

387

def requires(self):

388

return ProcessSalesData() # Task that generates CSV

389

390

def copy(self, file_object):

391

"""Custom copy logic if needed."""

392

# Default implementation handles CSV copying

393

return super().copy(file_object)

394

395

class QueryPostgresData(luigi.Task):

396

"""Query data from PostgreSQL."""

397

398

def requires(self):

399

return LoadDataToPostgres()

400

401

def output(self):

402

return luigi.LocalTarget("query_results.txt")

403

404

def run(self):

405

# Check that data was loaded

406

target = PostgresTarget(

407

host="localhost",

408

database="mydb",

409

user="postgres",

410

password="password",

411

table="sales_data",

412

update_id="loaded"

413

)

414

415

if target.exists():

416

with self.output().open('w') as f:

417

f.write("Data successfully loaded to PostgreSQL")

418

```

419

420

### S3 Integration Example

421

422

```python

423

import luigi

424

from luigi.contrib.s3 import S3Target, S3Client

425

426

class ProcessS3Data(luigi.Task):

427

"""Process data stored in S3."""

428

429

bucket = luigi.Parameter()

430

key = luigi.Parameter()

431

432

def output(self):

433

return S3Target(f"s3://{self.bucket}/processed/{self.key}")

434

435

def run(self):

436

# Read from S3 input

437

input_target = S3Target(f"s3://{self.bucket}/raw/{self.key}")

438

439

with input_target.open('r') as input_file:

440

data = input_file.read()

441

442

# Process data

443

processed_data = data.upper()

444

445

# Write to S3 output

446

with self.output().open('w') as output_file:

447

output_file.write(processed_data)

448

449

class S3DataPipeline(luigi.Task):

450

"""Pipeline that processes multiple S3 files."""

451

452

bucket = luigi.Parameter()

453

454

def run(self):

455

# List files in S3 bucket

456

client = S3Client()

457

files = client.list(f"s3://{self.bucket}/raw/")

458

459

# Process each file

460

tasks = []

461

for file_path in files:

462

key = file_path.split('/')[-1] # Extract filename

463

tasks.append(ProcessS3Data(bucket=self.bucket, key=key))

464

465

# Build all processing tasks

466

yield tasks

467

```

468

469

### Spark Integration Example

470

471

```python

472

import luigi

473

from luigi.contrib.spark import SparkSubmitTask

474

475

class SparkDataProcessing(SparkSubmitTask):

476

"""Process large dataset using Spark."""

477

478

input_path = luigi.Parameter()

479

output_path = luigi.Parameter()

480

481

# Spark configuration

482

app = "spark_jobs/data_processing.py"

483

master = "spark://localhost:7077"

484

executor_memory = "4g"

485

driver_memory = "2g"

486

487

def app_options(self):

488

"""Pass parameters to Spark application."""

489

return [

490

"--input", self.input_path,

491

"--output", self.output_path

492

]

493

494

def output(self):

495

return luigi.LocalTarget(f"{self.output_path}/_SUCCESS")

496

497

# The Spark application (data_processing.py) would contain:

498

# from pyspark.sql import SparkSession

499

# import argparse

500

#

501

# parser = argparse.ArgumentParser()

502

# parser.add_argument("--input")

503

# parser.add_argument("--output")

504

# args = parser.parse_args()

505

#

506

# spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

507

# df = spark.read.csv(args.input, header=True)

508

# processed_df = df.filter(df.amount > 100)

509

# processed_df.write.csv(args.output, header=True)

510

```

511

512

### SLURM Cluster Integration Example

513

514

```python

515

import luigi

516

from luigi.contrib.slurm import SlurmTask

517

518

class HeavyComputation(SlurmTask):

519

"""Run computationally intensive task on SLURM cluster."""

520

521

dataset = luigi.Parameter()

522

523

# SLURM job configuration

524

shared_tmp_dir = "/shared/tmp"

525

job_name = "heavy_computation"

526

n_cpu = 16

527

mem = "32GB"

528

time = "4:00:00"

529

partition = "compute"

530

531

def work(self):

532

"""Define work to be executed on cluster."""

533

# This runs on the SLURM compute node

534

import heavy_computation_module

535

536

result = heavy_computation_module.process_dataset(self.dataset)

537

538

# Write result to shared filesystem

539

with open(f"{self.shared_tmp_dir}/result_{self.dataset}.txt", 'w') as f:

540

f.write(str(result))

541

542

def output(self):

543

return luigi.LocalTarget(f"{self.shared_tmp_dir}/result_{self.dataset}.txt")

544

```

545

546

### Multi-Platform Pipeline Example

547

548

```python

549

import luigi

550

from luigi.contrib.s3 import S3Target

551

from luigi.contrib.postgres import CopyToTable

552

from luigi.contrib.spark import SparkSubmitTask

553

554

class DataIngestionPipeline(luigi.WrapperTask):

555

"""Complete data pipeline using multiple integrations."""

556

557

date = luigi.DateParameter()

558

559

def requires(self):

560

return [

561

# 1. Download data from S3

562

DownloadS3Data(date=self.date),

563

564

# 2. Process with Spark

565

SparkProcessing(date=self.date),

566

567

# 3. Load to PostgreSQL

568

LoadToDatabase(date=self.date),

569

570

# 4. Send metrics to monitoring

571

SendMetrics(date=self.date)

572

]

573

574

class DownloadS3Data(luigi.Task):

575

date = luigi.DateParameter()

576

577

def output(self):

578

return luigi.LocalTarget(f"data/raw_{self.date}.csv")

579

580

def run(self):

581

s3_target = S3Target(f"s3://data-bucket/raw/{self.date}.csv")

582

with s3_target.open('r') as s3_file, self.output().open('w') as local_file:

583

local_file.write(s3_file.read())

584

585

class SparkProcessing(SparkSubmitTask):

586

date = luigi.DateParameter()

587

588

app = "spark_jobs/daily_processing.py"

589

590

def requires(self):

591

return DownloadS3Data(date=self.date)

592

593

def app_options(self):

594

return ["--date", str(self.date)]

595

596

class LoadToDatabase(CopyToTable):

597

date = luigi.DateParameter()

598

599

host = "localhost"

600

database = "analytics"

601

user = "luigi"

602

password = "password"

603

table = "daily_metrics"

604

605

def requires(self):

606

return SparkProcessing(date=self.date)

607

```