or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdopenlineage.mdoperators.mdsensors.mdtriggers.md

sensors.mddocs/

0

# Job Monitoring Sensor

1

2

The `DbtCloudJobRunSensor` provides monitoring capabilities for dbt Cloud job runs within Airflow workflows. It can operate in both traditional polling mode and efficient deferrable mode, making it suitable for various monitoring scenarios and resource constraints.

3

4

## Capabilities

5

6

### Job Run Status Monitoring

7

8

The sensor monitors dbt Cloud job run status and waits for completion before allowing downstream tasks to proceed.

9

10

```python { .api }

11

class DbtCloudJobRunSensor:

12

def __init__(

13

self,

14

dbt_cloud_conn_id: str = "dbt_cloud_default",

15

run_id: int,

16

account_id: int | None = None,

17

deferrable: bool = False,

18

**kwargs

19

):

20

"""

21

Monitor the status of a dbt Cloud job run.

22

23

Args:

24

dbt_cloud_conn_id: Airflow connection ID for dbt Cloud

25

run_id: dbt Cloud job run ID to monitor

26

account_id: dbt Cloud account ID (defaults to connection default)

27

deferrable: Use async execution mode for resource efficiency

28

**kwargs: Additional sensor parameters (timeout, poke_interval, etc.)

29

"""

30

31

def poke(self, context: Context) -> bool:

32

"""

33

Check if the job run has reached a terminal status.

34

35

Args:

36

context: Airflow task execution context

37

38

Returns:

39

bool: True if job run is complete (success/failure), False if still running

40

41

Raises:

42

DbtCloudJobRunException: If job run fails or is cancelled

43

"""

44

45

def execute(self, context: Context) -> None:

46

"""

47

Execute the sensor (used for deferrable mode).

48

49

Args:

50

context: Airflow task execution context

51

"""

52

53

def execute_complete(self, context: Context, event: dict[str, Any]) -> int:

54

"""

55

Complete execution for deferrable sensors.

56

57

Args:

58

context: Airflow task execution context

59

event: Trigger event containing job status

60

61

Returns:

62

int: dbt Cloud job run ID

63

"""

64

65

def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:

66

"""

67

Generate OpenLineage metadata facets for data lineage tracking.

68

69

Args:

70

task_instance: Airflow task instance

71

72

Returns:

73

OperatorLineage: OpenLineage facets for lineage tracking

74

"""

75

```

76

77

## Usage Examples

78

79

### Basic Job Run Monitoring

80

81

```python

82

from airflow import DAG

83

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

84

from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

85

from datetime import datetime, timedelta

86

87

dag = DAG(

88

'dbt_workflow_with_monitoring',

89

start_date=datetime(2024, 1, 1),

90

schedule_interval='@daily',

91

)

92

93

# Execute dbt job

94

run_dbt_job = DbtCloudRunJobOperator(

95

task_id='run_dbt_job',

96

job_id=12345,

97

wait_for_termination=False, # Don't wait in operator

98

dag=dag,

99

)

100

101

# Monitor job completion with sensor

102

wait_for_completion = DbtCloudJobRunSensor(

103

task_id='wait_for_dbt_completion',

104

dbt_cloud_conn_id='dbt_cloud_default',

105

run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",

106

timeout=3600, # 1 hour timeout

107

poke_interval=60, # Check every minute

108

dag=dag,

109

)

110

111

run_dbt_job >> wait_for_completion

112

```

113

114

### Deferrable Sensor for Resource Efficiency

115

116

```python

117

# Use deferrable mode for long-running jobs

118

wait_for_long_job = DbtCloudJobRunSensor(

119

task_id='wait_for_long_dbt_job',

120

run_id="{{ task_instance.xcom_pull(task_ids='run_long_job') }}",

121

deferrable=True, # Enable async monitoring

122

timeout=14400, # 4 hours timeout

123

poke_interval=300, # Check every 5 minutes

124

dag=dag,

125

)

126

```

127

128

### Multiple Job Monitoring

129

130

```python

131

from airflow.operators.dummy import DummyOperator

132

133

# Monitor multiple parallel dbt jobs

134

start = DummyOperator(task_id='start', dag=dag)

135

136

# Start multiple jobs

137

job_runs = []

138

for i, job_id in enumerate([111, 222, 333]):

139

run_job = DbtCloudRunJobOperator(

140

task_id=f'run_job_{i+1}',

141

job_id=job_id,

142

wait_for_termination=False,

143

dag=dag,

144

)

145

146

wait_job = DbtCloudJobRunSensor(

147

task_id=f'wait_job_{i+1}',

148

run_id=f"{{{{ task_instance.xcom_pull(task_ids='run_job_{i+1}') }}}}",

149

deferrable=True,

150

dag=dag,

151

)

152

153

start >> run_job >> wait_job

154

job_runs.append(wait_job)

155

156

# Continue after all jobs complete

157

all_complete = DummyOperator(task_id='all_jobs_complete', dag=dag)

158

job_runs >> all_complete

159

```

160

161

### Sensor with Custom Configuration

162

163

```python

164

# Sensor with extended timeout and custom polling

165

monitor_critical_job = DbtCloudJobRunSensor(

166

task_id='monitor_critical_job',

167

run_id="{{ task_instance.xcom_pull(task_ids='run_critical_models') }}",

168

account_id=12345,

169

timeout=28800, # 8 hours for critical job

170

poke_interval=120, # Check every 2 minutes

171

mode='poke', # Traditional polling mode

172

dag=dag,

173

)

174

```

175

176

### Error Handling and Alerting

177

178

```python

179

from airflow.operators.python import PythonOperator

180

from airflow.operators.email import EmailOperator

181

182

def check_job_failure(**context):

183

"""Custom logic for handling dbt job failures."""

184

run_id = context['task_instance'].xcom_pull(task_ids='run_dbt_job')

185

# Add custom failure analysis logic

186

return f"dbt job run {run_id} monitoring failed"

187

188

# Sensor with failure handling

189

monitor_with_alerts = DbtCloudJobRunSensor(

190

task_id='monitor_with_alerts',

191

run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_job') }}",

192

timeout=3600,

193

on_failure_callback=check_job_failure,

194

dag=dag,

195

)

196

197

# Email alert on sensor failure

198

send_failure_alert = EmailOperator(

199

task_id='send_failure_alert',

200

to=['data-team@company.com'],

201

subject='dbt Cloud Job Monitoring Failed',

202

html_content='dbt Cloud job monitoring failed. Please check the logs.',

203

trigger_rule='one_failed', # Trigger only on upstream failure

204

dag=dag,

205

)

206

207

monitor_with_alerts >> send_failure_alert

208

```

209

210

### Conditional Logic Based on Job Status

211

212

```python

213

from airflow.operators.python import BranchPythonOperator

214

215

def decide_next_task(**context):

216

"""Branch logic based on job completion."""

217

try:

218

# Sensor succeeds, continue with success path

219

return 'process_results'

220

except Exception:

221

# Job failed, go to failure handling

222

return 'handle_failure'

223

224

# Conditional workflow based on monitoring result

225

branch_on_result = BranchPythonOperator(

226

task_id='branch_on_result',

227

python_callable=decide_next_task,

228

dag=dag,

229

)

230

231

process_success = DummyOperator(task_id='process_results', dag=dag)

232

handle_failure = DummyOperator(task_id='handle_failure', dag=dag)

233

234

wait_for_completion >> branch_on_result

235

branch_on_result >> [process_success, handle_failure]

236

```

237

238

### Integration with Data Quality Checks

239

240

```python

241

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudGetJobRunArtifactOperator

242

243

# Monitor job, then validate results

244

wait_for_models = DbtCloudJobRunSensor(

245

task_id='wait_for_models',

246

run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",

247

dag=dag,

248

)

249

250

# Download test results for validation

251

get_test_results = DbtCloudGetJobRunArtifactOperator(

252

task_id='get_test_results',

253

run_id="{{ task_instance.xcom_pull(task_ids='run_models') }}",

254

path='run_results.json',

255

dag=dag,

256

)

257

258

def validate_test_results(**context):

259

"""Validate dbt test results from downloaded artifact."""

260

# Read and validate test results

261

pass

262

263

validate_tests = PythonOperator(

264

task_id='validate_tests',

265

python_callable=validate_test_results,

266

dag=dag,

267

)

268

269

wait_for_models >> get_test_results >> validate_tests

270

```

271

272

### Cross-DAG Dependencies

273

274

```python

275

from airflow.sensors.external_task import ExternalTaskSensor

276

277

# Wait for dbt job in another DAG

278

wait_external_dbt = ExternalTaskSensor(

279

task_id='wait_external_dbt',

280

external_dag_id='upstream_dbt_dag',

281

external_task_id='wait_for_dbt_completion',

282

dag=dag,

283

)

284

285

# Then run local dbt job

286

run_downstream_job = DbtCloudRunJobOperator(

287

task_id='run_downstream_job',

288

job_id=67890,

289

dag=dag,

290

)

291

292

wait_external_dbt >> run_downstream_job

293

```

294

295

## Configuration Options

296

297

### Sensor Parameters

298

299

The sensor inherits from `BaseSensorOperator` and supports all standard sensor configuration:

300

301

- **timeout**: Maximum time to wait for job completion (seconds)

302

- **poke_interval**: Time between status checks in polling mode (seconds)

303

- **mode**: Execution mode ('poke' for polling, 'reschedule' for rescheduling)

304

- **soft_fail**: Whether to mark as skipped instead of failed on timeout

305

- **deferrable**: Enable async execution for resource efficiency

306

307

### Template Fields

308

309

The sensor supports Airflow templating for dynamic values:

310

311

- `dbt_cloud_conn_id`

312

- `run_id`

313

- `account_id`

314

315

## Best Practices

316

317

### Resource Efficiency

318

- Use `deferrable=True` for long-running jobs to free up worker slots

319

- Set appropriate `poke_interval` to balance responsiveness and API load

320

- Use `mode='reschedule'` for very long jobs to avoid blocking workers

321

322

### Error Handling

323

- Set reasonable `timeout` values based on expected job duration

324

- Use `soft_fail=True` for non-critical monitoring tasks

325

- Implement custom failure callbacks for alert and recovery logic

326

327

### Monitoring Patterns

328

- Combine sensors with operators that don't wait (`wait_for_termination=False`)

329

- Use sensors for complex conditional logic based on job outcomes

330

- Monitor multiple parallel jobs with individual sensors for better error isolation

331

332

## Types

333

334

```python { .api }

335

from typing import Any, Dict

336

from airflow.sensors.base import BaseSensorOperator

337

from airflow.utils.context import Context

338

339

# Sensor inherits from BaseSensorOperator

340

# All standard sensor configuration options are available

341

```