0
# Monitoring & Sensing
1
2
The Databricks provider offers comprehensive monitoring and sensing capabilities through sensors and triggers that can monitor job completion, data availability, SQL query results, and system status with support for both synchronous and asynchronous (deferrable) execution patterns.
3
4
## Core Sensors
5
6
### DatabricksSensor
7
8
Monitor Databricks job run completion and status with configurable polling and error handling.
9
10
```python { .api }
11
from airflow.providers.databricks.sensors.databricks import DatabricksSensor
12
13
class DatabricksSensor(BaseSensorOperator):
14
def __init__(
15
self,
16
run_id: int | str,
17
*,
18
databricks_conn_id: str = "databricks_default",
19
polling_period_seconds: int = 30,
20
databricks_retry_limit: int = 3,
21
databricks_retry_delay: int = 1,
22
databricks_retry_args: dict[str, Any] | None = None,
23
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
24
**kwargs
25
) -> None:
26
"""
27
Sensor for monitoring Databricks job run completion.
28
29
Args:
30
run_id: Databricks run ID to monitor (supports templating)
31
databricks_conn_id: Airflow connection ID for Databricks
32
polling_period_seconds: Seconds between status checks
33
databricks_retry_limit: Number of retries for API calls
34
databricks_retry_delay: Seconds between API call retries
35
databricks_retry_args: Additional retry configuration
36
deferrable: Whether to use deferrable (async) execution
37
"""
38
```
39
40
### DatabricksSqlSensor
41
42
Monitor SQL query results and data conditions on Databricks SQL endpoints.
43
44
```python { .api }
45
from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor
46
47
class DatabricksSqlSensor(BaseSensorOperator):
48
def __init__(
49
self,
50
sql: str,
51
*,
52
databricks_conn_id: str = "databricks_default",
53
http_path: str | None = None,
54
sql_endpoint_name: str | None = None,
55
session_configuration: dict[str, str] | None = None,
56
http_headers: list[tuple[str, str]] | None = None,
57
catalog: str | None = None,
58
schema: str | None = None,
59
**kwargs
60
) -> None:
61
"""
62
Sensor for monitoring SQL query results on Databricks.
63
64
Args:
65
sql: SQL query to execute for monitoring (supports templating)
66
databricks_conn_id: Airflow connection ID for Databricks
67
http_path: HTTP path to SQL endpoint or cluster
68
sql_endpoint_name: Name of SQL endpoint to use
69
session_configuration: Session-level configuration parameters
70
http_headers: Additional HTTP headers for requests
71
catalog: Default catalog for SQL operations
72
schema: Default schema for SQL operations
73
"""
74
```
75
76
### DatabricksPartitionSensor
77
78
Monitor table partition availability for data pipeline orchestration.
79
80
```python { .api }
81
from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor
82
83
class DatabricksPartitionSensor(BaseSensorOperator):
84
def __init__(
85
self,
86
table_name: str,
87
partitions: dict[str, str] | list[dict[str, str]],
88
*,
89
databricks_conn_id: str = "databricks_default",
90
http_path: str | None = None,
91
sql_endpoint_name: str | None = None,
92
catalog: str | None = None,
93
schema: str | None = None,
94
**kwargs
95
) -> None:
96
"""
97
Sensor for monitoring table partition availability.
98
99
Args:
100
table_name: Name of table to monitor (supports templating)
101
partitions: Partition specifications to check for availability
102
databricks_conn_id: Airflow connection ID for Databricks
103
http_path: HTTP path to SQL endpoint or cluster
104
sql_endpoint_name: Name of SQL endpoint to use
105
catalog: Catalog containing the table
106
schema: Schema containing the table
107
"""
108
```
109
110
## Core Triggers
111
112
### DatabricksTrigger
113
114
Asynchronous trigger for deferrable job monitoring with efficient resource usage.
115
116
```python { .api }
117
from airflow.providers.databricks.triggers.databricks import DatabricksTrigger
118
119
class DatabricksTrigger(BaseTrigger):
120
def __init__(
121
self,
122
run_id: int,
123
databricks_conn_id: str = "databricks_default",
124
polling_period_seconds: int = 30,
125
databricks_retry_limit: int = 3,
126
databricks_retry_delay: int = 1,
127
**kwargs
128
) -> None:
129
"""
130
Async trigger for monitoring Databricks job runs.
131
132
Args:
133
run_id: Databricks run ID to monitor
134
databricks_conn_id: Airflow connection ID for Databricks
135
polling_period_seconds: Seconds between status checks
136
databricks_retry_limit: Number of retries for API calls
137
databricks_retry_delay: Seconds between API call retries
138
"""
139
140
async def run(self) -> AsyncIterator[TriggerEvent]:
141
"""
142
Async generator that yields trigger events.
143
144
Yields:
145
TriggerEvent with run completion status and metadata
146
"""
147
```
148
149
### DatabricksWorkflowTrigger
150
151
Specialized trigger for monitoring Databricks workflow execution.
152
153
```python { .api }
154
from airflow.providers.databricks.triggers.databricks import DatabricksWorkflowTrigger
155
156
class DatabricksWorkflowTrigger(BaseTrigger):
157
def __init__(
158
self,
159
run_id: int,
160
databricks_conn_id: str = "databricks_default",
161
polling_period_seconds: int = 30,
162
databricks_retry_limit: int = 3,
163
databricks_retry_delay: int = 1,
164
**kwargs
165
) -> None:
166
"""
167
Async trigger for monitoring Databricks workflow runs.
168
169
Args:
170
run_id: Databricks workflow run ID to monitor
171
databricks_conn_id: Airflow connection ID for Databricks
172
polling_period_seconds: Seconds between status checks
173
databricks_retry_limit: Number of retries for API calls
174
databricks_retry_delay: Seconds between API call retries
175
"""
176
```
177
178
## Usage Examples
179
180
### Basic Job Monitoring
181
182
Monitor job completion with simple sensor configuration:
183
184
```python { .api }
185
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
186
from airflow.providers.databricks.sensors.databricks import DatabricksSensor
187
188
# Submit job and monitor completion
189
submit_job = DatabricksSubmitRunOperator(
190
task_id='submit_data_processing',
191
notebook_task={
192
'notebook_path': '/Analytics/Daily Processing',
193
'base_parameters': {'date': '{{ ds }}'}
194
},
195
existing_cluster_id='processing-cluster-001',
196
do_xcom_push=True
197
)
198
199
# Monitor job completion
200
monitor_job = DatabricksSensor(
201
task_id='wait_for_processing_completion',
202
run_id="{{ task_instance.xcom_pull(task_ids='submit_data_processing', key='run_id') }}",
203
databricks_conn_id='databricks_default',
204
poke_interval=60, # Check every minute
205
timeout=7200 # Timeout after 2 hours
206
)
207
208
submit_job >> monitor_job
209
```
210
211
### Deferrable Job Monitoring
212
213
Use deferrable execution for efficient resource utilization:
214
215
```python { .api }
216
# Long-running job with deferrable monitoring
217
long_job = DatabricksSubmitRunOperator(
218
task_id='submit_ml_training',
219
spark_python_task={
220
'python_file': 'dbfs:/ml/training/train_model.py',
221
'parameters': ['--epochs', '500', '--data-size', 'large']
222
},
223
new_cluster={
224
'spark_version': '12.2.x-cpu-ml-scala2.12',
225
'node_type_id': 'i3.4xlarge',
226
'num_workers': 10
227
},
228
timeout_seconds=28800, # 8 hours
229
deferrable=True # Use deferrable execution
230
)
231
232
# Deferrable sensor - doesn't consume worker slot while waiting
233
deferrable_monitor = DatabricksSensor(
234
task_id='monitor_ml_training',
235
run_id="{{ task_instance.xcom_pull(task_ids='submit_ml_training', key='run_id') }}",
236
databricks_conn_id='databricks_ml',
237
polling_period_seconds=300, # Check every 5 minutes
238
timeout=28800, # 8 hour timeout
239
deferrable=True # Async monitoring
240
)
241
242
long_job >> deferrable_monitor
243
```
244
245
### SQL Data Monitoring
246
247
Monitor data availability and quality using SQL sensors:
248
249
```python { .api }
250
from airflow.providers.databricks.sensors.databricks_sql import DatabricksSqlSensor
251
252
# Wait for daily data to arrive
253
data_availability_sensor = DatabricksSqlSensor(
254
task_id='wait_for_daily_data',
255
sql="""
256
SELECT COUNT(*) as record_count
257
FROM raw.daily_transactions
258
WHERE transaction_date = '{{ ds }}'
259
HAVING COUNT(*) >= 10000
260
""",
261
databricks_conn_id='databricks_sql',
262
http_path='/sql/1.0/warehouses/analytics-warehouse',
263
poke_interval=300, # Check every 5 minutes
264
timeout=14400, # Wait up to 4 hours
265
catalog='production',
266
schema='raw'
267
)
268
269
# Monitor data quality thresholds
270
quality_sensor = DatabricksSqlSensor(
271
task_id='check_data_quality',
272
sql="""
273
SELECT
274
COUNT(*) as total_records,
275
SUM(CASE WHEN customer_id IS NOT NULL THEN 1 ELSE 0 END) as valid_customers,
276
SUM(CASE WHEN amount > 0 THEN 1 ELSE 0 END) as valid_amounts
277
FROM processed.daily_sales
278
WHERE processing_date = '{{ ds }}'
279
HAVING
280
(valid_customers * 100.0 / total_records) >= 95
281
AND (valid_amounts * 100.0 / total_records) >= 98
282
AND total_records >= 1000
283
""",
284
databricks_conn_id='databricks_sql',
285
poke_interval=180,
286
timeout=3600
287
)
288
289
data_availability_sensor >> quality_sensor
290
```
291
292
### Partition Monitoring
293
294
Monitor table partition availability for data pipeline coordination:
295
296
```python { .api }
297
from airflow.providers.databricks.sensors.databricks_partition import DatabricksPartitionSensor
298
299
# Wait for specific date partition
300
partition_sensor = DatabricksPartitionSensor(
301
task_id='wait_for_daily_partition',
302
table_name='sales.daily_transactions',
303
partitions={'date': '{{ ds }}'},
304
databricks_conn_id='databricks_sql',
305
catalog='production',
306
schema='sales',
307
poke_interval=600, # Check every 10 minutes
308
timeout=21600 # Wait up to 6 hours
309
)
310
311
# Wait for multiple partitions
312
multi_partition_sensor = DatabricksPartitionSensor(
313
task_id='wait_for_regional_partitions',
314
table_name='analytics.regional_metrics',
315
partitions=[
316
{'date': '{{ ds }}', 'region': 'north_america'},
317
{'date': '{{ ds }}', 'region': 'europe'},
318
{'date': '{{ ds }}', 'region': 'asia_pacific'}
319
],
320
databricks_conn_id='databricks_analytics',
321
poke_interval=300,
322
timeout=7200
323
)
324
```
325
326
## Advanced Monitoring Patterns
327
328
### Conditional Processing Based on Data Status
329
330
Implement conditional workflows based on data monitoring results:
331
332
```python { .api }
333
from airflow.operators.python import BranchPythonOperator
334
from airflow.operators.dummy import DummyOperator
335
336
def check_data_completeness(**context):
337
"""Check data completeness and branch accordingly."""
338
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
339
340
hook = DatabricksSqlHook(databricks_conn_id='databricks_sql')
341
342
# Check data completeness
343
result = hook.get_first("""
344
SELECT
345
COUNT(*) as record_count,
346
COUNT(DISTINCT source_system) as source_count
347
FROM raw.daily_ingestion
348
WHERE ingestion_date = '{{ ds }}'
349
""")
350
351
record_count = result[0] if result else 0
352
source_count = result[1] if result else 0
353
354
# Expected: 3 source systems, minimum 50000 records
355
if source_count >= 3 and record_count >= 50000:
356
return 'full_processing'
357
elif record_count >= 10000:
358
return 'partial_processing'
359
else:
360
return 'wait_longer'
361
362
# Branching based on data status
363
data_check = BranchPythonOperator(
364
task_id='check_data_status',
365
python_callable=check_data_completeness
366
)
367
368
full_processing = DatabricksSubmitRunOperator(
369
task_id='full_processing',
370
notebook_task={
371
'notebook_path': '/pipelines/full_daily_pipeline'
372
},
373
existing_cluster_id='large-cluster-001'
374
)
375
376
partial_processing = DatabricksSubmitRunOperator(
377
task_id='partial_processing',
378
notebook_task={
379
'notebook_path': '/pipelines/partial_daily_pipeline'
380
},
381
existing_cluster_id='small-cluster-001'
382
)
383
384
wait_longer = DummyOperator(task_id='wait_longer')
385
386
data_check >> [full_processing, partial_processing, wait_longer]
387
```
388
389
### Multi-Level Monitoring
390
391
Implement cascading monitors for complex data dependencies:
392
393
```python { .api }
394
from airflow.utils.task_group import TaskGroup
395
396
with TaskGroup(group_id='data_dependency_monitoring') as monitoring_group:
397
398
# Level 1: Raw data availability
399
raw_data_monitor = DatabricksSqlSensor(
400
task_id='monitor_raw_data',
401
sql="""
402
SELECT 1
403
FROM information_schema.tables
404
WHERE table_name = 'raw_events_{{ ds_nodash }}'
405
AND table_schema = 'landing'
406
""",
407
databricks_conn_id='databricks_sql',
408
poke_interval=120,
409
timeout=7200
410
)
411
412
# Level 2: Data processing completion
413
processing_monitor = DatabricksSqlSensor(
414
task_id='monitor_processing_completion',
415
sql="""
416
SELECT 1
417
FROM processing_status
418
WHERE process_date = '{{ ds }}'
419
AND status = 'COMPLETED'
420
AND error_count = 0
421
""",
422
databricks_conn_id='databricks_sql',
423
poke_interval=300,
424
timeout=10800
425
)
426
427
# Level 3: Quality validation
428
quality_monitor = DatabricksSqlSensor(
429
task_id='monitor_quality_validation',
430
sql="""
431
SELECT 1
432
FROM quality_metrics
433
WHERE validation_date = '{{ ds }}'
434
AND overall_score >= 0.95
435
AND critical_failures = 0
436
""",
437
databricks_conn_id='databricks_sql',
438
poke_interval=180,
439
timeout=3600
440
)
441
442
# Set up monitoring cascade
443
raw_data_monitor >> processing_monitor >> quality_monitor
444
```
445
446
### Real-Time Streaming Monitoring
447
448
Monitor streaming data pipelines and real-time processing:
449
450
```python { .api }
451
# Monitor streaming job health
452
streaming_monitor = DatabricksSqlSensor(
453
task_id='monitor_streaming_health',
454
sql="""
455
SELECT
456
stream_id,
457
batch_duration_ms,
458
input_size,
459
processing_time_ms
460
FROM streaming_metrics
461
WHERE
462
metric_timestamp >= CURRENT_TIMESTAMP - INTERVAL 5 MINUTES
463
AND batch_duration_ms > 0
464
AND processing_time_ms < batch_duration_ms * 0.8 -- Processing within 80% of batch interval
465
HAVING COUNT(*) >= 3 -- At least 3 healthy batches in last 5 minutes
466
""",
467
databricks_conn_id='databricks_streaming',
468
poke_interval=60, # Check every minute
469
timeout=1800, # 30 minute timeout
470
mode='reschedule' # Don't block worker
471
)
472
473
# Monitor streaming lag
474
lag_monitor = DatabricksSqlSensor(
475
task_id='monitor_streaming_lag',
476
sql="""
477
SELECT 1
478
FROM (
479
SELECT MAX(event_timestamp) as latest_processed
480
FROM processed_events
481
) processed
482
CROSS JOIN (
483
SELECT CURRENT_TIMESTAMP as current_time
484
) current
485
WHERE TIMESTAMPDIFF(MINUTE, latest_processed, current_time) <= 10 -- Max 10 minutes lag
486
""",
487
databricks_conn_id='databricks_streaming',
488
poke_interval=300,
489
timeout=3600
490
)
491
```
492
493
### Error Detection and Alerting
494
495
Implement monitoring with error detection and alerting:
496
497
```python { .api }
498
def monitor_with_alerting(**context):
499
"""Monitor job with custom error handling and alerting."""
500
from airflow.providers.databricks.hooks.databricks import DatabricksHook
501
502
run_id = context['ti'].xcom_pull(task_ids='submit_critical_job', key='run_id')
503
hook = DatabricksHook(databricks_conn_id='databricks_production')
504
505
import time
506
timeout = 7200 # 2 hours
507
start_time = time.time()
508
509
while time.time() - start_time < timeout:
510
run_state = hook.get_run_state(run_id)
511
512
if run_state.is_terminal:
513
if run_state.is_successful:
514
print(f"Job {run_id} completed successfully")
515
return True
516
else:
517
# Job failed - extract error details
518
run_details = hook.get_run(run_id)
519
error_message = run_details.get('state', {}).get('state_message', 'Unknown error')
520
521
# Send alert (integrate with your alerting system)
522
send_alert(
523
message=f"Critical Databricks job {run_id} failed: {error_message}",
524
severity='HIGH',
525
job_url=hook.get_run_page_url(run_id)
526
)
527
528
raise ValueError(f"Databricks job {run_id} failed: {error_message}")
529
530
time.sleep(60) # Check every minute
531
532
# Timeout occurred
533
send_alert(
534
message=f"Databricks job {run_id} timed out after {timeout} seconds",
535
severity='MEDIUM',
536
job_url=hook.get_run_page_url(run_id)
537
)
538
raise TimeoutError(f"Job monitoring timed out for run {run_id}")
539
540
def send_alert(message: str, severity: str, job_url: str):
541
"""Send alert through configured alerting system."""
542
# Implement your alerting logic here
543
# (Slack, email, PagerDuty, etc.)
544
print(f"ALERT [{severity}]: {message}")
545
print(f"Job URL: {job_url}")
546
547
# Custom monitoring with alerting
548
custom_monitor = PythonOperator(
549
task_id='monitor_with_alerts',
550
python_callable=monitor_with_alerting
551
)
552
```
553
554
### Performance Monitoring
555
556
Monitor job performance and resource utilization:
557
558
```python { .api }
559
performance_monitor = DatabricksSqlSensor(
560
task_id='monitor_job_performance',
561
sql="""
562
WITH job_metrics AS (
563
SELECT
564
run_id,
565
execution_duration_ms,
566
cluster_size,
567
shuffle_read_bytes,
568
shuffle_write_bytes,
569
peak_memory_usage
570
FROM job_execution_metrics
571
WHERE job_name = '{{ params.job_name }}'
572
AND start_time >= CURRENT_DATE
573
)
574
SELECT 1
575
FROM job_metrics
576
WHERE
577
execution_duration_ms < {{ params.max_duration_ms }}
578
AND peak_memory_usage < {{ params.max_memory_bytes }}
579
AND shuffle_read_bytes < {{ params.max_shuffle_bytes }}
580
ORDER BY run_id DESC
581
LIMIT 1
582
""",
583
params={
584
'job_name': 'daily_etl_pipeline',
585
'max_duration_ms': 7200000, # 2 hours
586
'max_memory_bytes': 32 * 1024**3, # 32GB
587
'max_shuffle_bytes': 100 * 1024**3 # 100GB
588
},
589
databricks_conn_id='databricks_metrics',
590
poke_interval=300,
591
timeout=3600
592
)
593
```
594
595
The monitoring and sensing capabilities provide comprehensive tools for tracking job execution, data availability, quality metrics, and system health with both synchronous and asynchronous execution patterns to optimize resource usage and provide timely notifications of pipeline status.