or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mdindex.mdjob-management.mdmonitoring.mdrepositories.mdsql-operations.mdworkflows.md

workflows.mddocs/

0

# Workflow Orchestration

1

2

The Databricks provider offers sophisticated workflow orchestration capabilities through Databricks Workflows, enabling you to create complex multi-task pipelines that run as coordinated jobs with dependency management, resource sharing, and advanced monitoring.

3

4

## Core Components

5

6

### DatabricksWorkflowTaskGroup

7

8

Create coordinated workflows that execute as unified Databricks jobs with shared resources and dependencies.

9

10

```python { .api }

11

from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup

12

13

class DatabricksWorkflowTaskGroup(TaskGroup):

14

def __init__(

15

self,

16

*,

17

group_id: str,

18

databricks_conn_id: str = "databricks_default",

19

existing_clusters: dict[str, str] | None = None,

20

extra_job_params: dict[str, Any] | None = None,

21

max_concurrent_runs: int = 1,

22

default_task_timeout_seconds: int | None = None,

23

default_task_retries: int = 0,

24

prefix: str = "",

25

suffix: str = "",

26

dag: DAG | None = None,

27

parent_group: TaskGroup | None = None,

28

**kwargs

29

) -> None:

30

"""

31

Create a workflow task group for coordinated Databricks job execution.

32

33

Args:

34

group_id: Unique identifier for the workflow group

35

databricks_conn_id: Airflow connection ID for Databricks

36

existing_clusters: Mapping of cluster keys to cluster IDs for reuse

37

extra_job_params: Additional parameters for the Databricks job

38

max_concurrent_runs: Maximum number of concurrent workflow runs

39

default_task_timeout_seconds: Default timeout for workflow tasks

40

default_task_retries: Default retry count for workflow tasks

41

prefix: Prefix for task names within the workflow

42

suffix: Suffix for task names within the workflow

43

dag: Parent DAG containing this workflow

44

parent_group: Parent task group if this is a nested workflow

45

"""

46

```

47

48

### DatabricksTaskOperator

49

50

Individual tasks within Databricks workflows with comprehensive configuration support.

51

52

```python { .api }

53

from airflow.providers.databricks.operators.databricks_workflow import DatabricksTaskOperator

54

55

class DatabricksTaskOperator(BaseOperator):

56

def __init__(

57

self,

58

*,

59

task_config: dict[str, Any],

60

databricks_conn_id: str = "databricks_default",

61

timeout_seconds: int | None = None,

62

retries: int | None = None,

63

cluster_spec: dict[str, Any] | None = None,

64

libraries: list[dict[str, Any]] | None = None,

65

depends_on: list[str] | None = None,

66

**kwargs

67

) -> None:

68

"""

69

Individual task within a Databricks workflow.

70

71

Args:

72

task_config: Complete task configuration for Databricks

73

databricks_conn_id: Airflow connection ID for Databricks

74

timeout_seconds: Task-specific timeout override

75

retries: Task-specific retry count override

76

cluster_spec: Cluster configuration for this specific task

77

libraries: Libraries to install for this task

78

depends_on: List of task keys this task depends on

79

"""

80

```

81

82

## Usage Examples

83

84

### Basic Workflow Creation

85

86

Create a simple multi-stage data pipeline workflow:

87

88

```python { .api }

89

from airflow.providers.databricks.operators.databricks_workflow import (

90

DatabricksWorkflowTaskGroup,

91

DatabricksTaskOperator

92

)

93

94

# Define workflow with multiple dependent tasks

95

with DatabricksWorkflowTaskGroup(

96

group_id='data_pipeline_workflow',

97

databricks_conn_id='databricks_production',

98

max_concurrent_runs=3,

99

default_task_timeout_seconds=3600

100

) as data_pipeline:

101

102

# Extract raw data

103

extract_task = DatabricksTaskOperator(

104

task_id='extract_data',

105

task_config={

106

'notebook_task': {

107

'notebook_path': '/pipelines/extract/daily_extract',

108

'source': 'WORKSPACE',

109

'base_parameters': {

110

'extraction_date': '{{ ds }}',

111

'source_systems': 'crm,billing,support',

112

'output_path': '/raw/daily/{{ ds }}'

113

}

114

},

115

'new_cluster': {

116

'spark_version': '12.2.x-scala2.12',

117

'node_type_id': 'i3.large',

118

'num_workers': 3,

119

'spark_conf': {

120

'spark.sql.adaptive.enabled': 'true'

121

}

122

}

123

}

124

)

125

126

# Transform and clean data

127

transform_task = DatabricksTaskOperator(

128

task_id='transform_data',

129

task_config={

130

'spark_python_task': {

131

'python_file': 'dbfs:/pipelines/transform/data_cleaner.py',

132

'parameters': [

133

'--input-path', '/raw/daily/{{ ds }}',

134

'--output-path', '/processed/daily/{{ ds }}',

135

'--quality-threshold', '0.95'

136

]

137

},

138

'job_cluster_key': 'transform_cluster'

139

},

140

depends_on=['extract_data']

141

)

142

143

# Load into data warehouse

144

load_task = DatabricksTaskOperator(

145

task_id='load_to_warehouse',

146

task_config={

147

'sql_task': {

148

'query': {

149

'query_id': 'warehouse-load-query-123'

150

},

151

'warehouse_id': 'analytics-warehouse-001',

152

'parameters': {

153

'process_date': '{{ ds }}',

154

'source_path': '/processed/daily/{{ ds }}'

155

}

156

}

157

},

158

depends_on=['transform_data']

159

)

160

161

# Generate reports

162

reporting_task = DatabricksTaskOperator(

163

task_id='generate_reports',

164

task_config={

165

'notebook_task': {

166

'notebook_path': '/reporting/daily_dashboard',

167

'base_parameters': {

168

'report_date': '{{ ds }}',

169

'dashboard_refresh': 'true'

170

}

171

},

172

'existing_cluster_id': 'reporting-cluster-001'

173

},

174

depends_on=['load_to_warehouse']

175

)

176

177

# Define workflow dependencies

178

extract_task >> transform_task >> load_task >> reporting_task

179

```

180

181

### Complex Multi-Branch Workflow

182

183

Create workflows with parallel branches and conditional execution:

184

185

```python { .api }

186

with DatabricksWorkflowTaskGroup(

187

group_id='ml_pipeline_workflow',

188

databricks_conn_id='databricks_ml',

189

existing_clusters={

190

'feature_cluster': 'feature-engineering-001',

191

'training_cluster': 'ml-training-gpu-001',

192

'inference_cluster': 'ml-inference-001'

193

},

194

max_concurrent_runs=2

195

) as ml_pipeline:

196

197

# Feature engineering tasks (parallel)

198

customer_features = DatabricksTaskOperator(

199

task_id='extract_customer_features',

200

task_config={

201

'notebook_task': {

202

'notebook_path': '/ml/features/customer_features',

203

'base_parameters': {

204

'feature_date': '{{ ds }}',

205

'lookback_days': '90'

206

}

207

},

208

'job_cluster_key': 'feature_cluster'

209

}

210

)

211

212

product_features = DatabricksTaskOperator(

213

task_id='extract_product_features',

214

task_config={

215

'notebook_task': {

216

'notebook_path': '/ml/features/product_features',

217

'base_parameters': {

218

'feature_date': '{{ ds }}',

219

'category_encoding': 'onehot'

220

}

221

},

222

'job_cluster_key': 'feature_cluster'

223

}

224

)

225

226

interaction_features = DatabricksTaskOperator(

227

task_id='extract_interaction_features',

228

task_config={

229

'spark_python_task': {

230

'python_file': 'dbfs:/ml/features/interaction_builder.py',

231

'parameters': [

232

'--date', '{{ ds }}',

233

'--interaction-types', 'customer_product,temporal',

234

'--output-format', 'delta'

235

]

236

},

237

'job_cluster_key': 'feature_cluster'

238

}

239

)

240

241

# Feature combination and validation

242

combine_features = DatabricksTaskOperator(

243

task_id='combine_features',

244

task_config={

245

'notebook_task': {

246

'notebook_path': '/ml/features/feature_combiner',

247

'base_parameters': {

248

'feature_date': '{{ ds }}',

249

'validation_split': '0.2',

250

'target_column': 'conversion_probability'

251

}

252

},

253

'job_cluster_key': 'feature_cluster'

254

},

255

depends_on=['extract_customer_features', 'extract_product_features', 'extract_interaction_features']

256

)

257

258

# Model training (conditional on feature validation)

259

train_model = DatabricksTaskOperator(

260

task_id='train_model',

261

task_config={

262

'python_wheel_task': {

263

'package_name': 'ml_training_package',

264

'entry_point': 'train_recommender_model',

265

'parameters': [

266

'--training-data-path', '/features/combined/{{ ds }}',

267

'--model-output-path', '/models/recommender/{{ ds }}',

268

'--hyperopt-trials', '100',

269

'--early-stopping', 'true'

270

]

271

},

272

'new_cluster': {

273

'spark_version': '12.2.x-cpu-ml-scala2.12',

274

'node_type_id': 'i3.4xlarge',

275

'num_workers': 5,

276

'spark_conf': {

277

'spark.task.maxFailures': '3'

278

}

279

},

280

'libraries': [

281

{'pypi': {'package': 'mlflow>=2.0.0'}},

282

{'pypi': {'package': 'hyperopt>=0.2.0'}},

283

{'pypi': {'package': 'xgboost>=1.6.0'}}

284

]

285

},

286

depends_on=['combine_features']

287

)

288

289

# Model validation and deployment

290

validate_model = DatabricksTaskOperator(

291

task_id='validate_model',

292

task_config={

293

'notebook_task': {

294

'notebook_path': '/ml/validation/model_validator',

295

'base_parameters': {

296

'model_path': '/models/recommender/{{ ds }}',

297

'validation_data_path': '/features/validation/{{ ds }}',

298

'performance_threshold': '0.85',

299

'deployment_environment': 'staging'

300

}

301

},

302

'job_cluster_key': 'inference_cluster'

303

},

304

depends_on=['train_model']

305

)

306

307

# Deploy to production (conditional on validation success)

308

deploy_model = DatabricksTaskOperator(

309

task_id='deploy_to_production',

310

task_config={

311

'notebook_task': {

312

'notebook_path': '/ml/deployment/model_deployer',

313

'base_parameters': {

314

'model_path': '/models/recommender/{{ ds }}',

315

'deployment_target': 'production',

316

'canary_percentage': '10',

317

'rollback_threshold': '0.80'

318

}

319

},

320

'existing_cluster_id': 'production-deployment-001'

321

},

322

depends_on=['validate_model']

323

)

324

325

# Set up dependencies

326

[customer_features, product_features, interaction_features] >> combine_features

327

combine_features >> train_model >> validate_model >> deploy_model

328

```

329

330

### Workflow with Shared Job Clusters

331

332

Define workflows that share cluster resources across multiple tasks:

333

334

```python { .api }

335

with DatabricksWorkflowTaskGroup(

336

group_id='shared_cluster_workflow',

337

databricks_conn_id='databricks_etl',

338

extra_job_params={

339

'job_clusters': [

340

{

341

'job_cluster_key': 'etl_cluster',

342

'new_cluster': {

343

'spark_version': '12.2.x-scala2.12',

344

'node_type_id': 'i3.xlarge',

345

'num_workers': 8,

346

'autoscale': {

347

'min_workers': 2,

348

'max_workers': 10

349

},

350

'spark_conf': {

351

'spark.sql.adaptive.enabled': 'true',

352

'spark.sql.adaptive.coalescePartitions.enabled': 'true'

353

}

354

}

355

},

356

{

357

'job_cluster_key': 'analytics_cluster',

358

'new_cluster': {

359

'spark_version': '12.2.x-scala2.12',

360

'node_type_id': 'r5.2xlarge',

361

'num_workers': 4,

362

'spark_conf': {

363

'spark.sql.execution.arrow.pyspark.enabled': 'true'

364

}

365

}

366

}

367

],

368

'libraries': [

369

{'pypi': {'package': 'pandas>=1.5.0'}},

370

{'pypi': {'package': 'numpy>=1.21.0'}},

371

{'maven': {'coordinates': 'org.apache.spark:spark-avro_2.12:3.3.0'}}

372

]

373

}

374

) as shared_workflow:

375

376

# ETL tasks using shared ETL cluster

377

extract_customers = DatabricksTaskOperator(

378

task_id='extract_customers',

379

task_config={

380

'spark_python_task': {

381

'python_file': 'dbfs:/etl/extractors/customer_extractor.py',

382

'parameters': ['--date', '{{ ds }}', '--format', 'delta']

383

},

384

'job_cluster_key': 'etl_cluster'

385

}

386

)

387

388

extract_orders = DatabricksTaskOperator(

389

task_id='extract_orders',

390

task_config={

391

'spark_python_task': {

392

'python_file': 'dbfs:/etl/extractors/order_extractor.py',

393

'parameters': ['--date', '{{ ds }}', '--include-cancelled', 'false']

394

},

395

'job_cluster_key': 'etl_cluster'

396

}

397

)

398

399

# Data joining and transformation

400

join_data = DatabricksTaskOperator(

401

task_id='join_customer_orders',

402

task_config={

403

'notebook_task': {

404

'notebook_path': '/etl/transformers/customer_order_joiner',

405

'base_parameters': {

406

'process_date': '{{ ds }}',

407

'join_strategy': 'broadcast_hash',

408

'output_partitions': '100'

409

}

410

},

411

'job_cluster_key': 'etl_cluster'

412

},

413

depends_on=['extract_customers', 'extract_orders']

414

)

415

416

# Analytics tasks using dedicated analytics cluster

417

customer_analytics = DatabricksTaskOperator(

418

task_id='customer_segmentation',

419

task_config={

420

'notebook_task': {

421

'notebook_path': '/analytics/customer_segmentation',

422

'base_parameters': {

423

'analysis_date': '{{ ds }}',

424

'segmentation_method': 'kmeans',

425

'num_clusters': '5'

426

}

427

},

428

'job_cluster_key': 'analytics_cluster'

429

},

430

depends_on=['join_customer_orders']

431

)

432

433

revenue_analytics = DatabricksTaskOperator(

434

task_id='revenue_analysis',

435

task_config={

436

'sql_task': {

437

'query': {

438

'query_id': 'revenue-analysis-query'

439

},

440

'warehouse_id': 'analytics-warehouse'

441

}

442

},

443

depends_on=['join_customer_orders']

444

)

445

446

# Define workflow structure

447

[extract_customers, extract_orders] >> join_data >> [customer_analytics, revenue_analytics]

448

```

449

450

### Git-Integrated Workflow

451

452

Execute workflows using code from Git repositories:

453

454

```python { .api }

455

with DatabricksWorkflowTaskGroup(

456

group_id='git_integrated_workflow',

457

databricks_conn_id='databricks_dev',

458

extra_job_params={

459

'git_source': {

460

'git_url': 'https://github.com/company/data-pipelines.git',

461

'git_branch': '{{ params.git_branch | default("main") }}',

462

'git_provider': 'gitHub'

463

}

464

}

465

) as git_workflow:

466

467

# Data validation using Git-stored notebooks

468

validate_inputs = DatabricksTaskOperator(

469

task_id='validate_input_data',

470

task_config={

471

'notebook_task': {

472

'notebook_path': 'validation/input_validator.py',

473

'source': 'GIT',

474

'base_parameters': {

475

'validation_date': '{{ ds }}',

476

'strict_mode': 'true'

477

}

478

},

479

'existing_cluster_id': 'validation-cluster-001'

480

}

481

)

482

483

# ETL processing

484

process_data = DatabricksTaskOperator(

485

task_id='process_data',

486

task_config={

487

'spark_python_task': {

488

'python_file': 'processing/daily_processor.py',

489

'source': 'GIT',

490

'parameters': [

491

'--config', 'configs/production.yaml',

492

'--date', '{{ ds }}',

493

'--parallel-jobs', '4'

494

]

495

},

496

'new_cluster': {

497

'spark_version': '12.2.x-scala2.12',

498

'node_type_id': 'i3.2xlarge',

499

'num_workers': 6

500

}

501

},

502

depends_on=['validate_input_data']

503

)

504

505

# Quality assessment

506

assess_quality = DatabricksTaskOperator(

507

task_id='assess_data_quality',

508

task_config={

509

'python_wheel_task': {

510

'package_name': 'data_quality_package',

511

'entry_point': 'run_quality_checks',

512

'parameters': [

513

'--data-path', '/processed/{{ ds }}',

514

'--rules-config', 'quality_rules.json'

515

]

516

},

517

'existing_cluster_id': 'quality-cluster-001'

518

},

519

depends_on=['process_data']

520

)

521

522

validate_inputs >> process_data >> assess_quality

523

```

524

525

## Advanced Workflow Features

526

527

### Conditional Task Execution

528

529

Implement conditional logic within workflows:

530

531

```python { .api }

532

with DatabricksWorkflowTaskGroup(

533

group_id='conditional_workflow',

534

databricks_conn_id='databricks_conditional'

535

) as conditional_workflow:

536

537

# Check data availability

538

check_data = DatabricksTaskOperator(

539

task_id='check_data_availability',

540

task_config={

541

'notebook_task': {

542

'notebook_path': '/checks/data_availability_checker',

543

'base_parameters': {

544

'check_date': '{{ ds }}',

545

'required_sources': 'sales,marketing,customer_service'

546

}

547

},

548

'existing_cluster_id': 'utility-cluster-001'

549

}

550

)

551

552

# Full processing (when all data available)

553

full_processing = DatabricksTaskOperator(

554

task_id='full_data_processing',

555

task_config={

556

'notebook_task': {

557

'notebook_path': '/processing/full_pipeline',

558

'base_parameters': {

559

'process_date': '{{ ds }}',

560

'processing_mode': 'complete'

561

}

562

},

563

'job_cluster_key': 'processing_cluster'

564

},

565

depends_on=['check_data_availability']

566

)

567

568

# Partial processing (when some data missing)

569

partial_processing = DatabricksTaskOperator(

570

task_id='partial_data_processing',

571

task_config={

572

'notebook_task': {

573

'notebook_path': '/processing/partial_pipeline',

574

'base_parameters': {

575

'process_date': '{{ ds }}',

576

'processing_mode': 'available_only'

577

}

578

},

579

'job_cluster_key': 'processing_cluster'

580

},

581

depends_on=['check_data_availability']

582

)

583

584

check_data >> [full_processing, partial_processing]

585

```

586

587

### Workflow with Error Handling

588

589

Implement comprehensive error handling and recovery:

590

591

```python { .api }

592

with DatabricksWorkflowTaskGroup(

593

group_id='resilient_workflow',

594

databricks_conn_id='databricks_production',

595

default_task_retries=2,

596

default_task_timeout_seconds=7200

597

) as resilient_workflow:

598

599

# Critical data processing with retry logic

600

critical_processing = DatabricksTaskOperator(

601

task_id='critical_data_processing',

602

task_config={

603

'spark_python_task': {

604

'python_file': 'dbfs:/critical/data_processor.py',

605

'parameters': ['--date', '{{ ds }}', '--retry-mode', 'true']

606

},

607

'new_cluster': {

608

'spark_version': '12.2.x-scala2.12',

609

'node_type_id': 'i3.xlarge',

610

'num_workers': 4

611

}

612

},

613

retries=3,

614

timeout_seconds=3600

615

)

616

617

# Backup processing (runs if critical processing fails)

618

backup_processing = DatabricksTaskOperator(

619

task_id='backup_data_processing',

620

task_config={

621

'notebook_task': {

622

'notebook_path': '/backup/alternative_processor',

623

'base_parameters': {

624

'process_date': '{{ ds }}',

625

'fallback_mode': 'true'

626

}

627

},

628

'existing_cluster_id': 'backup-cluster-001'

629

},

630

depends_on=['critical_data_processing']

631

)

632

633

# Notification task (always runs)

634

notify_completion = DatabricksTaskOperator(

635

task_id='notify_completion',

636

task_config={

637

'notebook_task': {

638

'notebook_path': '/notifications/workflow_notifier',

639

'base_parameters': {

640

'workflow_id': '{{ run_id }}',

641

'completion_date': '{{ ds }}',

642

'status': 'completed'

643

}

644

},

645

'existing_cluster_id': 'utility-cluster-001'

646

},

647

depends_on=['backup_data_processing']

648

)

649

650

critical_processing >> backup_processing >> notify_completion

651

```

652

653

## Monitoring and Troubleshooting

654

655

### Workflow Status Monitoring

656

657

Monitor workflow execution with custom status checks:

658

659

```python { .api }

660

from airflow.providers.databricks.sensors.databricks import DatabricksSensor

661

662

def monitor_workflow_execution(**context):

663

"""Custom monitoring function for workflow status."""

664

workflow_run_id = context['ti'].xcom_pull(task_ids='data_pipeline_workflow', key='run_id')

665

666

# Custom monitoring logic

667

print(f"Monitoring workflow run: {workflow_run_id}")

668

669

return workflow_run_id

670

671

# Workflow with monitoring

672

workflow_monitor = DatabricksSensor(

673

task_id='monitor_workflow_completion',

674

run_id="{{ task_instance.xcom_pull(task_ids='data_pipeline_workflow', key='run_id') }}",

675

databricks_conn_id='databricks_production',

676

poke_interval=60,

677

timeout=7200,

678

deferrable=True

679

)

680

681

data_pipeline >> workflow_monitor

682

```

683

684

### Resource Usage Optimization

685

686

Optimize workflow resource allocation:

687

688

```python { .api }

689

with DatabricksWorkflowTaskGroup(

690

group_id='optimized_workflow',

691

databricks_conn_id='databricks_optimized',

692

extra_job_params={

693

'job_clusters': [

694

{

695

'job_cluster_key': 'small_tasks',

696

'new_cluster': {

697

'spark_version': '12.2.x-scala2.12',

698

'node_type_id': 'i3.large',

699

'autoscale': {'min_workers': 1, 'max_workers': 3}

700

}

701

},

702

{

703

'job_cluster_key': 'large_tasks',

704

'new_cluster': {

705

'spark_version': '12.2.x-scala2.12',

706

'node_type_id': 'i3.2xlarge',

707

'autoscale': {'min_workers': 4, 'max_workers': 12}

708

}

709

}

710

],

711

'timeout_seconds': 14400,

712

'max_concurrent_runs': 3

713

}

714

) as optimized_workflow:

715

716

# Light preprocessing on small cluster

717

light_preprocessing = DatabricksTaskOperator(

718

task_id='light_preprocessing',

719

task_config={

720

'notebook_task': {

721

'notebook_path': '/preprocessing/light_cleaner'

722

},

723

'job_cluster_key': 'small_tasks'

724

},

725

timeout_seconds=1800

726

)

727

728

# Heavy computation on large cluster

729

heavy_computation = DatabricksTaskOperator(

730

task_id='heavy_computation',

731

task_config={

732

'spark_python_task': {

733

'python_file': 'dbfs:/compute/heavy_aggregator.py'

734

},

735

'job_cluster_key': 'large_tasks'

736

},

737

timeout_seconds=10800,

738

depends_on=['light_preprocessing']

739

)

740

741

light_preprocessing >> heavy_computation

742

```

743

744

The workflow orchestration capabilities provide powerful tools for creating complex, multi-task pipelines that leverage Databricks' native workflow engine while maintaining full integration with Airflow's scheduling, monitoring, and error handling systems.