A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
This document covers Dagster's comprehensive error handling system, including the error hierarchy, failure events, retry policies, and best practices for robust pipeline development. Dagster provides structured error handling with rich failure information and configurable recovery strategies.
Dagster provides a structured hierarchy of exceptions for different failure scenarios, enabling precise error handling and debugging.
DagsterError { .api }Module: dagster_shared.error
Type: Exception base class
Base class for all Dagster-specific errors with structured error information.
from dagster import DagsterError, op, job, asset
import pandas as pd
class CustomDataError(DagsterError):
"""Custom error for data quality issues."""
def __init__(self, message: str, data_info: dict = None):
super().__init__(message)
self.data_info = data_info or {}
@op
def validate_data_quality(df: pd.DataFrame) -> pd.DataFrame:
"""Op that validates data quality and raises custom errors."""
# Check for null values
null_count = df.isnull().sum().sum()
if null_count > 0:
raise CustomDataError(
f"Data quality check failed: {null_count} null values found",
data_info={
"null_count": null_count,
"total_records": len(df),
"null_percentage": (null_count / (len(df) * len(df.columns))) * 100,
"affected_columns": df.columns[df.isnull().any()].tolist()
}
)
# Check for duplicates
duplicate_count = df.duplicated().sum()
if duplicate_count > 0:
raise CustomDataError(
f"Data quality check failed: {duplicate_count} duplicate records found",
data_info={
"duplicate_count": duplicate_count,
"total_records": len(df),
"duplicate_percentage": (duplicate_count / len(df)) * 100
}
)
return df
@asset
def validated_customer_data(raw_customer_data: pd.DataFrame) -> pd.DataFrame:
"""Asset with comprehensive error handling."""
try:
# Validate data quality
validated_data = validate_data_quality(raw_customer_data)
# Additional business rule validation
if len(validated_data) == 0:
raise CustomDataError(
"No valid customer records found after validation",
data_info={"original_count": len(raw_customer_data)}
)
# Check required columns
required_columns = ["customer_id", "email", "created_at"]
missing_columns = set(required_columns) - set(validated_data.columns)
if missing_columns:
raise CustomDataError(
f"Required columns missing: {missing_columns}",
data_info={
"missing_columns": list(missing_columns),
"available_columns": list(validated_data.columns)
}
)
return validated_data
except CustomDataError as e:
# Log detailed error information
context.log.error(f"Data validation failed: {str(e)}")
context.log.error(f"Error details: {e.data_info}")
# Re-raise to fail the asset materialization
raise
except Exception as e:
# Handle unexpected errors
raise DagsterError(
f"Unexpected error during customer data validation: {str(e)}"
) from eDagsterInvariantViolationError { .api }Module: dagster._core.errors
Type: DagsterError subclass
Error for invariant violations and internal consistency checks.
from dagster import DagsterInvariantViolationError, op
@op
def process_configuration(context) -> dict:
"""Op that validates configuration invariants."""
config = context.op_config
# Validate configuration invariants
if "batch_size" in config and "max_memory" in config:
batch_size = config["batch_size"]
max_memory = config["max_memory"]
# Check invariant: batch_size * record_size should not exceed max_memory
estimated_memory = batch_size * 1024 # Assume 1KB per record
if estimated_memory > max_memory:
raise DagsterInvariantViolationError(
f"Configuration invariant violated: "
f"batch_size ({batch_size}) * record_size (1KB) = {estimated_memory}KB "
f"exceeds max_memory ({max_memory}KB)"
)
# Validate required configuration relationships
if config.get("enable_caching") and not config.get("cache_directory"):
raise DagsterInvariantViolationError(
"Configuration invariant violated: "
"cache_directory must be specified when enable_caching is True"
)
return configDagsterInvalidDefinitionError { .api }Module: dagster._core.errors
Type: DagsterError subclass
Error for invalid asset, op, or job definitions.
from dagster import DagsterInvalidDefinitionError, asset, In, Out
def validate_asset_definition(asset_fn):
"""Decorator that validates asset definition at definition time."""
# Check function signature
import inspect
signature = inspect.signature(asset_fn)
# Validate return annotation exists
if signature.return_annotation == inspect.Signature.empty:
raise DagsterInvalidDefinitionError(
f"Asset function {asset_fn.__name__} must have a return type annotation"
)
# Validate docstring exists
if not asset_fn.__doc__:
raise DagsterInvalidDefinitionError(
f"Asset function {asset_fn.__name__} must have a docstring"
)
return asset_fn
@validate_asset_definition
@asset
def well_defined_asset() -> pd.DataFrame:
"""This asset has proper definition validation."""
return pd.DataFrame({"data": [1, 2, 3]})
# This would raise DagsterInvalidDefinitionError:
# @validate_asset_definition
# @asset
# def poorly_defined_asset(): # Missing return annotation and docstring
# return pd.DataFrame({"data": [1, 2, 3]})DagsterInvalidInvocationError { .api }Module: dagster._core.errors
Type: DagsterError subclass
Error for invalid invocation of Dagster definitions.
from dagster import DagsterInvalidInvocationError, op, job
@op(
ins={"input_data": In(dagster_type=pd.DataFrame)},
out=Out(dagster_type=pd.DataFrame)
)
def strict_data_processing(input_data: pd.DataFrame) -> pd.DataFrame:
"""Op with strict type checking."""
# Validate input type at runtime
if not isinstance(input_data, pd.DataFrame):
raise DagsterInvalidInvocationError(
f"Expected pandas DataFrame, got {type(input_data)}. "
f"This op requires DataFrame input for processing."
)
# Validate DataFrame structure
required_columns = ["id", "value", "timestamp"]
missing_columns = set(required_columns) - set(input_data.columns)
if missing_columns:
raise DagsterInvalidInvocationError(
f"Input DataFrame missing required columns: {missing_columns}. "
f"Available columns: {list(input_data.columns)}"
)
return input_data.dropna()
@op
def generate_invalid_data() -> dict: # Returns dict, not DataFrame
"""Op that produces invalid output for downstream consumption."""
return {"data": "not a dataframe"}
@job
def invalid_invocation_job():
"""Job that demonstrates invalid invocation."""
# This will raise DagsterInvalidInvocationError when executed
strict_data_processing(generate_invalid_data())DagsterInvalidConfigError { .api }Module: dagster._core.errors
Type: DagsterError subclass
Error for invalid configuration values.
from dagster import DagsterInvalidConfigError, op, Field, Int
@op(config_schema={
"batch_size": Field(Int, description="Batch size for processing"),
"timeout_seconds": Field(Int, description="Timeout in seconds"),
"retry_attempts": Field(Int, description="Number of retry attempts")
})
def configurable_processing_op(context):
"""Op with comprehensive config validation."""
config = context.op_config
# Validate configuration values
batch_size = config["batch_size"]
if batch_size <= 0:
raise DagsterInvalidConfigError(
f"batch_size must be positive, got {batch_size}"
)
if batch_size > 10000:
raise DagsterInvalidConfigError(
f"batch_size too large ({batch_size}), maximum allowed is 10000"
)
timeout_seconds = config["timeout_seconds"]
if timeout_seconds < 1 or timeout_seconds > 3600:
raise DagsterInvalidConfigError(
f"timeout_seconds must be between 1 and 3600, got {timeout_seconds}"
)
retry_attempts = config["retry_attempts"]
if retry_attempts < 0 or retry_attempts > 5:
raise DagsterInvalidConfigError(
f"retry_attempts must be between 0 and 5, got {retry_attempts}"
)
# Cross-field validation
if timeout_seconds < batch_size * 0.1:
raise DagsterInvalidConfigError(
f"timeout_seconds ({timeout_seconds}) too low for batch_size ({batch_size}). "
f"Minimum recommended: {int(batch_size * 0.1)} seconds"
)
context.log.info(f"Processing with batch_size={batch_size}, timeout={timeout_seconds}s")
return f"Processed with valid config"DagsterExecutionStepExecutionError { .api }Module: dagster._core.errors
Type: DagsterError subclass
Error during step execution with detailed execution context.
from dagster import DagsterExecutionStepExecutionError, op, asset
import requests
from requests.exceptions import RequestException
@op
def fetch_external_data(context) -> dict:
"""Op that fetches data from external API with error handling."""
api_url = "https://api.example.com/data"
max_retries = 3
for attempt in range(max_retries):
try:
context.log.info(f"Fetching data from {api_url} (attempt {attempt + 1}/{max_retries})")
response = requests.get(api_url, timeout=30)
response.raise_for_status()
data = response.json()
context.log.info(f"Successfully fetched {len(data)} records")
return data
except RequestException as e:
context.log.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt == max_retries - 1: # Last attempt
raise DagsterExecutionStepExecutionError(
f"Failed to fetch data from {api_url} after {max_retries} attempts. "
f"Last error: {str(e)}",
step_context=context.step_context,
user_exception=e
)
# Wait before retry
import time
time.sleep(2 ** attempt) # Exponential backoff
# This shouldn't be reached, but just in case
raise DagsterExecutionStepExecutionError("Unexpected error in fetch_external_data")
@asset
def processed_api_data(context, external_data: dict) -> pd.DataFrame:
"""Asset that processes external data with error handling."""
try:
if not external_data or "records" not in external_data:
raise ValueError("Invalid data structure from API")
records = external_data["records"]
if not isinstance(records, list):
raise ValueError(f"Expected list of records, got {type(records)}")
df = pd.DataFrame(records)
if len(df) == 0:
raise ValueError("No records found in API response")
# Validate required fields
required_fields = ["id", "timestamp", "value"]
missing_fields = set(required_fields) - set(df.columns)
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
# Process data
processed_df = df.dropna().reset_index(drop=True)
context.log.info(f"Processed {len(processed_df)} valid records from {len(df)} total")
return processed_df
except Exception as e:
# Wrap in execution error with context
raise DagsterExecutionStepExecutionError(
f"Failed to process API data: {str(e)}",
user_exception=e
) from eDagster provides structured failure events that capture detailed information about errors for debugging and monitoring.
Failure { .api }Module: dagster._core.definitions.events
Type: Event class
Structured failure event with metadata and debugging information.
from dagster import Failure, op, asset, MetadataValue
import traceback
@op
def risky_operation(context) -> str:
"""Op that demonstrates structured failure reporting."""
try:
# Simulate risky operation
data = load_critical_data()
if not validate_data_integrity(data):
# Create structured failure with metadata
raise Failure(
description="Data integrity validation failed",
metadata={
"validation_errors": MetadataValue.json({
"missing_records": 15,
"invalid_checksums": 3,
"schema_violations": 7
}),
"data_source": MetadataValue.text("critical_database"),
"validation_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
"expected_record_count": MetadataValue.int(1000),
"actual_record_count": MetadataValue.int(975),
"severity": MetadataValue.text("HIGH")
}
)
return process_data(data)
except DatabaseConnectionError as e:
# Database-specific failure
raise Failure(
description=f"Database connection failed: {str(e)}",
metadata={
"connection_string": MetadataValue.text("postgresql://..."),
"error_code": MetadataValue.text(getattr(e, 'code', 'UNKNOWN')),
"retry_recommended": MetadataValue.bool(True),
"estimated_downtime": MetadataValue.text("5-10 minutes"),
"contact": MetadataValue.text("database-team@company.com")
}
)
except Exception as e:
# Generic failure with full context
raise Failure(
description=f"Unexpected error in risky operation: {str(e)}",
metadata={
"error_type": MetadataValue.text(type(e).__name__),
"error_message": MetadataValue.text(str(e)),
"stack_trace": MetadataValue.text(traceback.format_exc()),
"operation_context": MetadataValue.json({
"step": "data_processing",
"input_size": "unknown",
"memory_usage": "high"
}),
"debug_info": MetadataValue.url("https://wiki.company.com/debug/risky-operation")
}
)
@asset
def resilient_asset(context) -> pd.DataFrame:
"""Asset with comprehensive failure handling and recovery."""
recovery_strategies = ["primary", "secondary", "fallback"]
for strategy in recovery_strategies:
try:
context.log.info(f"Attempting data load with {strategy} strategy")
if strategy == "primary":
data = load_primary_data_source()
elif strategy == "secondary":
data = load_secondary_data_source()
elif strategy == "fallback":
data = load_fallback_data_source()
# Validate loaded data
if len(data) == 0:
raise ValueError(f"No data loaded from {strategy} source")
context.log.info(f"Successfully loaded {len(data)} records using {strategy} strategy")
return data
except Exception as e:
context.log.warning(f"Strategy {strategy} failed: {str(e)}")
if strategy == "fallback": # Last resort failed
raise Failure(
description="All data loading strategies failed",
metadata={
"failed_strategies": MetadataValue.json(recovery_strategies),
"primary_error": MetadataValue.text("Connection timeout"),
"secondary_error": MetadataValue.text("Authentication failed"),
"fallback_error": MetadataValue.text(str(e)),
"recommended_action": MetadataValue.text(
"Check data source availability and credentials"
),
"escalation_required": MetadataValue.bool(True),
"incident_severity": MetadataValue.text("P1")
}
)
# Should never reach here
raise Failure("Unexpected code path in resilient_asset")@op
def debugging_friendly_op(context) -> dict:
"""Op with comprehensive debugging information in failures."""
debug_context = {
"op_name": context.op_def.name,
"run_id": context.run_id,
"step_key": context.step_context.step.key,
"execution_time": pd.Timestamp.now().isoformat(),
"resource_keys": list(context.resources._resource_defs.keys())
}
try:
# Simulate complex operation with multiple failure points
step1_result = perform_step1()
debug_context["step1_completed"] = True
step2_result = perform_step2(step1_result)
debug_context["step2_completed"] = True
final_result = perform_step3(step2_result)
debug_context["step3_completed"] = True
return final_result
except Exception as e:
# Add failure location to debug context
debug_context["failure_location"] = get_failure_location_from_traceback()
debug_context["error_type"] = type(e).__name__
debug_context["error_message"] = str(e)
# Determine failure category for better handling
if isinstance(e, ConnectionError):
failure_category = "connectivity"
recovery_time = "1-5 minutes"
elif isinstance(e, PermissionError):
failure_category = "authentication"
recovery_time = "immediate with credentials update"
elif isinstance(e, ValueError):
failure_category = "data_validation"
recovery_time = "requires data fix"
else:
failure_category = "unknown"
recovery_time = "unknown"
raise Failure(
description=f"Operation failed at {debug_context.get('failure_location', 'unknown location')}: {str(e)}",
metadata={
"debug_context": MetadataValue.json(debug_context),
"failure_category": MetadataValue.text(failure_category),
"estimated_recovery_time": MetadataValue.text(recovery_time),
"troubleshooting_guide": MetadataValue.url(
f"https://docs.company.com/troubleshooting/{failure_category}"
),
"runbook": MetadataValue.url(
f"https://runbook.company.com/{context.op_def.name}"
)
}
)Dagster provides configurable retry policies for automatic recovery from transient failures.
RetryPolicy { .api }Module: dagster._core.definitions.policy
Type: Class
Configurable retry policy with backoff and jitter strategies.
from dagster import RetryPolicy, Backoff, Jitter, op, job, asset
import random
import time
# Basic retry policy
basic_retry = RetryPolicy(
max_retries=3,
delay=1.0 # 1 second delay
)
@op(retry_policy=basic_retry)
def simple_retry_op(context) -> str:
"""Op with basic retry policy."""
# Simulate random failures
if random.random() < 0.7: # 70% chance of failure
raise Exception("Random failure for retry testing")
return "Success after retries"
# Exponential backoff retry policy
exponential_retry = RetryPolicy(
max_retries=5,
delay=1.0,
backoff=Backoff.EXPONENTIAL, # 1s, 2s, 4s, 8s, 16s
jitter=Jitter.PLUS_MINUS # Add randomness to avoid thundering herd
)
@op(retry_policy=exponential_retry)
def external_api_call(context) -> dict:
"""Op that calls external API with exponential backoff."""
context.log.info("Attempting API call...")
try:
# Simulate API call that might fail due to rate limiting
response = requests.get(
"https://api.example.com/data",
headers={"Authorization": "Bearer token"},
timeout=30
)
if response.status_code == 429: # Rate limited
context.log.warning("Rate limited, will retry with backoff")
raise Exception("Rate limit exceeded")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
context.log.warning(f"API call failed: {str(e)}")
raise
# Linear backoff with custom delay calculation
linear_retry = RetryPolicy(
max_retries=4,
delay=2.0,
backoff=Backoff.LINEAR, # 2s, 4s, 6s, 8s
jitter=Jitter.FULL # Randomize delay completely
)
@op(retry_policy=linear_retry)
def database_operation(context) -> pd.DataFrame:
"""Database operation with linear backoff retry."""
attempt_num = getattr(context, '_retry_attempt', 0)
context.log.info(f"Database operation attempt {attempt_num + 1}")
try:
# Simulate database operation
connection = get_database_connection()
query = """
SELECT id, name, value, created_at
FROM important_table
WHERE created_at >= NOW() - INTERVAL 1 DAY
"""
df = pd.read_sql(query, connection)
context.log.info(f"Successfully loaded {len(df)} records")
return df
except DatabaseError as e:
context.log.warning(f"Database error: {str(e)}")
# Check if error is retryable
if "connection" in str(e).lower() or "timeout" in str(e).lower():
context.log.info("Retryable database error, will attempt retry")
raise # Let retry policy handle it
else:
context.log.error("Non-retryable database error, failing immediately")
raise Failure(
description=f"Non-retryable database error: {str(e)}",
metadata={
"error_type": "database_error",
"retryable": False,
"error_code": getattr(e, 'code', 'UNKNOWN')
}
)
# Custom retry policy with conditional logic
class ConditionalRetryPolicy(RetryPolicy):
"""Custom retry policy with condition-based retry logic."""
def __init__(self, max_retries: int = 3, delay: float = 1.0):
super().__init__(max_retries=max_retries, delay=delay)
def should_retry(self, context, exception: Exception) -> bool:
"""Determine if operation should be retried based on exception type."""
# Always retry connection errors
if isinstance(exception, (ConnectionError, TimeoutError)):
return True
# Retry rate limit errors
if "rate limit" in str(exception).lower():
return True
# Don't retry authentication errors
if isinstance(exception, (PermissionError, AuthenticationError)):
return False
# Don't retry validation errors
if isinstance(exception, ValueError):
return False
# Default behavior for other exceptions
return True
conditional_retry = ConditionalRetryPolicy(max_retries=3, delay=2.0)
@op(retry_policy=conditional_retry)
def smart_retry_op(context) -> str:
"""Op with intelligent retry logic."""
# This will retry connection errors but not validation errors
operation_type = random.choice(["connection_error", "validation_error", "success"])
if operation_type == "connection_error":
raise ConnectionError("Network connection failed")
elif operation_type == "validation_error":
raise ValueError("Invalid input data")
return "Operation succeeded"RetryRequested { .api }Module: dagster._core.definitions.events
Type: Exception class
Explicit retry request with custom delay and metadata.
from dagster import RetryRequested, op, MetadataValue
@op
def explicit_retry_op(context) -> str:
"""Op that explicitly requests retries with custom logic."""
# Track retry attempts in op context
attempt_count = getattr(context, '_attempt_count', 0)
context._attempt_count = attempt_count + 1
context.log.info(f"Attempt {attempt_count + 1}")
# Simulate different failure scenarios
if attempt_count == 0:
# First attempt: request retry immediately
raise RetryRequested(
max_retries=3,
seconds_to_wait=0, # Immediate retry
metadata={
"retry_reason": MetadataValue.text("Initial setup required"),
"attempt_number": MetadataValue.int(attempt_count + 1)
}
)
elif attempt_count == 1:
# Second attempt: wait 5 seconds before retry
raise RetryRequested(
max_retries=3,
seconds_to_wait=5,
metadata={
"retry_reason": MetadataValue.text("Waiting for external service"),
"wait_time_seconds": MetadataValue.int(5),
"attempt_number": MetadataValue.int(attempt_count + 1)
}
)
elif attempt_count == 2:
# Third attempt: exponential backoff
wait_time = 2 ** attempt_count # 4 seconds
raise RetryRequested(
max_retries=3,
seconds_to_wait=wait_time,
metadata={
"retry_reason": MetadataValue.text("Exponential backoff"),
"wait_time_seconds": MetadataValue.int(wait_time),
"attempt_number": MetadataValue.int(attempt_count + 1)
}
)
# Fourth attempt: succeed
return f"Success on attempt {attempt_count + 1}"
@asset
def resilient_data_processing(context) -> pd.DataFrame:
"""Asset with sophisticated retry logic for data processing."""
max_attempts = 5
base_delay = 1.0
for attempt in range(max_attempts):
try:
context.log.info(f"Processing attempt {attempt + 1}/{max_attempts}")
# Load data with potential failures
data = load_data_with_retries()
# Validate data quality
if len(data) == 0:
if attempt < max_attempts - 1:
wait_time = base_delay * (2 ** attempt) # Exponential backoff
raise RetryRequested(
max_retries=max_attempts - 1,
seconds_to_wait=wait_time,
metadata={
"retry_reason": MetadataValue.text("Empty dataset, waiting for data arrival"),
"wait_time": MetadataValue.float(wait_time),
"attempt": MetadataValue.int(attempt + 1),
"data_source_status": MetadataValue.text("checking")
}
)
else:
raise Failure(
description="No data available after all retry attempts",
metadata={
"total_attempts": MetadataValue.int(max_attempts),
"final_status": MetadataValue.text("no_data_available")
}
)
# Check data quality
quality_score = calculate_data_quality(data)
if quality_score < 0.8: # 80% quality threshold
if attempt < max_attempts - 1:
wait_time = base_delay * (attempt + 1) # Linear backoff for quality issues
raise RetryRequested(
max_retries=max_attempts - 1,
seconds_to_wait=wait_time,
metadata={
"retry_reason": MetadataValue.text("Poor data quality, waiting for data refresh"),
"quality_score": MetadataValue.float(quality_score),
"quality_threshold": MetadataValue.float(0.8),
"wait_time": MetadataValue.float(wait_time),
"attempt": MetadataValue.int(attempt + 1)
}
)
else:
raise Failure(
description=f"Data quality too low ({quality_score:.2f}) after all attempts",
metadata={
"final_quality_score": MetadataValue.float(quality_score),
"quality_threshold": MetadataValue.float(0.8),
"total_attempts": MetadataValue.int(max_attempts)
}
)
# Success case
context.log.info(f"Data processing succeeded on attempt {attempt + 1}")
context.add_output_metadata({
"successful_attempt": MetadataValue.int(attempt + 1),
"quality_score": MetadataValue.float(quality_score),
"record_count": MetadataValue.int(len(data))
})
return data
except (RetryRequested, Failure):
# Re-raise retry and failure events
raise
except Exception as e:
# Handle unexpected errors
if attempt < max_attempts - 1:
wait_time = base_delay * (2 ** attempt)
raise RetryRequested(
max_retries=max_attempts - 1,
seconds_to_wait=wait_time,
metadata={
"retry_reason": MetadataValue.text(f"Unexpected error: {str(e)}"),
"error_type": MetadataValue.text(type(e).__name__),
"wait_time": MetadataValue.float(wait_time),
"attempt": MetadataValue.int(attempt + 1)
}
)
else:
raise Failure(
description=f"Unexpected error after all retry attempts: {str(e)}",
metadata={
"error_type": MetadataValue.text(type(e).__name__),
"error_message": MetadataValue.text(str(e)),
"total_attempts": MetadataValue.int(max_attempts)
}
)
# Should never reach here
raise Failure("Unexpected code path in resilient_data_processing")from dagster import asset, Failure, MetadataValue
from typing import Dict, Any
import logging
import traceback
class ErrorTracker:
"""Utility class for tracking and reporting errors."""
@staticmethod
def create_error_context(context, error: Exception) -> Dict[str, Any]:
"""Create standardized error context."""
return {
"error_type": type(error).__name__,
"error_message": str(error),
"stack_trace": traceback.format_exc(),
"asset_key": str(context.asset_key) if hasattr(context, 'asset_key') else None,
"op_name": context.op_def.name if hasattr(context, 'op_def') else None,
"run_id": context.run_id,
"step_key": getattr(context, 'step_key', None),
"partition_key": getattr(context, 'partition_key', None),
"timestamp": pd.Timestamp.now().isoformat()
}
@staticmethod
def should_retry(error: Exception) -> bool:
"""Determine if error is retryable."""
retryable_errors = (
ConnectionError,
TimeoutError,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout
)
non_retryable_errors = (
ValueError,
KeyError,
TypeError,
PermissionError
)
if isinstance(error, retryable_errors):
return True
elif isinstance(error, non_retryable_errors):
return False
# Check error message for retry indicators
error_msg = str(error).lower()
if any(keyword in error_msg for keyword in ["timeout", "connection", "network"]):
return True
elif any(keyword in error_msg for keyword in ["permission", "auth", "invalid"]):
return False
return True # Default to retryable for unknown errors
@asset
def robust_data_pipeline(context) -> pd.DataFrame:
"""Asset with comprehensive error handling best practices."""
error_tracker = ErrorTracker()
try:
# Step 1: Data loading with error context
context.log.info("Starting data loading phase")
try:
raw_data = load_raw_data()
context.log.info(f"Loaded {len(raw_data)} raw records")
except Exception as e:
error_context = error_tracker.create_error_context(context, e)
if error_tracker.should_retry(e):
raise RetryRequested(
max_retries=3,
seconds_to_wait=5.0,
metadata={
"phase": MetadataValue.text("data_loading"),
"error_context": MetadataValue.json(error_context),
"retry_recommended": MetadataValue.bool(True)
}
)
else:
raise Failure(
description=f"Non-retryable error in data loading: {str(e)}",
metadata={
"phase": MetadataValue.text("data_loading"),
"error_context": MetadataValue.json(error_context),
"retry_recommended": MetadataValue.bool(False)
}
)
# Step 2: Data validation with structured errors
context.log.info("Starting data validation phase")
validation_errors = []
if len(raw_data) == 0:
validation_errors.append("Empty dataset")
required_columns = ["id", "timestamp", "value"]
missing_columns = set(required_columns) - set(raw_data.columns)
if missing_columns:
validation_errors.append(f"Missing columns: {missing_columns}")
null_percentage = (raw_data.isnull().sum().sum() / (len(raw_data) * len(raw_data.columns))) * 100
if null_percentage > 5: # 5% threshold
validation_errors.append(f"Too many null values: {null_percentage:.1f}%")
if validation_errors:
raise Failure(
description="Data validation failed",
metadata={
"phase": MetadataValue.text("data_validation"),
"validation_errors": MetadataValue.json(validation_errors),
"record_count": MetadataValue.int(len(raw_data)),
"null_percentage": MetadataValue.float(null_percentage),
"data_quality_score": MetadataValue.float(max(0, 1 - null_percentage/100))
}
)
# Step 3: Data processing with progress tracking
context.log.info("Starting data processing phase")
try:
processed_data = process_data(raw_data)
# Validate processing results
processing_loss = len(raw_data) - len(processed_data)
loss_percentage = (processing_loss / len(raw_data)) * 100
if loss_percentage > 20: # 20% loss threshold
context.log.warning(f"High data loss during processing: {loss_percentage:.1f}%")
context.add_output_metadata({
"input_records": MetadataValue.int(len(raw_data)),
"output_records": MetadataValue.int(len(processed_data)),
"processing_loss": MetadataValue.int(processing_loss),
"loss_percentage": MetadataValue.float(loss_percentage),
"processing_success": MetadataValue.bool(True)
})
return processed_data
except Exception as e:
error_context = error_tracker.create_error_context(context, e)
raise Failure(
description=f"Data processing failed: {str(e)}",
metadata={
"phase": MetadataValue.text("data_processing"),
"error_context": MetadataValue.json(error_context),
"input_records": MetadataValue.int(len(raw_data)),
"processing_success": MetadataValue.bool(False)
}
)
except (Failure, RetryRequested):
# Re-raise structured Dagster events
raise
except Exception as e:
# Catch-all for unexpected errors
error_context = error_tracker.create_error_context(context, e)
raise Failure(
description=f"Unexpected error in robust_data_pipeline: {str(e)}",
metadata={
"phase": MetadataValue.text("unknown"),
"error_context": MetadataValue.json(error_context),
"unexpected_error": MetadataValue.bool(True),
"troubleshooting_guide": MetadataValue.url("https://docs.company.com/troubleshooting")
}
)@run_failure_sensor
def comprehensive_failure_monitor(context):
"""Monitor failures with detailed analysis and alerting."""
failed_run = context.dagster_run
failure_event = context.failure_event
# Extract failure information
job_name = failed_run.job_name
step_key = failure_event.step_key if failure_event.step_key else "unknown"
error_info = failure_event.dagster_event.step_failure_data
# Analyze failure type
failure_analysis = {
"job_name": job_name,
"step_key": step_key,
"run_id": failed_run.run_id,
"failure_time": datetime.fromtimestamp(failure_event.timestamp),
"error_category": "unknown",
"severity": "medium",
"auto_recoverable": False
}
if error_info and error_info.error:
error_message = error_info.error.message
error_type = error_info.error.__class__.__name__
# Categorize error
if any(keyword in error_message.lower() for keyword in ["connection", "timeout", "network"]):
failure_analysis["error_category"] = "connectivity"
failure_analysis["auto_recoverable"] = True
failure_analysis["severity"] = "low"
elif any(keyword in error_message.lower() for keyword in ["permission", "auth", "credentials"]):
failure_analysis["error_category"] = "authentication"
failure_analysis["severity"] = "high"
elif any(keyword in error_message.lower() for keyword in ["data", "validation", "schema"]):
failure_analysis["error_category"] = "data_quality"
failure_analysis["severity"] = "medium"
elif any(keyword in error_message.lower() for keyword in ["memory", "disk", "resource"]):
failure_analysis["error_category"] = "resource_exhaustion"
failure_analysis["severity"] = "high"
failure_analysis["error_message"] = error_message
failure_analysis["error_type"] = error_type
# Send appropriate alerts based on severity
if failure_analysis["severity"] == "high":
send_pager_duty_alert(failure_analysis)
send_slack_alert(failure_analysis, channel="#critical-alerts")
elif failure_analysis["severity"] == "medium":
send_slack_alert(failure_analysis, channel="#data-alerts")
send_email_alert(failure_analysis, recipients=["data-team@company.com"])
# Log failure for analysis
context.log.error(f"Job failure analysis: {failure_analysis}")
# Attempt auto-recovery for recoverable failures
if failure_analysis["auto_recoverable"]:
context.log.info("Attempting auto-recovery for recoverable failure")
# Wait a bit and retry
return RunRequest(
run_key=f"auto_retry_{failed_run.run_id}_{int(time.time())}",
job_name=job_name,
tags={
"retry_type": "auto_recovery",
"original_run_id": failed_run.run_id,
"failure_category": failure_analysis["error_category"]
}
)This comprehensive error handling system provides structured error information, intelligent retry strategies, and robust failure recovery mechanisms. The system enables precise error categorization, automated recovery for transient failures, and detailed debugging information for complex failure scenarios.
For monitoring failures with sensors, see Sensors and Schedules. For execution contexts that handle errors, see Execution and Contexts.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster