or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdopenlineage.mdoperators.mdsensors.mdtriggers.md

triggers.mddocs/

0

# Async Job Trigger

1

2

The `DbtCloudRunJobTrigger` provides asynchronous monitoring capabilities for dbt Cloud job runs in deferrable Airflow tasks. This trigger enables efficient resource utilization by monitoring job status without occupying worker slots, making it ideal for long-running dbt Cloud jobs.

3

4

## Capabilities

5

6

### Async Job Status Monitoring

7

8

The trigger runs asynchronously to monitor dbt Cloud job status and emit events when job states change.

9

10

```python { .api }

11

class DbtCloudRunJobTrigger:

12

def __init__(

13

self,

14

conn_id: str,

15

run_id: int,

16

end_time: float,

17

poll_interval: float,

18

account_id: int | None

19

):

20

"""

21

Async trigger for monitoring dbt Cloud job status.

22

23

Args:

24

conn_id: Airflow connection ID for dbt Cloud

25

run_id: dbt Cloud job run ID to monitor

26

end_time: Unix timestamp when monitoring should timeout

27

poll_interval: Seconds between status checks

28

account_id: dbt Cloud account ID (optional)

29

"""

30

31

def serialize(self) -> tuple[str, dict[str, Any]]:

32

"""

33

Serialize trigger for persistence across Airflow restarts.

34

35

Returns:

36

tuple[str, dict]: (class_path, kwargs) for trigger reconstruction

37

"""

38

39

async def run(self) -> AsyncIterator[TriggerEvent]:

40

"""

41

Main async execution loop that monitors job status.

42

43

Yields:

44

TriggerEvent: Events containing job status updates and completion

45

"""

46

47

async def is_still_running(self, hook: DbtCloudHook) -> bool:

48

"""

49

Check if the job run is still in progress.

50

51

Args:

52

hook: DbtCloudHook instance for API communication

53

54

Returns:

55

bool: True if job is still running, False if terminal state reached

56

"""

57

```

58

59

## Usage Examples

60

61

### Basic Deferrable Operator Usage

62

63

The trigger is typically used internally by deferrable operators, but understanding its behavior helps with debugging and advanced configurations.

64

65

```python

66

from airflow import DAG

67

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

68

from datetime import datetime

69

70

dag = DAG(

71

'deferrable_dbt_workflow',

72

start_date=datetime(2024, 1, 1),

73

schedule_interval='@daily',

74

)

75

76

# The trigger is automatically used when deferrable=True

77

run_dbt_job = DbtCloudRunJobOperator(

78

task_id='run_dbt_job',

79

job_id=12345,

80

deferrable=True, # This enables the trigger

81

timeout=7200, # 2 hours

82

check_interval=300, # Check every 5 minutes

83

dag=dag,

84

)

85

```

86

87

### Sensor with Trigger

88

89

```python

90

from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

91

92

# Sensor using the trigger for async monitoring

93

monitor_job = DbtCloudJobRunSensor(

94

task_id='monitor_dbt_job',

95

run_id="{{ task_instance.xcom_pull(task_ids='start_job') }}",

96

deferrable=True, # Enables trigger usage

97

timeout=14400, # 4 hours

98

poke_interval=180, # Check every 3 minutes

99

dag=dag,

100

)

101

```

102

103

### Custom Trigger Configuration

104

105

While the trigger is typically managed automatically, understanding its configuration helps with troubleshooting:

106

107

```python

108

from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger

109

import time

110

111

# Example of trigger parameters (normally handled automatically)

112

trigger = DbtCloudRunJobTrigger(

113

conn_id='dbt_cloud_default',

114

run_id=98765,

115

end_time=time.time() + 3600, # 1 hour from now

116

poll_interval=60, # Check every minute

117

account_id=12345

118

)

119

```

120

121

### Monitoring Trigger Events

122

123

```python

124

from airflow.operators.python import PythonOperator

125

126

def handle_dbt_completion(**context):

127

"""Handle completion event from dbt trigger."""

128

event = context.get('event', {})

129

run_id = event.get('run_id')

130

status = event.get('status')

131

132

print(f"dbt job run {run_id} completed with status: {status}")

133

134

if status == 'success':

135

return "Job completed successfully"

136

else:

137

raise Exception(f"Job failed with status: {status}")

138

139

# Process trigger completion

140

process_completion = PythonOperator(

141

task_id='process_completion',

142

python_callable=handle_dbt_completion,

143

dag=dag,

144

)

145

146

run_dbt_job >> process_completion

147

```

148

149

## Trigger Event Structure

150

151

The trigger emits events with the following structure:

152

153

```python

154

{

155

"status": "success" | "error" | "cancelled" | "timeout",

156

"run_id": int,

157

"message": str,

158

"job_run_url": str # Optional URL to dbt Cloud UI

159

}

160

```

161

162

### Event Types

163

164

#### Success Event

165

```python

166

{

167

"status": "success",

168

"run_id": 12345,

169

"message": "dbt Cloud job run completed successfully",

170

"job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"

171

}

172

```

173

174

#### Error Event

175

```python

176

{

177

"status": "error",

178

"run_id": 12345,

179

"message": "dbt Cloud job run failed",

180

"job_run_url": "https://cloud.getdbt.com/deploy/1234/projects/5678/runs/12345"

181

}

182

```

183

184

#### Timeout Event

185

```python

186

{

187

"status": "timeout",

188

"run_id": 12345,

189

"message": "dbt Cloud job run monitoring timed out after 3600 seconds"

190

}

191

```

192

193

## Advanced Usage Patterns

194

195

### Multiple Job Monitoring

196

197

```python

198

# Monitor multiple jobs with separate triggers (automatic)

199

jobs = [111, 222, 333]

200

201

for i, job_id in enumerate(jobs):

202

run_job = DbtCloudRunJobOperator(

203

task_id=f'run_job_{i}',

204

job_id=job_id,

205

deferrable=True, # Each gets its own trigger

206

wait_for_termination=True,

207

dag=dag,

208

)

209

```

210

211

### Trigger with Custom Polling Intervals

212

213

```python

214

# Fast polling for critical jobs

215

critical_job = DbtCloudRunJobOperator(

216

task_id='critical_job',

217

job_id=99999,

218

deferrable=True,

219

check_interval=30, # Check every 30 seconds

220

timeout=1800, # 30 minutes max

221

dag=dag,

222

)

223

224

# Slow polling for batch jobs

225

batch_job = DbtCloudRunJobOperator(

226

task_id='batch_job',

227

job_id=11111,

228

deferrable=True,

229

check_interval=600, # Check every 10 minutes

230

timeout=28800, # 8 hours max

231

dag=dag,

232

)

233

```

234

235

### Error Handling with Triggers

236

237

```python

238

def handle_trigger_failure(**context):

239

"""Custom error handling for trigger failures."""

240

task_instance = context['task_instance']

241

exception = context.get('exception')

242

243

if 'timeout' in str(exception).lower():

244

print("Job monitoring timed out - job may still be running")

245

# Implement custom timeout handling

246

else:

247

print(f"Job monitoring failed: {exception}")

248

# Implement custom error handling

249

250

long_running_job = DbtCloudRunJobOperator(

251

task_id='long_running_job',

252

job_id=55555,

253

deferrable=True,

254

timeout=43200, # 12 hours

255

on_failure_callback=handle_trigger_failure,

256

dag=dag,

257

)

258

```

259

260

### Resource-Efficient Workflows

261

262

```python

263

from airflow.operators.dummy import DummyOperator

264

265

# Start multiple long-running jobs without blocking workers

266

start = DummyOperator(task_id='start', dag=dag)

267

268

# All these jobs run concurrently without occupying worker slots

269

deferrable_jobs = []

270

for i in range(10): # 10 parallel jobs

271

job = DbtCloudRunJobOperator(

272

task_id=f'dbt_job_{i}',

273

job_id=10000 + i,

274

deferrable=True, # Uses trigger - no worker blocking

275

dag=dag,

276

)

277

start >> job

278

deferrable_jobs.append(job)

279

280

# Proceed after all complete

281

finish = DummyOperator(task_id='finish', dag=dag)

282

deferrable_jobs >> finish

283

```

284

285

## Performance Considerations

286

287

### Trigger Efficiency

288

- **Resource Usage**: Triggers run in the Airflow triggerer process, not worker processes

289

- **Scale**: Hundreds of triggers can run concurrently with minimal resource impact

290

- **Persistence**: Triggers survive Airflow restarts and maintain state

291

292

### Optimal Configuration

293

- **Poll Interval**: Balance between responsiveness and API load

294

- Critical jobs: 30-60 seconds

295

- Regular jobs: 60-300 seconds

296

- Batch jobs: 300-600 seconds

297

- **Timeout**: Set based on expected job duration plus buffer

298

- Add 50-100% buffer to expected runtime

299

- Consider downstream task dependencies

300

301

### Best Practices

302

- Use deferrable mode for jobs longer than 5 minutes

303

- Set reasonable poll intervals to avoid API rate limits

304

- Monitor trigger performance in Airflow UI's "Triggers" view

305

- Use triggers for I/O-bound operations (API polling, file watching)

306

307

## Debugging Triggers

308

309

### Trigger Logs

310

```python

311

# Check trigger logs in Airflow UI:

312

# Admin > Logs > Triggerer Logs

313

# Or via CLI: airflow logs triggerer

314

```

315

316

### Common Issues

317

1. **Connection Errors**: Check dbt Cloud connection configuration

318

2. **Timeout Issues**: Verify timeout values and job duration expectations

319

3. **API Rate Limits**: Adjust poll intervals if hitting rate limits

320

4. **Trigger Restart**: Triggers resume after Airflow restarts automatically

321

322

## Types

323

324

```python { .api }

325

from typing import Any, AsyncIterator, Dict, Tuple

326

from airflow.triggers.base import BaseTrigger, TriggerEvent

327

328

class TriggerEvent:

329

"""Event emitted by triggers to notify task completion."""

330

payload: Dict[str, Any]

331

332

# The trigger inherits from BaseTrigger

333

# Standard trigger serialization and async patterns apply

334

```