or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mdindex.mdjob-management.mdmonitoring.mdrepositories.mdsql-operations.mdworkflows.md

sql-operations.mddocs/

0

# SQL Operations

1

2

The Databricks provider offers comprehensive SQL operations for executing queries on Databricks SQL endpoints and clusters. This includes query execution, data loading with COPY INTO, and result management with multiple output formats.

3

4

## Core Operators

5

6

### DatabricksSqlOperator

7

8

Execute SQL queries on Databricks SQL endpoints with flexible output and result handling.

9

10

```python { .api }

11

from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator

12

13

class DatabricksSqlOperator(SQLExecuteQueryOperator):

14

def __init__(

15

self,

16

sql: str | list[str],

17

*,

18

databricks_conn_id: str = "databricks_default",

19

http_path: str | None = None,

20

sql_endpoint_name: str | None = None,

21

session_configuration: dict[str, str] | None = None,

22

http_headers: list[tuple[str, str]] | None = None,

23

catalog: str | None = None,

24

schema: str | None = None,

25

output_path: str | None = None,

26

output_format: str = "csv",

27

csv_params: dict[str, Any] | None = None,

28

client_parameters: dict[str, Any] | None = None,

29

return_last: bool = True,

30

do_xcom_push: bool = True,

31

caller: str = "DatabricksSqlOperator",

32

**kwargs

33

) -> None:

34

"""

35

Execute SQL queries on Databricks SQL endpoints.

36

37

Args:

38

sql: SQL query or list of queries to execute

39

databricks_conn_id: Airflow connection ID for Databricks

40

http_path: HTTP path to SQL endpoint or cluster

41

sql_endpoint_name: Name of SQL endpoint to use

42

session_configuration: Session-level configuration parameters

43

http_headers: Additional HTTP headers for requests

44

catalog: Default catalog for SQL operations

45

schema: Default schema for SQL operations

46

output_path: Path to save query results (supports templating)

47

output_format: Output format - "csv", "json", or "parquet"

48

csv_params: CSV-specific formatting parameters

49

client_parameters: Additional client configuration parameters

50

return_last: Return only the last result set for multiple queries

51

do_xcom_push: Whether to push results to XCom

52

caller: Caller identification for logging and monitoring

53

"""

54

```

55

56

### DatabricksCopyIntoOperator

57

58

Load data into Databricks tables using the COPY INTO command with comprehensive format support.

59

60

```python { .api }

61

from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator

62

63

class DatabricksCopyIntoOperator(BaseOperator):

64

def __init__(

65

self,

66

*,

67

table_name: str,

68

file_location: str,

69

file_format: str | dict[str, Any],

70

files: list[str] | None = None,

71

pattern: str | None = None,

72

expression_list: list[str] | None = None,

73

credential: dict[str, str] | None = None,

74

encryption: dict[str, str] | None = None,

75

format_options: dict[str, Any] | None = None,

76

force_copy: bool = False,

77

validate: str | int | bool | None = None,

78

copy_options: dict[str, Any] | None = None,

79

databricks_conn_id: str = "databricks_default",

80

http_path: str | None = None,

81

sql_endpoint_name: str | None = None,

82

session_configuration: dict[str, str] | None = None,

83

http_headers: list[tuple[str, str]] | None = None,

84

catalog: str | None = None,

85

schema: str | None = None,

86

caller: str = "DatabricksCopyIntoOperator",

87

**kwargs

88

) -> None:

89

"""

90

Load data using Databricks COPY INTO command.

91

92

Args:

93

table_name: Target table name for data loading

94

file_location: Source file location (cloud storage path)

95

file_format: File format specification (string or format options dict)

96

files: Specific files to copy (optional, alternative to pattern)

97

pattern: File pattern to match for copying

98

expression_list: List of expressions for data transformation during copy

99

credential: Credential configuration for accessing source files

100

encryption: Encryption configuration for source files

101

format_options: Format-specific options (delimiter, header, etc.)

102

force_copy: Force copy operation even if files were previously copied

103

validate: Validation mode - "ALL", number of rows, or boolean

104

copy_options: Additional copy operation options

105

databricks_conn_id: Airflow connection ID for Databricks

106

http_path: HTTP path to SQL endpoint or cluster

107

sql_endpoint_name: Name of SQL endpoint to use

108

session_configuration: Session-level configuration parameters

109

http_headers: Additional HTTP headers for requests

110

catalog: Default catalog for SQL operations

111

schema: Default schema for SQL operations

112

caller: Caller identification for logging and monitoring

113

"""

114

```

115

116

## Usage Examples

117

118

### Basic Query Execution

119

120

Execute SQL queries with result export:

121

122

```python

123

from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator

124

125

# Simple query execution

126

daily_report = DatabricksSqlOperator(

127

task_id='generate_daily_report',

128

databricks_conn_id='databricks_sql',

129

sql="""

130

SELECT

131

date_trunc('day', order_timestamp) as order_date,

132

customer_segment,

133

COUNT(*) as order_count,

134

SUM(order_amount) as total_revenue

135

FROM sales.orders

136

WHERE date_trunc('day', order_timestamp) = '{{ ds }}'

137

GROUP BY date_trunc('day', order_timestamp), customer_segment

138

ORDER BY customer_segment

139

""",

140

catalog='sales',

141

schema='reports',

142

output_path='/tmp/daily_report_{{ ds }}.csv',

143

output_format='csv'

144

)

145

```

146

147

### Multi-Statement SQL Execution

148

149

Execute multiple SQL statements in sequence:

150

151

```python

152

data_pipeline = DatabricksSqlOperator(

153

task_id='run_data_pipeline',

154

sql=[

155

"DROP TABLE IF EXISTS staging.temp_customer_metrics",

156

"""

157

CREATE TABLE staging.temp_customer_metrics AS

158

SELECT

159

customer_id,

160

COUNT(DISTINCT order_id) as order_count,

161

SUM(order_amount) as lifetime_value,

162

AVG(order_amount) as avg_order_value,

163

MAX(order_timestamp) as last_order_date

164

FROM raw.orders

165

WHERE order_timestamp >= '{{ macros.ds_add(ds, -30) }}'

166

GROUP BY customer_id

167

""",

168

"""

169

INSERT INTO analytics.customer_metrics

170

SELECT * FROM staging.temp_customer_metrics

171

WHERE order_count >= 2

172

""",

173

"DROP TABLE staging.temp_customer_metrics"

174

],

175

databricks_conn_id='databricks_analytics',

176

http_path='/sql/1.0/warehouses/warehouse123',

177

return_last=False # Don't return results for pipeline operations

178

)

179

```

180

181

### Query with Parameters and Configuration

182

183

Execute parameterized queries with session configuration:

184

185

```python

186

analytical_query = DatabricksSqlOperator(

187

task_id='customer_segmentation',

188

sql="""

189

WITH customer_metrics AS (

190

SELECT

191

customer_id,

192

SUM(order_amount) as total_spent,

193

COUNT(*) as order_frequency,

194

DATEDIFF(CURRENT_DATE(), MAX(order_date)) as days_since_last_order

195

FROM {{ params.source_table }}

196

WHERE order_date >= '{{ params.analysis_start_date }}'

197

GROUP BY customer_id

198

)

199

SELECT

200

CASE

201

WHEN total_spent > 1000 AND order_frequency > 10 THEN 'VIP'

202

WHEN total_spent > 500 AND order_frequency > 5 THEN 'Premium'

203

WHEN days_since_last_order <= 30 THEN 'Active'

204

ELSE 'Standard'

205

END as segment,

206

COUNT(*) as customer_count,

207

AVG(total_spent) as avg_spending

208

FROM customer_metrics

209

GROUP BY segment

210

ORDER BY avg_spending DESC

211

""",

212

params={

213

'source_table': 'sales.orders',

214

'analysis_start_date': '{{ macros.ds_add(ds, -365) }}'

215

},

216

session_configuration={

217

'spark.sql.adaptive.enabled': 'true',

218

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

219

'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'

220

},

221

output_path='/analytics/segments/customer_segmentation_{{ ds }}.parquet',

222

output_format='parquet'

223

)

224

```

225

226

### CSV Data Loading with COPY INTO

227

228

Load CSV data with format specifications:

229

230

```python

231

from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator

232

233

load_customer_data = DatabricksCopyIntoOperator(

234

task_id='load_customer_csv',

235

table_name='raw.customers',

236

file_location='s3://data-lake/customers/{{ ds }}/',

237

file_format='CSV',

238

format_options={

239

'header': 'true',

240

'delimiter': ',',

241

'quote': '"',

242

'escape': '\\',

243

'inferSchema': 'true',

244

'timestampFormat': 'yyyy-MM-dd HH:mm:ss'

245

},

246

pattern='customer_*.csv',

247

copy_options={

248

'mergeSchema': 'true',

249

'force': 'false'

250

},

251

databricks_conn_id='databricks_etl',

252

catalog='raw',

253

schema='ingestion'

254

)

255

```

256

257

### JSON Data Loading

258

259

Load JSON files with nested structure handling:

260

261

```python

262

load_events = DatabricksCopyIntoOperator(

263

task_id='load_event_json',

264

table_name='events.user_actions',

265

file_location='s3://event-streams/user-actions/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/day={{ macros.ds_format(ds, "%Y-%m-%d", "%d") }}/',

266

file_format='JSON',

267

expression_list=[

268

'user_id',

269

'action_type',

270

'timestamp::timestamp as event_timestamp',

271

'properties:device_type::string as device_type',

272

'properties:session_id::string as session_id',

273

'properties:page_url::string as page_url'

274

],

275

format_options={

276

'multiLine': 'false',

277

'timestampFormat': 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'

278

},

279

validate='ALL',

280

databricks_conn_id='databricks_events'

281

)

282

```

283

284

### Parquet Data Loading with Credentials

285

286

Load Parquet files with cloud storage credentials:

287

288

```python

289

load_parquet_data = DatabricksCopyIntoOperator(

290

task_id='load_sales_parquet',

291

table_name='analytics.daily_sales',

292

file_location='abfss://data@storageaccount.dfs.core.windows.net/sales/{{ ds }}/',

293

file_format='PARQUET',

294

credential={

295

'AZURE_SAS_TOKEN': '{{ var.value.azure_sas_token }}'

296

},

297

files=['sales_summary.parquet', 'sales_details.parquet'],

298

force_copy=True,

299

databricks_conn_id='databricks_production',

300

sql_endpoint_name='analytics-endpoint'

301

)

302

```

303

304

## Advanced Features

305

306

### Result Export to Multiple Formats

307

308

Export query results in different formats:

309

310

```python

311

# CSV export with custom formatting

312

csv_export = DatabricksSqlOperator(

313

task_id='export_to_csv',

314

sql="SELECT * FROM analytics.monthly_summary WHERE report_month = '{{ ds }}'",

315

output_path='/reports/monthly_summary_{{ ds }}.csv',

316

output_format='csv',

317

csv_params={

318

'header': True,

319

'delimiter': '|',

320

'quoteAll': True,

321

'timestampFormat': 'yyyy-MM-dd HH:mm:ss'

322

}

323

)

324

325

# JSON export with nested structures

326

json_export = DatabricksSqlOperator(

327

task_id='export_to_json',

328

sql="""

329

SELECT

330

customer_id,

331

struct(

332

first_name,

333

last_name,

334

email

335

) as customer_info,

336

collect_list(

337

struct(order_id, order_date, amount)

338

) as orders

339

FROM customer_orders

340

WHERE order_date >= '{{ ds }}'

341

GROUP BY customer_id, first_name, last_name, email

342

""",

343

output_path='/exports/customers_{{ ds }}.json',

344

output_format='json'

345

)

346

347

# Parquet export for large datasets

348

parquet_export = DatabricksSqlOperator(

349

task_id='export_to_parquet',

350

sql="SELECT * FROM large_dataset WHERE partition_date = '{{ ds }}'",

351

output_path='/data/exports/large_dataset_{{ ds }}.parquet',

352

output_format='parquet'

353

)

354

```

355

356

### Session Configuration and Optimization

357

358

Configure Spark SQL settings for optimal performance:

359

360

```python

361

optimized_query = DatabricksSqlOperator(

362

task_id='optimized_aggregation',

363

sql="""

364

SELECT

365

product_category,

366

DATE_TRUNC('month', sale_date) as sale_month,

367

SUM(quantity * unit_price) as revenue,

368

COUNT(DISTINCT customer_id) as unique_customers

369

FROM sales_fact s

370

JOIN product_dim p ON s.product_id = p.product_id

371

WHERE sale_date >= '{{ macros.ds_add(ds, -90) }}'

372

GROUP BY product_category, DATE_TRUNC('month', sale_date)

373

""",

374

session_configuration={

375

# Enable adaptive query execution

376

'spark.sql.adaptive.enabled': 'true',

377

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

378

'spark.sql.adaptive.skewJoin.enabled': 'true',

379

380

# Optimize for large datasets

381

'spark.sql.adaptive.advisoryPartitionSizeInBytes': '256MB',

382

'spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold': '200MB',

383

384

# Enable column pruning and predicate pushdown

385

'spark.sql.optimizer.nestedSchemaPruning.enabled': 'true',

386

'spark.sql.optimizer.dynamicPartitionPruning.enabled': 'true'

387

},

388

databricks_conn_id='databricks_analytics',

389

catalog='sales',

390

schema='aggregated'

391

)

392

```

393

394

### Data Quality Validation with COPY INTO

395

396

Implement data validation during loading:

397

398

```python

399

validated_load = DatabricksCopyIntoOperator(

400

task_id='validated_data_load',

401

table_name='trusted.customer_transactions',

402

file_location='s3://raw-data/transactions/{{ ds }}/',

403

file_format={

404

'format': 'CSV',

405

'options': {

406

'header': 'true',

407

'delimiter': ',',

408

'quote': '"',

409

'dateFormat': 'yyyy-MM-dd',

410

'timestampFormat': 'yyyy-MM-dd HH:mm:ss'

411

}

412

},

413

expression_list=[

414

'customer_id::long as customer_id',

415

'transaction_date::date as transaction_date',

416

'amount::decimal(10,2) as amount',

417

'transaction_type',

418

'CASE WHEN length(trim(description)) = 0 THEN NULL ELSE description END as description',

419

'_metadata.file_path as source_file'

420

],

421

validate=1000, # Validate first 1000 rows

422

copy_options={

423

'force': 'false',

424

'mergeSchema': 'false'

425

},

426

databricks_conn_id='databricks_etl'

427

)

428

```

429

430

### Custom HTTP Headers and Client Configuration

431

432

Configure custom client settings and authentication headers:

433

434

```python

435

custom_client_query = DatabricksSqlOperator(

436

task_id='query_with_custom_client',

437

sql="SELECT COUNT(*) FROM system.access_logs WHERE log_date = '{{ ds }}'",

438

http_headers=[

439

('X-Custom-Header', 'airflow-pipeline'),

440

('X-Request-ID', '{{ run_id }}'),

441

('X-User-Agent', 'Airflow/{{ var.value.airflow_version }}')

442

],

443

client_parameters={

444

'connect_timeout': 60,

445

'socket_timeout': 300,

446

'_user_agent_entry': 'AirflowPipeline/1.0'

447

},

448

databricks_conn_id='databricks_monitoring'

449

)

450

```

451

452

## Error Handling and Monitoring

453

454

### Query Result Validation

455

456

Validate query results and handle empty results:

457

458

```python

459

def validate_results(**context):

460

"""Custom validation for query results."""

461

ti = context['task_instance']

462

results = ti.xcom_pull(task_ids='daily_metrics_query')

463

464

if not results or len(results) == 0:

465

raise ValueError("No data found for the specified date")

466

467

row_count = len(results)

468

if row_count < 100: # Expected minimum rows

469

raise ValueError(f"Insufficient data: only {row_count} rows found")

470

471

validated_query = DatabricksSqlOperator(

472

task_id='daily_metrics_query',

473

sql="SELECT * FROM metrics.daily_kpis WHERE metric_date = '{{ ds }}'",

474

databricks_conn_id='databricks_metrics'

475

) >> PythonOperator(

476

task_id='validate_query_results',

477

python_callable=validate_results

478

)

479

```

480

481

### Retry Configuration for Resilience

482

483

Configure robust retry mechanisms:

484

485

```python

486

resilient_query = DatabricksSqlOperator(

487

task_id='resilient_analytics',

488

sql="SELECT * FROM complex_analytics_view WHERE process_date = '{{ ds }}'",

489

databricks_conn_id='databricks_analytics',

490

retries=3,

491

retry_delay=timedelta(minutes=5),

492

retry_exponential_backoff=True,

493

max_retry_delay=timedelta(minutes=30)

494

)

495

```

496

497

The SQL operations provide comprehensive capabilities for data processing, loading, and analytics on Databricks SQL endpoints with robust error handling, multiple output formats, and optimization features.