0
# Workflow Orchestration
1
2
The Databricks provider offers sophisticated workflow orchestration capabilities through Databricks Workflows, enabling you to create complex multi-task pipelines that run as coordinated jobs with dependency management, resource sharing, and advanced monitoring.
3
4
## Core Components
5
6
### DatabricksWorkflowTaskGroup
7
8
Create coordinated workflows that execute as unified Databricks jobs with shared resources and dependencies.
9
10
```python { .api }
11
from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
12
13
class DatabricksWorkflowTaskGroup(TaskGroup):
14
def __init__(
15
self,
16
*,
17
group_id: str,
18
databricks_conn_id: str = "databricks_default",
19
existing_clusters: dict[str, str] | None = None,
20
extra_job_params: dict[str, Any] | None = None,
21
max_concurrent_runs: int = 1,
22
default_task_timeout_seconds: int | None = None,
23
default_task_retries: int = 0,
24
prefix: str = "",
25
suffix: str = "",
26
dag: DAG | None = None,
27
parent_group: TaskGroup | None = None,
28
**kwargs
29
) -> None:
30
"""
31
Create a workflow task group for coordinated Databricks job execution.
32
33
Args:
34
group_id: Unique identifier for the workflow group
35
databricks_conn_id: Airflow connection ID for Databricks
36
existing_clusters: Mapping of cluster keys to cluster IDs for reuse
37
extra_job_params: Additional parameters for the Databricks job
38
max_concurrent_runs: Maximum number of concurrent workflow runs
39
default_task_timeout_seconds: Default timeout for workflow tasks
40
default_task_retries: Default retry count for workflow tasks
41
prefix: Prefix for task names within the workflow
42
suffix: Suffix for task names within the workflow
43
dag: Parent DAG containing this workflow
44
parent_group: Parent task group if this is a nested workflow
45
"""
46
```
47
48
### DatabricksTaskOperator
49
50
Individual tasks within Databricks workflows with comprehensive configuration support.
51
52
```python { .api }
53
from airflow.providers.databricks.operators.databricks_workflow import DatabricksTaskOperator
54
55
class DatabricksTaskOperator(BaseOperator):
56
def __init__(
57
self,
58
*,
59
task_config: dict[str, Any],
60
databricks_conn_id: str = "databricks_default",
61
timeout_seconds: int | None = None,
62
retries: int | None = None,
63
cluster_spec: dict[str, Any] | None = None,
64
libraries: list[dict[str, Any]] | None = None,
65
depends_on: list[str] | None = None,
66
**kwargs
67
) -> None:
68
"""
69
Individual task within a Databricks workflow.
70
71
Args:
72
task_config: Complete task configuration for Databricks
73
databricks_conn_id: Airflow connection ID for Databricks
74
timeout_seconds: Task-specific timeout override
75
retries: Task-specific retry count override
76
cluster_spec: Cluster configuration for this specific task
77
libraries: Libraries to install for this task
78
depends_on: List of task keys this task depends on
79
"""
80
```
81
82
## Usage Examples
83
84
### Basic Workflow Creation
85
86
Create a simple multi-stage data pipeline workflow:
87
88
```python { .api }
89
from airflow.providers.databricks.operators.databricks_workflow import (
90
DatabricksWorkflowTaskGroup,
91
DatabricksTaskOperator
92
)
93
94
# Define workflow with multiple dependent tasks
95
with DatabricksWorkflowTaskGroup(
96
group_id='data_pipeline_workflow',
97
databricks_conn_id='databricks_production',
98
max_concurrent_runs=3,
99
default_task_timeout_seconds=3600
100
) as data_pipeline:
101
102
# Extract raw data
103
extract_task = DatabricksTaskOperator(
104
task_id='extract_data',
105
task_config={
106
'notebook_task': {
107
'notebook_path': '/pipelines/extract/daily_extract',
108
'source': 'WORKSPACE',
109
'base_parameters': {
110
'extraction_date': '{{ ds }}',
111
'source_systems': 'crm,billing,support',
112
'output_path': '/raw/daily/{{ ds }}'
113
}
114
},
115
'new_cluster': {
116
'spark_version': '12.2.x-scala2.12',
117
'node_type_id': 'i3.large',
118
'num_workers': 3,
119
'spark_conf': {
120
'spark.sql.adaptive.enabled': 'true'
121
}
122
}
123
}
124
)
125
126
# Transform and clean data
127
transform_task = DatabricksTaskOperator(
128
task_id='transform_data',
129
task_config={
130
'spark_python_task': {
131
'python_file': 'dbfs:/pipelines/transform/data_cleaner.py',
132
'parameters': [
133
'--input-path', '/raw/daily/{{ ds }}',
134
'--output-path', '/processed/daily/{{ ds }}',
135
'--quality-threshold', '0.95'
136
]
137
},
138
'job_cluster_key': 'transform_cluster'
139
},
140
depends_on=['extract_data']
141
)
142
143
# Load into data warehouse
144
load_task = DatabricksTaskOperator(
145
task_id='load_to_warehouse',
146
task_config={
147
'sql_task': {
148
'query': {
149
'query_id': 'warehouse-load-query-123'
150
},
151
'warehouse_id': 'analytics-warehouse-001',
152
'parameters': {
153
'process_date': '{{ ds }}',
154
'source_path': '/processed/daily/{{ ds }}'
155
}
156
}
157
},
158
depends_on=['transform_data']
159
)
160
161
# Generate reports
162
reporting_task = DatabricksTaskOperator(
163
task_id='generate_reports',
164
task_config={
165
'notebook_task': {
166
'notebook_path': '/reporting/daily_dashboard',
167
'base_parameters': {
168
'report_date': '{{ ds }}',
169
'dashboard_refresh': 'true'
170
}
171
},
172
'existing_cluster_id': 'reporting-cluster-001'
173
},
174
depends_on=['load_to_warehouse']
175
)
176
177
# Define workflow dependencies
178
extract_task >> transform_task >> load_task >> reporting_task
179
```
180
181
### Complex Multi-Branch Workflow
182
183
Create workflows with parallel branches and conditional execution:
184
185
```python { .api }
186
with DatabricksWorkflowTaskGroup(
187
group_id='ml_pipeline_workflow',
188
databricks_conn_id='databricks_ml',
189
existing_clusters={
190
'feature_cluster': 'feature-engineering-001',
191
'training_cluster': 'ml-training-gpu-001',
192
'inference_cluster': 'ml-inference-001'
193
},
194
max_concurrent_runs=2
195
) as ml_pipeline:
196
197
# Feature engineering tasks (parallel)
198
customer_features = DatabricksTaskOperator(
199
task_id='extract_customer_features',
200
task_config={
201
'notebook_task': {
202
'notebook_path': '/ml/features/customer_features',
203
'base_parameters': {
204
'feature_date': '{{ ds }}',
205
'lookback_days': '90'
206
}
207
},
208
'job_cluster_key': 'feature_cluster'
209
}
210
)
211
212
product_features = DatabricksTaskOperator(
213
task_id='extract_product_features',
214
task_config={
215
'notebook_task': {
216
'notebook_path': '/ml/features/product_features',
217
'base_parameters': {
218
'feature_date': '{{ ds }}',
219
'category_encoding': 'onehot'
220
}
221
},
222
'job_cluster_key': 'feature_cluster'
223
}
224
)
225
226
interaction_features = DatabricksTaskOperator(
227
task_id='extract_interaction_features',
228
task_config={
229
'spark_python_task': {
230
'python_file': 'dbfs:/ml/features/interaction_builder.py',
231
'parameters': [
232
'--date', '{{ ds }}',
233
'--interaction-types', 'customer_product,temporal',
234
'--output-format', 'delta'
235
]
236
},
237
'job_cluster_key': 'feature_cluster'
238
}
239
)
240
241
# Feature combination and validation
242
combine_features = DatabricksTaskOperator(
243
task_id='combine_features',
244
task_config={
245
'notebook_task': {
246
'notebook_path': '/ml/features/feature_combiner',
247
'base_parameters': {
248
'feature_date': '{{ ds }}',
249
'validation_split': '0.2',
250
'target_column': 'conversion_probability'
251
}
252
},
253
'job_cluster_key': 'feature_cluster'
254
},
255
depends_on=['extract_customer_features', 'extract_product_features', 'extract_interaction_features']
256
)
257
258
# Model training (conditional on feature validation)
259
train_model = DatabricksTaskOperator(
260
task_id='train_model',
261
task_config={
262
'python_wheel_task': {
263
'package_name': 'ml_training_package',
264
'entry_point': 'train_recommender_model',
265
'parameters': [
266
'--training-data-path', '/features/combined/{{ ds }}',
267
'--model-output-path', '/models/recommender/{{ ds }}',
268
'--hyperopt-trials', '100',
269
'--early-stopping', 'true'
270
]
271
},
272
'new_cluster': {
273
'spark_version': '12.2.x-cpu-ml-scala2.12',
274
'node_type_id': 'i3.4xlarge',
275
'num_workers': 5,
276
'spark_conf': {
277
'spark.task.maxFailures': '3'
278
}
279
},
280
'libraries': [
281
{'pypi': {'package': 'mlflow>=2.0.0'}},
282
{'pypi': {'package': 'hyperopt>=0.2.0'}},
283
{'pypi': {'package': 'xgboost>=1.6.0'}}
284
]
285
},
286
depends_on=['combine_features']
287
)
288
289
# Model validation and deployment
290
validate_model = DatabricksTaskOperator(
291
task_id='validate_model',
292
task_config={
293
'notebook_task': {
294
'notebook_path': '/ml/validation/model_validator',
295
'base_parameters': {
296
'model_path': '/models/recommender/{{ ds }}',
297
'validation_data_path': '/features/validation/{{ ds }}',
298
'performance_threshold': '0.85',
299
'deployment_environment': 'staging'
300
}
301
},
302
'job_cluster_key': 'inference_cluster'
303
},
304
depends_on=['train_model']
305
)
306
307
# Deploy to production (conditional on validation success)
308
deploy_model = DatabricksTaskOperator(
309
task_id='deploy_to_production',
310
task_config={
311
'notebook_task': {
312
'notebook_path': '/ml/deployment/model_deployer',
313
'base_parameters': {
314
'model_path': '/models/recommender/{{ ds }}',
315
'deployment_target': 'production',
316
'canary_percentage': '10',
317
'rollback_threshold': '0.80'
318
}
319
},
320
'existing_cluster_id': 'production-deployment-001'
321
},
322
depends_on=['validate_model']
323
)
324
325
# Set up dependencies
326
[customer_features, product_features, interaction_features] >> combine_features
327
combine_features >> train_model >> validate_model >> deploy_model
328
```
329
330
### Workflow with Shared Job Clusters
331
332
Define workflows that share cluster resources across multiple tasks:
333
334
```python { .api }
335
with DatabricksWorkflowTaskGroup(
336
group_id='shared_cluster_workflow',
337
databricks_conn_id='databricks_etl',
338
extra_job_params={
339
'job_clusters': [
340
{
341
'job_cluster_key': 'etl_cluster',
342
'new_cluster': {
343
'spark_version': '12.2.x-scala2.12',
344
'node_type_id': 'i3.xlarge',
345
'num_workers': 8,
346
'autoscale': {
347
'min_workers': 2,
348
'max_workers': 10
349
},
350
'spark_conf': {
351
'spark.sql.adaptive.enabled': 'true',
352
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
353
}
354
}
355
},
356
{
357
'job_cluster_key': 'analytics_cluster',
358
'new_cluster': {
359
'spark_version': '12.2.x-scala2.12',
360
'node_type_id': 'r5.2xlarge',
361
'num_workers': 4,
362
'spark_conf': {
363
'spark.sql.execution.arrow.pyspark.enabled': 'true'
364
}
365
}
366
}
367
],
368
'libraries': [
369
{'pypi': {'package': 'pandas>=1.5.0'}},
370
{'pypi': {'package': 'numpy>=1.21.0'}},
371
{'maven': {'coordinates': 'org.apache.spark:spark-avro_2.12:3.3.0'}}
372
]
373
}
374
) as shared_workflow:
375
376
# ETL tasks using shared ETL cluster
377
extract_customers = DatabricksTaskOperator(
378
task_id='extract_customers',
379
task_config={
380
'spark_python_task': {
381
'python_file': 'dbfs:/etl/extractors/customer_extractor.py',
382
'parameters': ['--date', '{{ ds }}', '--format', 'delta']
383
},
384
'job_cluster_key': 'etl_cluster'
385
}
386
)
387
388
extract_orders = DatabricksTaskOperator(
389
task_id='extract_orders',
390
task_config={
391
'spark_python_task': {
392
'python_file': 'dbfs:/etl/extractors/order_extractor.py',
393
'parameters': ['--date', '{{ ds }}', '--include-cancelled', 'false']
394
},
395
'job_cluster_key': 'etl_cluster'
396
}
397
)
398
399
# Data joining and transformation
400
join_data = DatabricksTaskOperator(
401
task_id='join_customer_orders',
402
task_config={
403
'notebook_task': {
404
'notebook_path': '/etl/transformers/customer_order_joiner',
405
'base_parameters': {
406
'process_date': '{{ ds }}',
407
'join_strategy': 'broadcast_hash',
408
'output_partitions': '100'
409
}
410
},
411
'job_cluster_key': 'etl_cluster'
412
},
413
depends_on=['extract_customers', 'extract_orders']
414
)
415
416
# Analytics tasks using dedicated analytics cluster
417
customer_analytics = DatabricksTaskOperator(
418
task_id='customer_segmentation',
419
task_config={
420
'notebook_task': {
421
'notebook_path': '/analytics/customer_segmentation',
422
'base_parameters': {
423
'analysis_date': '{{ ds }}',
424
'segmentation_method': 'kmeans',
425
'num_clusters': '5'
426
}
427
},
428
'job_cluster_key': 'analytics_cluster'
429
},
430
depends_on=['join_customer_orders']
431
)
432
433
revenue_analytics = DatabricksTaskOperator(
434
task_id='revenue_analysis',
435
task_config={
436
'sql_task': {
437
'query': {
438
'query_id': 'revenue-analysis-query'
439
},
440
'warehouse_id': 'analytics-warehouse'
441
}
442
},
443
depends_on=['join_customer_orders']
444
)
445
446
# Define workflow structure
447
[extract_customers, extract_orders] >> join_data >> [customer_analytics, revenue_analytics]
448
```
449
450
### Git-Integrated Workflow
451
452
Execute workflows using code from Git repositories:
453
454
```python { .api }
455
with DatabricksWorkflowTaskGroup(
456
group_id='git_integrated_workflow',
457
databricks_conn_id='databricks_dev',
458
extra_job_params={
459
'git_source': {
460
'git_url': 'https://github.com/company/data-pipelines.git',
461
'git_branch': '{{ params.git_branch | default("main") }}',
462
'git_provider': 'gitHub'
463
}
464
}
465
) as git_workflow:
466
467
# Data validation using Git-stored notebooks
468
validate_inputs = DatabricksTaskOperator(
469
task_id='validate_input_data',
470
task_config={
471
'notebook_task': {
472
'notebook_path': 'validation/input_validator.py',
473
'source': 'GIT',
474
'base_parameters': {
475
'validation_date': '{{ ds }}',
476
'strict_mode': 'true'
477
}
478
},
479
'existing_cluster_id': 'validation-cluster-001'
480
}
481
)
482
483
# ETL processing
484
process_data = DatabricksTaskOperator(
485
task_id='process_data',
486
task_config={
487
'spark_python_task': {
488
'python_file': 'processing/daily_processor.py',
489
'source': 'GIT',
490
'parameters': [
491
'--config', 'configs/production.yaml',
492
'--date', '{{ ds }}',
493
'--parallel-jobs', '4'
494
]
495
},
496
'new_cluster': {
497
'spark_version': '12.2.x-scala2.12',
498
'node_type_id': 'i3.2xlarge',
499
'num_workers': 6
500
}
501
},
502
depends_on=['validate_input_data']
503
)
504
505
# Quality assessment
506
assess_quality = DatabricksTaskOperator(
507
task_id='assess_data_quality',
508
task_config={
509
'python_wheel_task': {
510
'package_name': 'data_quality_package',
511
'entry_point': 'run_quality_checks',
512
'parameters': [
513
'--data-path', '/processed/{{ ds }}',
514
'--rules-config', 'quality_rules.json'
515
]
516
},
517
'existing_cluster_id': 'quality-cluster-001'
518
},
519
depends_on=['process_data']
520
)
521
522
validate_inputs >> process_data >> assess_quality
523
```
524
525
## Advanced Workflow Features
526
527
### Conditional Task Execution
528
529
Implement conditional logic within workflows:
530
531
```python { .api }
532
with DatabricksWorkflowTaskGroup(
533
group_id='conditional_workflow',
534
databricks_conn_id='databricks_conditional'
535
) as conditional_workflow:
536
537
# Check data availability
538
check_data = DatabricksTaskOperator(
539
task_id='check_data_availability',
540
task_config={
541
'notebook_task': {
542
'notebook_path': '/checks/data_availability_checker',
543
'base_parameters': {
544
'check_date': '{{ ds }}',
545
'required_sources': 'sales,marketing,customer_service'
546
}
547
},
548
'existing_cluster_id': 'utility-cluster-001'
549
}
550
)
551
552
# Full processing (when all data available)
553
full_processing = DatabricksTaskOperator(
554
task_id='full_data_processing',
555
task_config={
556
'notebook_task': {
557
'notebook_path': '/processing/full_pipeline',
558
'base_parameters': {
559
'process_date': '{{ ds }}',
560
'processing_mode': 'complete'
561
}
562
},
563
'job_cluster_key': 'processing_cluster'
564
},
565
depends_on=['check_data_availability']
566
)
567
568
# Partial processing (when some data missing)
569
partial_processing = DatabricksTaskOperator(
570
task_id='partial_data_processing',
571
task_config={
572
'notebook_task': {
573
'notebook_path': '/processing/partial_pipeline',
574
'base_parameters': {
575
'process_date': '{{ ds }}',
576
'processing_mode': 'available_only'
577
}
578
},
579
'job_cluster_key': 'processing_cluster'
580
},
581
depends_on=['check_data_availability']
582
)
583
584
check_data >> [full_processing, partial_processing]
585
```
586
587
### Workflow with Error Handling
588
589
Implement comprehensive error handling and recovery:
590
591
```python { .api }
592
with DatabricksWorkflowTaskGroup(
593
group_id='resilient_workflow',
594
databricks_conn_id='databricks_production',
595
default_task_retries=2,
596
default_task_timeout_seconds=7200
597
) as resilient_workflow:
598
599
# Critical data processing with retry logic
600
critical_processing = DatabricksTaskOperator(
601
task_id='critical_data_processing',
602
task_config={
603
'spark_python_task': {
604
'python_file': 'dbfs:/critical/data_processor.py',
605
'parameters': ['--date', '{{ ds }}', '--retry-mode', 'true']
606
},
607
'new_cluster': {
608
'spark_version': '12.2.x-scala2.12',
609
'node_type_id': 'i3.xlarge',
610
'num_workers': 4
611
}
612
},
613
retries=3,
614
timeout_seconds=3600
615
)
616
617
# Backup processing (runs if critical processing fails)
618
backup_processing = DatabricksTaskOperator(
619
task_id='backup_data_processing',
620
task_config={
621
'notebook_task': {
622
'notebook_path': '/backup/alternative_processor',
623
'base_parameters': {
624
'process_date': '{{ ds }}',
625
'fallback_mode': 'true'
626
}
627
},
628
'existing_cluster_id': 'backup-cluster-001'
629
},
630
depends_on=['critical_data_processing']
631
)
632
633
# Notification task (always runs)
634
notify_completion = DatabricksTaskOperator(
635
task_id='notify_completion',
636
task_config={
637
'notebook_task': {
638
'notebook_path': '/notifications/workflow_notifier',
639
'base_parameters': {
640
'workflow_id': '{{ run_id }}',
641
'completion_date': '{{ ds }}',
642
'status': 'completed'
643
}
644
},
645
'existing_cluster_id': 'utility-cluster-001'
646
},
647
depends_on=['backup_data_processing']
648
)
649
650
critical_processing >> backup_processing >> notify_completion
651
```
652
653
## Monitoring and Troubleshooting
654
655
### Workflow Status Monitoring
656
657
Monitor workflow execution with custom status checks:
658
659
```python { .api }
660
from airflow.providers.databricks.sensors.databricks import DatabricksSensor
661
662
def monitor_workflow_execution(**context):
663
"""Custom monitoring function for workflow status."""
664
workflow_run_id = context['ti'].xcom_pull(task_ids='data_pipeline_workflow', key='run_id')
665
666
# Custom monitoring logic
667
print(f"Monitoring workflow run: {workflow_run_id}")
668
669
return workflow_run_id
670
671
# Workflow with monitoring
672
workflow_monitor = DatabricksSensor(
673
task_id='monitor_workflow_completion',
674
run_id="{{ task_instance.xcom_pull(task_ids='data_pipeline_workflow', key='run_id') }}",
675
databricks_conn_id='databricks_production',
676
poke_interval=60,
677
timeout=7200,
678
deferrable=True
679
)
680
681
data_pipeline >> workflow_monitor
682
```
683
684
### Resource Usage Optimization
685
686
Optimize workflow resource allocation:
687
688
```python { .api }
689
with DatabricksWorkflowTaskGroup(
690
group_id='optimized_workflow',
691
databricks_conn_id='databricks_optimized',
692
extra_job_params={
693
'job_clusters': [
694
{
695
'job_cluster_key': 'small_tasks',
696
'new_cluster': {
697
'spark_version': '12.2.x-scala2.12',
698
'node_type_id': 'i3.large',
699
'autoscale': {'min_workers': 1, 'max_workers': 3}
700
}
701
},
702
{
703
'job_cluster_key': 'large_tasks',
704
'new_cluster': {
705
'spark_version': '12.2.x-scala2.12',
706
'node_type_id': 'i3.2xlarge',
707
'autoscale': {'min_workers': 4, 'max_workers': 12}
708
}
709
}
710
],
711
'timeout_seconds': 14400,
712
'max_concurrent_runs': 3
713
}
714
) as optimized_workflow:
715
716
# Light preprocessing on small cluster
717
light_preprocessing = DatabricksTaskOperator(
718
task_id='light_preprocessing',
719
task_config={
720
'notebook_task': {
721
'notebook_path': '/preprocessing/light_cleaner'
722
},
723
'job_cluster_key': 'small_tasks'
724
},
725
timeout_seconds=1800
726
)
727
728
# Heavy computation on large cluster
729
heavy_computation = DatabricksTaskOperator(
730
task_id='heavy_computation',
731
task_config={
732
'spark_python_task': {
733
'python_file': 'dbfs:/compute/heavy_aggregator.py'
734
},
735
'job_cluster_key': 'large_tasks'
736
},
737
timeout_seconds=10800,
738
depends_on=['light_preprocessing']
739
)
740
741
light_preprocessing >> heavy_computation
742
```
743
744
The workflow orchestration capabilities provide powerful tools for creating complex, multi-task pipelines that leverage Databricks' native workflow engine while maintaining full integration with Airflow's scheduling, monitoring, and error handling systems.