CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-databricks

Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

workflows.mddocs/

Workflow Orchestration

The Databricks provider offers sophisticated workflow orchestration capabilities through Databricks Workflows, enabling you to create complex multi-task pipelines that run as coordinated jobs with dependency management, resource sharing, and advanced monitoring.

Core Components

DatabricksWorkflowTaskGroup

Create coordinated workflows that execute as unified Databricks jobs with shared resources and dependencies.

from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup

class DatabricksWorkflowTaskGroup(TaskGroup):
    def __init__(
        self,
        *,
        group_id: str,
        databricks_conn_id: str = "databricks_default",
        existing_clusters: dict[str, str] | None = None,
        extra_job_params: dict[str, Any] | None = None,
        max_concurrent_runs: int = 1,
        default_task_timeout_seconds: int | None = None,
        default_task_retries: int = 0,
        prefix: str = "",
        suffix: str = "",
        dag: DAG | None = None,
        parent_group: TaskGroup | None = None,
        **kwargs
    ) -> None:
        """
        Create a workflow task group for coordinated Databricks job execution.
        
        Args:
            group_id: Unique identifier for the workflow group
            databricks_conn_id: Airflow connection ID for Databricks
            existing_clusters: Mapping of cluster keys to cluster IDs for reuse
            extra_job_params: Additional parameters for the Databricks job
            max_concurrent_runs: Maximum number of concurrent workflow runs
            default_task_timeout_seconds: Default timeout for workflow tasks
            default_task_retries: Default retry count for workflow tasks
            prefix: Prefix for task names within the workflow
            suffix: Suffix for task names within the workflow
            dag: Parent DAG containing this workflow
            parent_group: Parent task group if this is a nested workflow
        """

DatabricksTaskOperator

Individual tasks within Databricks workflows with comprehensive configuration support.

from airflow.providers.databricks.operators.databricks_workflow import DatabricksTaskOperator

class DatabricksTaskOperator(BaseOperator):
    def __init__(
        self,
        *,
        task_config: dict[str, Any],
        databricks_conn_id: str = "databricks_default",
        timeout_seconds: int | None = None,
        retries: int | None = None,
        cluster_spec: dict[str, Any] | None = None,
        libraries: list[dict[str, Any]] | None = None,
        depends_on: list[str] | None = None,
        **kwargs
    ) -> None:
        """
        Individual task within a Databricks workflow.
        
        Args:
            task_config: Complete task configuration for Databricks
            databricks_conn_id: Airflow connection ID for Databricks
            timeout_seconds: Task-specific timeout override
            retries: Task-specific retry count override
            cluster_spec: Cluster configuration for this specific task
            libraries: Libraries to install for this task
            depends_on: List of task keys this task depends on
        """

Usage Examples

Basic Workflow Creation

Create a simple multi-stage data pipeline workflow:

from airflow.providers.databricks.operators.databricks_workflow import (
    DatabricksWorkflowTaskGroup,
    DatabricksTaskOperator
)

# Define workflow with multiple dependent tasks
with DatabricksWorkflowTaskGroup(
    group_id='data_pipeline_workflow',
    databricks_conn_id='databricks_production',
    max_concurrent_runs=3,
    default_task_timeout_seconds=3600
) as data_pipeline:
    
    # Extract raw data
    extract_task = DatabricksTaskOperator(
        task_id='extract_data',
        task_config={
            'notebook_task': {
                'notebook_path': '/pipelines/extract/daily_extract',
                'source': 'WORKSPACE',
                'base_parameters': {
                    'extraction_date': '{{ ds }}',
                    'source_systems': 'crm,billing,support',
                    'output_path': '/raw/daily/{{ ds }}'
                }
            },
            'new_cluster': {
                'spark_version': '12.2.x-scala2.12',
                'node_type_id': 'i3.large',
                'num_workers': 3,
                'spark_conf': {
                    'spark.sql.adaptive.enabled': 'true'
                }
            }
        }
    )
    
    # Transform and clean data
    transform_task = DatabricksTaskOperator(
        task_id='transform_data',
        task_config={
            'spark_python_task': {
                'python_file': 'dbfs:/pipelines/transform/data_cleaner.py',
                'parameters': [
                    '--input-path', '/raw/daily/{{ ds }}',
                    '--output-path', '/processed/daily/{{ ds }}',
                    '--quality-threshold', '0.95'
                ]
            },
            'job_cluster_key': 'transform_cluster'
        },
        depends_on=['extract_data']
    )
    
    # Load into data warehouse
    load_task = DatabricksTaskOperator(
        task_id='load_to_warehouse',
        task_config={
            'sql_task': {
                'query': {
                    'query_id': 'warehouse-load-query-123'
                },
                'warehouse_id': 'analytics-warehouse-001',
                'parameters': {
                    'process_date': '{{ ds }}',
                    'source_path': '/processed/daily/{{ ds }}'
                }
            }
        },
        depends_on=['transform_data']
    )
    
    # Generate reports
    reporting_task = DatabricksTaskOperator(
        task_id='generate_reports',
        task_config={
            'notebook_task': {
                'notebook_path': '/reporting/daily_dashboard',
                'base_parameters': {
                    'report_date': '{{ ds }}',
                    'dashboard_refresh': 'true'
                }
            },
            'existing_cluster_id': 'reporting-cluster-001'
        },
        depends_on=['load_to_warehouse']
    )

    # Define workflow dependencies
    extract_task >> transform_task >> load_task >> reporting_task

Complex Multi-Branch Workflow

Create workflows with parallel branches and conditional execution:

with DatabricksWorkflowTaskGroup(
    group_id='ml_pipeline_workflow',
    databricks_conn_id='databricks_ml',
    existing_clusters={
        'feature_cluster': 'feature-engineering-001',
        'training_cluster': 'ml-training-gpu-001',
        'inference_cluster': 'ml-inference-001'
    },
    max_concurrent_runs=2
) as ml_pipeline:
    
    # Feature engineering tasks (parallel)
    customer_features = DatabricksTaskOperator(
        task_id='extract_customer_features',
        task_config={
            'notebook_task': {
                'notebook_path': '/ml/features/customer_features',
                'base_parameters': {
                    'feature_date': '{{ ds }}',
                    'lookback_days': '90'
                }
            },
            'job_cluster_key': 'feature_cluster'
        }
    )
    
    product_features = DatabricksTaskOperator(
        task_id='extract_product_features',
        task_config={
            'notebook_task': {
                'notebook_path': '/ml/features/product_features',
                'base_parameters': {
                    'feature_date': '{{ ds }}',
                    'category_encoding': 'onehot'
                }
            },
            'job_cluster_key': 'feature_cluster'
        }
    )
    
    interaction_features = DatabricksTaskOperator(
        task_id='extract_interaction_features',
        task_config={
            'spark_python_task': {
                'python_file': 'dbfs:/ml/features/interaction_builder.py',
                'parameters': [
                    '--date', '{{ ds }}',
                    '--interaction-types', 'customer_product,temporal',
                    '--output-format', 'delta'
                ]
            },
            'job_cluster_key': 'feature_cluster'
        }
    )
    
    # Feature combination and validation
    combine_features = DatabricksTaskOperator(
        task_id='combine_features',
        task_config={
            'notebook_task': {
                'notebook_path': '/ml/features/feature_combiner',
                'base_parameters': {
                    'feature_date': '{{ ds }}',
                    'validation_split': '0.2',
                    'target_column': 'conversion_probability'
                }
            },
            'job_cluster_key': 'feature_cluster'
        },
        depends_on=['extract_customer_features', 'extract_product_features', 'extract_interaction_features']
    )
    
    # Model training (conditional on feature validation)
    train_model = DatabricksTaskOperator(
        task_id='train_model',
        task_config={
            'python_wheel_task': {
                'package_name': 'ml_training_package',
                'entry_point': 'train_recommender_model',
                'parameters': [
                    '--training-data-path', '/features/combined/{{ ds }}',
                    '--model-output-path', '/models/recommender/{{ ds }}',
                    '--hyperopt-trials', '100',
                    '--early-stopping', 'true'
                ]
            },
            'new_cluster': {
                'spark_version': '12.2.x-cpu-ml-scala2.12',
                'node_type_id': 'i3.4xlarge',
                'num_workers': 5,
                'spark_conf': {
                    'spark.task.maxFailures': '3'
                }
            },
            'libraries': [
                {'pypi': {'package': 'mlflow>=2.0.0'}},
                {'pypi': {'package': 'hyperopt>=0.2.0'}},
                {'pypi': {'package': 'xgboost>=1.6.0'}}
            ]
        },
        depends_on=['combine_features']
    )
    
    # Model validation and deployment
    validate_model = DatabricksTaskOperator(
        task_id='validate_model',
        task_config={
            'notebook_task': {
                'notebook_path': '/ml/validation/model_validator',
                'base_parameters': {
                    'model_path': '/models/recommender/{{ ds }}',
                    'validation_data_path': '/features/validation/{{ ds }}',
                    'performance_threshold': '0.85',
                    'deployment_environment': 'staging'
                }
            },
            'job_cluster_key': 'inference_cluster'
        },
        depends_on=['train_model']
    )
    
    # Deploy to production (conditional on validation success)
    deploy_model = DatabricksTaskOperator(
        task_id='deploy_to_production',
        task_config={
            'notebook_task': {
                'notebook_path': '/ml/deployment/model_deployer',
                'base_parameters': {
                    'model_path': '/models/recommender/{{ ds }}',
                    'deployment_target': 'production',
                    'canary_percentage': '10',
                    'rollback_threshold': '0.80'
                }
            },
            'existing_cluster_id': 'production-deployment-001'
        },
        depends_on=['validate_model']
    )
    
    # Set up dependencies
    [customer_features, product_features, interaction_features] >> combine_features
    combine_features >> train_model >> validate_model >> deploy_model

Workflow with Shared Job Clusters

Define workflows that share cluster resources across multiple tasks:

with DatabricksWorkflowTaskGroup(
    group_id='shared_cluster_workflow',
    databricks_conn_id='databricks_etl',
    extra_job_params={
        'job_clusters': [
            {
                'job_cluster_key': 'etl_cluster',
                'new_cluster': {
                    'spark_version': '12.2.x-scala2.12',
                    'node_type_id': 'i3.xlarge',
                    'num_workers': 8,
                    'autoscale': {
                        'min_workers': 2,
                        'max_workers': 10
                    },
                    'spark_conf': {
                        'spark.sql.adaptive.enabled': 'true',
                        'spark.sql.adaptive.coalescePartitions.enabled': 'true'
                    }
                }
            },
            {
                'job_cluster_key': 'analytics_cluster',
                'new_cluster': {
                    'spark_version': '12.2.x-scala2.12',
                    'node_type_id': 'r5.2xlarge',
                    'num_workers': 4,
                    'spark_conf': {
                        'spark.sql.execution.arrow.pyspark.enabled': 'true'
                    }
                }
            }
        ],
        'libraries': [
            {'pypi': {'package': 'pandas>=1.5.0'}},
            {'pypi': {'package': 'numpy>=1.21.0'}},
            {'maven': {'coordinates': 'org.apache.spark:spark-avro_2.12:3.3.0'}}
        ]
    }
) as shared_workflow:
    
    # ETL tasks using shared ETL cluster
    extract_customers = DatabricksTaskOperator(
        task_id='extract_customers',
        task_config={
            'spark_python_task': {
                'python_file': 'dbfs:/etl/extractors/customer_extractor.py',
                'parameters': ['--date', '{{ ds }}', '--format', 'delta']
            },
            'job_cluster_key': 'etl_cluster'
        }
    )
    
    extract_orders = DatabricksTaskOperator(
        task_id='extract_orders',
        task_config={
            'spark_python_task': {
                'python_file': 'dbfs:/etl/extractors/order_extractor.py',
                'parameters': ['--date', '{{ ds }}', '--include-cancelled', 'false']
            },
            'job_cluster_key': 'etl_cluster'
        }
    )
    
    # Data joining and transformation
    join_data = DatabricksTaskOperator(
        task_id='join_customer_orders',
        task_config={
            'notebook_task': {
                'notebook_path': '/etl/transformers/customer_order_joiner',
                'base_parameters': {
                    'process_date': '{{ ds }}',
                    'join_strategy': 'broadcast_hash',
                    'output_partitions': '100'
                }
            },
            'job_cluster_key': 'etl_cluster'
        },
        depends_on=['extract_customers', 'extract_orders']
    )
    
    # Analytics tasks using dedicated analytics cluster
    customer_analytics = DatabricksTaskOperator(
        task_id='customer_segmentation',
        task_config={
            'notebook_task': {
                'notebook_path': '/analytics/customer_segmentation',
                'base_parameters': {
                    'analysis_date': '{{ ds }}',
                    'segmentation_method': 'kmeans',
                    'num_clusters': '5'
                }
            },
            'job_cluster_key': 'analytics_cluster'
        },
        depends_on=['join_customer_orders']
    )
    
    revenue_analytics = DatabricksTaskOperator(
        task_id='revenue_analysis',
        task_config={
            'sql_task': {
                'query': {
                    'query_id': 'revenue-analysis-query'
                },
                'warehouse_id': 'analytics-warehouse'
            }
        },
        depends_on=['join_customer_orders']
    )
    
    # Define workflow structure
    [extract_customers, extract_orders] >> join_data >> [customer_analytics, revenue_analytics]

Git-Integrated Workflow

Execute workflows using code from Git repositories:

with DatabricksWorkflowTaskGroup(
    group_id='git_integrated_workflow',
    databricks_conn_id='databricks_dev',
    extra_job_params={
        'git_source': {
            'git_url': 'https://github.com/company/data-pipelines.git',
            'git_branch': '{{ params.git_branch | default("main") }}',
            'git_provider': 'gitHub'
        }
    }
) as git_workflow:
    
    # Data validation using Git-stored notebooks
    validate_inputs = DatabricksTaskOperator(
        task_id='validate_input_data',
        task_config={
            'notebook_task': {
                'notebook_path': 'validation/input_validator.py',
                'source': 'GIT',
                'base_parameters': {
                    'validation_date': '{{ ds }}',
                    'strict_mode': 'true'
                }
            },
            'existing_cluster_id': 'validation-cluster-001'
        }
    )
    
    # ETL processing
    process_data = DatabricksTaskOperator(
        task_id='process_data',
        task_config={
            'spark_python_task': {
                'python_file': 'processing/daily_processor.py',
                'source': 'GIT',
                'parameters': [
                    '--config', 'configs/production.yaml',
                    '--date', '{{ ds }}',
                    '--parallel-jobs', '4'
                ]
            },
            'new_cluster': {
                'spark_version': '12.2.x-scala2.12',
                'node_type_id': 'i3.2xlarge',
                'num_workers': 6
            }
        },
        depends_on=['validate_input_data']
    )
    
    # Quality assessment
    assess_quality = DatabricksTaskOperator(
        task_id='assess_data_quality',
        task_config={
            'python_wheel_task': {
                'package_name': 'data_quality_package',
                'entry_point': 'run_quality_checks',
                'parameters': [
                    '--data-path', '/processed/{{ ds }}',
                    '--rules-config', 'quality_rules.json'
                ]
            },
            'existing_cluster_id': 'quality-cluster-001'
        },
        depends_on=['process_data']
    )

    validate_inputs >> process_data >> assess_quality

Advanced Workflow Features

Conditional Task Execution

Implement conditional logic within workflows:

with DatabricksWorkflowTaskGroup(
    group_id='conditional_workflow',
    databricks_conn_id='databricks_conditional'
) as conditional_workflow:
    
    # Check data availability
    check_data = DatabricksTaskOperator(
        task_id='check_data_availability',
        task_config={
            'notebook_task': {
                'notebook_path': '/checks/data_availability_checker',
                'base_parameters': {
                    'check_date': '{{ ds }}',
                    'required_sources': 'sales,marketing,customer_service'
                }
            },
            'existing_cluster_id': 'utility-cluster-001'
        }
    )
    
    # Full processing (when all data available)
    full_processing = DatabricksTaskOperator(
        task_id='full_data_processing',
        task_config={
            'notebook_task': {
                'notebook_path': '/processing/full_pipeline',
                'base_parameters': {
                    'process_date': '{{ ds }}',
                    'processing_mode': 'complete'
                }
            },
            'job_cluster_key': 'processing_cluster'
        },
        depends_on=['check_data_availability']
    )
    
    # Partial processing (when some data missing)
    partial_processing = DatabricksTaskOperator(
        task_id='partial_data_processing',
        task_config={
            'notebook_task': {
                'notebook_path': '/processing/partial_pipeline',
                'base_parameters': {
                    'process_date': '{{ ds }}',
                    'processing_mode': 'available_only'
                }
            },
            'job_cluster_key': 'processing_cluster'
        },
        depends_on=['check_data_availability']
    )
    
    check_data >> [full_processing, partial_processing]

Workflow with Error Handling

Implement comprehensive error handling and recovery:

with DatabricksWorkflowTaskGroup(
    group_id='resilient_workflow',
    databricks_conn_id='databricks_production',
    default_task_retries=2,
    default_task_timeout_seconds=7200
) as resilient_workflow:
    
    # Critical data processing with retry logic
    critical_processing = DatabricksTaskOperator(
        task_id='critical_data_processing',
        task_config={
            'spark_python_task': {
                'python_file': 'dbfs:/critical/data_processor.py',
                'parameters': ['--date', '{{ ds }}', '--retry-mode', 'true']
            },
            'new_cluster': {
                'spark_version': '12.2.x-scala2.12',
                'node_type_id': 'i3.xlarge',
                'num_workers': 4
            }
        },
        retries=3,
        timeout_seconds=3600
    )
    
    # Backup processing (runs if critical processing fails)
    backup_processing = DatabricksTaskOperator(
        task_id='backup_data_processing',
        task_config={
            'notebook_task': {
                'notebook_path': '/backup/alternative_processor',
                'base_parameters': {
                    'process_date': '{{ ds }}',
                    'fallback_mode': 'true'
                }
            },
            'existing_cluster_id': 'backup-cluster-001'
        },
        depends_on=['critical_data_processing']
    )
    
    # Notification task (always runs)
    notify_completion = DatabricksTaskOperator(
        task_id='notify_completion',
        task_config={
            'notebook_task': {
                'notebook_path': '/notifications/workflow_notifier',
                'base_parameters': {
                    'workflow_id': '{{ run_id }}',
                    'completion_date': '{{ ds }}',
                    'status': 'completed'
                }
            },
            'existing_cluster_id': 'utility-cluster-001'
        },
        depends_on=['backup_data_processing']
    )
    
    critical_processing >> backup_processing >> notify_completion

Monitoring and Troubleshooting

Workflow Status Monitoring

Monitor workflow execution with custom status checks:

from airflow.providers.databricks.sensors.databricks import DatabricksSensor

def monitor_workflow_execution(**context):
    """Custom monitoring function for workflow status."""
    workflow_run_id = context['ti'].xcom_pull(task_ids='data_pipeline_workflow', key='run_id')
    
    # Custom monitoring logic
    print(f"Monitoring workflow run: {workflow_run_id}")
    
    return workflow_run_id

# Workflow with monitoring
workflow_monitor = DatabricksSensor(
    task_id='monitor_workflow_completion',
    run_id="{{ task_instance.xcom_pull(task_ids='data_pipeline_workflow', key='run_id') }}",
    databricks_conn_id='databricks_production',
    poke_interval=60,
    timeout=7200,
    deferrable=True
)

data_pipeline >> workflow_monitor

Resource Usage Optimization

Optimize workflow resource allocation:

with DatabricksWorkflowTaskGroup(
    group_id='optimized_workflow',
    databricks_conn_id='databricks_optimized',
    extra_job_params={
        'job_clusters': [
            {
                'job_cluster_key': 'small_tasks',
                'new_cluster': {
                    'spark_version': '12.2.x-scala2.12',
                    'node_type_id': 'i3.large',
                    'autoscale': {'min_workers': 1, 'max_workers': 3}
                }
            },
            {
                'job_cluster_key': 'large_tasks', 
                'new_cluster': {
                    'spark_version': '12.2.x-scala2.12',
                    'node_type_id': 'i3.2xlarge',
                    'autoscale': {'min_workers': 4, 'max_workers': 12}
                }
            }
        ],
        'timeout_seconds': 14400,
        'max_concurrent_runs': 3
    }
) as optimized_workflow:
    
    # Light preprocessing on small cluster
    light_preprocessing = DatabricksTaskOperator(
        task_id='light_preprocessing',
        task_config={
            'notebook_task': {
                'notebook_path': '/preprocessing/light_cleaner'
            },
            'job_cluster_key': 'small_tasks'
        },
        timeout_seconds=1800
    )
    
    # Heavy computation on large cluster
    heavy_computation = DatabricksTaskOperator(
        task_id='heavy_computation',
        task_config={
            'spark_python_task': {
                'python_file': 'dbfs:/compute/heavy_aggregator.py'
            },
            'job_cluster_key': 'large_tasks'
        },
        timeout_seconds=10800,
        depends_on=['light_preprocessing']
    )
    
    light_preprocessing >> heavy_computation

The workflow orchestration capabilities provide powerful tools for creating complex, multi-task pipelines that leverage Databricks' native workflow engine while maintaining full integration with Airflow's scheduling, monitoring, and error handling systems.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-databricks

docs

connections.md

index.md

job-management.md

monitoring.md

repositories.md

sql-operations.md

workflows.md

tile.json