Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Databricks provider offers sophisticated workflow orchestration capabilities through Databricks Workflows, enabling you to create complex multi-task pipelines that run as coordinated jobs with dependency management, resource sharing, and advanced monitoring.
Create coordinated workflows that execute as unified Databricks jobs with shared resources and dependencies.
from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
class DatabricksWorkflowTaskGroup(TaskGroup):
def __init__(
self,
*,
group_id: str,
databricks_conn_id: str = "databricks_default",
existing_clusters: dict[str, str] | None = None,
extra_job_params: dict[str, Any] | None = None,
max_concurrent_runs: int = 1,
default_task_timeout_seconds: int | None = None,
default_task_retries: int = 0,
prefix: str = "",
suffix: str = "",
dag: DAG | None = None,
parent_group: TaskGroup | None = None,
**kwargs
) -> None:
"""
Create a workflow task group for coordinated Databricks job execution.
Args:
group_id: Unique identifier for the workflow group
databricks_conn_id: Airflow connection ID for Databricks
existing_clusters: Mapping of cluster keys to cluster IDs for reuse
extra_job_params: Additional parameters for the Databricks job
max_concurrent_runs: Maximum number of concurrent workflow runs
default_task_timeout_seconds: Default timeout for workflow tasks
default_task_retries: Default retry count for workflow tasks
prefix: Prefix for task names within the workflow
suffix: Suffix for task names within the workflow
dag: Parent DAG containing this workflow
parent_group: Parent task group if this is a nested workflow
"""Individual tasks within Databricks workflows with comprehensive configuration support.
from airflow.providers.databricks.operators.databricks_workflow import DatabricksTaskOperator
class DatabricksTaskOperator(BaseOperator):
def __init__(
self,
*,
task_config: dict[str, Any],
databricks_conn_id: str = "databricks_default",
timeout_seconds: int | None = None,
retries: int | None = None,
cluster_spec: dict[str, Any] | None = None,
libraries: list[dict[str, Any]] | None = None,
depends_on: list[str] | None = None,
**kwargs
) -> None:
"""
Individual task within a Databricks workflow.
Args:
task_config: Complete task configuration for Databricks
databricks_conn_id: Airflow connection ID for Databricks
timeout_seconds: Task-specific timeout override
retries: Task-specific retry count override
cluster_spec: Cluster configuration for this specific task
libraries: Libraries to install for this task
depends_on: List of task keys this task depends on
"""Create a simple multi-stage data pipeline workflow:
from airflow.providers.databricks.operators.databricks_workflow import (
DatabricksWorkflowTaskGroup,
DatabricksTaskOperator
)
# Define workflow with multiple dependent tasks
with DatabricksWorkflowTaskGroup(
group_id='data_pipeline_workflow',
databricks_conn_id='databricks_production',
max_concurrent_runs=3,
default_task_timeout_seconds=3600
) as data_pipeline:
# Extract raw data
extract_task = DatabricksTaskOperator(
task_id='extract_data',
task_config={
'notebook_task': {
'notebook_path': '/pipelines/extract/daily_extract',
'source': 'WORKSPACE',
'base_parameters': {
'extraction_date': '{{ ds }}',
'source_systems': 'crm,billing,support',
'output_path': '/raw/daily/{{ ds }}'
}
},
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.large',
'num_workers': 3,
'spark_conf': {
'spark.sql.adaptive.enabled': 'true'
}
}
}
)
# Transform and clean data
transform_task = DatabricksTaskOperator(
task_id='transform_data',
task_config={
'spark_python_task': {
'python_file': 'dbfs:/pipelines/transform/data_cleaner.py',
'parameters': [
'--input-path', '/raw/daily/{{ ds }}',
'--output-path', '/processed/daily/{{ ds }}',
'--quality-threshold', '0.95'
]
},
'job_cluster_key': 'transform_cluster'
},
depends_on=['extract_data']
)
# Load into data warehouse
load_task = DatabricksTaskOperator(
task_id='load_to_warehouse',
task_config={
'sql_task': {
'query': {
'query_id': 'warehouse-load-query-123'
},
'warehouse_id': 'analytics-warehouse-001',
'parameters': {
'process_date': '{{ ds }}',
'source_path': '/processed/daily/{{ ds }}'
}
}
},
depends_on=['transform_data']
)
# Generate reports
reporting_task = DatabricksTaskOperator(
task_id='generate_reports',
task_config={
'notebook_task': {
'notebook_path': '/reporting/daily_dashboard',
'base_parameters': {
'report_date': '{{ ds }}',
'dashboard_refresh': 'true'
}
},
'existing_cluster_id': 'reporting-cluster-001'
},
depends_on=['load_to_warehouse']
)
# Define workflow dependencies
extract_task >> transform_task >> load_task >> reporting_taskCreate workflows with parallel branches and conditional execution:
with DatabricksWorkflowTaskGroup(
group_id='ml_pipeline_workflow',
databricks_conn_id='databricks_ml',
existing_clusters={
'feature_cluster': 'feature-engineering-001',
'training_cluster': 'ml-training-gpu-001',
'inference_cluster': 'ml-inference-001'
},
max_concurrent_runs=2
) as ml_pipeline:
# Feature engineering tasks (parallel)
customer_features = DatabricksTaskOperator(
task_id='extract_customer_features',
task_config={
'notebook_task': {
'notebook_path': '/ml/features/customer_features',
'base_parameters': {
'feature_date': '{{ ds }}',
'lookback_days': '90'
}
},
'job_cluster_key': 'feature_cluster'
}
)
product_features = DatabricksTaskOperator(
task_id='extract_product_features',
task_config={
'notebook_task': {
'notebook_path': '/ml/features/product_features',
'base_parameters': {
'feature_date': '{{ ds }}',
'category_encoding': 'onehot'
}
},
'job_cluster_key': 'feature_cluster'
}
)
interaction_features = DatabricksTaskOperator(
task_id='extract_interaction_features',
task_config={
'spark_python_task': {
'python_file': 'dbfs:/ml/features/interaction_builder.py',
'parameters': [
'--date', '{{ ds }}',
'--interaction-types', 'customer_product,temporal',
'--output-format', 'delta'
]
},
'job_cluster_key': 'feature_cluster'
}
)
# Feature combination and validation
combine_features = DatabricksTaskOperator(
task_id='combine_features',
task_config={
'notebook_task': {
'notebook_path': '/ml/features/feature_combiner',
'base_parameters': {
'feature_date': '{{ ds }}',
'validation_split': '0.2',
'target_column': 'conversion_probability'
}
},
'job_cluster_key': 'feature_cluster'
},
depends_on=['extract_customer_features', 'extract_product_features', 'extract_interaction_features']
)
# Model training (conditional on feature validation)
train_model = DatabricksTaskOperator(
task_id='train_model',
task_config={
'python_wheel_task': {
'package_name': 'ml_training_package',
'entry_point': 'train_recommender_model',
'parameters': [
'--training-data-path', '/features/combined/{{ ds }}',
'--model-output-path', '/models/recommender/{{ ds }}',
'--hyperopt-trials', '100',
'--early-stopping', 'true'
]
},
'new_cluster': {
'spark_version': '12.2.x-cpu-ml-scala2.12',
'node_type_id': 'i3.4xlarge',
'num_workers': 5,
'spark_conf': {
'spark.task.maxFailures': '3'
}
},
'libraries': [
{'pypi': {'package': 'mlflow>=2.0.0'}},
{'pypi': {'package': 'hyperopt>=0.2.0'}},
{'pypi': {'package': 'xgboost>=1.6.0'}}
]
},
depends_on=['combine_features']
)
# Model validation and deployment
validate_model = DatabricksTaskOperator(
task_id='validate_model',
task_config={
'notebook_task': {
'notebook_path': '/ml/validation/model_validator',
'base_parameters': {
'model_path': '/models/recommender/{{ ds }}',
'validation_data_path': '/features/validation/{{ ds }}',
'performance_threshold': '0.85',
'deployment_environment': 'staging'
}
},
'job_cluster_key': 'inference_cluster'
},
depends_on=['train_model']
)
# Deploy to production (conditional on validation success)
deploy_model = DatabricksTaskOperator(
task_id='deploy_to_production',
task_config={
'notebook_task': {
'notebook_path': '/ml/deployment/model_deployer',
'base_parameters': {
'model_path': '/models/recommender/{{ ds }}',
'deployment_target': 'production',
'canary_percentage': '10',
'rollback_threshold': '0.80'
}
},
'existing_cluster_id': 'production-deployment-001'
},
depends_on=['validate_model']
)
# Set up dependencies
[customer_features, product_features, interaction_features] >> combine_features
combine_features >> train_model >> validate_model >> deploy_modelDefine workflows that share cluster resources across multiple tasks:
with DatabricksWorkflowTaskGroup(
group_id='shared_cluster_workflow',
databricks_conn_id='databricks_etl',
extra_job_params={
'job_clusters': [
{
'job_cluster_key': 'etl_cluster',
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 8,
'autoscale': {
'min_workers': 2,
'max_workers': 10
},
'spark_conf': {
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
}
}
},
{
'job_cluster_key': 'analytics_cluster',
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'r5.2xlarge',
'num_workers': 4,
'spark_conf': {
'spark.sql.execution.arrow.pyspark.enabled': 'true'
}
}
}
],
'libraries': [
{'pypi': {'package': 'pandas>=1.5.0'}},
{'pypi': {'package': 'numpy>=1.21.0'}},
{'maven': {'coordinates': 'org.apache.spark:spark-avro_2.12:3.3.0'}}
]
}
) as shared_workflow:
# ETL tasks using shared ETL cluster
extract_customers = DatabricksTaskOperator(
task_id='extract_customers',
task_config={
'spark_python_task': {
'python_file': 'dbfs:/etl/extractors/customer_extractor.py',
'parameters': ['--date', '{{ ds }}', '--format', 'delta']
},
'job_cluster_key': 'etl_cluster'
}
)
extract_orders = DatabricksTaskOperator(
task_id='extract_orders',
task_config={
'spark_python_task': {
'python_file': 'dbfs:/etl/extractors/order_extractor.py',
'parameters': ['--date', '{{ ds }}', '--include-cancelled', 'false']
},
'job_cluster_key': 'etl_cluster'
}
)
# Data joining and transformation
join_data = DatabricksTaskOperator(
task_id='join_customer_orders',
task_config={
'notebook_task': {
'notebook_path': '/etl/transformers/customer_order_joiner',
'base_parameters': {
'process_date': '{{ ds }}',
'join_strategy': 'broadcast_hash',
'output_partitions': '100'
}
},
'job_cluster_key': 'etl_cluster'
},
depends_on=['extract_customers', 'extract_orders']
)
# Analytics tasks using dedicated analytics cluster
customer_analytics = DatabricksTaskOperator(
task_id='customer_segmentation',
task_config={
'notebook_task': {
'notebook_path': '/analytics/customer_segmentation',
'base_parameters': {
'analysis_date': '{{ ds }}',
'segmentation_method': 'kmeans',
'num_clusters': '5'
}
},
'job_cluster_key': 'analytics_cluster'
},
depends_on=['join_customer_orders']
)
revenue_analytics = DatabricksTaskOperator(
task_id='revenue_analysis',
task_config={
'sql_task': {
'query': {
'query_id': 'revenue-analysis-query'
},
'warehouse_id': 'analytics-warehouse'
}
},
depends_on=['join_customer_orders']
)
# Define workflow structure
[extract_customers, extract_orders] >> join_data >> [customer_analytics, revenue_analytics]Execute workflows using code from Git repositories:
with DatabricksWorkflowTaskGroup(
group_id='git_integrated_workflow',
databricks_conn_id='databricks_dev',
extra_job_params={
'git_source': {
'git_url': 'https://github.com/company/data-pipelines.git',
'git_branch': '{{ params.git_branch | default("main") }}',
'git_provider': 'gitHub'
}
}
) as git_workflow:
# Data validation using Git-stored notebooks
validate_inputs = DatabricksTaskOperator(
task_id='validate_input_data',
task_config={
'notebook_task': {
'notebook_path': 'validation/input_validator.py',
'source': 'GIT',
'base_parameters': {
'validation_date': '{{ ds }}',
'strict_mode': 'true'
}
},
'existing_cluster_id': 'validation-cluster-001'
}
)
# ETL processing
process_data = DatabricksTaskOperator(
task_id='process_data',
task_config={
'spark_python_task': {
'python_file': 'processing/daily_processor.py',
'source': 'GIT',
'parameters': [
'--config', 'configs/production.yaml',
'--date', '{{ ds }}',
'--parallel-jobs', '4'
]
},
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.2xlarge',
'num_workers': 6
}
},
depends_on=['validate_input_data']
)
# Quality assessment
assess_quality = DatabricksTaskOperator(
task_id='assess_data_quality',
task_config={
'python_wheel_task': {
'package_name': 'data_quality_package',
'entry_point': 'run_quality_checks',
'parameters': [
'--data-path', '/processed/{{ ds }}',
'--rules-config', 'quality_rules.json'
]
},
'existing_cluster_id': 'quality-cluster-001'
},
depends_on=['process_data']
)
validate_inputs >> process_data >> assess_qualityImplement conditional logic within workflows:
with DatabricksWorkflowTaskGroup(
group_id='conditional_workflow',
databricks_conn_id='databricks_conditional'
) as conditional_workflow:
# Check data availability
check_data = DatabricksTaskOperator(
task_id='check_data_availability',
task_config={
'notebook_task': {
'notebook_path': '/checks/data_availability_checker',
'base_parameters': {
'check_date': '{{ ds }}',
'required_sources': 'sales,marketing,customer_service'
}
},
'existing_cluster_id': 'utility-cluster-001'
}
)
# Full processing (when all data available)
full_processing = DatabricksTaskOperator(
task_id='full_data_processing',
task_config={
'notebook_task': {
'notebook_path': '/processing/full_pipeline',
'base_parameters': {
'process_date': '{{ ds }}',
'processing_mode': 'complete'
}
},
'job_cluster_key': 'processing_cluster'
},
depends_on=['check_data_availability']
)
# Partial processing (when some data missing)
partial_processing = DatabricksTaskOperator(
task_id='partial_data_processing',
task_config={
'notebook_task': {
'notebook_path': '/processing/partial_pipeline',
'base_parameters': {
'process_date': '{{ ds }}',
'processing_mode': 'available_only'
}
},
'job_cluster_key': 'processing_cluster'
},
depends_on=['check_data_availability']
)
check_data >> [full_processing, partial_processing]Implement comprehensive error handling and recovery:
with DatabricksWorkflowTaskGroup(
group_id='resilient_workflow',
databricks_conn_id='databricks_production',
default_task_retries=2,
default_task_timeout_seconds=7200
) as resilient_workflow:
# Critical data processing with retry logic
critical_processing = DatabricksTaskOperator(
task_id='critical_data_processing',
task_config={
'spark_python_task': {
'python_file': 'dbfs:/critical/data_processor.py',
'parameters': ['--date', '{{ ds }}', '--retry-mode', 'true']
},
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 4
}
},
retries=3,
timeout_seconds=3600
)
# Backup processing (runs if critical processing fails)
backup_processing = DatabricksTaskOperator(
task_id='backup_data_processing',
task_config={
'notebook_task': {
'notebook_path': '/backup/alternative_processor',
'base_parameters': {
'process_date': '{{ ds }}',
'fallback_mode': 'true'
}
},
'existing_cluster_id': 'backup-cluster-001'
},
depends_on=['critical_data_processing']
)
# Notification task (always runs)
notify_completion = DatabricksTaskOperator(
task_id='notify_completion',
task_config={
'notebook_task': {
'notebook_path': '/notifications/workflow_notifier',
'base_parameters': {
'workflow_id': '{{ run_id }}',
'completion_date': '{{ ds }}',
'status': 'completed'
}
},
'existing_cluster_id': 'utility-cluster-001'
},
depends_on=['backup_data_processing']
)
critical_processing >> backup_processing >> notify_completionMonitor workflow execution with custom status checks:
from airflow.providers.databricks.sensors.databricks import DatabricksSensor
def monitor_workflow_execution(**context):
"""Custom monitoring function for workflow status."""
workflow_run_id = context['ti'].xcom_pull(task_ids='data_pipeline_workflow', key='run_id')
# Custom monitoring logic
print(f"Monitoring workflow run: {workflow_run_id}")
return workflow_run_id
# Workflow with monitoring
workflow_monitor = DatabricksSensor(
task_id='monitor_workflow_completion',
run_id="{{ task_instance.xcom_pull(task_ids='data_pipeline_workflow', key='run_id') }}",
databricks_conn_id='databricks_production',
poke_interval=60,
timeout=7200,
deferrable=True
)
data_pipeline >> workflow_monitorOptimize workflow resource allocation:
with DatabricksWorkflowTaskGroup(
group_id='optimized_workflow',
databricks_conn_id='databricks_optimized',
extra_job_params={
'job_clusters': [
{
'job_cluster_key': 'small_tasks',
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.large',
'autoscale': {'min_workers': 1, 'max_workers': 3}
}
},
{
'job_cluster_key': 'large_tasks',
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.2xlarge',
'autoscale': {'min_workers': 4, 'max_workers': 12}
}
}
],
'timeout_seconds': 14400,
'max_concurrent_runs': 3
}
) as optimized_workflow:
# Light preprocessing on small cluster
light_preprocessing = DatabricksTaskOperator(
task_id='light_preprocessing',
task_config={
'notebook_task': {
'notebook_path': '/preprocessing/light_cleaner'
},
'job_cluster_key': 'small_tasks'
},
timeout_seconds=1800
)
# Heavy computation on large cluster
heavy_computation = DatabricksTaskOperator(
task_id='heavy_computation',
task_config={
'spark_python_task': {
'python_file': 'dbfs:/compute/heavy_aggregator.py'
},
'job_cluster_key': 'large_tasks'
},
timeout_seconds=10800,
depends_on=['light_preprocessing']
)
light_preprocessing >> heavy_computationThe workflow orchestration capabilities provide powerful tools for creating complex, multi-task pipelines that leverage Databricks' native workflow engine while maintaining full integration with Airflow's scheduling, monitoring, and error handling systems.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-databricks