or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mdindex.mdjob-management.mdmonitoring.mdrepositories.mdsql-operations.mdworkflows.md

monitoring.mddocs/

0

# Monitoring & Sensing

1

2

The Databricks provider offers comprehensive monitoring and sensing capabilities through sensors and triggers that can monitor job completion, data availability, SQL query results, and system status with support for both synchronous and asynchronous (deferrable) execution patterns.

3

4

## Core Sensors

5

6

### DatabricksSensor

7

8

Monitor Databricks job run completion and status with configurable polling and error handling.

9

10

```python { .api }

11

from airflow.providers.databricks.sensors.databricks import DatabricksSensor

12

13

class DatabricksSensor(BaseSensorOperator):

14

def __init__(

15

self,

16

run_id: int | str,

17

*,

18

databricks_conn_id: str = "databricks_default",

19

polling_period_seconds: int = 30,

20

databricks_retry_limit: int = 3,

21

databricks_retry_delay: int = 1,

22

databricks_retry_args: dict[str, Any] | None = None,

23

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

24

**kwargs

25

) -> None:

26

"""

27

Sensor for monitoring Databricks job run completion.

28

29

Args:

30

run_id: Databricks run ID to monitor (supports templating)

31

databricks_conn_id: Airflow connection ID for Databricks

32

polling_period_seconds: Seconds between status checks

33

databricks_retry_limit: Number of retries for API calls

34

databricks_retry_delay: Seconds between API call retries

35

databricks_retry_args: Additional retry configuration

36

deferrable: Whether to use deferrable (async) execution

37

"""

38

```

39

40

### DatabricksSqlSensor

41

42

Monitor SQL query results and data conditions on Databricks SQL endpoints.

43

44

```python { .api }

45

from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor

46

47

class DatabricksSqlSensor(BaseSensorOperator):

48

def __init__(

49

self,

50

sql: str,

51

*,

52

databricks_conn_id: str = "databricks_default",

53

http_path: str | None = None,

54

sql_endpoint_name: str | None = None,

55

session_configuration: dict[str, str] | None = None,

56

http_headers: list[tuple[str, str]] | None = None,

57

catalog: str | None = None,

58

schema: str | None = None,

59

**kwargs

60

) -> None:

61

"""

62

Sensor for monitoring SQL query results on Databricks.

63

64

Args:

65

sql: SQL query to execute for monitoring (supports templating)

66

databricks_conn_id: Airflow connection ID for Databricks

67

http_path: HTTP path to SQL endpoint or cluster

68

sql_endpoint_name: Name of SQL endpoint to use

69

session_configuration: Session-level configuration parameters

70

http_headers: Additional HTTP headers for requests

71

catalog: Default catalog for SQL operations

72

schema: Default schema for SQL operations

73

"""

74

```

75

76

### DatabricksPartitionSensor

77

78

Monitor table partition availability for data pipeline orchestration.

79

80

```python { .api }

81

from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor

82

83

class DatabricksPartitionSensor(BaseSensorOperator):

84

def __init__(

85

self,

86

table_name: str,

87

partitions: dict[str, str] | list[dict[str, str]],

88

*,

89

databricks_conn_id: str = "databricks_default",

90

http_path: str | None = None,

91

sql_endpoint_name: str | None = None,

92

catalog: str | None = None,

93

schema: str | None = None,

94

**kwargs

95

) -> None:

96

"""

97

Sensor for monitoring table partition availability.

98

99

Args:

100

table_name: Name of table to monitor (supports templating)

101

partitions: Partition specifications to check for availability

102

databricks_conn_id: Airflow connection ID for Databricks

103

http_path: HTTP path to SQL endpoint or cluster

104

sql_endpoint_name: Name of SQL endpoint to use

105

catalog: Catalog containing the table

106

schema: Schema containing the table

107

"""

108

```

109

110

## Core Triggers

111

112

### DatabricksTrigger

113

114

Asynchronous trigger for deferrable job monitoring with efficient resource usage.

115

116

```python { .api }

117

from airflow.providers.databricks.triggers.databricks import DatabricksTrigger

118

119

class DatabricksTrigger(BaseTrigger):

120

def __init__(

121

self,

122

run_id: int,

123

databricks_conn_id: str = "databricks_default",

124

polling_period_seconds: int = 30,

125

databricks_retry_limit: int = 3,

126

databricks_retry_delay: int = 1,

127

**kwargs

128

) -> None:

129

"""

130

Async trigger for monitoring Databricks job runs.

131

132

Args:

133

run_id: Databricks run ID to monitor

134

databricks_conn_id: Airflow connection ID for Databricks

135

polling_period_seconds: Seconds between status checks

136

databricks_retry_limit: Number of retries for API calls

137

databricks_retry_delay: Seconds between API call retries

138

"""

139

140

async def run(self) -> AsyncIterator[TriggerEvent]:

141

"""

142

Async generator that yields trigger events.

143

144

Yields:

145

TriggerEvent with run completion status and metadata

146

"""

147

```

148

149

### DatabricksWorkflowTrigger

150

151

Specialized trigger for monitoring Databricks workflow execution.

152

153

```python { .api }

154

from airflow.providers.databricks.triggers.databricks import DatabricksWorkflowTrigger

155

156

class DatabricksWorkflowTrigger(BaseTrigger):

157

def __init__(

158

self,

159

run_id: int,

160

databricks_conn_id: str = "databricks_default",

161

polling_period_seconds: int = 30,

162

databricks_retry_limit: int = 3,

163

databricks_retry_delay: int = 1,

164

**kwargs

165

) -> None:

166

"""

167

Async trigger for monitoring Databricks workflow runs.

168

169

Args:

170

run_id: Databricks workflow run ID to monitor

171

databricks_conn_id: Airflow connection ID for Databricks

172

polling_period_seconds: Seconds between status checks

173

databricks_retry_limit: Number of retries for API calls

174

databricks_retry_delay: Seconds between API call retries

175

"""

176

```

177

178

## Usage Examples

179

180

### Basic Job Monitoring

181

182

Monitor job completion with simple sensor configuration:

183

184

```python { .api }

185

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

186

from airflow.providers.databricks.sensors.databricks import DatabricksSensor

187

188

# Submit job and monitor completion

189

submit_job = DatabricksSubmitRunOperator(

190

task_id='submit_data_processing',

191

notebook_task={

192

'notebook_path': '/Analytics/Daily Processing',

193

'base_parameters': {'date': '{{ ds }}'}

194

},

195

existing_cluster_id='processing-cluster-001',

196

do_xcom_push=True

197

)

198

199

# Monitor job completion

200

monitor_job = DatabricksSensor(

201

task_id='wait_for_processing_completion',

202

run_id="{{ task_instance.xcom_pull(task_ids='submit_data_processing', key='run_id') }}",

203

databricks_conn_id='databricks_default',

204

poke_interval=60, # Check every minute

205

timeout=7200 # Timeout after 2 hours

206

)

207

208

submit_job >> monitor_job

209

```

210

211

### Deferrable Job Monitoring

212

213

Use deferrable execution for efficient resource utilization:

214

215

```python { .api }

216

# Long-running job with deferrable monitoring

217

long_job = DatabricksSubmitRunOperator(

218

task_id='submit_ml_training',

219

spark_python_task={

220

'python_file': 'dbfs:/ml/training/train_model.py',

221

'parameters': ['--epochs', '500', '--data-size', 'large']

222

},

223

new_cluster={

224

'spark_version': '12.2.x-cpu-ml-scala2.12',

225

'node_type_id': 'i3.4xlarge',

226

'num_workers': 10

227

},

228

timeout_seconds=28800, # 8 hours

229

deferrable=True # Use deferrable execution

230

)

231

232

# Deferrable sensor - doesn't consume worker slot while waiting

233

deferrable_monitor = DatabricksSensor(

234

task_id='monitor_ml_training',

235

run_id="{{ task_instance.xcom_pull(task_ids='submit_ml_training', key='run_id') }}",

236

databricks_conn_id='databricks_ml',

237

polling_period_seconds=300, # Check every 5 minutes

238

timeout=28800, # 8 hour timeout

239

deferrable=True # Async monitoring

240

)

241

242

long_job >> deferrable_monitor

243

```

244

245

### SQL Data Monitoring

246

247

Monitor data availability and quality using SQL sensors:

248

249

```python { .api }

250

from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor

251

252

# Wait for daily data to arrive

253

data_availability_sensor = DatabricksSqlSensor(

254

task_id='wait_for_daily_data',

255

sql="""

256

SELECT COUNT(*) as record_count

257

FROM raw.daily_transactions

258

WHERE transaction_date = '{{ ds }}'

259

HAVING COUNT(*) >= 10000

260

""",

261

databricks_conn_id='databricks_sql',

262

http_path='/sql/1.0/warehouses/analytics-warehouse',

263

poke_interval=300, # Check every 5 minutes

264

timeout=14400, # Wait up to 4 hours

265

catalog='production',

266

schema='raw'

267

)

268

269

# Monitor data quality thresholds

270

quality_sensor = DatabricksSqlSensor(

271

task_id='check_data_quality',

272

sql="""

273

SELECT

274

COUNT(*) as total_records,

275

SUM(CASE WHEN customer_id IS NOT NULL THEN 1 ELSE 0 END) as valid_customers,

276

SUM(CASE WHEN amount > 0 THEN 1 ELSE 0 END) as valid_amounts

277

FROM processed.daily_sales

278

WHERE processing_date = '{{ ds }}'

279

HAVING

280

(valid_customers * 100.0 / total_records) >= 95

281

AND (valid_amounts * 100.0 / total_records) >= 98

282

AND total_records >= 1000

283

""",

284

databricks_conn_id='databricks_sql',

285

poke_interval=180,

286

timeout=3600

287

)

288

289

data_availability_sensor >> quality_sensor

290

```

291

292

### Partition Monitoring

293

294

Monitor table partition availability for data pipeline coordination:

295

296

```python { .api }

297

from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor

298

299

# Wait for specific date partition

300

partition_sensor = DatabricksPartitionSensor(

301

task_id='wait_for_daily_partition',

302

table_name='sales.daily_transactions',

303

partitions={'date': '{{ ds }}'},

304

databricks_conn_id='databricks_sql',

305

catalog='production',

306

schema='sales',

307

poke_interval=600, # Check every 10 minutes

308

timeout=21600 # Wait up to 6 hours

309

)

310

311

# Wait for multiple partitions

312

multi_partition_sensor = DatabricksPartitionSensor(

313

task_id='wait_for_regional_partitions',

314

table_name='analytics.regional_metrics',

315

partitions=[

316

{'date': '{{ ds }}', 'region': 'north_america'},

317

{'date': '{{ ds }}', 'region': 'europe'},

318

{'date': '{{ ds }}', 'region': 'asia_pacific'}

319

],

320

databricks_conn_id='databricks_analytics',

321

poke_interval=300,

322

timeout=7200

323

)

324

```

325

326

## Advanced Monitoring Patterns

327

328

### Conditional Processing Based on Data Status

329

330

Implement conditional workflows based on data monitoring results:

331

332

```python { .api }

333

from airflow.operators.python import BranchPythonOperator

334

from airflow.operators.dummy import DummyOperator

335

336

def check_data_completeness(**context):

337

"""Check data completeness and branch accordingly."""

338

from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook

339

340

hook = DatabricksSqlHook(databricks_conn_id='databricks_sql')

341

342

# Check data completeness

343

result = hook.get_first("""

344

SELECT

345

COUNT(*) as record_count,

346

COUNT(DISTINCT source_system) as source_count

347

FROM raw.daily_ingestion

348

WHERE ingestion_date = '{{ ds }}'

349

""")

350

351

record_count = result[0] if result else 0

352

source_count = result[1] if result else 0

353

354

# Expected: 3 source systems, minimum 50000 records

355

if source_count >= 3 and record_count >= 50000:

356

return 'full_processing'

357

elif record_count >= 10000:

358

return 'partial_processing'

359

else:

360

return 'wait_longer'

361

362

# Branching based on data status

363

data_check = BranchPythonOperator(

364

task_id='check_data_status',

365

python_callable=check_data_completeness

366

)

367

368

full_processing = DatabricksSubmitRunOperator(

369

task_id='full_processing',

370

notebook_task={

371

'notebook_path': '/pipelines/full_daily_pipeline'

372

},

373

existing_cluster_id='large-cluster-001'

374

)

375

376

partial_processing = DatabricksSubmitRunOperator(

377

task_id='partial_processing',

378

notebook_task={

379

'notebook_path': '/pipelines/partial_daily_pipeline'

380

},

381

existing_cluster_id='small-cluster-001'

382

)

383

384

wait_longer = DummyOperator(task_id='wait_longer')

385

386

data_check >> [full_processing, partial_processing, wait_longer]

387

```

388

389

### Multi-Level Monitoring

390

391

Implement cascading monitors for complex data dependencies:

392

393

```python { .api }

394

from airflow.utils.task_group import TaskGroup

395

396

with TaskGroup(group_id='data_dependency_monitoring') as monitoring_group:

397

398

# Level 1: Raw data availability

399

raw_data_monitor = DatabricksSqlSensor(

400

task_id='monitor_raw_data',

401

sql="""

402

SELECT 1

403

FROM information_schema.tables

404

WHERE table_name = 'raw_events_{{ ds_nodash }}'

405

AND table_schema = 'landing'

406

""",

407

databricks_conn_id='databricks_sql',

408

poke_interval=120,

409

timeout=7200

410

)

411

412

# Level 2: Data processing completion

413

processing_monitor = DatabricksSqlSensor(

414

task_id='monitor_processing_completion',

415

sql="""

416

SELECT 1

417

FROM processing_status

418

WHERE process_date = '{{ ds }}'

419

AND status = 'COMPLETED'

420

AND error_count = 0

421

""",

422

databricks_conn_id='databricks_sql',

423

poke_interval=300,

424

timeout=10800

425

)

426

427

# Level 3: Quality validation

428

quality_monitor = DatabricksSqlSensor(

429

task_id='monitor_quality_validation',

430

sql="""

431

SELECT 1

432

FROM quality_metrics

433

WHERE validation_date = '{{ ds }}'

434

AND overall_score >= 0.95

435

AND critical_failures = 0

436

""",

437

databricks_conn_id='databricks_sql',

438

poke_interval=180,

439

timeout=3600

440

)

441

442

# Set up monitoring cascade

443

raw_data_monitor >> processing_monitor >> quality_monitor

444

```

445

446

### Real-Time Streaming Monitoring

447

448

Monitor streaming data pipelines and real-time processing:

449

450

```python { .api }

451

# Monitor streaming job health

452

streaming_monitor = DatabricksSqlSensor(

453

task_id='monitor_streaming_health',

454

sql="""

455

SELECT

456

stream_id,

457

batch_duration_ms,

458

input_size,

459

processing_time_ms

460

FROM streaming_metrics

461

WHERE

462

metric_timestamp >= CURRENT_TIMESTAMP - INTERVAL 5 MINUTES

463

AND batch_duration_ms > 0

464

AND processing_time_ms < batch_duration_ms * 0.8 -- Processing within 80% of batch interval

465

HAVING COUNT(*) >= 3 -- At least 3 healthy batches in last 5 minutes

466

""",

467

databricks_conn_id='databricks_streaming',

468

poke_interval=60, # Check every minute

469

timeout=1800, # 30 minute timeout

470

mode='reschedule' # Don't block worker

471

)

472

473

# Monitor streaming lag

474

lag_monitor = DatabricksSqlSensor(

475

task_id='monitor_streaming_lag',

476

sql="""

477

SELECT 1

478

FROM (

479

SELECT MAX(event_timestamp) as latest_processed

480

FROM processed_events

481

) processed

482

CROSS JOIN (

483

SELECT CURRENT_TIMESTAMP as current_time

484

) current

485

WHERE TIMESTAMPDIFF(MINUTE, latest_processed, current_time) <= 10 -- Max 10 minutes lag

486

""",

487

databricks_conn_id='databricks_streaming',

488

poke_interval=300,

489

timeout=3600

490

)

491

```

492

493

### Error Detection and Alerting

494

495

Implement monitoring with error detection and alerting:

496

497

```python { .api }

498

def monitor_with_alerting(**context):

499

"""Monitor job with custom error handling and alerting."""

500

from airflow.providers.databricks.hooks.databricks import DatabricksHook

501

502

run_id = context['ti'].xcom_pull(task_ids='submit_critical_job', key='run_id')

503

hook = DatabricksHook(databricks_conn_id='databricks_production')

504

505

import time

506

timeout = 7200 # 2 hours

507

start_time = time.time()

508

509

while time.time() - start_time < timeout:

510

run_state = hook.get_run_state(run_id)

511

512

if run_state.is_terminal:

513

if run_state.is_successful:

514

print(f"Job {run_id} completed successfully")

515

return True

516

else:

517

# Job failed - extract error details

518

run_details = hook.get_run(run_id)

519

error_message = run_details.get('state', {}).get('state_message', 'Unknown error')

520

521

# Send alert (integrate with your alerting system)

522

send_alert(

523

message=f"Critical Databricks job {run_id} failed: {error_message}",

524

severity='HIGH',

525

job_url=hook.get_run_page_url(run_id)

526

)

527

528

raise ValueError(f"Databricks job {run_id} failed: {error_message}")

529

530

time.sleep(60) # Check every minute

531

532

# Timeout occurred

533

send_alert(

534

message=f"Databricks job {run_id} timed out after {timeout} seconds",

535

severity='MEDIUM',

536

job_url=hook.get_run_page_url(run_id)

537

)

538

raise TimeoutError(f"Job monitoring timed out for run {run_id}")

539

540

def send_alert(message: str, severity: str, job_url: str):

541

"""Send alert through configured alerting system."""

542

# Implement your alerting logic here

543

# (Slack, email, PagerDuty, etc.)

544

print(f"ALERT [{severity}]: {message}")

545

print(f"Job URL: {job_url}")

546

547

# Custom monitoring with alerting

548

custom_monitor = PythonOperator(

549

task_id='monitor_with_alerts',

550

python_callable=monitor_with_alerting

551

)

552

```

553

554

### Performance Monitoring

555

556

Monitor job performance and resource utilization:

557

558

```python { .api }

559

performance_monitor = DatabricksSqlSensor(

560

task_id='monitor_job_performance',

561

sql="""

562

WITH job_metrics AS (

563

SELECT

564

run_id,

565

execution_duration_ms,

566

cluster_size,

567

shuffle_read_bytes,

568

shuffle_write_bytes,

569

peak_memory_usage

570

FROM job_execution_metrics

571

WHERE job_name = '{{ params.job_name }}'

572

AND start_time >= CURRENT_DATE

573

)

574

SELECT 1

575

FROM job_metrics

576

WHERE

577

execution_duration_ms < {{ params.max_duration_ms }}

578

AND peak_memory_usage < {{ params.max_memory_bytes }}

579

AND shuffle_read_bytes < {{ params.max_shuffle_bytes }}

580

ORDER BY run_id DESC

581

LIMIT 1

582

""",

583

params={

584

'job_name': 'daily_etl_pipeline',

585

'max_duration_ms': 7200000, # 2 hours

586

'max_memory_bytes': 32 * 1024**3, # 32GB

587

'max_shuffle_bytes': 100 * 1024**3 # 100GB

588

},

589

databricks_conn_id='databricks_metrics',

590

poke_interval=300,

591

timeout=3600

592

)

593

```

594

595

The monitoring and sensing capabilities provide comprehensive tools for tracking job execution, data availability, quality metrics, and system health with both synchronous and asynchronous execution patterns to optimize resource usage and provide timely notifications of pipeline status.