0
# SQL Operations
1
2
The Databricks provider offers comprehensive SQL operations for executing queries on Databricks SQL endpoints and clusters. This includes query execution, data loading with COPY INTO, and result management with multiple output formats.
3
4
## Core Operators
5
6
### DatabricksSqlOperator
7
8
Execute SQL queries on Databricks SQL endpoints with flexible output and result handling.
9
10
```python { .api }
11
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
12
13
class DatabricksSqlOperator(SQLExecuteQueryOperator):
14
def __init__(
15
self,
16
sql: str | list[str],
17
*,
18
databricks_conn_id: str = "databricks_default",
19
http_path: str | None = None,
20
sql_endpoint_name: str | None = None,
21
session_configuration: dict[str, str] | None = None,
22
http_headers: list[tuple[str, str]] | None = None,
23
catalog: str | None = None,
24
schema: str | None = None,
25
output_path: str | None = None,
26
output_format: str = "csv",
27
csv_params: dict[str, Any] | None = None,
28
client_parameters: dict[str, Any] | None = None,
29
return_last: bool = True,
30
do_xcom_push: bool = True,
31
caller: str = "DatabricksSqlOperator",
32
**kwargs
33
) -> None:
34
"""
35
Execute SQL queries on Databricks SQL endpoints.
36
37
Args:
38
sql: SQL query or list of queries to execute
39
databricks_conn_id: Airflow connection ID for Databricks
40
http_path: HTTP path to SQL endpoint or cluster
41
sql_endpoint_name: Name of SQL endpoint to use
42
session_configuration: Session-level configuration parameters
43
http_headers: Additional HTTP headers for requests
44
catalog: Default catalog for SQL operations
45
schema: Default schema for SQL operations
46
output_path: Path to save query results (supports templating)
47
output_format: Output format - "csv", "json", or "parquet"
48
csv_params: CSV-specific formatting parameters
49
client_parameters: Additional client configuration parameters
50
return_last: Return only the last result set for multiple queries
51
do_xcom_push: Whether to push results to XCom
52
caller: Caller identification for logging and monitoring
53
"""
54
```
55
56
### DatabricksCopyIntoOperator
57
58
Load data into Databricks tables using the COPY INTO command with comprehensive format support.
59
60
```python { .api }
61
from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator
62
63
class DatabricksCopyIntoOperator(BaseOperator):
64
def __init__(
65
self,
66
*,
67
table_name: str,
68
file_location: str,
69
file_format: str | dict[str, Any],
70
files: list[str] | None = None,
71
pattern: str | None = None,
72
expression_list: list[str] | None = None,
73
credential: dict[str, str] | None = None,
74
encryption: dict[str, str] | None = None,
75
format_options: dict[str, Any] | None = None,
76
force_copy: bool = False,
77
validate: str | int | bool | None = None,
78
copy_options: dict[str, Any] | None = None,
79
databricks_conn_id: str = "databricks_default",
80
http_path: str | None = None,
81
sql_endpoint_name: str | None = None,
82
session_configuration: dict[str, str] | None = None,
83
http_headers: list[tuple[str, str]] | None = None,
84
catalog: str | None = None,
85
schema: str | None = None,
86
caller: str = "DatabricksCopyIntoOperator",
87
**kwargs
88
) -> None:
89
"""
90
Load data using Databricks COPY INTO command.
91
92
Args:
93
table_name: Target table name for data loading
94
file_location: Source file location (cloud storage path)
95
file_format: File format specification (string or format options dict)
96
files: Specific files to copy (optional, alternative to pattern)
97
pattern: File pattern to match for copying
98
expression_list: List of expressions for data transformation during copy
99
credential: Credential configuration for accessing source files
100
encryption: Encryption configuration for source files
101
format_options: Format-specific options (delimiter, header, etc.)
102
force_copy: Force copy operation even if files were previously copied
103
validate: Validation mode - "ALL", number of rows, or boolean
104
copy_options: Additional copy operation options
105
databricks_conn_id: Airflow connection ID for Databricks
106
http_path: HTTP path to SQL endpoint or cluster
107
sql_endpoint_name: Name of SQL endpoint to use
108
session_configuration: Session-level configuration parameters
109
http_headers: Additional HTTP headers for requests
110
catalog: Default catalog for SQL operations
111
schema: Default schema for SQL operations
112
caller: Caller identification for logging and monitoring
113
"""
114
```
115
116
## Usage Examples
117
118
### Basic Query Execution
119
120
Execute SQL queries with result export:
121
122
```python
123
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
124
125
# Simple query execution
126
daily_report = DatabricksSqlOperator(
127
task_id='generate_daily_report',
128
databricks_conn_id='databricks_sql',
129
sql="""
130
SELECT
131
date_trunc('day', order_timestamp) as order_date,
132
customer_segment,
133
COUNT(*) as order_count,
134
SUM(order_amount) as total_revenue
135
FROM sales.orders
136
WHERE date_trunc('day', order_timestamp) = '{{ ds }}'
137
GROUP BY date_trunc('day', order_timestamp), customer_segment
138
ORDER BY customer_segment
139
""",
140
catalog='sales',
141
schema='reports',
142
output_path='/tmp/daily_report_{{ ds }}.csv',
143
output_format='csv'
144
)
145
```
146
147
### Multi-Statement SQL Execution
148
149
Execute multiple SQL statements in sequence:
150
151
```python
152
data_pipeline = DatabricksSqlOperator(
153
task_id='run_data_pipeline',
154
sql=[
155
"DROP TABLE IF EXISTS staging.temp_customer_metrics",
156
"""
157
CREATE TABLE staging.temp_customer_metrics AS
158
SELECT
159
customer_id,
160
COUNT(DISTINCT order_id) as order_count,
161
SUM(order_amount) as lifetime_value,
162
AVG(order_amount) as avg_order_value,
163
MAX(order_timestamp) as last_order_date
164
FROM raw.orders
165
WHERE order_timestamp >= '{{ macros.ds_add(ds, -30) }}'
166
GROUP BY customer_id
167
""",
168
"""
169
INSERT INTO analytics.customer_metrics
170
SELECT * FROM staging.temp_customer_metrics
171
WHERE order_count >= 2
172
""",
173
"DROP TABLE staging.temp_customer_metrics"
174
],
175
databricks_conn_id='databricks_analytics',
176
http_path='/sql/1.0/warehouses/warehouse123',
177
return_last=False # Don't return results for pipeline operations
178
)
179
```
180
181
### Query with Parameters and Configuration
182
183
Execute parameterized queries with session configuration:
184
185
```python
186
analytical_query = DatabricksSqlOperator(
187
task_id='customer_segmentation',
188
sql="""
189
WITH customer_metrics AS (
190
SELECT
191
customer_id,
192
SUM(order_amount) as total_spent,
193
COUNT(*) as order_frequency,
194
DATEDIFF(CURRENT_DATE(), MAX(order_date)) as days_since_last_order
195
FROM {{ params.source_table }}
196
WHERE order_date >= '{{ params.analysis_start_date }}'
197
GROUP BY customer_id
198
)
199
SELECT
200
CASE
201
WHEN total_spent > 1000 AND order_frequency > 10 THEN 'VIP'
202
WHEN total_spent > 500 AND order_frequency > 5 THEN 'Premium'
203
WHEN days_since_last_order <= 30 THEN 'Active'
204
ELSE 'Standard'
205
END as segment,
206
COUNT(*) as customer_count,
207
AVG(total_spent) as avg_spending
208
FROM customer_metrics
209
GROUP BY segment
210
ORDER BY avg_spending DESC
211
""",
212
params={
213
'source_table': 'sales.orders',
214
'analysis_start_date': '{{ macros.ds_add(ds, -365) }}'
215
},
216
session_configuration={
217
'spark.sql.adaptive.enabled': 'true',
218
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
219
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
220
},
221
output_path='/analytics/segments/customer_segmentation_{{ ds }}.parquet',
222
output_format='parquet'
223
)
224
```
225
226
### CSV Data Loading with COPY INTO
227
228
Load CSV data with format specifications:
229
230
```python
231
from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator
232
233
load_customer_data = DatabricksCopyIntoOperator(
234
task_id='load_customer_csv',
235
table_name='raw.customers',
236
file_location='s3://data-lake/customers/{{ ds }}/',
237
file_format='CSV',
238
format_options={
239
'header': 'true',
240
'delimiter': ',',
241
'quote': '"',
242
'escape': '\\',
243
'inferSchema': 'true',
244
'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
245
},
246
pattern='customer_*.csv',
247
copy_options={
248
'mergeSchema': 'true',
249
'force': 'false'
250
},
251
databricks_conn_id='databricks_etl',
252
catalog='raw',
253
schema='ingestion'
254
)
255
```
256
257
### JSON Data Loading
258
259
Load JSON files with nested structure handling:
260
261
```python
262
load_events = DatabricksCopyIntoOperator(
263
task_id='load_event_json',
264
table_name='events.user_actions',
265
file_location='s3://event-streams/user-actions/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/day={{ macros.ds_format(ds, "%Y-%m-%d", "%d") }}/',
266
file_format='JSON',
267
expression_list=[
268
'user_id',
269
'action_type',
270
'timestamp::timestamp as event_timestamp',
271
'properties:device_type::string as device_type',
272
'properties:session_id::string as session_id',
273
'properties:page_url::string as page_url'
274
],
275
format_options={
276
'multiLine': 'false',
277
'timestampFormat': 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
278
},
279
validate='ALL',
280
databricks_conn_id='databricks_events'
281
)
282
```
283
284
### Parquet Data Loading with Credentials
285
286
Load Parquet files with cloud storage credentials:
287
288
```python
289
load_parquet_data = DatabricksCopyIntoOperator(
290
task_id='load_sales_parquet',
291
table_name='analytics.daily_sales',
292
file_location='abfss://data@storageaccount.dfs.core.windows.net/sales/{{ ds }}/',
293
file_format='PARQUET',
294
credential={
295
'AZURE_SAS_TOKEN': '{{ var.value.azure_sas_token }}'
296
},
297
files=['sales_summary.parquet', 'sales_details.parquet'],
298
force_copy=True,
299
databricks_conn_id='databricks_production',
300
sql_endpoint_name='analytics-endpoint'
301
)
302
```
303
304
## Advanced Features
305
306
### Result Export to Multiple Formats
307
308
Export query results in different formats:
309
310
```python
311
# CSV export with custom formatting
312
csv_export = DatabricksSqlOperator(
313
task_id='export_to_csv',
314
sql="SELECT * FROM analytics.monthly_summary WHERE report_month = '{{ ds }}'",
315
output_path='/reports/monthly_summary_{{ ds }}.csv',
316
output_format='csv',
317
csv_params={
318
'header': True,
319
'delimiter': '|',
320
'quoteAll': True,
321
'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
322
}
323
)
324
325
# JSON export with nested structures
326
json_export = DatabricksSqlOperator(
327
task_id='export_to_json',
328
sql="""
329
SELECT
330
customer_id,
331
struct(
332
first_name,
333
last_name,
334
335
) as customer_info,
336
collect_list(
337
struct(order_id, order_date, amount)
338
) as orders
339
FROM customer_orders
340
WHERE order_date >= '{{ ds }}'
341
GROUP BY customer_id, first_name, last_name, email
342
""",
343
output_path='/exports/customers_{{ ds }}.json',
344
output_format='json'
345
)
346
347
# Parquet export for large datasets
348
parquet_export = DatabricksSqlOperator(
349
task_id='export_to_parquet',
350
sql="SELECT * FROM large_dataset WHERE partition_date = '{{ ds }}'",
351
output_path='/data/exports/large_dataset_{{ ds }}.parquet',
352
output_format='parquet'
353
)
354
```
355
356
### Session Configuration and Optimization
357
358
Configure Spark SQL settings for optimal performance:
359
360
```python
361
optimized_query = DatabricksSqlOperator(
362
task_id='optimized_aggregation',
363
sql="""
364
SELECT
365
product_category,
366
DATE_TRUNC('month', sale_date) as sale_month,
367
SUM(quantity * unit_price) as revenue,
368
COUNT(DISTINCT customer_id) as unique_customers
369
FROM sales_fact s
370
JOIN product_dim p ON s.product_id = p.product_id
371
WHERE sale_date >= '{{ macros.ds_add(ds, -90) }}'
372
GROUP BY product_category, DATE_TRUNC('month', sale_date)
373
""",
374
session_configuration={
375
# Enable adaptive query execution
376
'spark.sql.adaptive.enabled': 'true',
377
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
378
'spark.sql.adaptive.skewJoin.enabled': 'true',
379
380
# Optimize for large datasets
381
'spark.sql.adaptive.advisoryPartitionSizeInBytes': '256MB',
382
'spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold': '200MB',
383
384
# Enable column pruning and predicate pushdown
385
'spark.sql.optimizer.nestedSchemaPruning.enabled': 'true',
386
'spark.sql.optimizer.dynamicPartitionPruning.enabled': 'true'
387
},
388
databricks_conn_id='databricks_analytics',
389
catalog='sales',
390
schema='aggregated'
391
)
392
```
393
394
### Data Quality Validation with COPY INTO
395
396
Implement data validation during loading:
397
398
```python
399
validated_load = DatabricksCopyIntoOperator(
400
task_id='validated_data_load',
401
table_name='trusted.customer_transactions',
402
file_location='s3://raw-data/transactions/{{ ds }}/',
403
file_format={
404
'format': 'CSV',
405
'options': {
406
'header': 'true',
407
'delimiter': ',',
408
'quote': '"',
409
'dateFormat': 'yyyy-MM-dd',
410
'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
411
}
412
},
413
expression_list=[
414
'customer_id::long as customer_id',
415
'transaction_date::date as transaction_date',
416
'amount::decimal(10,2) as amount',
417
'transaction_type',
418
'CASE WHEN length(trim(description)) = 0 THEN NULL ELSE description END as description',
419
'_metadata.file_path as source_file'
420
],
421
validate=1000, # Validate first 1000 rows
422
copy_options={
423
'force': 'false',
424
'mergeSchema': 'false'
425
},
426
databricks_conn_id='databricks_etl'
427
)
428
```
429
430
### Custom HTTP Headers and Client Configuration
431
432
Configure custom client settings and authentication headers:
433
434
```python
435
custom_client_query = DatabricksSqlOperator(
436
task_id='query_with_custom_client',
437
sql="SELECT COUNT(*) FROM system.access_logs WHERE log_date = '{{ ds }}'",
438
http_headers=[
439
('X-Custom-Header', 'airflow-pipeline'),
440
('X-Request-ID', '{{ run_id }}'),
441
('X-User-Agent', 'Airflow/{{ var.value.airflow_version }}')
442
],
443
client_parameters={
444
'connect_timeout': 60,
445
'socket_timeout': 300,
446
'_user_agent_entry': 'AirflowPipeline/1.0'
447
},
448
databricks_conn_id='databricks_monitoring'
449
)
450
```
451
452
## Error Handling and Monitoring
453
454
### Query Result Validation
455
456
Validate query results and handle empty results:
457
458
```python
459
def validate_results(**context):
460
"""Custom validation for query results."""
461
ti = context['task_instance']
462
results = ti.xcom_pull(task_ids='daily_metrics_query')
463
464
if not results or len(results) == 0:
465
raise ValueError("No data found for the specified date")
466
467
row_count = len(results)
468
if row_count < 100: # Expected minimum rows
469
raise ValueError(f"Insufficient data: only {row_count} rows found")
470
471
validated_query = DatabricksSqlOperator(
472
task_id='daily_metrics_query',
473
sql="SELECT * FROM metrics.daily_kpis WHERE metric_date = '{{ ds }}'",
474
databricks_conn_id='databricks_metrics'
475
) >> PythonOperator(
476
task_id='validate_query_results',
477
python_callable=validate_results
478
)
479
```
480
481
### Retry Configuration for Resilience
482
483
Configure robust retry mechanisms:
484
485
```python
486
resilient_query = DatabricksSqlOperator(
487
task_id='resilient_analytics',
488
sql="SELECT * FROM complex_analytics_view WHERE process_date = '{{ ds }}'",
489
databricks_conn_id='databricks_analytics',
490
retries=3,
491
retry_delay=timedelta(minutes=5),
492
retry_exponential_backoff=True,
493
max_retry_delay=timedelta(minutes=30)
494
)
495
```
496
497
The SQL operations provide comprehensive capabilities for data processing, loading, and analytics on Databricks SQL endpoints with robust error handling, multiple output formats, and optimization features.