CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-dbt

A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Comprehensive exception hierarchy for dbt integration error handling and debugging. The dagster-dbt package provides specific exception classes to help identify and handle different types of integration errors.

Capabilities

Base Exception Classes

DagsterDbtError

Base exception class for all dagster-dbt integration errors.

class DagsterDbtError(Exception):
    """
    Base exception class for dagster-dbt integration errors.
    
    All dagster-dbt specific exceptions inherit from this base class,
    allowing for broad exception handling when needed.
    """

CLI and Runtime Errors

DagsterDbtCliRuntimeError

Raised when dbt CLI commands fail during execution.

class DagsterDbtCliRuntimeError(DagsterDbtError):
    """
    Raised when dbt CLI command execution fails.
    
    This exception is raised when a dbt CLI command (run, test, build, etc.)
    fails during execution, providing details about the failure.
    
    Attributes:
    - logs: dbt CLI output logs
    - events: Parsed dbt events if available
    - returncode: CLI process return code
    """

Project and File Errors

DagsterDbtProjectNotFoundError

Raised when a dbt project directory or configuration cannot be found.

class DagsterDbtProjectNotFoundError(DagsterDbtError):
    """
    Raised when dbt project cannot be located or accessed.
    
    This exception is raised when the specified dbt project directory
    does not exist or does not contain a valid dbt_project.yml file.
    """

DagsterDbtManifestNotFoundError

Raised when the dbt manifest.json file cannot be found or loaded.

class DagsterDbtManifestNotFoundError(DagsterDbtError):
    """
    Raised when dbt manifest.json cannot be found or loaded.
    
    This exception indicates that the manifest.json file is missing,
    corrupted, or inaccessible. The manifest is required for most
    dagster-dbt integration functionality.
    """

DagsterDbtProjectYmlFileNotFoundError

Raised when the dbt_project.yml file cannot be found.

class DagsterDbtProjectYmlFileNotFoundError(DagsterDbtError):
    """
    Raised when dbt_project.yml file cannot be found.
    
    This exception is raised when the dbt_project.yml configuration
    file is missing from the specified project directory.
    """

DagsterDbtProfilesDirectoryNotFoundError

Raised when the dbt profiles directory cannot be found.

class DagsterDbtProfilesDirectoryNotFoundError(DagsterDbtError):
    """
    Raised when dbt profiles directory cannot be located.
    
    This exception occurs when the specified dbt profiles directory
    does not exist or is not accessible.
    """

Cloud Integration Errors

DagsterDbtCloudJobInvariantViolationError

Raised when dbt Cloud job constraints or invariants are violated.

class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError):
    """
    Raised when dbt Cloud job invariants are violated.
    
    This exception is raised when dbt Cloud job configuration or
    execution violates expected constraints or business rules.
    """

Usage Examples

Basic Error Handling

from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource
from dagster_dbt.errors import (
    DagsterDbtError,
    DagsterDbtCliRuntimeError,
    DagsterDbtManifestNotFoundError
)

@asset
def dbt_models_with_error_handling(
    context: AssetExecutionContext, 
    dbt: DbtCliResource
):
    """Run dbt models with comprehensive error handling."""
    try:
        # Execute dbt command
        dbt_run = dbt.cli(["run"], context=context)
        
        # Process events
        for event in dbt_run.stream():
            context.log.info(f"dbt: {event}")
        
        # Check for success
        if not dbt_run.is_successful:
            raise DagsterDbtCliRuntimeError("dbt run failed")
        
        return {"status": "success"}
        
    except DagsterDbtCliRuntimeError as e:
        context.log.error(f"dbt CLI execution failed: {e}")
        # Could retry, send alert, or fail gracefully
        raise
    
    except DagsterDbtManifestNotFoundError as e:
        context.log.error(f"dbt manifest not found: {e}")
        # Could trigger manifest generation
        context.log.info("Attempting to generate manifest...")
        dbt.cli(["parse"], context=context)
        # Retry the original operation
        return dbt_models_with_error_handling(context, dbt)
    
    except DagsterDbtError as e:
        context.log.error(f"General dbt integration error: {e}")
        raise
    
    except Exception as e:
        context.log.error(f"Unexpected error: {e}")
        raise

Project Validation with Error Handling

from dagster_dbt.errors import (
    DagsterDbtProjectNotFoundError,
    DagsterDbtProjectYmlFileNotFoundError,
    DagsterDbtProfilesDirectoryNotFoundError
)
import os

def validate_dbt_project_setup(project_dir: str, profiles_dir: str = None) -> dict:
    """Validate dbt project setup with detailed error handling."""
    validation_result = {
        "valid": True,
        "errors": [],
        "warnings": []
    }
    
    try:
        # Check project directory
        if not os.path.exists(project_dir):
            raise DagsterDbtProjectNotFoundError(
                f"Project directory not found: {project_dir}"
            )
        
        # Check dbt_project.yml
        project_yml_path = os.path.join(project_dir, "dbt_project.yml")
        if not os.path.exists(project_yml_path):
            raise DagsterDbtProjectYmlFileNotFoundError(
                f"dbt_project.yml not found in {project_dir}"
            )
        
        # Check profiles directory if specified
        if profiles_dir and not os.path.exists(profiles_dir):
            raise DagsterDbtProfilesDirectoryNotFoundError(
                f"Profiles directory not found: {profiles_dir}"
            )
        
        # Check for manifest (warning if missing)
        manifest_path = os.path.join(project_dir, "target", "manifest.json")
        if not os.path.exists(manifest_path):
            validation_result["warnings"].append(
                "Manifest not found - run 'dbt parse' to generate"
            )
        
        validation_result["message"] = "dbt project validation successful"
        
    except DagsterDbtProjectNotFoundError as e:
        validation_result["valid"] = False
        validation_result["errors"].append(f"Project error: {e}")
    
    except DagsterDbtProjectYmlFileNotFoundError as e:
        validation_result["valid"] = False
        validation_result["errors"].append(f"Configuration error: {e}")
    
    except DagsterDbtProfilesDirectoryNotFoundError as e:
        validation_result["valid"] = False
        validation_result["errors"].append(f"Profiles error: {e}")
    
    except Exception as e:
        validation_result["valid"] = False
        validation_result["errors"].append(f"Unexpected error: {e}")
    
    return validation_result

# Validate project setup
result = validate_dbt_project_setup(
    project_dir="./my_dbt_project",
    profiles_dir="~/.dbt"
)

if result["valid"]:
    print("✓ dbt project setup is valid")
    if result["warnings"]:
        for warning in result["warnings"]:
            print(f"⚠ Warning: {warning}")
else:
    print("✗ dbt project setup has errors:")
    for error in result["errors"]:
        print(f"  - {error}")

Cloud Integration Error Handling

from dagster_dbt.cloud_v2 import DbtCloudWorkspace
from dagster_dbt.errors import DagsterDbtCloudJobInvariantViolationError

def run_dbt_cloud_job_with_validation(
    workspace: DbtCloudWorkspace,
    job_id: int,
    max_retries: int = 3
) -> dict:
    """Run dbt Cloud job with error handling and retries."""
    for attempt in range(max_retries):
        try:
            # Validate job configuration
            job = workspace.get_job(job_id)
            if job.state != 1:  # Active state
                raise DagsterDbtCloudJobInvariantViolationError(
                    f"Job {job_id} is not in active state"
                )
            
            # Run the job
            run = workspace.run_job(
                job_id=job_id,
                cause=f"Triggered by Dagster (attempt {attempt + 1})"
            )
            
            # Wait for completion and check result
            while not run.is_complete:
                time.sleep(30)
                run = workspace.get_run(run.id)
            
            if run.is_success:
                return {
                    "status": "success",
                    "run_id": run.id,
                    "attempt": attempt + 1
                }
            else:
                raise DagsterDbtCliRuntimeError(
                    f"dbt Cloud job failed with status: {run.status_humanized}"
                )
        
        except DagsterDbtCloudJobInvariantViolationError as e:
            # Don't retry invariant violations
            return {
                "status": "error",
                "error": f"Job configuration error: {e}",
                "retryable": False
            }
        
        except DagsterDbtCliRuntimeError as e:
            if attempt == max_retries - 1:
                # Last attempt failed
                return {
                    "status": "error",
                    "error": f"Job execution failed after {max_retries} attempts: {e}",
                    "retryable": True
                }
            else:
                # Wait before retry
                print(f"Attempt {attempt + 1} failed, retrying in 60 seconds...")
                time.sleep(60)
                continue
        
        except Exception as e:
            return {
                "status": "error",
                "error": f"Unexpected error: {e}",
                "retryable": False
            }
    
    return {
        "status": "error",
        "error": "Max retries exceeded",
        "retryable": True
    }

Custom Error Handler

from dagster import failure_hook, HookContext
from dagster_dbt.errors import DagsterDbtError, DagsterDbtCliRuntimeError

@failure_hook
def dbt_error_handler(context: HookContext):
    """Custom failure hook for dbt-related errors."""
    if context.failure_event and context.failure_event.failure_data:
        error = context.failure_event.failure_data.error
        
        if isinstance(error, DagsterDbtCliRuntimeError):
            context.log.error("dbt CLI command failed")
            # Could send notification, create incident, etc.
            send_dbt_failure_alert({
                "job_name": context.job_name,
                "op_name": context.op.name,
                "error_type": "dbt_cli_runtime_error",
                "error_message": str(error)
            })
        
        elif isinstance(error, DagsterDbtError):
            context.log.error("dbt integration error occurred")
            send_dbt_failure_alert({
                "job_name": context.job_name,
                "op_name": context.op.name,
                "error_type": "dbt_integration_error",
                "error_message": str(error)
            })
        
        else:
            context.log.error("Non-dbt error in dbt-related operation")

def send_dbt_failure_alert(error_info: dict):
    """Send alert for dbt failures (placeholder implementation)."""
    print(f"ALERT: dbt failure in {error_info['job_name']}: {error_info['error_message']}")
    # Implement actual alerting logic (Slack, email, PagerDuty, etc.)

# Use the failure hook
from dagster import job, op

@op
def failing_dbt_op():
    from dagster_dbt.errors import DagsterDbtCliRuntimeError
    raise DagsterDbtCliRuntimeError("Simulated dbt CLI failure")

@job(hooks={dbt_error_handler})
def job_with_dbt_error_handling():
    failing_dbt_op()

Error Recovery Patterns

Automatic Manifest Generation

from dagster_dbt.errors import DagsterDbtManifestNotFoundError

def ensure_manifest_exists(dbt_resource: DbtCliResource, context) -> bool:
    """Ensure manifest exists, generating if necessary."""
    try:
        # Try to access manifest
        manifest_path = os.path.join(dbt_resource.project_dir, "target", "manifest.json")
        if not os.path.exists(manifest_path):
            raise DagsterDbtManifestNotFoundError("Manifest not found")
        return True
        
    except DagsterDbtManifestNotFoundError:
        context.log.info("Manifest not found, generating...")
        try:
            # Generate manifest with dbt parse
            parse_result = dbt_resource.cli(["parse"], context=context)
            for event in parse_result.stream():
                pass  # Process events
            
            if parse_result.is_successful:
                context.log.info("Manifest generated successfully")
                return True
            else:
                context.log.error("Failed to generate manifest")
                return False
                
        except Exception as e:
            context.log.error(f"Error generating manifest: {e}")
            return False

Type Definitions

# All error classes inherit from Exception and DagsterDbtError
from typing import Optional, List, Dict, Any

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-dbt

docs

asset-creation.md

cli-resource.md

component-system.md

dbt-cloud-legacy.md

dbt-cloud-v2.md

error-handling.md

freshness-checks.md

index.md

project-management.md

translation-system.md

utilities.md

tile.json