or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mdindex.mdjob-management.mdmonitoring.mdrepositories.mdsql-operations.mdworkflows.md

connections.mddocs/

0

# Connection & Authentication

1

2

The Databricks provider offers flexible authentication and connection management through specialized hooks that support multiple authentication methods, connection pooling, and robust error handling for both REST API and SQL operations.

3

4

## Core Hooks

5

6

### DatabricksHook

7

8

Primary hook for Databricks REST API operations with comprehensive authentication support.

9

10

```python { .api }

11

from airflow.providers.databricks.hooks.databricks import DatabricksHook

12

13

class DatabricksHook(BaseDatabricksHook):

14

def __init__(

15

self,

16

databricks_conn_id: str = "databricks_default",

17

timeout_seconds: int | None = None,

18

retry_limit: int = 3,

19

retry_delay: int = 1,

20

retry_args: dict[str, Any] | None = None,

21

caller: str | None = None,

22

**kwargs

23

) -> None:

24

"""

25

Hook for interacting with Databricks REST API.

26

27

Args:

28

databricks_conn_id: Airflow connection ID for Databricks

29

timeout_seconds: Request timeout in seconds

30

retry_limit: Number of retries for failed requests

31

retry_delay: Base delay between retries in seconds

32

retry_args: Additional retry configuration (exponential backoff, etc.)

33

caller: Caller identification for logging and debugging

34

"""

35

36

def submit_run(self, json: dict[str, Any]) -> int:

37

"""

38

Submit a one-time run to Databricks.

39

40

Args:

41

json: Run configuration dictionary

42

43

Returns:

44

Run ID of the submitted job

45

"""

46

47

def run_now(self, json: dict[str, Any]) -> int:

48

"""

49

Trigger an existing Databricks job.

50

51

Args:

52

json: Job trigger configuration

53

54

Returns:

55

Run ID of the triggered job

56

"""

57

58

def get_run_state(self, run_id: int) -> RunState:

59

"""

60

Get current state of a Databricks run.

61

62

Args:

63

run_id: Run ID to check

64

65

Returns:

66

RunState object with current status

67

"""

68

69

def cancel_run(self, run_id: int) -> None:

70

"""

71

Cancel a running Databricks job.

72

73

Args:

74

run_id: Run ID to cancel

75

"""

76

77

def get_run_page_url(self, run_id: int) -> str:

78

"""

79

Get URL for the Databricks run page.

80

81

Args:

82

run_id: Run ID

83

84

Returns:

85

Direct URL to run details page

86

"""

87

```

88

89

### DatabricksSqlHook

90

91

Specialized hook for SQL operations on Databricks SQL endpoints and clusters.

92

93

```python { .api }

94

from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

95

96

class DatabricksSqlHook(DbApiHook):

97

def __init__(

98

self,

99

databricks_conn_id: str = "databricks_default",

100

http_path: str | None = None,

101

session_configuration: dict[str, str] | None = None,

102

sql_endpoint_name: str | None = None,

103

http_headers: list[tuple[str, str]] | None = None,

104

catalog: str | None = None,

105

schema: str | None = None,

106

caller: str | None = None,

107

**kwargs

108

) -> None:

109

"""

110

Hook for SQL operations on Databricks SQL endpoints.

111

112

Args:

113

databricks_conn_id: Airflow connection ID for Databricks

114

http_path: HTTP path to SQL endpoint or cluster

115

session_configuration: Session-level Spark configuration

116

sql_endpoint_name: Name of SQL endpoint to use

117

http_headers: Additional HTTP headers for requests

118

catalog: Default catalog for SQL operations

119

schema: Default schema for SQL operations

120

caller: Caller identification for logging

121

"""

122

123

def get_conn(self) -> Connection:

124

"""

125

Get database connection for SQL operations.

126

127

Returns:

128

Database connection object

129

"""

130

131

def run(

132

self,

133

sql: str | list[str],

134

autocommit: bool = False,

135

parameters: dict[str, Any] | None = None,

136

handler: Callable[[Any], Any] | None = None,

137

split_statements: bool = False,

138

return_last: bool = True

139

) -> Any:

140

"""

141

Execute SQL statement(s).

142

143

Args:

144

sql: SQL query or list of queries

145

autocommit: Whether to autocommit transactions

146

parameters: Parameters for parameterized queries

147

handler: Result handler function

148

split_statements: Whether to split multiple statements

149

return_last: Return only last result for multiple queries

150

151

Returns:

152

Query results based on handler or default processing

153

"""

154

155

def get_pandas_df(

156

self,

157

sql: str,

158

parameters: dict[str, Any] | None = None,

159

**kwargs

160

) -> DataFrame:

161

"""

162

Execute SQL query and return results as pandas DataFrame.

163

164

Args:

165

sql: SQL query to execute

166

parameters: Query parameters

167

168

Returns:

169

pandas DataFrame with query results

170

"""

171

```

172

173

## Authentication Methods

174

175

### Personal Access Token Authentication

176

177

The most common authentication method using Databricks personal access tokens:

178

179

```python { .api }

180

# Connection configuration in Airflow

181

# Connection ID: databricks_token_auth

182

# Connection Type: Databricks

183

# Host: https://your-databricks-workspace.cloud.databricks.com

184

# Password: dapi1234567890abcdef (your personal access token)

185

186

from airflow.providers.databricks.hooks.databricks import DatabricksHook

187

188

# Use hook with token authentication

189

hook = DatabricksHook(

190

databricks_conn_id='databricks_token_auth',

191

timeout_seconds=600,

192

retry_limit=3

193

)

194

195

# Submit job using authenticated connection

196

run_id = hook.submit_run({

197

'run_name': 'Token Auth Example',

198

'notebook_task': {

199

'notebook_path': '/Shared/example_notebook'

200

},

201

'existing_cluster_id': 'cluster-001'

202

})

203

```

204

205

### Azure Active Directory (Azure AD) Authentication

206

207

Authenticate using Azure AD for Azure Databricks workspaces:

208

209

```python { .api }

210

# Connection configuration for Azure AD

211

# Connection ID: databricks_azure_ad

212

# Connection Type: Databricks

213

# Host: https://adb-1234567890123456.7.azuredatabricks.net

214

# Extra: {

215

# "azure_tenant_id": "12345678-1234-1234-1234-123456789012",

216

# "azure_client_id": "87654321-4321-4321-4321-210987654321",

217

# "azure_client_secret": "your_client_secret",

218

# "use_azure_cli": false

219

# }

220

221

from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

222

223

# SQL hook with Azure AD authentication

224

sql_hook = DatabricksSqlHook(

225

databricks_conn_id='databricks_azure_ad',

226

http_path='/sql/1.0/warehouses/your-warehouse-id',

227

catalog='production',

228

schema='analytics'

229

)

230

231

# Execute query with Azure AD authentication

232

results = sql_hook.get_pandas_df("""

233

SELECT customer_id, SUM(order_amount) as total_spent

234

FROM orders

235

WHERE order_date >= CURRENT_DATE - INTERVAL 30 DAYS

236

GROUP BY customer_id

237

""")

238

```

239

240

### Service Principal Authentication

241

242

Use Azure service principals for programmatic access:

243

244

```python { .api }

245

# Connection configuration for Service Principal

246

# Connection ID: databricks_service_principal

247

# Connection Type: Databricks

248

# Host: https://adb-1234567890123456.7.azuredatabricks.net

249

# Extra: {

250

# "azure_tenant_id": "12345678-1234-1234-1234-123456789012",

251

# "azure_client_id": "service-principal-client-id",

252

# "azure_client_secret": "service-principal-secret"

253

# }

254

255

hook = DatabricksHook(

256

databricks_conn_id='databricks_service_principal',

257

retry_limit=5,

258

retry_delay=2

259

)

260

261

# Create and run job with service principal auth

262

job_config = {

263

'name': 'Service Principal Job',

264

'new_cluster': {

265

'spark_version': '12.2.x-scala2.12',

266

'node_type_id': 'Standard_DS3_v2',

267

'num_workers': 2

268

},

269

'notebook_task': {

270

'notebook_path': '/Production/ETL/daily_pipeline'

271

},

272

'timeout_seconds': 3600

273

}

274

275

job_id = hook.create_job(job_config)

276

run_id = hook.run_now({'job_id': job_id})

277

```

278

279

### AWS IAM Role Authentication

280

281

Authenticate using AWS IAM roles for AWS Databricks workspaces:

282

283

```python { .api }

284

# Connection configuration for AWS IAM

285

# Connection ID: databricks_aws_iam

286

# Connection Type: Databricks

287

# Host: https://dbc-12345678-9012.cloud.databricks.com

288

# Extra: {

289

# "use_aws_iam_role": true,

290

# "aws_region": "us-west-2"

291

# }

292

293

from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

294

295

# SQL operations with IAM role authentication

296

iam_hook = DatabricksSqlHook(

297

databricks_conn_id='databricks_aws_iam',

298

http_path='/sql/1.0/warehouses/warehouse-123',

299

session_configuration={

300

'spark.sql.adaptive.enabled': 'true',

301

'spark.sql.adaptive.coalescePartitions.enabled': 'true'

302

}

303

)

304

305

# Execute data loading operation

306

load_result = iam_hook.run("""

307

COPY INTO production.sales_data

308

FROM 's3://data-lake/sales/{{ ds }}/'

309

FILEFORMAT = DELTA

310

COPY_OPTIONS ('mergeSchema' = 'true')

311

""")

312

```

313

314

## Advanced Connection Configuration

315

316

### Connection Pooling and Performance

317

318

Configure connections for high-throughput scenarios:

319

320

```python { .api }

321

# High-performance connection configuration

322

# Extra configuration for optimized connection:

323

# {

324

# "http_timeout_seconds": 300,

325

# "max_connections": 50,

326

# "connection_pool_size": 10,

327

# "retry_config": {

328

# "max_retries": 5,

329

# "exponential_backoff": true,

330

# "base_delay": 1,

331

# "max_delay": 60

332

# }

333

# }

334

335

from airflow.providers.databricks.hooks.databricks import DatabricksHook

336

337

# Hook with optimized retry configuration

338

optimized_hook = DatabricksHook(

339

databricks_conn_id='databricks_high_performance',

340

timeout_seconds=300,

341

retry_limit=5,

342

retry_delay=1,

343

retry_args={

344

'stop_max_attempt_number': 5,

345

'wait_exponential_multiplier': 1000,

346

'wait_exponential_max': 60000

347

}

348

)

349

350

# Batch job submission with optimized connection

351

job_runs = []

352

for job_config in batch_job_configs:

353

run_id = optimized_hook.submit_run(job_config)

354

job_runs.append(run_id)

355

356

print(f"Submitted {len(job_runs)} jobs successfully")

357

```

358

359

### Multi-Environment Connection Management

360

361

Manage connections across different environments:

362

363

```python { .api }

364

from airflow.providers.databricks.hooks.databricks import DatabricksHook

365

from airflow.models import Variable

366

367

def get_environment_hook(environment: str) -> DatabricksHook:

368

"""Get Databricks hook for specific environment."""

369

370

connection_mapping = {

371

'development': 'databricks_dev',

372

'staging': 'databricks_staging',

373

'production': 'databricks_prod'

374

}

375

376

conn_id = connection_mapping.get(environment)

377

if not conn_id:

378

raise ValueError(f"Unknown environment: {environment}")

379

380

# Environment-specific timeout and retry configuration

381

timeout_config = {

382

'development': 1800, # 30 minutes for dev

383

'staging': 3600, # 1 hour for staging

384

'production': 7200 # 2 hours for production

385

}

386

387

return DatabricksHook(

388

databricks_conn_id=conn_id,

389

timeout_seconds=timeout_config[environment],

390

retry_limit=3 if environment == 'production' else 1

391

)

392

393

# Usage in DAG

394

def submit_environment_job(**context):

395

env = context['params'].get('environment', 'development')

396

hook = get_environment_hook(env)

397

398

job_config = {

399

'run_name': f'{env}_data_processing',

400

'notebook_task': {

401

'notebook_path': f'/Repos/{env}/data-pipeline/main_notebook'

402

},

403

'existing_cluster_id': Variable.get(f'{env}_cluster_id')

404

}

405

406

run_id = hook.submit_run(job_config)

407

return run_id

408

```

409

410

### Custom Authentication Headers

411

412

Configure custom headers for specialized authentication:

413

414

```python { .api }

415

from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

416

417

# SQL hook with custom authentication headers

418

custom_auth_hook = DatabricksSqlHook(

419

databricks_conn_id='databricks_custom_auth',

420

http_path='/sql/1.0/warehouses/custom-warehouse',

421

http_headers=[

422

('X-Custom-Auth-Token', 'your-custom-token'),

423

('X-Request-Source', 'airflow-pipeline'),

424

('X-Environment', 'production'),

425

('User-Agent', 'Airflow-Databricks-Provider/1.0')

426

],

427

caller='CustomAuthPipeline'

428

)

429

430

# Execute query with custom headers

431

query_results = custom_auth_hook.run("""

432

SELECT

433

table_name,

434

COUNT(*) as row_count,

435

MAX(last_modified) as last_update

436

FROM information_schema.tables

437

WHERE table_schema = 'analytics'

438

GROUP BY table_name

439

""")

440

```

441

442

## Connection Testing and Validation

443

444

### Connection Health Checks

445

446

Implement connection validation and health monitoring:

447

448

```python { .api }

449

from airflow.providers.databricks.hooks.databricks import DatabricksHook

450

from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

451

452

def validate_databricks_connection(conn_id: str) -> dict[str, Any]:

453

"""Validate Databricks connection and return health status."""

454

455

health_status = {

456

'connection_id': conn_id,

457

'rest_api_healthy': False,

458

'sql_endpoint_healthy': False,

459

'clusters_accessible': False,

460

'errors': []

461

}

462

463

try:

464

# Test REST API connection

465

rest_hook = DatabricksHook(databricks_conn_id=conn_id)

466

467

# Test cluster list access

468

clusters = rest_hook.list_jobs(limit=1)

469

health_status['rest_api_healthy'] = True

470

health_status['clusters_accessible'] = True

471

472

except Exception as e:

473

health_status['errors'].append(f"REST API error: {str(e)}")

474

475

try:

476

# Test SQL endpoint connection

477

sql_hook = DatabricksSqlHook(databricks_conn_id=conn_id)

478

479

# Test simple query

480

result = sql_hook.run("SELECT 1 as test_connection")

481

if result:

482

health_status['sql_endpoint_healthy'] = True

483

484

except Exception as e:

485

health_status['errors'].append(f"SQL endpoint error: {str(e)}")

486

487

return health_status

488

489

# Use in DAG for connection monitoring

490

def check_connection_health(**context):

491

"""Task to check connection health."""

492

conn_id = context['params'].get('connection_id', 'databricks_default')

493

health = validate_databricks_connection(conn_id)

494

495

if not health['rest_api_healthy']:

496

raise ValueError(f"REST API connection failed for {conn_id}")

497

498

return health

499

500

# Connection health check task

501

health_check = PythonOperator(

502

task_id='check_databricks_health',

503

python_callable=check_connection_health,

504

params={'connection_id': 'databricks_production'}

505

)

506

```

507

508

### Connection Retry and Fallback

509

510

Implement connection fallback strategies:

511

512

```python { .api }

513

def get_reliable_databricks_hook(primary_conn: str, fallback_conn: str) -> DatabricksHook:

514

"""Get Databricks hook with automatic fallback."""

515

516

try:

517

# Try primary connection

518

primary_hook = DatabricksHook(

519

databricks_conn_id=primary_conn,

520

timeout_seconds=30 # Quick timeout for testing

521

)

522

523

# Test connection with simple API call

524

primary_hook.list_jobs(limit=1)

525

print(f"Using primary connection: {primary_conn}")

526

527

# Return hook with normal timeout if successful

528

return DatabricksHook(

529

databricks_conn_id=primary_conn,

530

timeout_seconds=600,

531

retry_limit=3

532

)

533

534

except Exception as e:

535

print(f"Primary connection {primary_conn} failed: {str(e)}")

536

print(f"Falling back to: {fallback_conn}")

537

538

return DatabricksHook(

539

databricks_conn_id=fallback_conn,

540

timeout_seconds=600,

541

retry_limit=5 # More retries for fallback

542

)

543

544

# Usage with fallback

545

def resilient_job_submission(**context):

546

"""Submit job with connection fallback."""

547

hook = get_reliable_databricks_hook(

548

primary_conn='databricks_primary',

549

fallback_conn='databricks_secondary'

550

)

551

552

job_config = {

553

'run_name': 'Resilient Job Submission',

554

'notebook_task': {

555

'notebook_path': '/Shared/resilient_pipeline'

556

},

557

'existing_cluster_id': 'backup-cluster-001'

558

}

559

560

run_id = hook.submit_run(job_config)

561

return run_id

562

```

563

564

### Session Configuration Management

565

566

Manage SQL session configurations for optimal performance:

567

568

```python { .api }

569

def get_optimized_sql_hook(workload_type: str) -> DatabricksSqlHook:

570

"""Get SQL hook optimized for specific workload types."""

571

572

# Workload-specific configurations

573

configs = {

574

'etl': {

575

'spark.sql.adaptive.enabled': 'true',

576

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

577

'spark.sql.adaptive.skewJoin.enabled': 'true',

578

'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'

579

},

580

'analytics': {

581

'spark.sql.execution.arrow.pyspark.enabled': 'true',

582

'spark.sql.adaptive.enabled': 'true',

583

'spark.sql.optimizer.dynamicPartitionPruning.enabled': 'true'

584

},

585

'ml': {

586

'spark.sql.execution.arrow.maxRecordsPerBatch': '10000',

587

'spark.sql.adaptive.enabled': 'true',

588

'spark.task.maxFailures': '3'

589

}

590

}

591

592

session_config = configs.get(workload_type, {})

593

594

return DatabricksSqlHook(

595

databricks_conn_id='databricks_sql',

596

session_configuration=session_config,

597

caller=f'OptimizedHook-{workload_type}'

598

)

599

600

# ETL workload

601

etl_hook = get_optimized_sql_hook('etl')

602

etl_results = etl_hook.run("""

603

INSERT INTO processed_data

604

SELECT * FROM raw_data

605

WHERE processing_date = CURRENT_DATE

606

""")

607

608

# Analytics workload

609

analytics_hook = get_optimized_sql_hook('analytics')

610

analytics_df = analytics_hook.get_pandas_df("""

611

SELECT customer_segment, AVG(order_value)

612

FROM customer_analytics

613

GROUP BY customer_segment

614

""")

615

```

616

617

The connection and authentication system provides robust, flexible access to Databricks services with comprehensive error handling, multiple authentication methods, and performance optimization capabilities.