0
# Error Handling
1
2
Comprehensive exception hierarchy for dbt integration error handling and debugging. The dagster-dbt package provides specific exception classes to help identify and handle different types of integration errors.
3
4
## Capabilities
5
6
### Base Exception Classes
7
8
#### DagsterDbtError
9
10
Base exception class for all dagster-dbt integration errors.
11
12
```python { .api }
13
class DagsterDbtError(Exception):
14
"""
15
Base exception class for dagster-dbt integration errors.
16
17
All dagster-dbt specific exceptions inherit from this base class,
18
allowing for broad exception handling when needed.
19
"""
20
```
21
22
### CLI and Runtime Errors
23
24
#### DagsterDbtCliRuntimeError
25
26
Raised when dbt CLI commands fail during execution.
27
28
```python { .api }
29
class DagsterDbtCliRuntimeError(DagsterDbtError):
30
"""
31
Raised when dbt CLI command execution fails.
32
33
This exception is raised when a dbt CLI command (run, test, build, etc.)
34
fails during execution, providing details about the failure.
35
36
Attributes:
37
- logs: dbt CLI output logs
38
- events: Parsed dbt events if available
39
- returncode: CLI process return code
40
"""
41
```
42
43
### Project and File Errors
44
45
#### DagsterDbtProjectNotFoundError
46
47
Raised when a dbt project directory or configuration cannot be found.
48
49
```python { .api }
50
class DagsterDbtProjectNotFoundError(DagsterDbtError):
51
"""
52
Raised when dbt project cannot be located or accessed.
53
54
This exception is raised when the specified dbt project directory
55
does not exist or does not contain a valid dbt_project.yml file.
56
"""
57
```
58
59
#### DagsterDbtManifestNotFoundError
60
61
Raised when the dbt manifest.json file cannot be found or loaded.
62
63
```python { .api }
64
class DagsterDbtManifestNotFoundError(DagsterDbtError):
65
"""
66
Raised when dbt manifest.json cannot be found or loaded.
67
68
This exception indicates that the manifest.json file is missing,
69
corrupted, or inaccessible. The manifest is required for most
70
dagster-dbt integration functionality.
71
"""
72
```
73
74
#### DagsterDbtProjectYmlFileNotFoundError
75
76
Raised when the dbt_project.yml file cannot be found.
77
78
```python { .api }
79
class DagsterDbtProjectYmlFileNotFoundError(DagsterDbtError):
80
"""
81
Raised when dbt_project.yml file cannot be found.
82
83
This exception is raised when the dbt_project.yml configuration
84
file is missing from the specified project directory.
85
"""
86
```
87
88
#### DagsterDbtProfilesDirectoryNotFoundError
89
90
Raised when the dbt profiles directory cannot be found.
91
92
```python { .api }
93
class DagsterDbtProfilesDirectoryNotFoundError(DagsterDbtError):
94
"""
95
Raised when dbt profiles directory cannot be located.
96
97
This exception occurs when the specified dbt profiles directory
98
does not exist or is not accessible.
99
"""
100
```
101
102
### Cloud Integration Errors
103
104
#### DagsterDbtCloudJobInvariantViolationError
105
106
Raised when dbt Cloud job constraints or invariants are violated.
107
108
```python { .api }
109
class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError):
110
"""
111
Raised when dbt Cloud job invariants are violated.
112
113
This exception is raised when dbt Cloud job configuration or
114
execution violates expected constraints or business rules.
115
"""
116
```
117
118
## Usage Examples
119
120
### Basic Error Handling
121
122
```python
123
from dagster import asset, AssetExecutionContext
124
from dagster_dbt import DbtCliResource
125
from dagster_dbt.errors import (
126
DagsterDbtError,
127
DagsterDbtCliRuntimeError,
128
DagsterDbtManifestNotFoundError
129
)
130
131
@asset
132
def dbt_models_with_error_handling(
133
context: AssetExecutionContext,
134
dbt: DbtCliResource
135
):
136
"""Run dbt models with comprehensive error handling."""
137
try:
138
# Execute dbt command
139
dbt_run = dbt.cli(["run"], context=context)
140
141
# Process events
142
for event in dbt_run.stream():
143
context.log.info(f"dbt: {event}")
144
145
# Check for success
146
if not dbt_run.is_successful:
147
raise DagsterDbtCliRuntimeError("dbt run failed")
148
149
return {"status": "success"}
150
151
except DagsterDbtCliRuntimeError as e:
152
context.log.error(f"dbt CLI execution failed: {e}")
153
# Could retry, send alert, or fail gracefully
154
raise
155
156
except DagsterDbtManifestNotFoundError as e:
157
context.log.error(f"dbt manifest not found: {e}")
158
# Could trigger manifest generation
159
context.log.info("Attempting to generate manifest...")
160
dbt.cli(["parse"], context=context)
161
# Retry the original operation
162
return dbt_models_with_error_handling(context, dbt)
163
164
except DagsterDbtError as e:
165
context.log.error(f"General dbt integration error: {e}")
166
raise
167
168
except Exception as e:
169
context.log.error(f"Unexpected error: {e}")
170
raise
171
```
172
173
### Project Validation with Error Handling
174
175
```python
176
from dagster_dbt.errors import (
177
DagsterDbtProjectNotFoundError,
178
DagsterDbtProjectYmlFileNotFoundError,
179
DagsterDbtProfilesDirectoryNotFoundError
180
)
181
import os
182
183
def validate_dbt_project_setup(project_dir: str, profiles_dir: str = None) -> dict:
184
"""Validate dbt project setup with detailed error handling."""
185
validation_result = {
186
"valid": True,
187
"errors": [],
188
"warnings": []
189
}
190
191
try:
192
# Check project directory
193
if not os.path.exists(project_dir):
194
raise DagsterDbtProjectNotFoundError(
195
f"Project directory not found: {project_dir}"
196
)
197
198
# Check dbt_project.yml
199
project_yml_path = os.path.join(project_dir, "dbt_project.yml")
200
if not os.path.exists(project_yml_path):
201
raise DagsterDbtProjectYmlFileNotFoundError(
202
f"dbt_project.yml not found in {project_dir}"
203
)
204
205
# Check profiles directory if specified
206
if profiles_dir and not os.path.exists(profiles_dir):
207
raise DagsterDbtProfilesDirectoryNotFoundError(
208
f"Profiles directory not found: {profiles_dir}"
209
)
210
211
# Check for manifest (warning if missing)
212
manifest_path = os.path.join(project_dir, "target", "manifest.json")
213
if not os.path.exists(manifest_path):
214
validation_result["warnings"].append(
215
"Manifest not found - run 'dbt parse' to generate"
216
)
217
218
validation_result["message"] = "dbt project validation successful"
219
220
except DagsterDbtProjectNotFoundError as e:
221
validation_result["valid"] = False
222
validation_result["errors"].append(f"Project error: {e}")
223
224
except DagsterDbtProjectYmlFileNotFoundError as e:
225
validation_result["valid"] = False
226
validation_result["errors"].append(f"Configuration error: {e}")
227
228
except DagsterDbtProfilesDirectoryNotFoundError as e:
229
validation_result["valid"] = False
230
validation_result["errors"].append(f"Profiles error: {e}")
231
232
except Exception as e:
233
validation_result["valid"] = False
234
validation_result["errors"].append(f"Unexpected error: {e}")
235
236
return validation_result
237
238
# Validate project setup
239
result = validate_dbt_project_setup(
240
project_dir="./my_dbt_project",
241
profiles_dir="~/.dbt"
242
)
243
244
if result["valid"]:
245
print("✓ dbt project setup is valid")
246
if result["warnings"]:
247
for warning in result["warnings"]:
248
print(f"⚠ Warning: {warning}")
249
else:
250
print("✗ dbt project setup has errors:")
251
for error in result["errors"]:
252
print(f" - {error}")
253
```
254
255
### Cloud Integration Error Handling
256
257
```python
258
from dagster_dbt.cloud_v2 import DbtCloudWorkspace
259
from dagster_dbt.errors import DagsterDbtCloudJobInvariantViolationError
260
261
def run_dbt_cloud_job_with_validation(
262
workspace: DbtCloudWorkspace,
263
job_id: int,
264
max_retries: int = 3
265
) -> dict:
266
"""Run dbt Cloud job with error handling and retries."""
267
for attempt in range(max_retries):
268
try:
269
# Validate job configuration
270
job = workspace.get_job(job_id)
271
if job.state != 1: # Active state
272
raise DagsterDbtCloudJobInvariantViolationError(
273
f"Job {job_id} is not in active state"
274
)
275
276
# Run the job
277
run = workspace.run_job(
278
job_id=job_id,
279
cause=f"Triggered by Dagster (attempt {attempt + 1})"
280
)
281
282
# Wait for completion and check result
283
while not run.is_complete:
284
time.sleep(30)
285
run = workspace.get_run(run.id)
286
287
if run.is_success:
288
return {
289
"status": "success",
290
"run_id": run.id,
291
"attempt": attempt + 1
292
}
293
else:
294
raise DagsterDbtCliRuntimeError(
295
f"dbt Cloud job failed with status: {run.status_humanized}"
296
)
297
298
except DagsterDbtCloudJobInvariantViolationError as e:
299
# Don't retry invariant violations
300
return {
301
"status": "error",
302
"error": f"Job configuration error: {e}",
303
"retryable": False
304
}
305
306
except DagsterDbtCliRuntimeError as e:
307
if attempt == max_retries - 1:
308
# Last attempt failed
309
return {
310
"status": "error",
311
"error": f"Job execution failed after {max_retries} attempts: {e}",
312
"retryable": True
313
}
314
else:
315
# Wait before retry
316
print(f"Attempt {attempt + 1} failed, retrying in 60 seconds...")
317
time.sleep(60)
318
continue
319
320
except Exception as e:
321
return {
322
"status": "error",
323
"error": f"Unexpected error: {e}",
324
"retryable": False
325
}
326
327
return {
328
"status": "error",
329
"error": "Max retries exceeded",
330
"retryable": True
331
}
332
```
333
334
### Custom Error Handler
335
336
```python
337
from dagster import failure_hook, HookContext
338
from dagster_dbt.errors import DagsterDbtError, DagsterDbtCliRuntimeError
339
340
@failure_hook
341
def dbt_error_handler(context: HookContext):
342
"""Custom failure hook for dbt-related errors."""
343
if context.failure_event and context.failure_event.failure_data:
344
error = context.failure_event.failure_data.error
345
346
if isinstance(error, DagsterDbtCliRuntimeError):
347
context.log.error("dbt CLI command failed")
348
# Could send notification, create incident, etc.
349
send_dbt_failure_alert({
350
"job_name": context.job_name,
351
"op_name": context.op.name,
352
"error_type": "dbt_cli_runtime_error",
353
"error_message": str(error)
354
})
355
356
elif isinstance(error, DagsterDbtError):
357
context.log.error("dbt integration error occurred")
358
send_dbt_failure_alert({
359
"job_name": context.job_name,
360
"op_name": context.op.name,
361
"error_type": "dbt_integration_error",
362
"error_message": str(error)
363
})
364
365
else:
366
context.log.error("Non-dbt error in dbt-related operation")
367
368
def send_dbt_failure_alert(error_info: dict):
369
"""Send alert for dbt failures (placeholder implementation)."""
370
print(f"ALERT: dbt failure in {error_info['job_name']}: {error_info['error_message']}")
371
# Implement actual alerting logic (Slack, email, PagerDuty, etc.)
372
373
# Use the failure hook
374
from dagster import job, op
375
376
@op
377
def failing_dbt_op():
378
from dagster_dbt.errors import DagsterDbtCliRuntimeError
379
raise DagsterDbtCliRuntimeError("Simulated dbt CLI failure")
380
381
@job(hooks={dbt_error_handler})
382
def job_with_dbt_error_handling():
383
failing_dbt_op()
384
```
385
386
## Error Recovery Patterns
387
388
### Automatic Manifest Generation
389
390
```python
391
from dagster_dbt.errors import DagsterDbtManifestNotFoundError
392
393
def ensure_manifest_exists(dbt_resource: DbtCliResource, context) -> bool:
394
"""Ensure manifest exists, generating if necessary."""
395
try:
396
# Try to access manifest
397
manifest_path = os.path.join(dbt_resource.project_dir, "target", "manifest.json")
398
if not os.path.exists(manifest_path):
399
raise DagsterDbtManifestNotFoundError("Manifest not found")
400
return True
401
402
except DagsterDbtManifestNotFoundError:
403
context.log.info("Manifest not found, generating...")
404
try:
405
# Generate manifest with dbt parse
406
parse_result = dbt_resource.cli(["parse"], context=context)
407
for event in parse_result.stream():
408
pass # Process events
409
410
if parse_result.is_successful:
411
context.log.info("Manifest generated successfully")
412
return True
413
else:
414
context.log.error("Failed to generate manifest")
415
return False
416
417
except Exception as e:
418
context.log.error(f"Error generating manifest: {e}")
419
return False
420
```
421
422
## Type Definitions
423
424
```python { .api }
425
# All error classes inherit from Exception and DagsterDbtError
426
from typing import Optional, List, Dict, Any
427
```