A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
Comprehensive exception hierarchy for dbt integration error handling and debugging. The dagster-dbt package provides specific exception classes to help identify and handle different types of integration errors.
Base exception class for all dagster-dbt integration errors.
class DagsterDbtError(Exception):
"""
Base exception class for dagster-dbt integration errors.
All dagster-dbt specific exceptions inherit from this base class,
allowing for broad exception handling when needed.
"""Raised when dbt CLI commands fail during execution.
class DagsterDbtCliRuntimeError(DagsterDbtError):
"""
Raised when dbt CLI command execution fails.
This exception is raised when a dbt CLI command (run, test, build, etc.)
fails during execution, providing details about the failure.
Attributes:
- logs: dbt CLI output logs
- events: Parsed dbt events if available
- returncode: CLI process return code
"""Raised when a dbt project directory or configuration cannot be found.
class DagsterDbtProjectNotFoundError(DagsterDbtError):
"""
Raised when dbt project cannot be located or accessed.
This exception is raised when the specified dbt project directory
does not exist or does not contain a valid dbt_project.yml file.
"""Raised when the dbt manifest.json file cannot be found or loaded.
class DagsterDbtManifestNotFoundError(DagsterDbtError):
"""
Raised when dbt manifest.json cannot be found or loaded.
This exception indicates that the manifest.json file is missing,
corrupted, or inaccessible. The manifest is required for most
dagster-dbt integration functionality.
"""Raised when the dbt_project.yml file cannot be found.
class DagsterDbtProjectYmlFileNotFoundError(DagsterDbtError):
"""
Raised when dbt_project.yml file cannot be found.
This exception is raised when the dbt_project.yml configuration
file is missing from the specified project directory.
"""Raised when the dbt profiles directory cannot be found.
class DagsterDbtProfilesDirectoryNotFoundError(DagsterDbtError):
"""
Raised when dbt profiles directory cannot be located.
This exception occurs when the specified dbt profiles directory
does not exist or is not accessible.
"""Raised when dbt Cloud job constraints or invariants are violated.
class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError):
"""
Raised when dbt Cloud job invariants are violated.
This exception is raised when dbt Cloud job configuration or
execution violates expected constraints or business rules.
"""from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource
from dagster_dbt.errors import (
DagsterDbtError,
DagsterDbtCliRuntimeError,
DagsterDbtManifestNotFoundError
)
@asset
def dbt_models_with_error_handling(
context: AssetExecutionContext,
dbt: DbtCliResource
):
"""Run dbt models with comprehensive error handling."""
try:
# Execute dbt command
dbt_run = dbt.cli(["run"], context=context)
# Process events
for event in dbt_run.stream():
context.log.info(f"dbt: {event}")
# Check for success
if not dbt_run.is_successful:
raise DagsterDbtCliRuntimeError("dbt run failed")
return {"status": "success"}
except DagsterDbtCliRuntimeError as e:
context.log.error(f"dbt CLI execution failed: {e}")
# Could retry, send alert, or fail gracefully
raise
except DagsterDbtManifestNotFoundError as e:
context.log.error(f"dbt manifest not found: {e}")
# Could trigger manifest generation
context.log.info("Attempting to generate manifest...")
dbt.cli(["parse"], context=context)
# Retry the original operation
return dbt_models_with_error_handling(context, dbt)
except DagsterDbtError as e:
context.log.error(f"General dbt integration error: {e}")
raise
except Exception as e:
context.log.error(f"Unexpected error: {e}")
raisefrom dagster_dbt.errors import (
DagsterDbtProjectNotFoundError,
DagsterDbtProjectYmlFileNotFoundError,
DagsterDbtProfilesDirectoryNotFoundError
)
import os
def validate_dbt_project_setup(project_dir: str, profiles_dir: str = None) -> dict:
"""Validate dbt project setup with detailed error handling."""
validation_result = {
"valid": True,
"errors": [],
"warnings": []
}
try:
# Check project directory
if not os.path.exists(project_dir):
raise DagsterDbtProjectNotFoundError(
f"Project directory not found: {project_dir}"
)
# Check dbt_project.yml
project_yml_path = os.path.join(project_dir, "dbt_project.yml")
if not os.path.exists(project_yml_path):
raise DagsterDbtProjectYmlFileNotFoundError(
f"dbt_project.yml not found in {project_dir}"
)
# Check profiles directory if specified
if profiles_dir and not os.path.exists(profiles_dir):
raise DagsterDbtProfilesDirectoryNotFoundError(
f"Profiles directory not found: {profiles_dir}"
)
# Check for manifest (warning if missing)
manifest_path = os.path.join(project_dir, "target", "manifest.json")
if not os.path.exists(manifest_path):
validation_result["warnings"].append(
"Manifest not found - run 'dbt parse' to generate"
)
validation_result["message"] = "dbt project validation successful"
except DagsterDbtProjectNotFoundError as e:
validation_result["valid"] = False
validation_result["errors"].append(f"Project error: {e}")
except DagsterDbtProjectYmlFileNotFoundError as e:
validation_result["valid"] = False
validation_result["errors"].append(f"Configuration error: {e}")
except DagsterDbtProfilesDirectoryNotFoundError as e:
validation_result["valid"] = False
validation_result["errors"].append(f"Profiles error: {e}")
except Exception as e:
validation_result["valid"] = False
validation_result["errors"].append(f"Unexpected error: {e}")
return validation_result
# Validate project setup
result = validate_dbt_project_setup(
project_dir="./my_dbt_project",
profiles_dir="~/.dbt"
)
if result["valid"]:
print("✓ dbt project setup is valid")
if result["warnings"]:
for warning in result["warnings"]:
print(f"⚠ Warning: {warning}")
else:
print("✗ dbt project setup has errors:")
for error in result["errors"]:
print(f" - {error}")from dagster_dbt.cloud_v2 import DbtCloudWorkspace
from dagster_dbt.errors import DagsterDbtCloudJobInvariantViolationError
def run_dbt_cloud_job_with_validation(
workspace: DbtCloudWorkspace,
job_id: int,
max_retries: int = 3
) -> dict:
"""Run dbt Cloud job with error handling and retries."""
for attempt in range(max_retries):
try:
# Validate job configuration
job = workspace.get_job(job_id)
if job.state != 1: # Active state
raise DagsterDbtCloudJobInvariantViolationError(
f"Job {job_id} is not in active state"
)
# Run the job
run = workspace.run_job(
job_id=job_id,
cause=f"Triggered by Dagster (attempt {attempt + 1})"
)
# Wait for completion and check result
while not run.is_complete:
time.sleep(30)
run = workspace.get_run(run.id)
if run.is_success:
return {
"status": "success",
"run_id": run.id,
"attempt": attempt + 1
}
else:
raise DagsterDbtCliRuntimeError(
f"dbt Cloud job failed with status: {run.status_humanized}"
)
except DagsterDbtCloudJobInvariantViolationError as e:
# Don't retry invariant violations
return {
"status": "error",
"error": f"Job configuration error: {e}",
"retryable": False
}
except DagsterDbtCliRuntimeError as e:
if attempt == max_retries - 1:
# Last attempt failed
return {
"status": "error",
"error": f"Job execution failed after {max_retries} attempts: {e}",
"retryable": True
}
else:
# Wait before retry
print(f"Attempt {attempt + 1} failed, retrying in 60 seconds...")
time.sleep(60)
continue
except Exception as e:
return {
"status": "error",
"error": f"Unexpected error: {e}",
"retryable": False
}
return {
"status": "error",
"error": "Max retries exceeded",
"retryable": True
}from dagster import failure_hook, HookContext
from dagster_dbt.errors import DagsterDbtError, DagsterDbtCliRuntimeError
@failure_hook
def dbt_error_handler(context: HookContext):
"""Custom failure hook for dbt-related errors."""
if context.failure_event and context.failure_event.failure_data:
error = context.failure_event.failure_data.error
if isinstance(error, DagsterDbtCliRuntimeError):
context.log.error("dbt CLI command failed")
# Could send notification, create incident, etc.
send_dbt_failure_alert({
"job_name": context.job_name,
"op_name": context.op.name,
"error_type": "dbt_cli_runtime_error",
"error_message": str(error)
})
elif isinstance(error, DagsterDbtError):
context.log.error("dbt integration error occurred")
send_dbt_failure_alert({
"job_name": context.job_name,
"op_name": context.op.name,
"error_type": "dbt_integration_error",
"error_message": str(error)
})
else:
context.log.error("Non-dbt error in dbt-related operation")
def send_dbt_failure_alert(error_info: dict):
"""Send alert for dbt failures (placeholder implementation)."""
print(f"ALERT: dbt failure in {error_info['job_name']}: {error_info['error_message']}")
# Implement actual alerting logic (Slack, email, PagerDuty, etc.)
# Use the failure hook
from dagster import job, op
@op
def failing_dbt_op():
from dagster_dbt.errors import DagsterDbtCliRuntimeError
raise DagsterDbtCliRuntimeError("Simulated dbt CLI failure")
@job(hooks={dbt_error_handler})
def job_with_dbt_error_handling():
failing_dbt_op()from dagster_dbt.errors import DagsterDbtManifestNotFoundError
def ensure_manifest_exists(dbt_resource: DbtCliResource, context) -> bool:
"""Ensure manifest exists, generating if necessary."""
try:
# Try to access manifest
manifest_path = os.path.join(dbt_resource.project_dir, "target", "manifest.json")
if not os.path.exists(manifest_path):
raise DagsterDbtManifestNotFoundError("Manifest not found")
return True
except DagsterDbtManifestNotFoundError:
context.log.info("Manifest not found, generating...")
try:
# Generate manifest with dbt parse
parse_result = dbt_resource.cli(["parse"], context=context)
for event in parse_result.stream():
pass # Process events
if parse_result.is_successful:
context.log.info("Manifest generated successfully")
return True
else:
context.log.error("Failed to generate manifest")
return False
except Exception as e:
context.log.error(f"Error generating manifest: {e}")
return False# All error classes inherit from Exception and DagsterDbtError
from typing import Optional, List, Dict, AnyInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt