0
# Connection & Authentication
1
2
The Databricks provider offers flexible authentication and connection management through specialized hooks that support multiple authentication methods, connection pooling, and robust error handling for both REST API and SQL operations.
3
4
## Core Hooks
5
6
### DatabricksHook
7
8
Primary hook for Databricks REST API operations with comprehensive authentication support.
9
10
```python { .api }
11
from airflow.providers.databricks.hooks.databricks import DatabricksHook
12
13
class DatabricksHook(BaseDatabricksHook):
14
def __init__(
15
self,
16
databricks_conn_id: str = "databricks_default",
17
timeout_seconds: int | None = None,
18
retry_limit: int = 3,
19
retry_delay: int = 1,
20
retry_args: dict[str, Any] | None = None,
21
caller: str | None = None,
22
**kwargs
23
) -> None:
24
"""
25
Hook for interacting with Databricks REST API.
26
27
Args:
28
databricks_conn_id: Airflow connection ID for Databricks
29
timeout_seconds: Request timeout in seconds
30
retry_limit: Number of retries for failed requests
31
retry_delay: Base delay between retries in seconds
32
retry_args: Additional retry configuration (exponential backoff, etc.)
33
caller: Caller identification for logging and debugging
34
"""
35
36
def submit_run(self, json: dict[str, Any]) -> int:
37
"""
38
Submit a one-time run to Databricks.
39
40
Args:
41
json: Run configuration dictionary
42
43
Returns:
44
Run ID of the submitted job
45
"""
46
47
def run_now(self, json: dict[str, Any]) -> int:
48
"""
49
Trigger an existing Databricks job.
50
51
Args:
52
json: Job trigger configuration
53
54
Returns:
55
Run ID of the triggered job
56
"""
57
58
def get_run_state(self, run_id: int) -> RunState:
59
"""
60
Get current state of a Databricks run.
61
62
Args:
63
run_id: Run ID to check
64
65
Returns:
66
RunState object with current status
67
"""
68
69
def cancel_run(self, run_id: int) -> None:
70
"""
71
Cancel a running Databricks job.
72
73
Args:
74
run_id: Run ID to cancel
75
"""
76
77
def get_run_page_url(self, run_id: int) -> str:
78
"""
79
Get URL for the Databricks run page.
80
81
Args:
82
run_id: Run ID
83
84
Returns:
85
Direct URL to run details page
86
"""
87
```
88
89
### DatabricksSqlHook
90
91
Specialized hook for SQL operations on Databricks SQL endpoints and clusters.
92
93
```python { .api }
94
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
95
96
class DatabricksSqlHook(DbApiHook):
97
def __init__(
98
self,
99
databricks_conn_id: str = "databricks_default",
100
http_path: str | None = None,
101
session_configuration: dict[str, str] | None = None,
102
sql_endpoint_name: str | None = None,
103
http_headers: list[tuple[str, str]] | None = None,
104
catalog: str | None = None,
105
schema: str | None = None,
106
caller: str | None = None,
107
**kwargs
108
) -> None:
109
"""
110
Hook for SQL operations on Databricks SQL endpoints.
111
112
Args:
113
databricks_conn_id: Airflow connection ID for Databricks
114
http_path: HTTP path to SQL endpoint or cluster
115
session_configuration: Session-level Spark configuration
116
sql_endpoint_name: Name of SQL endpoint to use
117
http_headers: Additional HTTP headers for requests
118
catalog: Default catalog for SQL operations
119
schema: Default schema for SQL operations
120
caller: Caller identification for logging
121
"""
122
123
def get_conn(self) -> Connection:
124
"""
125
Get database connection for SQL operations.
126
127
Returns:
128
Database connection object
129
"""
130
131
def run(
132
self,
133
sql: str | list[str],
134
autocommit: bool = False,
135
parameters: dict[str, Any] | None = None,
136
handler: Callable[[Any], Any] | None = None,
137
split_statements: bool = False,
138
return_last: bool = True
139
) -> Any:
140
"""
141
Execute SQL statement(s).
142
143
Args:
144
sql: SQL query or list of queries
145
autocommit: Whether to autocommit transactions
146
parameters: Parameters for parameterized queries
147
handler: Result handler function
148
split_statements: Whether to split multiple statements
149
return_last: Return only last result for multiple queries
150
151
Returns:
152
Query results based on handler or default processing
153
"""
154
155
def get_pandas_df(
156
self,
157
sql: str,
158
parameters: dict[str, Any] | None = None,
159
**kwargs
160
) -> DataFrame:
161
"""
162
Execute SQL query and return results as pandas DataFrame.
163
164
Args:
165
sql: SQL query to execute
166
parameters: Query parameters
167
168
Returns:
169
pandas DataFrame with query results
170
"""
171
```
172
173
## Authentication Methods
174
175
### Personal Access Token Authentication
176
177
The most common authentication method using Databricks personal access tokens:
178
179
```python { .api }
180
# Connection configuration in Airflow
181
# Connection ID: databricks_token_auth
182
# Connection Type: Databricks
183
# Host: https://your-databricks-workspace.cloud.databricks.com
184
# Password: dapi1234567890abcdef (your personal access token)
185
186
from airflow.providers.databricks.hooks.databricks import DatabricksHook
187
188
# Use hook with token authentication
189
hook = DatabricksHook(
190
databricks_conn_id='databricks_token_auth',
191
timeout_seconds=600,
192
retry_limit=3
193
)
194
195
# Submit job using authenticated connection
196
run_id = hook.submit_run({
197
'run_name': 'Token Auth Example',
198
'notebook_task': {
199
'notebook_path': '/Shared/example_notebook'
200
},
201
'existing_cluster_id': 'cluster-001'
202
})
203
```
204
205
### Azure Active Directory (Azure AD) Authentication
206
207
Authenticate using Azure AD for Azure Databricks workspaces:
208
209
```python { .api }
210
# Connection configuration for Azure AD
211
# Connection ID: databricks_azure_ad
212
# Connection Type: Databricks
213
# Host: https://adb-1234567890123456.7.azuredatabricks.net
214
# Extra: {
215
# "azure_tenant_id": "12345678-1234-1234-1234-123456789012",
216
# "azure_client_id": "87654321-4321-4321-4321-210987654321",
217
# "azure_client_secret": "your_client_secret",
218
# "use_azure_cli": false
219
# }
220
221
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
222
223
# SQL hook with Azure AD authentication
224
sql_hook = DatabricksSqlHook(
225
databricks_conn_id='databricks_azure_ad',
226
http_path='/sql/1.0/warehouses/your-warehouse-id',
227
catalog='production',
228
schema='analytics'
229
)
230
231
# Execute query with Azure AD authentication
232
results = sql_hook.get_pandas_df("""
233
SELECT customer_id, SUM(order_amount) as total_spent
234
FROM orders
235
WHERE order_date >= CURRENT_DATE - INTERVAL 30 DAYS
236
GROUP BY customer_id
237
""")
238
```
239
240
### Service Principal Authentication
241
242
Use Azure service principals for programmatic access:
243
244
```python { .api }
245
# Connection configuration for Service Principal
246
# Connection ID: databricks_service_principal
247
# Connection Type: Databricks
248
# Host: https://adb-1234567890123456.7.azuredatabricks.net
249
# Extra: {
250
# "azure_tenant_id": "12345678-1234-1234-1234-123456789012",
251
# "azure_client_id": "service-principal-client-id",
252
# "azure_client_secret": "service-principal-secret"
253
# }
254
255
hook = DatabricksHook(
256
databricks_conn_id='databricks_service_principal',
257
retry_limit=5,
258
retry_delay=2
259
)
260
261
# Create and run job with service principal auth
262
job_config = {
263
'name': 'Service Principal Job',
264
'new_cluster': {
265
'spark_version': '12.2.x-scala2.12',
266
'node_type_id': 'Standard_DS3_v2',
267
'num_workers': 2
268
},
269
'notebook_task': {
270
'notebook_path': '/Production/ETL/daily_pipeline'
271
},
272
'timeout_seconds': 3600
273
}
274
275
job_id = hook.create_job(job_config)
276
run_id = hook.run_now({'job_id': job_id})
277
```
278
279
### AWS IAM Role Authentication
280
281
Authenticate using AWS IAM roles for AWS Databricks workspaces:
282
283
```python { .api }
284
# Connection configuration for AWS IAM
285
# Connection ID: databricks_aws_iam
286
# Connection Type: Databricks
287
# Host: https://dbc-12345678-9012.cloud.databricks.com
288
# Extra: {
289
# "use_aws_iam_role": true,
290
# "aws_region": "us-west-2"
291
# }
292
293
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
294
295
# SQL operations with IAM role authentication
296
iam_hook = DatabricksSqlHook(
297
databricks_conn_id='databricks_aws_iam',
298
http_path='/sql/1.0/warehouses/warehouse-123',
299
session_configuration={
300
'spark.sql.adaptive.enabled': 'true',
301
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
302
}
303
)
304
305
# Execute data loading operation
306
load_result = iam_hook.run("""
307
COPY INTO production.sales_data
308
FROM 's3://data-lake/sales/{{ ds }}/'
309
FILEFORMAT = DELTA
310
COPY_OPTIONS ('mergeSchema' = 'true')
311
""")
312
```
313
314
## Advanced Connection Configuration
315
316
### Connection Pooling and Performance
317
318
Configure connections for high-throughput scenarios:
319
320
```python { .api }
321
# High-performance connection configuration
322
# Extra configuration for optimized connection:
323
# {
324
# "http_timeout_seconds": 300,
325
# "max_connections": 50,
326
# "connection_pool_size": 10,
327
# "retry_config": {
328
# "max_retries": 5,
329
# "exponential_backoff": true,
330
# "base_delay": 1,
331
# "max_delay": 60
332
# }
333
# }
334
335
from airflow.providers.databricks.hooks.databricks import DatabricksHook
336
337
# Hook with optimized retry configuration
338
optimized_hook = DatabricksHook(
339
databricks_conn_id='databricks_high_performance',
340
timeout_seconds=300,
341
retry_limit=5,
342
retry_delay=1,
343
retry_args={
344
'stop_max_attempt_number': 5,
345
'wait_exponential_multiplier': 1000,
346
'wait_exponential_max': 60000
347
}
348
)
349
350
# Batch job submission with optimized connection
351
job_runs = []
352
for job_config in batch_job_configs:
353
run_id = optimized_hook.submit_run(job_config)
354
job_runs.append(run_id)
355
356
print(f"Submitted {len(job_runs)} jobs successfully")
357
```
358
359
### Multi-Environment Connection Management
360
361
Manage connections across different environments:
362
363
```python { .api }
364
from airflow.providers.databricks.hooks.databricks import DatabricksHook
365
from airflow.models import Variable
366
367
def get_environment_hook(environment: str) -> DatabricksHook:
368
"""Get Databricks hook for specific environment."""
369
370
connection_mapping = {
371
'development': 'databricks_dev',
372
'staging': 'databricks_staging',
373
'production': 'databricks_prod'
374
}
375
376
conn_id = connection_mapping.get(environment)
377
if not conn_id:
378
raise ValueError(f"Unknown environment: {environment}")
379
380
# Environment-specific timeout and retry configuration
381
timeout_config = {
382
'development': 1800, # 30 minutes for dev
383
'staging': 3600, # 1 hour for staging
384
'production': 7200 # 2 hours for production
385
}
386
387
return DatabricksHook(
388
databricks_conn_id=conn_id,
389
timeout_seconds=timeout_config[environment],
390
retry_limit=3 if environment == 'production' else 1
391
)
392
393
# Usage in DAG
394
def submit_environment_job(**context):
395
env = context['params'].get('environment', 'development')
396
hook = get_environment_hook(env)
397
398
job_config = {
399
'run_name': f'{env}_data_processing',
400
'notebook_task': {
401
'notebook_path': f'/Repos/{env}/data-pipeline/main_notebook'
402
},
403
'existing_cluster_id': Variable.get(f'{env}_cluster_id')
404
}
405
406
run_id = hook.submit_run(job_config)
407
return run_id
408
```
409
410
### Custom Authentication Headers
411
412
Configure custom headers for specialized authentication:
413
414
```python { .api }
415
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
416
417
# SQL hook with custom authentication headers
418
custom_auth_hook = DatabricksSqlHook(
419
databricks_conn_id='databricks_custom_auth',
420
http_path='/sql/1.0/warehouses/custom-warehouse',
421
http_headers=[
422
('X-Custom-Auth-Token', 'your-custom-token'),
423
('X-Request-Source', 'airflow-pipeline'),
424
('X-Environment', 'production'),
425
('User-Agent', 'Airflow-Databricks-Provider/1.0')
426
],
427
caller='CustomAuthPipeline'
428
)
429
430
# Execute query with custom headers
431
query_results = custom_auth_hook.run("""
432
SELECT
433
table_name,
434
COUNT(*) as row_count,
435
MAX(last_modified) as last_update
436
FROM information_schema.tables
437
WHERE table_schema = 'analytics'
438
GROUP BY table_name
439
""")
440
```
441
442
## Connection Testing and Validation
443
444
### Connection Health Checks
445
446
Implement connection validation and health monitoring:
447
448
```python { .api }
449
from airflow.providers.databricks.hooks.databricks import DatabricksHook
450
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
451
452
def validate_databricks_connection(conn_id: str) -> dict[str, Any]:
453
"""Validate Databricks connection and return health status."""
454
455
health_status = {
456
'connection_id': conn_id,
457
'rest_api_healthy': False,
458
'sql_endpoint_healthy': False,
459
'clusters_accessible': False,
460
'errors': []
461
}
462
463
try:
464
# Test REST API connection
465
rest_hook = DatabricksHook(databricks_conn_id=conn_id)
466
467
# Test cluster list access
468
clusters = rest_hook.list_jobs(limit=1)
469
health_status['rest_api_healthy'] = True
470
health_status['clusters_accessible'] = True
471
472
except Exception as e:
473
health_status['errors'].append(f"REST API error: {str(e)}")
474
475
try:
476
# Test SQL endpoint connection
477
sql_hook = DatabricksSqlHook(databricks_conn_id=conn_id)
478
479
# Test simple query
480
result = sql_hook.run("SELECT 1 as test_connection")
481
if result:
482
health_status['sql_endpoint_healthy'] = True
483
484
except Exception as e:
485
health_status['errors'].append(f"SQL endpoint error: {str(e)}")
486
487
return health_status
488
489
# Use in DAG for connection monitoring
490
def check_connection_health(**context):
491
"""Task to check connection health."""
492
conn_id = context['params'].get('connection_id', 'databricks_default')
493
health = validate_databricks_connection(conn_id)
494
495
if not health['rest_api_healthy']:
496
raise ValueError(f"REST API connection failed for {conn_id}")
497
498
return health
499
500
# Connection health check task
501
health_check = PythonOperator(
502
task_id='check_databricks_health',
503
python_callable=check_connection_health,
504
params={'connection_id': 'databricks_production'}
505
)
506
```
507
508
### Connection Retry and Fallback
509
510
Implement connection fallback strategies:
511
512
```python { .api }
513
def get_reliable_databricks_hook(primary_conn: str, fallback_conn: str) -> DatabricksHook:
514
"""Get Databricks hook with automatic fallback."""
515
516
try:
517
# Try primary connection
518
primary_hook = DatabricksHook(
519
databricks_conn_id=primary_conn,
520
timeout_seconds=30 # Quick timeout for testing
521
)
522
523
# Test connection with simple API call
524
primary_hook.list_jobs(limit=1)
525
print(f"Using primary connection: {primary_conn}")
526
527
# Return hook with normal timeout if successful
528
return DatabricksHook(
529
databricks_conn_id=primary_conn,
530
timeout_seconds=600,
531
retry_limit=3
532
)
533
534
except Exception as e:
535
print(f"Primary connection {primary_conn} failed: {str(e)}")
536
print(f"Falling back to: {fallback_conn}")
537
538
return DatabricksHook(
539
databricks_conn_id=fallback_conn,
540
timeout_seconds=600,
541
retry_limit=5 # More retries for fallback
542
)
543
544
# Usage with fallback
545
def resilient_job_submission(**context):
546
"""Submit job with connection fallback."""
547
hook = get_reliable_databricks_hook(
548
primary_conn='databricks_primary',
549
fallback_conn='databricks_secondary'
550
)
551
552
job_config = {
553
'run_name': 'Resilient Job Submission',
554
'notebook_task': {
555
'notebook_path': '/Shared/resilient_pipeline'
556
},
557
'existing_cluster_id': 'backup-cluster-001'
558
}
559
560
run_id = hook.submit_run(job_config)
561
return run_id
562
```
563
564
### Session Configuration Management
565
566
Manage SQL session configurations for optimal performance:
567
568
```python { .api }
569
def get_optimized_sql_hook(workload_type: str) -> DatabricksSqlHook:
570
"""Get SQL hook optimized for specific workload types."""
571
572
# Workload-specific configurations
573
configs = {
574
'etl': {
575
'spark.sql.adaptive.enabled': 'true',
576
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
577
'spark.sql.adaptive.skewJoin.enabled': 'true',
578
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
579
},
580
'analytics': {
581
'spark.sql.execution.arrow.pyspark.enabled': 'true',
582
'spark.sql.adaptive.enabled': 'true',
583
'spark.sql.optimizer.dynamicPartitionPruning.enabled': 'true'
584
},
585
'ml': {
586
'spark.sql.execution.arrow.maxRecordsPerBatch': '10000',
587
'spark.sql.adaptive.enabled': 'true',
588
'spark.task.maxFailures': '3'
589
}
590
}
591
592
session_config = configs.get(workload_type, {})
593
594
return DatabricksSqlHook(
595
databricks_conn_id='databricks_sql',
596
session_configuration=session_config,
597
caller=f'OptimizedHook-{workload_type}'
598
)
599
600
# ETL workload
601
etl_hook = get_optimized_sql_hook('etl')
602
etl_results = etl_hook.run("""
603
INSERT INTO processed_data
604
SELECT * FROM raw_data
605
WHERE processing_date = CURRENT_DATE
606
""")
607
608
# Analytics workload
609
analytics_hook = get_optimized_sql_hook('analytics')
610
analytics_df = analytics_hook.get_pandas_df("""
611
SELECT customer_segment, AVG(order_value)
612
FROM customer_analytics
613
GROUP BY customer_segment
614
""")
615
```
616
617
The connection and authentication system provides robust, flexible access to Databricks services with comprehensive error handling, multiple authentication methods, and performance optimization capabilities.