CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

This document covers Dagster's comprehensive error handling system, including the error hierarchy, failure events, retry policies, and best practices for robust pipeline development. Dagster provides structured error handling with rich failure information and configurable recovery strategies.

Error Hierarchy

Dagster provides a structured hierarchy of exceptions for different failure scenarios, enabling precise error handling and debugging.

Base Errors

DagsterError { .api }

Module: dagster_shared.error
Type: Exception base class

Base class for all Dagster-specific errors with structured error information.

from dagster import DagsterError, op, job, asset
import pandas as pd

class CustomDataError(DagsterError):
    """Custom error for data quality issues."""
    
    def __init__(self, message: str, data_info: dict = None):
        super().__init__(message)
        self.data_info = data_info or {}

@op
def validate_data_quality(df: pd.DataFrame) -> pd.DataFrame:
    """Op that validates data quality and raises custom errors."""
    
    # Check for null values
    null_count = df.isnull().sum().sum()
    if null_count > 0:
        raise CustomDataError(
            f"Data quality check failed: {null_count} null values found",
            data_info={
                "null_count": null_count,
                "total_records": len(df),
                "null_percentage": (null_count / (len(df) * len(df.columns))) * 100,
                "affected_columns": df.columns[df.isnull().any()].tolist()
            }
        )
    
    # Check for duplicates
    duplicate_count = df.duplicated().sum()
    if duplicate_count > 0:
        raise CustomDataError(
            f"Data quality check failed: {duplicate_count} duplicate records found",
            data_info={
                "duplicate_count": duplicate_count,
                "total_records": len(df),
                "duplicate_percentage": (duplicate_count / len(df)) * 100
            }
        )
    
    return df

@asset
def validated_customer_data(raw_customer_data: pd.DataFrame) -> pd.DataFrame:
    """Asset with comprehensive error handling."""
    
    try:
        # Validate data quality
        validated_data = validate_data_quality(raw_customer_data)
        
        # Additional business rule validation
        if len(validated_data) == 0:
            raise CustomDataError(
                "No valid customer records found after validation",
                data_info={"original_count": len(raw_customer_data)}
            )
        
        # Check required columns
        required_columns = ["customer_id", "email", "created_at"]
        missing_columns = set(required_columns) - set(validated_data.columns)
        if missing_columns:
            raise CustomDataError(
                f"Required columns missing: {missing_columns}",
                data_info={
                    "missing_columns": list(missing_columns),
                    "available_columns": list(validated_data.columns)
                }
            )
        
        return validated_data
        
    except CustomDataError as e:
        # Log detailed error information
        context.log.error(f"Data validation failed: {str(e)}")
        context.log.error(f"Error details: {e.data_info}")
        
        # Re-raise to fail the asset materialization
        raise
        
    except Exception as e:
        # Handle unexpected errors
        raise DagsterError(
            f"Unexpected error during customer data validation: {str(e)}"
        ) from e

DagsterInvariantViolationError { .api }

Module: dagster._core.errors
Type: DagsterError subclass

Error for invariant violations and internal consistency checks.

from dagster import DagsterInvariantViolationError, op

@op
def process_configuration(context) -> dict:
    """Op that validates configuration invariants."""
    
    config = context.op_config
    
    # Validate configuration invariants
    if "batch_size" in config and "max_memory" in config:
        batch_size = config["batch_size"]
        max_memory = config["max_memory"]
        
        # Check invariant: batch_size * record_size should not exceed max_memory
        estimated_memory = batch_size * 1024  # Assume 1KB per record
        
        if estimated_memory > max_memory:
            raise DagsterInvariantViolationError(
                f"Configuration invariant violated: "
                f"batch_size ({batch_size}) * record_size (1KB) = {estimated_memory}KB "
                f"exceeds max_memory ({max_memory}KB)"
            )
    
    # Validate required configuration relationships
    if config.get("enable_caching") and not config.get("cache_directory"):
        raise DagsterInvariantViolationError(
            "Configuration invariant violated: "
            "cache_directory must be specified when enable_caching is True"
        )
    
    return config

Definition Errors

DagsterInvalidDefinitionError { .api }

Module: dagster._core.errors
Type: DagsterError subclass

Error for invalid asset, op, or job definitions.

from dagster import DagsterInvalidDefinitionError, asset, In, Out

def validate_asset_definition(asset_fn):
    """Decorator that validates asset definition at definition time."""
    
    # Check function signature
    import inspect
    signature = inspect.signature(asset_fn)
    
    # Validate return annotation exists
    if signature.return_annotation == inspect.Signature.empty:
        raise DagsterInvalidDefinitionError(
            f"Asset function {asset_fn.__name__} must have a return type annotation"
        )
    
    # Validate docstring exists
    if not asset_fn.__doc__:
        raise DagsterInvalidDefinitionError(
            f"Asset function {asset_fn.__name__} must have a docstring"
        )
    
    return asset_fn

@validate_asset_definition
@asset
def well_defined_asset() -> pd.DataFrame:
    """This asset has proper definition validation."""
    return pd.DataFrame({"data": [1, 2, 3]})

# This would raise DagsterInvalidDefinitionError:
# @validate_asset_definition
# @asset
# def poorly_defined_asset():  # Missing return annotation and docstring
#     return pd.DataFrame({"data": [1, 2, 3]})

DagsterInvalidInvocationError { .api }

Module: dagster._core.errors
Type: DagsterError subclass

Error for invalid invocation of Dagster definitions.

from dagster import DagsterInvalidInvocationError, op, job

@op(
    ins={"input_data": In(dagster_type=pd.DataFrame)},
    out=Out(dagster_type=pd.DataFrame)
)
def strict_data_processing(input_data: pd.DataFrame) -> pd.DataFrame:
    """Op with strict type checking."""
    
    # Validate input type at runtime
    if not isinstance(input_data, pd.DataFrame):
        raise DagsterInvalidInvocationError(
            f"Expected pandas DataFrame, got {type(input_data)}. "
            f"This op requires DataFrame input for processing."
        )
    
    # Validate DataFrame structure
    required_columns = ["id", "value", "timestamp"]
    missing_columns = set(required_columns) - set(input_data.columns)
    
    if missing_columns:
        raise DagsterInvalidInvocationError(
            f"Input DataFrame missing required columns: {missing_columns}. "
            f"Available columns: {list(input_data.columns)}"
        )
    
    return input_data.dropna()

@op
def generate_invalid_data() -> dict:  # Returns dict, not DataFrame
    """Op that produces invalid output for downstream consumption."""
    return {"data": "not a dataframe"}

@job
def invalid_invocation_job():
    """Job that demonstrates invalid invocation."""
    # This will raise DagsterInvalidInvocationError when executed
    strict_data_processing(generate_invalid_data())

Configuration Errors

DagsterInvalidConfigError { .api }

Module: dagster._core.errors
Type: DagsterError subclass

Error for invalid configuration values.

from dagster import DagsterInvalidConfigError, op, Field, Int

@op(config_schema={
    "batch_size": Field(Int, description="Batch size for processing"),
    "timeout_seconds": Field(Int, description="Timeout in seconds"),
    "retry_attempts": Field(Int, description="Number of retry attempts")
})
def configurable_processing_op(context):
    """Op with comprehensive config validation."""
    
    config = context.op_config
    
    # Validate configuration values
    batch_size = config["batch_size"]
    if batch_size <= 0:
        raise DagsterInvalidConfigError(
            f"batch_size must be positive, got {batch_size}"
        )
    
    if batch_size > 10000:
        raise DagsterInvalidConfigError(
            f"batch_size too large ({batch_size}), maximum allowed is 10000"
        )
    
    timeout_seconds = config["timeout_seconds"]
    if timeout_seconds < 1 or timeout_seconds > 3600:
        raise DagsterInvalidConfigError(
            f"timeout_seconds must be between 1 and 3600, got {timeout_seconds}"
        )
    
    retry_attempts = config["retry_attempts"]
    if retry_attempts < 0 or retry_attempts > 5:
        raise DagsterInvalidConfigError(
            f"retry_attempts must be between 0 and 5, got {retry_attempts}"
        )
    
    # Cross-field validation
    if timeout_seconds < batch_size * 0.1:
        raise DagsterInvalidConfigError(
            f"timeout_seconds ({timeout_seconds}) too low for batch_size ({batch_size}). "
            f"Minimum recommended: {int(batch_size * 0.1)} seconds"
        )
    
    context.log.info(f"Processing with batch_size={batch_size}, timeout={timeout_seconds}s")
    return f"Processed with valid config"

Execution Errors

DagsterExecutionStepExecutionError { .api }

Module: dagster._core.errors
Type: DagsterError subclass

Error during step execution with detailed execution context.

from dagster import DagsterExecutionStepExecutionError, op, asset
import requests
from requests.exceptions import RequestException

@op
def fetch_external_data(context) -> dict:
    """Op that fetches data from external API with error handling."""
    
    api_url = "https://api.example.com/data"
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            context.log.info(f"Fetching data from {api_url} (attempt {attempt + 1}/{max_retries})")
            
            response = requests.get(api_url, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            context.log.info(f"Successfully fetched {len(data)} records")
            
            return data
            
        except RequestException as e:
            context.log.warning(f"Attempt {attempt + 1} failed: {str(e)}")
            
            if attempt == max_retries - 1:  # Last attempt
                raise DagsterExecutionStepExecutionError(
                    f"Failed to fetch data from {api_url} after {max_retries} attempts. "
                    f"Last error: {str(e)}",
                    step_context=context.step_context,
                    user_exception=e
                )
            
            # Wait before retry
            import time
            time.sleep(2 ** attempt)  # Exponential backoff
    
    # This shouldn't be reached, but just in case
    raise DagsterExecutionStepExecutionError("Unexpected error in fetch_external_data")

@asset
def processed_api_data(context, external_data: dict) -> pd.DataFrame:
    """Asset that processes external data with error handling."""
    
    try:
        if not external_data or "records" not in external_data:
            raise ValueError("Invalid data structure from API")
        
        records = external_data["records"]
        if not isinstance(records, list):
            raise ValueError(f"Expected list of records, got {type(records)}")
        
        df = pd.DataFrame(records)
        
        if len(df) == 0:
            raise ValueError("No records found in API response")
        
        # Validate required fields
        required_fields = ["id", "timestamp", "value"]
        missing_fields = set(required_fields) - set(df.columns)
        
        if missing_fields:
            raise ValueError(f"Missing required fields: {missing_fields}")
        
        # Process data
        processed_df = df.dropna().reset_index(drop=True)
        
        context.log.info(f"Processed {len(processed_df)} valid records from {len(df)} total")
        
        return processed_df
        
    except Exception as e:
        # Wrap in execution error with context
        raise DagsterExecutionStepExecutionError(
            f"Failed to process API data: {str(e)}",
            user_exception=e
        ) from e

Failure Events

Dagster provides structured failure events that capture detailed information about errors for debugging and monitoring.

Failure { .api }

Module: dagster._core.definitions.events
Type: Event class

Structured failure event with metadata and debugging information.

from dagster import Failure, op, asset, MetadataValue
import traceback

@op
def risky_operation(context) -> str:
    """Op that demonstrates structured failure reporting."""
    
    try:
        # Simulate risky operation
        data = load_critical_data()
        
        if not validate_data_integrity(data):
            # Create structured failure with metadata
            raise Failure(
                description="Data integrity validation failed",
                metadata={
                    "validation_errors": MetadataValue.json({
                        "missing_records": 15,
                        "invalid_checksums": 3,
                        "schema_violations": 7
                    }),
                    "data_source": MetadataValue.text("critical_database"),
                    "validation_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
                    "expected_record_count": MetadataValue.int(1000),
                    "actual_record_count": MetadataValue.int(975),
                    "severity": MetadataValue.text("HIGH")
                }
            )
        
        return process_data(data)
        
    except DatabaseConnectionError as e:
        # Database-specific failure
        raise Failure(
            description=f"Database connection failed: {str(e)}",
            metadata={
                "connection_string": MetadataValue.text("postgresql://..."),
                "error_code": MetadataValue.text(getattr(e, 'code', 'UNKNOWN')),
                "retry_recommended": MetadataValue.bool(True),
                "estimated_downtime": MetadataValue.text("5-10 minutes"),
                "contact": MetadataValue.text("database-team@company.com")
            }
        )
    
    except Exception as e:
        # Generic failure with full context
        raise Failure(
            description=f"Unexpected error in risky operation: {str(e)}",
            metadata={
                "error_type": MetadataValue.text(type(e).__name__),
                "error_message": MetadataValue.text(str(e)),
                "stack_trace": MetadataValue.text(traceback.format_exc()),
                "operation_context": MetadataValue.json({
                    "step": "data_processing",
                    "input_size": "unknown", 
                    "memory_usage": "high"
                }),
                "debug_info": MetadataValue.url("https://wiki.company.com/debug/risky-operation")
            }
        )

@asset
def resilient_asset(context) -> pd.DataFrame:
    """Asset with comprehensive failure handling and recovery."""
    
    recovery_strategies = ["primary", "secondary", "fallback"]
    
    for strategy in recovery_strategies:
        try:
            context.log.info(f"Attempting data load with {strategy} strategy")
            
            if strategy == "primary":
                data = load_primary_data_source()
            elif strategy == "secondary":
                data = load_secondary_data_source() 
            elif strategy == "fallback":
                data = load_fallback_data_source()
            
            # Validate loaded data
            if len(data) == 0:
                raise ValueError(f"No data loaded from {strategy} source")
            
            context.log.info(f"Successfully loaded {len(data)} records using {strategy} strategy")
            
            return data
            
        except Exception as e:
            context.log.warning(f"Strategy {strategy} failed: {str(e)}")
            
            if strategy == "fallback":  # Last resort failed
                raise Failure(
                    description="All data loading strategies failed",
                    metadata={
                        "failed_strategies": MetadataValue.json(recovery_strategies),
                        "primary_error": MetadataValue.text("Connection timeout"),
                        "secondary_error": MetadataValue.text("Authentication failed"),
                        "fallback_error": MetadataValue.text(str(e)),
                        "recommended_action": MetadataValue.text(
                            "Check data source availability and credentials"
                        ),
                        "escalation_required": MetadataValue.bool(True),
                        "incident_severity": MetadataValue.text("P1")
                    }
                )
    
    # Should never reach here
    raise Failure("Unexpected code path in resilient_asset")

Error Context and Debugging

@op
def debugging_friendly_op(context) -> dict:
    """Op with comprehensive debugging information in failures."""
    
    debug_context = {
        "op_name": context.op_def.name,
        "run_id": context.run_id,
        "step_key": context.step_context.step.key,
        "execution_time": pd.Timestamp.now().isoformat(),
        "resource_keys": list(context.resources._resource_defs.keys())
    }
    
    try:
        # Simulate complex operation with multiple failure points
        step1_result = perform_step1()
        debug_context["step1_completed"] = True
        
        step2_result = perform_step2(step1_result)
        debug_context["step2_completed"] = True
        
        final_result = perform_step3(step2_result)
        debug_context["step3_completed"] = True
        
        return final_result
        
    except Exception as e:
        # Add failure location to debug context
        debug_context["failure_location"] = get_failure_location_from_traceback()
        debug_context["error_type"] = type(e).__name__
        debug_context["error_message"] = str(e)
        
        # Determine failure category for better handling
        if isinstance(e, ConnectionError):
            failure_category = "connectivity"
            recovery_time = "1-5 minutes"
        elif isinstance(e, PermissionError):
            failure_category = "authentication"
            recovery_time = "immediate with credentials update"
        elif isinstance(e, ValueError):
            failure_category = "data_validation"
            recovery_time = "requires data fix"
        else:
            failure_category = "unknown"
            recovery_time = "unknown"
        
        raise Failure(
            description=f"Operation failed at {debug_context.get('failure_location', 'unknown location')}: {str(e)}",
            metadata={
                "debug_context": MetadataValue.json(debug_context),
                "failure_category": MetadataValue.text(failure_category),
                "estimated_recovery_time": MetadataValue.text(recovery_time),
                "troubleshooting_guide": MetadataValue.url(
                    f"https://docs.company.com/troubleshooting/{failure_category}"
                ),
                "runbook": MetadataValue.url(
                    f"https://runbook.company.com/{context.op_def.name}"
                )
            }
        )

Retry Policies

Dagster provides configurable retry policies for automatic recovery from transient failures.

RetryPolicy { .api }

Module: dagster._core.definitions.policy
Type: Class

Configurable retry policy with backoff and jitter strategies.

from dagster import RetryPolicy, Backoff, Jitter, op, job, asset
import random
import time

# Basic retry policy
basic_retry = RetryPolicy(
    max_retries=3,
    delay=1.0  # 1 second delay
)

@op(retry_policy=basic_retry)
def simple_retry_op(context) -> str:
    """Op with basic retry policy."""
    
    # Simulate random failures
    if random.random() < 0.7:  # 70% chance of failure
        raise Exception("Random failure for retry testing")
    
    return "Success after retries"

# Exponential backoff retry policy
exponential_retry = RetryPolicy(
    max_retries=5,
    delay=1.0,
    backoff=Backoff.EXPONENTIAL,  # 1s, 2s, 4s, 8s, 16s
    jitter=Jitter.PLUS_MINUS  # Add randomness to avoid thundering herd
)

@op(retry_policy=exponential_retry)
def external_api_call(context) -> dict:
    """Op that calls external API with exponential backoff."""
    
    context.log.info("Attempting API call...")
    
    try:
        # Simulate API call that might fail due to rate limiting
        response = requests.get(
            "https://api.example.com/data",
            headers={"Authorization": "Bearer token"},
            timeout=30
        )
        
        if response.status_code == 429:  # Rate limited
            context.log.warning("Rate limited, will retry with backoff")
            raise Exception("Rate limit exceeded")
        
        response.raise_for_status()
        return response.json()
        
    except requests.RequestException as e:
        context.log.warning(f"API call failed: {str(e)}")
        raise

# Linear backoff with custom delay calculation
linear_retry = RetryPolicy(
    max_retries=4,
    delay=2.0,
    backoff=Backoff.LINEAR,  # 2s, 4s, 6s, 8s
    jitter=Jitter.FULL  # Randomize delay completely
)

@op(retry_policy=linear_retry)
def database_operation(context) -> pd.DataFrame:
    """Database operation with linear backoff retry."""
    
    attempt_num = getattr(context, '_retry_attempt', 0)
    context.log.info(f"Database operation attempt {attempt_num + 1}")
    
    try:
        # Simulate database operation
        connection = get_database_connection()
        
        query = """
        SELECT id, name, value, created_at
        FROM important_table
        WHERE created_at >= NOW() - INTERVAL 1 DAY
        """
        
        df = pd.read_sql(query, connection)
        
        context.log.info(f"Successfully loaded {len(df)} records")
        return df
        
    except DatabaseError as e:
        context.log.warning(f"Database error: {str(e)}")
        
        # Check if error is retryable
        if "connection" in str(e).lower() or "timeout" in str(e).lower():
            context.log.info("Retryable database error, will attempt retry")
            raise  # Let retry policy handle it
        else:
            context.log.error("Non-retryable database error, failing immediately")
            raise Failure(
                description=f"Non-retryable database error: {str(e)}",
                metadata={
                    "error_type": "database_error",
                    "retryable": False,
                    "error_code": getattr(e, 'code', 'UNKNOWN')
                }
            )

# Custom retry policy with conditional logic
class ConditionalRetryPolicy(RetryPolicy):
    """Custom retry policy with condition-based retry logic."""
    
    def __init__(self, max_retries: int = 3, delay: float = 1.0):
        super().__init__(max_retries=max_retries, delay=delay)
    
    def should_retry(self, context, exception: Exception) -> bool:
        """Determine if operation should be retried based on exception type."""
        
        # Always retry connection errors
        if isinstance(exception, (ConnectionError, TimeoutError)):
            return True
        
        # Retry rate limit errors
        if "rate limit" in str(exception).lower():
            return True
        
        # Don't retry authentication errors
        if isinstance(exception, (PermissionError, AuthenticationError)):
            return False
        
        # Don't retry validation errors  
        if isinstance(exception, ValueError):
            return False
        
        # Default behavior for other exceptions
        return True

conditional_retry = ConditionalRetryPolicy(max_retries=3, delay=2.0)

@op(retry_policy=conditional_retry)
def smart_retry_op(context) -> str:
    """Op with intelligent retry logic."""
    
    # This will retry connection errors but not validation errors
    operation_type = random.choice(["connection_error", "validation_error", "success"])
    
    if operation_type == "connection_error":
        raise ConnectionError("Network connection failed")
    elif operation_type == "validation_error":
        raise ValueError("Invalid input data")
    
    return "Operation succeeded"

RetryRequested { .api }

Module: dagster._core.definitions.events
Type: Exception class

Explicit retry request with custom delay and metadata.

from dagster import RetryRequested, op, MetadataValue

@op
def explicit_retry_op(context) -> str:
    """Op that explicitly requests retries with custom logic."""
    
    # Track retry attempts in op context
    attempt_count = getattr(context, '_attempt_count', 0)
    context._attempt_count = attempt_count + 1
    
    context.log.info(f"Attempt {attempt_count + 1}")
    
    # Simulate different failure scenarios
    if attempt_count == 0:
        # First attempt: request retry immediately
        raise RetryRequested(
            max_retries=3,
            seconds_to_wait=0,  # Immediate retry
            metadata={
                "retry_reason": MetadataValue.text("Initial setup required"),
                "attempt_number": MetadataValue.int(attempt_count + 1)
            }
        )
    
    elif attempt_count == 1:
        # Second attempt: wait 5 seconds before retry
        raise RetryRequested(
            max_retries=3,
            seconds_to_wait=5,
            metadata={
                "retry_reason": MetadataValue.text("Waiting for external service"),
                "wait_time_seconds": MetadataValue.int(5),
                "attempt_number": MetadataValue.int(attempt_count + 1)
            }
        )
    
    elif attempt_count == 2:
        # Third attempt: exponential backoff
        wait_time = 2 ** attempt_count  # 4 seconds
        raise RetryRequested(
            max_retries=3,
            seconds_to_wait=wait_time,
            metadata={
                "retry_reason": MetadataValue.text("Exponential backoff"),
                "wait_time_seconds": MetadataValue.int(wait_time),
                "attempt_number": MetadataValue.int(attempt_count + 1)
            }
        )
    
    # Fourth attempt: succeed
    return f"Success on attempt {attempt_count + 1}"

@asset
def resilient_data_processing(context) -> pd.DataFrame:
    """Asset with sophisticated retry logic for data processing."""
    
    max_attempts = 5
    base_delay = 1.0
    
    for attempt in range(max_attempts):
        try:
            context.log.info(f"Processing attempt {attempt + 1}/{max_attempts}")
            
            # Load data with potential failures
            data = load_data_with_retries()
            
            # Validate data quality
            if len(data) == 0:
                if attempt < max_attempts - 1:
                    wait_time = base_delay * (2 ** attempt)  # Exponential backoff
                    
                    raise RetryRequested(
                        max_retries=max_attempts - 1,
                        seconds_to_wait=wait_time,
                        metadata={
                            "retry_reason": MetadataValue.text("Empty dataset, waiting for data arrival"),
                            "wait_time": MetadataValue.float(wait_time),
                            "attempt": MetadataValue.int(attempt + 1),
                            "data_source_status": MetadataValue.text("checking")
                        }
                    )
                else:
                    raise Failure(
                        description="No data available after all retry attempts",
                        metadata={
                            "total_attempts": MetadataValue.int(max_attempts),
                            "final_status": MetadataValue.text("no_data_available")
                        }
                    )
            
            # Check data quality
            quality_score = calculate_data_quality(data)
            
            if quality_score < 0.8:  # 80% quality threshold
                if attempt < max_attempts - 1:
                    wait_time = base_delay * (attempt + 1)  # Linear backoff for quality issues
                    
                    raise RetryRequested(
                        max_retries=max_attempts - 1, 
                        seconds_to_wait=wait_time,
                        metadata={
                            "retry_reason": MetadataValue.text("Poor data quality, waiting for data refresh"),
                            "quality_score": MetadataValue.float(quality_score),
                            "quality_threshold": MetadataValue.float(0.8),
                            "wait_time": MetadataValue.float(wait_time),
                            "attempt": MetadataValue.int(attempt + 1)
                        }
                    )
                else:
                    raise Failure(
                        description=f"Data quality too low ({quality_score:.2f}) after all attempts",
                        metadata={
                            "final_quality_score": MetadataValue.float(quality_score),
                            "quality_threshold": MetadataValue.float(0.8),
                            "total_attempts": MetadataValue.int(max_attempts)
                        }
                    )
            
            # Success case
            context.log.info(f"Data processing succeeded on attempt {attempt + 1}")
            context.add_output_metadata({
                "successful_attempt": MetadataValue.int(attempt + 1),
                "quality_score": MetadataValue.float(quality_score),
                "record_count": MetadataValue.int(len(data))
            })
            
            return data
            
        except (RetryRequested, Failure):
            # Re-raise retry and failure events
            raise
        except Exception as e:
            # Handle unexpected errors
            if attempt < max_attempts - 1:
                wait_time = base_delay * (2 ** attempt)
                
                raise RetryRequested(
                    max_retries=max_attempts - 1,
                    seconds_to_wait=wait_time,
                    metadata={
                        "retry_reason": MetadataValue.text(f"Unexpected error: {str(e)}"),
                        "error_type": MetadataValue.text(type(e).__name__),
                        "wait_time": MetadataValue.float(wait_time),
                        "attempt": MetadataValue.int(attempt + 1)
                    }
                )
            else:
                raise Failure(
                    description=f"Unexpected error after all retry attempts: {str(e)}",
                    metadata={
                        "error_type": MetadataValue.text(type(e).__name__),
                        "error_message": MetadataValue.text(str(e)),
                        "total_attempts": MetadataValue.int(max_attempts)
                    }
                )
    
    # Should never reach here
    raise Failure("Unexpected code path in resilient_data_processing")

Error Handling Best Practices

Structured Error Information

from dagster import asset, Failure, MetadataValue
from typing import Dict, Any
import logging
import traceback

class ErrorTracker:
    """Utility class for tracking and reporting errors."""
    
    @staticmethod
    def create_error_context(context, error: Exception) -> Dict[str, Any]:
        """Create standardized error context."""
        return {
            "error_type": type(error).__name__,
            "error_message": str(error),
            "stack_trace": traceback.format_exc(),
            "asset_key": str(context.asset_key) if hasattr(context, 'asset_key') else None,
            "op_name": context.op_def.name if hasattr(context, 'op_def') else None,
            "run_id": context.run_id,
            "step_key": getattr(context, 'step_key', None),
            "partition_key": getattr(context, 'partition_key', None),
            "timestamp": pd.Timestamp.now().isoformat()
        }
    
    @staticmethod
    def should_retry(error: Exception) -> bool:
        """Determine if error is retryable."""
        retryable_errors = (
            ConnectionError,
            TimeoutError,
            requests.exceptions.ConnectionError,
            requests.exceptions.Timeout
        )
        
        non_retryable_errors = (
            ValueError,
            KeyError,
            TypeError,
            PermissionError
        )
        
        if isinstance(error, retryable_errors):
            return True
        elif isinstance(error, non_retryable_errors):
            return False
        
        # Check error message for retry indicators
        error_msg = str(error).lower()
        if any(keyword in error_msg for keyword in ["timeout", "connection", "network"]):
            return True
        elif any(keyword in error_msg for keyword in ["permission", "auth", "invalid"]):
            return False
        
        return True  # Default to retryable for unknown errors

@asset
def robust_data_pipeline(context) -> pd.DataFrame:
    """Asset with comprehensive error handling best practices."""
    
    error_tracker = ErrorTracker()
    
    try:
        # Step 1: Data loading with error context
        context.log.info("Starting data loading phase")
        
        try:
            raw_data = load_raw_data()
            context.log.info(f"Loaded {len(raw_data)} raw records")
        except Exception as e:
            error_context = error_tracker.create_error_context(context, e)
            
            if error_tracker.should_retry(e):
                raise RetryRequested(
                    max_retries=3,
                    seconds_to_wait=5.0,
                    metadata={
                        "phase": MetadataValue.text("data_loading"),
                        "error_context": MetadataValue.json(error_context),
                        "retry_recommended": MetadataValue.bool(True)
                    }
                )
            else:
                raise Failure(
                    description=f"Non-retryable error in data loading: {str(e)}",
                    metadata={
                        "phase": MetadataValue.text("data_loading"),
                        "error_context": MetadataValue.json(error_context),
                        "retry_recommended": MetadataValue.bool(False)
                    }
                )
        
        # Step 2: Data validation with structured errors
        context.log.info("Starting data validation phase")
        
        validation_errors = []
        
        if len(raw_data) == 0:
            validation_errors.append("Empty dataset")
        
        required_columns = ["id", "timestamp", "value"]
        missing_columns = set(required_columns) - set(raw_data.columns)
        if missing_columns:
            validation_errors.append(f"Missing columns: {missing_columns}")
        
        null_percentage = (raw_data.isnull().sum().sum() / (len(raw_data) * len(raw_data.columns))) * 100
        if null_percentage > 5:  # 5% threshold
            validation_errors.append(f"Too many null values: {null_percentage:.1f}%")
        
        if validation_errors:
            raise Failure(
                description="Data validation failed",
                metadata={
                    "phase": MetadataValue.text("data_validation"),
                    "validation_errors": MetadataValue.json(validation_errors),
                    "record_count": MetadataValue.int(len(raw_data)),
                    "null_percentage": MetadataValue.float(null_percentage),
                    "data_quality_score": MetadataValue.float(max(0, 1 - null_percentage/100))
                }
            )
        
        # Step 3: Data processing with progress tracking
        context.log.info("Starting data processing phase")
        
        try:
            processed_data = process_data(raw_data)
            
            # Validate processing results
            processing_loss = len(raw_data) - len(processed_data)
            loss_percentage = (processing_loss / len(raw_data)) * 100
            
            if loss_percentage > 20:  # 20% loss threshold
                context.log.warning(f"High data loss during processing: {loss_percentage:.1f}%")
            
            context.add_output_metadata({
                "input_records": MetadataValue.int(len(raw_data)),
                "output_records": MetadataValue.int(len(processed_data)),
                "processing_loss": MetadataValue.int(processing_loss),
                "loss_percentage": MetadataValue.float(loss_percentage),
                "processing_success": MetadataValue.bool(True)
            })
            
            return processed_data
            
        except Exception as e:
            error_context = error_tracker.create_error_context(context, e)
            
            raise Failure(
                description=f"Data processing failed: {str(e)}",
                metadata={
                    "phase": MetadataValue.text("data_processing"),
                    "error_context": MetadataValue.json(error_context),
                    "input_records": MetadataValue.int(len(raw_data)),
                    "processing_success": MetadataValue.bool(False)
                }
            )
    
    except (Failure, RetryRequested):
        # Re-raise structured Dagster events
        raise
    
    except Exception as e:
        # Catch-all for unexpected errors
        error_context = error_tracker.create_error_context(context, e)
        
        raise Failure(
            description=f"Unexpected error in robust_data_pipeline: {str(e)}",
            metadata={
                "phase": MetadataValue.text("unknown"),
                "error_context": MetadataValue.json(error_context),
                "unexpected_error": MetadataValue.bool(True),
                "troubleshooting_guide": MetadataValue.url("https://docs.company.com/troubleshooting")
            }
        )

Error Monitoring and Alerting

@run_failure_sensor
def comprehensive_failure_monitor(context):
    """Monitor failures with detailed analysis and alerting."""
    
    failed_run = context.dagster_run
    failure_event = context.failure_event
    
    # Extract failure information
    job_name = failed_run.job_name
    step_key = failure_event.step_key if failure_event.step_key else "unknown"
    error_info = failure_event.dagster_event.step_failure_data
    
    # Analyze failure type
    failure_analysis = {
        "job_name": job_name,
        "step_key": step_key,
        "run_id": failed_run.run_id,
        "failure_time": datetime.fromtimestamp(failure_event.timestamp),
        "error_category": "unknown",
        "severity": "medium",
        "auto_recoverable": False
    }
    
    if error_info and error_info.error:
        error_message = error_info.error.message
        error_type = error_info.error.__class__.__name__
        
        # Categorize error
        if any(keyword in error_message.lower() for keyword in ["connection", "timeout", "network"]):
            failure_analysis["error_category"] = "connectivity"
            failure_analysis["auto_recoverable"] = True
            failure_analysis["severity"] = "low"
        elif any(keyword in error_message.lower() for keyword in ["permission", "auth", "credentials"]):
            failure_analysis["error_category"] = "authentication"
            failure_analysis["severity"] = "high"
        elif any(keyword in error_message.lower() for keyword in ["data", "validation", "schema"]):
            failure_analysis["error_category"] = "data_quality"
            failure_analysis["severity"] = "medium"
        elif any(keyword in error_message.lower() for keyword in ["memory", "disk", "resource"]):
            failure_analysis["error_category"] = "resource_exhaustion"
            failure_analysis["severity"] = "high"
        
        failure_analysis["error_message"] = error_message
        failure_analysis["error_type"] = error_type
    
    # Send appropriate alerts based on severity
    if failure_analysis["severity"] == "high":
        send_pager_duty_alert(failure_analysis)
        send_slack_alert(failure_analysis, channel="#critical-alerts")
    elif failure_analysis["severity"] == "medium":
        send_slack_alert(failure_analysis, channel="#data-alerts")
        send_email_alert(failure_analysis, recipients=["data-team@company.com"])
    
    # Log failure for analysis
    context.log.error(f"Job failure analysis: {failure_analysis}")
    
    # Attempt auto-recovery for recoverable failures
    if failure_analysis["auto_recoverable"]:
        context.log.info("Attempting auto-recovery for recoverable failure")
        
        # Wait a bit and retry
        return RunRequest(
            run_key=f"auto_retry_{failed_run.run_id}_{int(time.time())}",
            job_name=job_name,
            tags={
                "retry_type": "auto_recovery",
                "original_run_id": failed_run.run_id,
                "failure_category": failure_analysis["error_category"]
            }
        )

This comprehensive error handling system provides structured error information, intelligent retry strategies, and robust failure recovery mechanisms. The system enables precise error categorization, automated recovery for transient failures, and detailed debugging information for complex failure scenarios.

For monitoring failures with sensors, see Sensors and Schedules. For execution contexts that handle errors, see Execution and Contexts.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json