Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Databricks provider offers powerful job management capabilities for executing various types of tasks on Databricks clusters. This includes one-time runs, triggering existing jobs, and specialized notebook execution with comprehensive parameter support.
Submit one-time runs to Databricks with flexible task configurations and cluster management.
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
class DatabricksSubmitRunOperator(BaseOperator):
def __init__(
self,
*,
tasks: list[dict[str, Any]] | None = None,
spark_jar_task: dict[str, Any] | None = None,
notebook_task: dict[str, Any] | None = None,
spark_python_task: dict[str, Any] | None = None,
spark_submit_task: dict[str, Any] | None = None,
pipeline_task: dict[str, Any] | None = None,
python_wheel_task: dict[str, Any] | None = None,
dbt_task: dict[str, Any] | None = None,
sql_task: dict[str, Any] | None = None,
new_cluster: dict[str, Any] | None = None,
existing_cluster_id: str | None = None,
job_clusters: list[dict[str, Any]] | None = None,
libraries: list[dict[str, Any]] | None = None,
run_name: str | None = None,
timeout_seconds: int | None = None,
databricks_conn_id: str = "databricks_default",
polling_period_seconds: int = 30,
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
databricks_retry_args: dict[str, Any] | None = None,
do_xcom_push: bool = True,
idempotency_token: str | None = None,
access_control_list: list[dict[str, Any]] | None = None,
wait_for_termination: bool = True,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
git_source: dict[str, Any] | None = None,
**kwargs
) -> None:
"""
Submit a one-time run to Databricks.
Args:
tasks: List of tasks to execute in the run
spark_jar_task: Configuration for Spark JAR task
notebook_task: Configuration for notebook task
spark_python_task: Configuration for Spark Python task
spark_submit_task: Configuration for Spark submit task
pipeline_task: Configuration for Delta Live Tables pipeline task
python_wheel_task: Configuration for Python wheel task
dbt_task: Configuration for dbt task
sql_task: Configuration for SQL task
new_cluster: New cluster configuration for the run
existing_cluster_id: ID of existing cluster to use
job_clusters: Job cluster configurations
libraries: Libraries to install on the cluster
run_name: Name for the run (defaults to Airflow task name)
timeout_seconds: Maximum time to wait for job completion
databricks_conn_id: Airflow connection ID for Databricks
polling_period_seconds: Seconds between status polls
databricks_retry_limit: Number of retries for API calls
databricks_retry_delay: Seconds between retries
databricks_retry_args: Additional retry configuration
do_xcom_push: Whether to push run metadata to XCom
idempotency_token: Token to ensure idempotent execution
access_control_list: Access control permissions for the run
wait_for_termination: Whether to wait for run completion
deferrable: Whether to use deferrable execution
git_source: Git source configuration for code
"""Trigger existing Databricks jobs with parameter overrides and monitoring.
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
class DatabricksRunNowOperator(BaseOperator):
def __init__(
self,
*,
job_id: int | None = None,
job_name: str | None = None,
notebook_params: dict[str, str] | None = None,
python_params: list[str] | None = None,
spark_submit_params: list[str] | None = None,
jar_params: list[str] | None = None,
sql_params: dict[str, str] | None = None,
dbt_commands: list[str] | None = None,
python_named_params: dict[str, str] | None = None,
pipeline_params: dict[str, str] | None = None,
wait_for_termination: bool = True,
timeout_seconds: int | None = None,
databricks_conn_id: str = "databricks_default",
polling_period_seconds: int = 30,
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
do_xcom_push: bool = True,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs
) -> None:
"""
Trigger an existing Databricks job.
Args:
job_id: Databricks job ID to trigger
job_name: Databricks job name to trigger (alternative to job_id)
notebook_params: Parameters for notebook tasks
python_params: Parameters for Python tasks
spark_submit_params: Parameters for Spark submit tasks
jar_params: Parameters for JAR tasks
sql_params: Parameters for SQL tasks
dbt_commands: Commands for dbt tasks
python_named_params: Named parameters for Python tasks
pipeline_params: Parameters for pipeline tasks
wait_for_termination: Whether to wait for job completion
timeout_seconds: Maximum time to wait for job completion
databricks_conn_id: Airflow connection ID for Databricks
polling_period_seconds: Seconds between status polls
databricks_retry_limit: Number of retries for API calls
databricks_retry_delay: Seconds between retries
do_xcom_push: Whether to push run metadata to XCom
deferrable: Whether to use deferrable execution
"""Execute Databricks notebooks with parameter support and source management.
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
class DatabricksNotebookOperator(BaseOperator):
def __init__(
self,
*,
notebook_path: str,
source: str = "WORKSPACE",
base_parameters: dict[str, str] | None = None,
new_cluster: dict[str, Any] | None = None,
existing_cluster_id: str | None = None,
job_cluster_key: str | None = None,
libraries: list[dict[str, Any]] | None = None,
run_name: str | None = None,
timeout_seconds: int | None = None,
databricks_conn_id: str = "databricks_default",
polling_period_seconds: int = 30,
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
do_xcom_push: bool = True,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
wait_for_termination: bool = True,
**kwargs
) -> None:
"""
Execute a Databricks notebook.
Args:
notebook_path: Path to the notebook in Databricks workspace or repo
source: Source type - "WORKSPACE" or "GIT"
base_parameters: Parameters to pass to the notebook
new_cluster: New cluster configuration for notebook execution
existing_cluster_id: ID of existing cluster to use
job_cluster_key: Key of job cluster to use (for workflow contexts)
libraries: Libraries to install on the cluster
run_name: Name for the notebook run
timeout_seconds: Maximum time to wait for notebook completion
databricks_conn_id: Airflow connection ID for Databricks
polling_period_seconds: Seconds between status polls
databricks_retry_limit: Number of retries for API calls
databricks_retry_delay: Seconds between retries
do_xcom_push: Whether to push run metadata to XCom
deferrable: Whether to use deferrable execution
wait_for_termination: Whether to wait for notebook completion
"""Execute a Python script on a new cluster:
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
spark_job = DatabricksSubmitRunOperator(
task_id='run_data_processing',
spark_python_task={
'python_file': 'dbfs:/mnt/scripts/process_data.py',
'parameters': [
'--input-path', '/data/raw/{{ ds }}',
'--output-path', '/data/processed/{{ ds }}',
'--partition-count', '10'
]
},
new_cluster={
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 5,
'spark_conf': {
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
}
},
libraries=[
{'pypi': {'package': 'pandas==1.5.0'}},
{'pypi': {'package': 'numpy>=1.21.0'}}
],
timeout_seconds=7200,
databricks_conn_id='databricks_production'
)Run a Scala JAR with specific parameters:
jar_job = DatabricksSubmitRunOperator(
task_id='run_scala_etl',
spark_jar_task={
'main_class_name': 'com.company.etl.DataProcessor',
'parameters': [
'--config', 'production.conf',
'--date', '{{ ds }}',
'--batch-size', '1000'
]
},
libraries=[
{'jar': 'dbfs:/mnt/jars/data-processor-1.2.3.jar'},
{'maven': {'coordinates': 'org.apache.kafka:kafka-clients:3.0.0'}}
],
existing_cluster_id='0123-456789-etl001',
run_name='ETL Process {{ ds }}',
idempotency_token='etl_{{ ds }}_{{ dag_run.run_id }}'
)Execute a notebook with dynamic parameters:
notebook_task = DatabricksNotebookOperator(
task_id='run_analysis_notebook',
notebook_path='/Shared/Analytics/Daily Report',
base_parameters={
'report_date': '{{ ds }}',
'customer_segment': 'premium',
'output_format': 'parquet',
'include_charts': 'true'
},
existing_cluster_id='0123-456789-analytics',
libraries=[
{'pypi': {'package': 'matplotlib'}},
{'pypi': {'package': 'seaborn'}}
],
timeout_seconds=1800
)Trigger a pre-configured Databricks job:
trigger_job = DatabricksRunNowOperator(
task_id='trigger_daily_pipeline',
job_id=123456,
notebook_params={
'input_date': '{{ ds }}',
'refresh_mode': 'incremental'
},
python_params=['--verbose', '--config=prod'],
wait_for_termination=True,
timeout_seconds=3600
)Submit a SQL task as part of a job run:
sql_job = DatabricksSubmitRunOperator(
task_id='run_sql_aggregation',
sql_task={
'query': {
'query_id': 'abc123-def456-789' # Reference to saved query
},
'warehouse_id': 'warehouse-xyz789',
'parameters': {
'start_date': '{{ ds }}',
'end_date': '{{ next_ds }}'
}
},
run_name='SQL Aggregation {{ ds }}',
timeout_seconds=1200
)Use deferrable mode for long-running jobs:
long_running_job = DatabricksSubmitRunOperator(
task_id='long_ml_training',
spark_python_task={
'python_file': 'dbfs:/mnt/ml/train_model.py',
'parameters': ['--epochs', '100', '--data-path', '/data/training']
},
new_cluster={
'spark_version': '12.2.x-cpu-ml-scala2.12',
'node_type_id': 'i3.2xlarge',
'num_workers': 8
},
timeout_seconds=14400, # 4 hours
deferrable=True, # Use async execution
polling_period_seconds=60 # Check every minute
)Define reusable cluster configurations within job submissions:
job_with_clusters = DatabricksSubmitRunOperator(
task_id='multi_cluster_job',
tasks=[
{
'task_key': 'extract',
'job_cluster_key': 'extract_cluster',
'spark_python_task': {
'python_file': 'dbfs:/scripts/extract.py'
}
},
{
'task_key': 'transform',
'job_cluster_key': 'transform_cluster',
'depends_on': [{'task_key': 'extract'}],
'spark_python_task': {
'python_file': 'dbfs:/scripts/transform.py'
}
}
],
job_clusters=[
{
'job_cluster_key': 'extract_cluster',
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.large',
'num_workers': 2
}
},
{
'job_cluster_key': 'transform_cluster',
'new_cluster': {
'spark_version': '12.2.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 8
}
}
]
)Set permissions for job runs:
secured_job = DatabricksSubmitRunOperator(
task_id='secured_data_job',
notebook_task={
'notebook_path': '/Secure/Financial Analysis'
},
existing_cluster_id='secure-cluster-001',
access_control_list=[
{
'user_name': 'analyst@company.com',
'permission_level': 'CAN_VIEW'
},
{
'group_name': 'data-engineers',
'permission_level': 'CAN_MANAGE_RUN'
}
]
)Execute code directly from Git repositories:
git_job = DatabricksSubmitRunOperator(
task_id='run_from_git',
notebook_task={
'notebook_path': 'notebooks/data_pipeline.py',
'source': 'GIT'
},
git_source={
'git_url': 'https://github.com/company/data-pipelines.git',
'git_branch': 'main',
'git_provider': 'gitHub'
},
existing_cluster_id='dev-cluster-001'
)Automatically push job metadata to XCom for downstream tasks:
# Job run pushes run_id, job_id, and run_page_url to XCom
job_run = DatabricksSubmitRunOperator(
task_id='data_processing',
spark_python_task={'python_file': 'dbfs:/scripts/process.py'},
existing_cluster_id='cluster-001',
do_xcom_push=True # Default is True
)
# Downstream task can access run information
def get_job_results(**context):
run_id = context['ti'].xcom_pull(task_ids='data_processing', key='run_id')
run_url = context['ti'].xcom_pull(task_ids='data_processing', key='run_page_url')
print(f"Job {run_id} completed. View at: {run_url}")Configure robust error handling:
resilient_job = DatabricksSubmitRunOperator(
task_id='resilient_processing',
spark_python_task={'python_file': 'dbfs:/scripts/process.py'},
existing_cluster_id='cluster-001',
timeout_seconds=3600, # 1 hour job timeout
databricks_retry_limit=5, # Retry API calls 5 times
databricks_retry_delay=30, # Wait 30 seconds between retries
databricks_retry_args={
'stop_max_attempt_number': 3,
'wait_exponential_multiplier': 1000
}
)The job management operators provide comprehensive control over Databricks job execution with support for all major task types, cluster configurations, and monitoring capabilities. They integrate seamlessly with Airflow's templating, XCom, and error handling systems.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-databricks