Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Databricks provider offers comprehensive SQL operations for executing queries on Databricks SQL endpoints and clusters. This includes query execution, data loading with COPY INTO, and result management with multiple output formats.
Execute SQL queries on Databricks SQL endpoints with flexible output and result handling.
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
class DatabricksSqlOperator(SQLExecuteQueryOperator):
def __init__(
self,
sql: str | list[str],
*,
databricks_conn_id: str = "databricks_default",
http_path: str | None = None,
sql_endpoint_name: str | None = None,
session_configuration: dict[str, str] | None = None,
http_headers: list[tuple[str, str]] | None = None,
catalog: str | None = None,
schema: str | None = None,
output_path: str | None = None,
output_format: str = "csv",
csv_params: dict[str, Any] | None = None,
client_parameters: dict[str, Any] | None = None,
return_last: bool = True,
do_xcom_push: bool = True,
caller: str = "DatabricksSqlOperator",
**kwargs
) -> None:
"""
Execute SQL queries on Databricks SQL endpoints.
Args:
sql: SQL query or list of queries to execute
databricks_conn_id: Airflow connection ID for Databricks
http_path: HTTP path to SQL endpoint or cluster
sql_endpoint_name: Name of SQL endpoint to use
session_configuration: Session-level configuration parameters
http_headers: Additional HTTP headers for requests
catalog: Default catalog for SQL operations
schema: Default schema for SQL operations
output_path: Path to save query results (supports templating)
output_format: Output format - "csv", "json", or "parquet"
csv_params: CSV-specific formatting parameters
client_parameters: Additional client configuration parameters
return_last: Return only the last result set for multiple queries
do_xcom_push: Whether to push results to XCom
caller: Caller identification for logging and monitoring
"""Load data into Databricks tables using the COPY INTO command with comprehensive format support.
from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator
class DatabricksCopyIntoOperator(BaseOperator):
def __init__(
self,
*,
table_name: str,
file_location: str,
file_format: str | dict[str, Any],
files: list[str] | None = None,
pattern: str | None = None,
expression_list: list[str] | None = None,
credential: dict[str, str] | None = None,
encryption: dict[str, str] | None = None,
format_options: dict[str, Any] | None = None,
force_copy: bool = False,
validate: str | int | bool | None = None,
copy_options: dict[str, Any] | None = None,
databricks_conn_id: str = "databricks_default",
http_path: str | None = None,
sql_endpoint_name: str | None = None,
session_configuration: dict[str, str] | None = None,
http_headers: list[tuple[str, str]] | None = None,
catalog: str | None = None,
schema: str | None = None,
caller: str = "DatabricksCopyIntoOperator",
**kwargs
) -> None:
"""
Load data using Databricks COPY INTO command.
Args:
table_name: Target table name for data loading
file_location: Source file location (cloud storage path)
file_format: File format specification (string or format options dict)
files: Specific files to copy (optional, alternative to pattern)
pattern: File pattern to match for copying
expression_list: List of expressions for data transformation during copy
credential: Credential configuration for accessing source files
encryption: Encryption configuration for source files
format_options: Format-specific options (delimiter, header, etc.)
force_copy: Force copy operation even if files were previously copied
validate: Validation mode - "ALL", number of rows, or boolean
copy_options: Additional copy operation options
databricks_conn_id: Airflow connection ID for Databricks
http_path: HTTP path to SQL endpoint or cluster
sql_endpoint_name: Name of SQL endpoint to use
session_configuration: Session-level configuration parameters
http_headers: Additional HTTP headers for requests
catalog: Default catalog for SQL operations
schema: Default schema for SQL operations
caller: Caller identification for logging and monitoring
"""Execute SQL queries with result export:
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
# Simple query execution
daily_report = DatabricksSqlOperator(
task_id='generate_daily_report',
databricks_conn_id='databricks_sql',
sql="""
SELECT
date_trunc('day', order_timestamp) as order_date,
customer_segment,
COUNT(*) as order_count,
SUM(order_amount) as total_revenue
FROM sales.orders
WHERE date_trunc('day', order_timestamp) = '{{ ds }}'
GROUP BY date_trunc('day', order_timestamp), customer_segment
ORDER BY customer_segment
""",
catalog='sales',
schema='reports',
output_path='/tmp/daily_report_{{ ds }}.csv',
output_format='csv'
)Execute multiple SQL statements in sequence:
data_pipeline = DatabricksSqlOperator(
task_id='run_data_pipeline',
sql=[
"DROP TABLE IF EXISTS staging.temp_customer_metrics",
"""
CREATE TABLE staging.temp_customer_metrics AS
SELECT
customer_id,
COUNT(DISTINCT order_id) as order_count,
SUM(order_amount) as lifetime_value,
AVG(order_amount) as avg_order_value,
MAX(order_timestamp) as last_order_date
FROM raw.orders
WHERE order_timestamp >= '{{ macros.ds_add(ds, -30) }}'
GROUP BY customer_id
""",
"""
INSERT INTO analytics.customer_metrics
SELECT * FROM staging.temp_customer_metrics
WHERE order_count >= 2
""",
"DROP TABLE staging.temp_customer_metrics"
],
databricks_conn_id='databricks_analytics',
http_path='/sql/1.0/warehouses/warehouse123',
return_last=False # Don't return results for pipeline operations
)Execute parameterized queries with session configuration:
analytical_query = DatabricksSqlOperator(
task_id='customer_segmentation',
sql="""
WITH customer_metrics AS (
SELECT
customer_id,
SUM(order_amount) as total_spent,
COUNT(*) as order_frequency,
DATEDIFF(CURRENT_DATE(), MAX(order_date)) as days_since_last_order
FROM {{ params.source_table }}
WHERE order_date >= '{{ params.analysis_start_date }}'
GROUP BY customer_id
)
SELECT
CASE
WHEN total_spent > 1000 AND order_frequency > 10 THEN 'VIP'
WHEN total_spent > 500 AND order_frequency > 5 THEN 'Premium'
WHEN days_since_last_order <= 30 THEN 'Active'
ELSE 'Standard'
END as segment,
COUNT(*) as customer_count,
AVG(total_spent) as avg_spending
FROM customer_metrics
GROUP BY segment
ORDER BY avg_spending DESC
""",
params={
'source_table': 'sales.orders',
'analysis_start_date': '{{ macros.ds_add(ds, -365) }}'
},
session_configuration={
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
},
output_path='/analytics/segments/customer_segmentation_{{ ds }}.parquet',
output_format='parquet'
)Load CSV data with format specifications:
from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator
load_customer_data = DatabricksCopyIntoOperator(
task_id='load_customer_csv',
table_name='raw.customers',
file_location='s3://data-lake/customers/{{ ds }}/',
file_format='CSV',
format_options={
'header': 'true',
'delimiter': ',',
'quote': '"',
'escape': '\\',
'inferSchema': 'true',
'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
},
pattern='customer_*.csv',
copy_options={
'mergeSchema': 'true',
'force': 'false'
},
databricks_conn_id='databricks_etl',
catalog='raw',
schema='ingestion'
)Load JSON files with nested structure handling:
load_events = DatabricksCopyIntoOperator(
task_id='load_event_json',
table_name='events.user_actions',
file_location='s3://event-streams/user-actions/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/day={{ macros.ds_format(ds, "%Y-%m-%d", "%d") }}/',
file_format='JSON',
expression_list=[
'user_id',
'action_type',
'timestamp::timestamp as event_timestamp',
'properties:device_type::string as device_type',
'properties:session_id::string as session_id',
'properties:page_url::string as page_url'
],
format_options={
'multiLine': 'false',
'timestampFormat': 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
},
validate='ALL',
databricks_conn_id='databricks_events'
)Load Parquet files with cloud storage credentials:
load_parquet_data = DatabricksCopyIntoOperator(
task_id='load_sales_parquet',
table_name='analytics.daily_sales',
file_location='abfss://data@storageaccount.dfs.core.windows.net/sales/{{ ds }}/',
file_format='PARQUET',
credential={
'AZURE_SAS_TOKEN': '{{ var.value.azure_sas_token }}'
},
files=['sales_summary.parquet', 'sales_details.parquet'],
force_copy=True,
databricks_conn_id='databricks_production',
sql_endpoint_name='analytics-endpoint'
)Export query results in different formats:
# CSV export with custom formatting
csv_export = DatabricksSqlOperator(
task_id='export_to_csv',
sql="SELECT * FROM analytics.monthly_summary WHERE report_month = '{{ ds }}'",
output_path='/reports/monthly_summary_{{ ds }}.csv',
output_format='csv',
csv_params={
'header': True,
'delimiter': '|',
'quoteAll': True,
'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
}
)
# JSON export with nested structures
json_export = DatabricksSqlOperator(
task_id='export_to_json',
sql="""
SELECT
customer_id,
struct(
first_name,
last_name,
email
) as customer_info,
collect_list(
struct(order_id, order_date, amount)
) as orders
FROM customer_orders
WHERE order_date >= '{{ ds }}'
GROUP BY customer_id, first_name, last_name, email
""",
output_path='/exports/customers_{{ ds }}.json',
output_format='json'
)
# Parquet export for large datasets
parquet_export = DatabricksSqlOperator(
task_id='export_to_parquet',
sql="SELECT * FROM large_dataset WHERE partition_date = '{{ ds }}'",
output_path='/data/exports/large_dataset_{{ ds }}.parquet',
output_format='parquet'
)Configure Spark SQL settings for optimal performance:
optimized_query = DatabricksSqlOperator(
task_id='optimized_aggregation',
sql="""
SELECT
product_category,
DATE_TRUNC('month', sale_date) as sale_month,
SUM(quantity * unit_price) as revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales_fact s
JOIN product_dim p ON s.product_id = p.product_id
WHERE sale_date >= '{{ macros.ds_add(ds, -90) }}'
GROUP BY product_category, DATE_TRUNC('month', sale_date)
""",
session_configuration={
# Enable adaptive query execution
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
'spark.sql.adaptive.skewJoin.enabled': 'true',
# Optimize for large datasets
'spark.sql.adaptive.advisoryPartitionSizeInBytes': '256MB',
'spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold': '200MB',
# Enable column pruning and predicate pushdown
'spark.sql.optimizer.nestedSchemaPruning.enabled': 'true',
'spark.sql.optimizer.dynamicPartitionPruning.enabled': 'true'
},
databricks_conn_id='databricks_analytics',
catalog='sales',
schema='aggregated'
)Implement data validation during loading:
validated_load = DatabricksCopyIntoOperator(
task_id='validated_data_load',
table_name='trusted.customer_transactions',
file_location='s3://raw-data/transactions/{{ ds }}/',
file_format={
'format': 'CSV',
'options': {
'header': 'true',
'delimiter': ',',
'quote': '"',
'dateFormat': 'yyyy-MM-dd',
'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
}
},
expression_list=[
'customer_id::long as customer_id',
'transaction_date::date as transaction_date',
'amount::decimal(10,2) as amount',
'transaction_type',
'CASE WHEN length(trim(description)) = 0 THEN NULL ELSE description END as description',
'_metadata.file_path as source_file'
],
validate=1000, # Validate first 1000 rows
copy_options={
'force': 'false',
'mergeSchema': 'false'
},
databricks_conn_id='databricks_etl'
)Configure custom client settings and authentication headers:
custom_client_query = DatabricksSqlOperator(
task_id='query_with_custom_client',
sql="SELECT COUNT(*) FROM system.access_logs WHERE log_date = '{{ ds }}'",
http_headers=[
('X-Custom-Header', 'airflow-pipeline'),
('X-Request-ID', '{{ run_id }}'),
('X-User-Agent', 'Airflow/{{ var.value.airflow_version }}')
],
client_parameters={
'connect_timeout': 60,
'socket_timeout': 300,
'_user_agent_entry': 'AirflowPipeline/1.0'
},
databricks_conn_id='databricks_monitoring'
)Validate query results and handle empty results:
def validate_results(**context):
"""Custom validation for query results."""
ti = context['task_instance']
results = ti.xcom_pull(task_ids='daily_metrics_query')
if not results or len(results) == 0:
raise ValueError("No data found for the specified date")
row_count = len(results)
if row_count < 100: # Expected minimum rows
raise ValueError(f"Insufficient data: only {row_count} rows found")
validated_query = DatabricksSqlOperator(
task_id='daily_metrics_query',
sql="SELECT * FROM metrics.daily_kpis WHERE metric_date = '{{ ds }}'",
databricks_conn_id='databricks_metrics'
) >> PythonOperator(
task_id='validate_query_results',
python_callable=validate_results
)Configure robust retry mechanisms:
resilient_query = DatabricksSqlOperator(
task_id='resilient_analytics',
sql="SELECT * FROM complex_analytics_view WHERE process_date = '{{ ds }}'",
databricks_conn_id='databricks_analytics',
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30)
)The SQL operations provide comprehensive capabilities for data processing, loading, and analytics on Databricks SQL endpoints with robust error handling, multiple output formats, and optimization features.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-databricks