0
# Error Handling
1
2
This document covers Dagster's comprehensive error handling system, including the error hierarchy, failure events, retry policies, and best practices for robust pipeline development. Dagster provides structured error handling with rich failure information and configurable recovery strategies.
3
4
## Error Hierarchy
5
6
Dagster provides a structured hierarchy of exceptions for different failure scenarios, enabling precise error handling and debugging.
7
8
### Base Errors
9
10
#### `DagsterError` { .api }
11
12
**Module:** `dagster_shared.error`
13
**Type:** Exception base class
14
15
Base class for all Dagster-specific errors with structured error information.
16
17
```python
18
from dagster import DagsterError, op, job, asset
19
import pandas as pd
20
21
class CustomDataError(DagsterError):
22
"""Custom error for data quality issues."""
23
24
def __init__(self, message: str, data_info: dict = None):
25
super().__init__(message)
26
self.data_info = data_info or {}
27
28
@op
29
def validate_data_quality(df: pd.DataFrame) -> pd.DataFrame:
30
"""Op that validates data quality and raises custom errors."""
31
32
# Check for null values
33
null_count = df.isnull().sum().sum()
34
if null_count > 0:
35
raise CustomDataError(
36
f"Data quality check failed: {null_count} null values found",
37
data_info={
38
"null_count": null_count,
39
"total_records": len(df),
40
"null_percentage": (null_count / (len(df) * len(df.columns))) * 100,
41
"affected_columns": df.columns[df.isnull().any()].tolist()
42
}
43
)
44
45
# Check for duplicates
46
duplicate_count = df.duplicated().sum()
47
if duplicate_count > 0:
48
raise CustomDataError(
49
f"Data quality check failed: {duplicate_count} duplicate records found",
50
data_info={
51
"duplicate_count": duplicate_count,
52
"total_records": len(df),
53
"duplicate_percentage": (duplicate_count / len(df)) * 100
54
}
55
)
56
57
return df
58
59
@asset
60
def validated_customer_data(raw_customer_data: pd.DataFrame) -> pd.DataFrame:
61
"""Asset with comprehensive error handling."""
62
63
try:
64
# Validate data quality
65
validated_data = validate_data_quality(raw_customer_data)
66
67
# Additional business rule validation
68
if len(validated_data) == 0:
69
raise CustomDataError(
70
"No valid customer records found after validation",
71
data_info={"original_count": len(raw_customer_data)}
72
)
73
74
# Check required columns
75
required_columns = ["customer_id", "email", "created_at"]
76
missing_columns = set(required_columns) - set(validated_data.columns)
77
if missing_columns:
78
raise CustomDataError(
79
f"Required columns missing: {missing_columns}",
80
data_info={
81
"missing_columns": list(missing_columns),
82
"available_columns": list(validated_data.columns)
83
}
84
)
85
86
return validated_data
87
88
except CustomDataError as e:
89
# Log detailed error information
90
context.log.error(f"Data validation failed: {str(e)}")
91
context.log.error(f"Error details: {e.data_info}")
92
93
# Re-raise to fail the asset materialization
94
raise
95
96
except Exception as e:
97
# Handle unexpected errors
98
raise DagsterError(
99
f"Unexpected error during customer data validation: {str(e)}"
100
) from e
101
```
102
103
#### `DagsterInvariantViolationError` { .api }
104
105
**Module:** `dagster._core.errors`
106
**Type:** DagsterError subclass
107
108
Error for invariant violations and internal consistency checks.
109
110
```python
111
from dagster import DagsterInvariantViolationError, op
112
113
@op
114
def process_configuration(context) -> dict:
115
"""Op that validates configuration invariants."""
116
117
config = context.op_config
118
119
# Validate configuration invariants
120
if "batch_size" in config and "max_memory" in config:
121
batch_size = config["batch_size"]
122
max_memory = config["max_memory"]
123
124
# Check invariant: batch_size * record_size should not exceed max_memory
125
estimated_memory = batch_size * 1024 # Assume 1KB per record
126
127
if estimated_memory > max_memory:
128
raise DagsterInvariantViolationError(
129
f"Configuration invariant violated: "
130
f"batch_size ({batch_size}) * record_size (1KB) = {estimated_memory}KB "
131
f"exceeds max_memory ({max_memory}KB)"
132
)
133
134
# Validate required configuration relationships
135
if config.get("enable_caching") and not config.get("cache_directory"):
136
raise DagsterInvariantViolationError(
137
"Configuration invariant violated: "
138
"cache_directory must be specified when enable_caching is True"
139
)
140
141
return config
142
```
143
144
### Definition Errors
145
146
#### `DagsterInvalidDefinitionError` { .api }
147
148
**Module:** `dagster._core.errors`
149
**Type:** DagsterError subclass
150
151
Error for invalid asset, op, or job definitions.
152
153
```python
154
from dagster import DagsterInvalidDefinitionError, asset, In, Out
155
156
def validate_asset_definition(asset_fn):
157
"""Decorator that validates asset definition at definition time."""
158
159
# Check function signature
160
import inspect
161
signature = inspect.signature(asset_fn)
162
163
# Validate return annotation exists
164
if signature.return_annotation == inspect.Signature.empty:
165
raise DagsterInvalidDefinitionError(
166
f"Asset function {asset_fn.__name__} must have a return type annotation"
167
)
168
169
# Validate docstring exists
170
if not asset_fn.__doc__:
171
raise DagsterInvalidDefinitionError(
172
f"Asset function {asset_fn.__name__} must have a docstring"
173
)
174
175
return asset_fn
176
177
@validate_asset_definition
178
@asset
179
def well_defined_asset() -> pd.DataFrame:
180
"""This asset has proper definition validation."""
181
return pd.DataFrame({"data": [1, 2, 3]})
182
183
# This would raise DagsterInvalidDefinitionError:
184
# @validate_asset_definition
185
# @asset
186
# def poorly_defined_asset(): # Missing return annotation and docstring
187
# return pd.DataFrame({"data": [1, 2, 3]})
188
```
189
190
#### `DagsterInvalidInvocationError` { .api }
191
192
**Module:** `dagster._core.errors`
193
**Type:** DagsterError subclass
194
195
Error for invalid invocation of Dagster definitions.
196
197
```python
198
from dagster import DagsterInvalidInvocationError, op, job
199
200
@op(
201
ins={"input_data": In(dagster_type=pd.DataFrame)},
202
out=Out(dagster_type=pd.DataFrame)
203
)
204
def strict_data_processing(input_data: pd.DataFrame) -> pd.DataFrame:
205
"""Op with strict type checking."""
206
207
# Validate input type at runtime
208
if not isinstance(input_data, pd.DataFrame):
209
raise DagsterInvalidInvocationError(
210
f"Expected pandas DataFrame, got {type(input_data)}. "
211
f"This op requires DataFrame input for processing."
212
)
213
214
# Validate DataFrame structure
215
required_columns = ["id", "value", "timestamp"]
216
missing_columns = set(required_columns) - set(input_data.columns)
217
218
if missing_columns:
219
raise DagsterInvalidInvocationError(
220
f"Input DataFrame missing required columns: {missing_columns}. "
221
f"Available columns: {list(input_data.columns)}"
222
)
223
224
return input_data.dropna()
225
226
@op
227
def generate_invalid_data() -> dict: # Returns dict, not DataFrame
228
"""Op that produces invalid output for downstream consumption."""
229
return {"data": "not a dataframe"}
230
231
@job
232
def invalid_invocation_job():
233
"""Job that demonstrates invalid invocation."""
234
# This will raise DagsterInvalidInvocationError when executed
235
strict_data_processing(generate_invalid_data())
236
```
237
238
### Configuration Errors
239
240
#### `DagsterInvalidConfigError` { .api }
241
242
**Module:** `dagster._core.errors`
243
**Type:** DagsterError subclass
244
245
Error for invalid configuration values.
246
247
```python
248
from dagster import DagsterInvalidConfigError, op, Field, Int
249
250
@op(config_schema={
251
"batch_size": Field(Int, description="Batch size for processing"),
252
"timeout_seconds": Field(Int, description="Timeout in seconds"),
253
"retry_attempts": Field(Int, description="Number of retry attempts")
254
})
255
def configurable_processing_op(context):
256
"""Op with comprehensive config validation."""
257
258
config = context.op_config
259
260
# Validate configuration values
261
batch_size = config["batch_size"]
262
if batch_size <= 0:
263
raise DagsterInvalidConfigError(
264
f"batch_size must be positive, got {batch_size}"
265
)
266
267
if batch_size > 10000:
268
raise DagsterInvalidConfigError(
269
f"batch_size too large ({batch_size}), maximum allowed is 10000"
270
)
271
272
timeout_seconds = config["timeout_seconds"]
273
if timeout_seconds < 1 or timeout_seconds > 3600:
274
raise DagsterInvalidConfigError(
275
f"timeout_seconds must be between 1 and 3600, got {timeout_seconds}"
276
)
277
278
retry_attempts = config["retry_attempts"]
279
if retry_attempts < 0 or retry_attempts > 5:
280
raise DagsterInvalidConfigError(
281
f"retry_attempts must be between 0 and 5, got {retry_attempts}"
282
)
283
284
# Cross-field validation
285
if timeout_seconds < batch_size * 0.1:
286
raise DagsterInvalidConfigError(
287
f"timeout_seconds ({timeout_seconds}) too low for batch_size ({batch_size}). "
288
f"Minimum recommended: {int(batch_size * 0.1)} seconds"
289
)
290
291
context.log.info(f"Processing with batch_size={batch_size}, timeout={timeout_seconds}s")
292
return f"Processed with valid config"
293
```
294
295
### Execution Errors
296
297
#### `DagsterExecutionStepExecutionError` { .api }
298
299
**Module:** `dagster._core.errors`
300
**Type:** DagsterError subclass
301
302
Error during step execution with detailed execution context.
303
304
```python
305
from dagster import DagsterExecutionStepExecutionError, op, asset
306
import requests
307
from requests.exceptions import RequestException
308
309
@op
310
def fetch_external_data(context) -> dict:
311
"""Op that fetches data from external API with error handling."""
312
313
api_url = "https://api.example.com/data"
314
max_retries = 3
315
316
for attempt in range(max_retries):
317
try:
318
context.log.info(f"Fetching data from {api_url} (attempt {attempt + 1}/{max_retries})")
319
320
response = requests.get(api_url, timeout=30)
321
response.raise_for_status()
322
323
data = response.json()
324
context.log.info(f"Successfully fetched {len(data)} records")
325
326
return data
327
328
except RequestException as e:
329
context.log.warning(f"Attempt {attempt + 1} failed: {str(e)}")
330
331
if attempt == max_retries - 1: # Last attempt
332
raise DagsterExecutionStepExecutionError(
333
f"Failed to fetch data from {api_url} after {max_retries} attempts. "
334
f"Last error: {str(e)}",
335
step_context=context.step_context,
336
user_exception=e
337
)
338
339
# Wait before retry
340
import time
341
time.sleep(2 ** attempt) # Exponential backoff
342
343
# This shouldn't be reached, but just in case
344
raise DagsterExecutionStepExecutionError("Unexpected error in fetch_external_data")
345
346
@asset
347
def processed_api_data(context, external_data: dict) -> pd.DataFrame:
348
"""Asset that processes external data with error handling."""
349
350
try:
351
if not external_data or "records" not in external_data:
352
raise ValueError("Invalid data structure from API")
353
354
records = external_data["records"]
355
if not isinstance(records, list):
356
raise ValueError(f"Expected list of records, got {type(records)}")
357
358
df = pd.DataFrame(records)
359
360
if len(df) == 0:
361
raise ValueError("No records found in API response")
362
363
# Validate required fields
364
required_fields = ["id", "timestamp", "value"]
365
missing_fields = set(required_fields) - set(df.columns)
366
367
if missing_fields:
368
raise ValueError(f"Missing required fields: {missing_fields}")
369
370
# Process data
371
processed_df = df.dropna().reset_index(drop=True)
372
373
context.log.info(f"Processed {len(processed_df)} valid records from {len(df)} total")
374
375
return processed_df
376
377
except Exception as e:
378
# Wrap in execution error with context
379
raise DagsterExecutionStepExecutionError(
380
f"Failed to process API data: {str(e)}",
381
user_exception=e
382
) from e
383
```
384
385
## Failure Events
386
387
Dagster provides structured failure events that capture detailed information about errors for debugging and monitoring.
388
389
### `Failure` { .api }
390
391
**Module:** `dagster._core.definitions.events`
392
**Type:** Event class
393
394
Structured failure event with metadata and debugging information.
395
396
```python
397
from dagster import Failure, op, asset, MetadataValue
398
import traceback
399
400
@op
401
def risky_operation(context) -> str:
402
"""Op that demonstrates structured failure reporting."""
403
404
try:
405
# Simulate risky operation
406
data = load_critical_data()
407
408
if not validate_data_integrity(data):
409
# Create structured failure with metadata
410
raise Failure(
411
description="Data integrity validation failed",
412
metadata={
413
"validation_errors": MetadataValue.json({
414
"missing_records": 15,
415
"invalid_checksums": 3,
416
"schema_violations": 7
417
}),
418
"data_source": MetadataValue.text("critical_database"),
419
"validation_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
420
"expected_record_count": MetadataValue.int(1000),
421
"actual_record_count": MetadataValue.int(975),
422
"severity": MetadataValue.text("HIGH")
423
}
424
)
425
426
return process_data(data)
427
428
except DatabaseConnectionError as e:
429
# Database-specific failure
430
raise Failure(
431
description=f"Database connection failed: {str(e)}",
432
metadata={
433
"connection_string": MetadataValue.text("postgresql://..."),
434
"error_code": MetadataValue.text(getattr(e, 'code', 'UNKNOWN')),
435
"retry_recommended": MetadataValue.bool(True),
436
"estimated_downtime": MetadataValue.text("5-10 minutes"),
437
"contact": MetadataValue.text("database-team@company.com")
438
}
439
)
440
441
except Exception as e:
442
# Generic failure with full context
443
raise Failure(
444
description=f"Unexpected error in risky operation: {str(e)}",
445
metadata={
446
"error_type": MetadataValue.text(type(e).__name__),
447
"error_message": MetadataValue.text(str(e)),
448
"stack_trace": MetadataValue.text(traceback.format_exc()),
449
"operation_context": MetadataValue.json({
450
"step": "data_processing",
451
"input_size": "unknown",
452
"memory_usage": "high"
453
}),
454
"debug_info": MetadataValue.url("https://wiki.company.com/debug/risky-operation")
455
}
456
)
457
458
@asset
459
def resilient_asset(context) -> pd.DataFrame:
460
"""Asset with comprehensive failure handling and recovery."""
461
462
recovery_strategies = ["primary", "secondary", "fallback"]
463
464
for strategy in recovery_strategies:
465
try:
466
context.log.info(f"Attempting data load with {strategy} strategy")
467
468
if strategy == "primary":
469
data = load_primary_data_source()
470
elif strategy == "secondary":
471
data = load_secondary_data_source()
472
elif strategy == "fallback":
473
data = load_fallback_data_source()
474
475
# Validate loaded data
476
if len(data) == 0:
477
raise ValueError(f"No data loaded from {strategy} source")
478
479
context.log.info(f"Successfully loaded {len(data)} records using {strategy} strategy")
480
481
return data
482
483
except Exception as e:
484
context.log.warning(f"Strategy {strategy} failed: {str(e)}")
485
486
if strategy == "fallback": # Last resort failed
487
raise Failure(
488
description="All data loading strategies failed",
489
metadata={
490
"failed_strategies": MetadataValue.json(recovery_strategies),
491
"primary_error": MetadataValue.text("Connection timeout"),
492
"secondary_error": MetadataValue.text("Authentication failed"),
493
"fallback_error": MetadataValue.text(str(e)),
494
"recommended_action": MetadataValue.text(
495
"Check data source availability and credentials"
496
),
497
"escalation_required": MetadataValue.bool(True),
498
"incident_severity": MetadataValue.text("P1")
499
}
500
)
501
502
# Should never reach here
503
raise Failure("Unexpected code path in resilient_asset")
504
```
505
506
### Error Context and Debugging
507
508
```python
509
@op
510
def debugging_friendly_op(context) -> dict:
511
"""Op with comprehensive debugging information in failures."""
512
513
debug_context = {
514
"op_name": context.op_def.name,
515
"run_id": context.run_id,
516
"step_key": context.step_context.step.key,
517
"execution_time": pd.Timestamp.now().isoformat(),
518
"resource_keys": list(context.resources._resource_defs.keys())
519
}
520
521
try:
522
# Simulate complex operation with multiple failure points
523
step1_result = perform_step1()
524
debug_context["step1_completed"] = True
525
526
step2_result = perform_step2(step1_result)
527
debug_context["step2_completed"] = True
528
529
final_result = perform_step3(step2_result)
530
debug_context["step3_completed"] = True
531
532
return final_result
533
534
except Exception as e:
535
# Add failure location to debug context
536
debug_context["failure_location"] = get_failure_location_from_traceback()
537
debug_context["error_type"] = type(e).__name__
538
debug_context["error_message"] = str(e)
539
540
# Determine failure category for better handling
541
if isinstance(e, ConnectionError):
542
failure_category = "connectivity"
543
recovery_time = "1-5 minutes"
544
elif isinstance(e, PermissionError):
545
failure_category = "authentication"
546
recovery_time = "immediate with credentials update"
547
elif isinstance(e, ValueError):
548
failure_category = "data_validation"
549
recovery_time = "requires data fix"
550
else:
551
failure_category = "unknown"
552
recovery_time = "unknown"
553
554
raise Failure(
555
description=f"Operation failed at {debug_context.get('failure_location', 'unknown location')}: {str(e)}",
556
metadata={
557
"debug_context": MetadataValue.json(debug_context),
558
"failure_category": MetadataValue.text(failure_category),
559
"estimated_recovery_time": MetadataValue.text(recovery_time),
560
"troubleshooting_guide": MetadataValue.url(
561
f"https://docs.company.com/troubleshooting/{failure_category}"
562
),
563
"runbook": MetadataValue.url(
564
f"https://runbook.company.com/{context.op_def.name}"
565
)
566
}
567
)
568
```
569
570
## Retry Policies
571
572
Dagster provides configurable retry policies for automatic recovery from transient failures.
573
574
### `RetryPolicy` { .api }
575
576
**Module:** `dagster._core.definitions.policy`
577
**Type:** Class
578
579
Configurable retry policy with backoff and jitter strategies.
580
581
```python
582
from dagster import RetryPolicy, Backoff, Jitter, op, job, asset
583
import random
584
import time
585
586
# Basic retry policy
587
basic_retry = RetryPolicy(
588
max_retries=3,
589
delay=1.0 # 1 second delay
590
)
591
592
@op(retry_policy=basic_retry)
593
def simple_retry_op(context) -> str:
594
"""Op with basic retry policy."""
595
596
# Simulate random failures
597
if random.random() < 0.7: # 70% chance of failure
598
raise Exception("Random failure for retry testing")
599
600
return "Success after retries"
601
602
# Exponential backoff retry policy
603
exponential_retry = RetryPolicy(
604
max_retries=5,
605
delay=1.0,
606
backoff=Backoff.EXPONENTIAL, # 1s, 2s, 4s, 8s, 16s
607
jitter=Jitter.PLUS_MINUS # Add randomness to avoid thundering herd
608
)
609
610
@op(retry_policy=exponential_retry)
611
def external_api_call(context) -> dict:
612
"""Op that calls external API with exponential backoff."""
613
614
context.log.info("Attempting API call...")
615
616
try:
617
# Simulate API call that might fail due to rate limiting
618
response = requests.get(
619
"https://api.example.com/data",
620
headers={"Authorization": "Bearer token"},
621
timeout=30
622
)
623
624
if response.status_code == 429: # Rate limited
625
context.log.warning("Rate limited, will retry with backoff")
626
raise Exception("Rate limit exceeded")
627
628
response.raise_for_status()
629
return response.json()
630
631
except requests.RequestException as e:
632
context.log.warning(f"API call failed: {str(e)}")
633
raise
634
635
# Linear backoff with custom delay calculation
636
linear_retry = RetryPolicy(
637
max_retries=4,
638
delay=2.0,
639
backoff=Backoff.LINEAR, # 2s, 4s, 6s, 8s
640
jitter=Jitter.FULL # Randomize delay completely
641
)
642
643
@op(retry_policy=linear_retry)
644
def database_operation(context) -> pd.DataFrame:
645
"""Database operation with linear backoff retry."""
646
647
attempt_num = getattr(context, '_retry_attempt', 0)
648
context.log.info(f"Database operation attempt {attempt_num + 1}")
649
650
try:
651
# Simulate database operation
652
connection = get_database_connection()
653
654
query = """
655
SELECT id, name, value, created_at
656
FROM important_table
657
WHERE created_at >= NOW() - INTERVAL 1 DAY
658
"""
659
660
df = pd.read_sql(query, connection)
661
662
context.log.info(f"Successfully loaded {len(df)} records")
663
return df
664
665
except DatabaseError as e:
666
context.log.warning(f"Database error: {str(e)}")
667
668
# Check if error is retryable
669
if "connection" in str(e).lower() or "timeout" in str(e).lower():
670
context.log.info("Retryable database error, will attempt retry")
671
raise # Let retry policy handle it
672
else:
673
context.log.error("Non-retryable database error, failing immediately")
674
raise Failure(
675
description=f"Non-retryable database error: {str(e)}",
676
metadata={
677
"error_type": "database_error",
678
"retryable": False,
679
"error_code": getattr(e, 'code', 'UNKNOWN')
680
}
681
)
682
683
# Custom retry policy with conditional logic
684
class ConditionalRetryPolicy(RetryPolicy):
685
"""Custom retry policy with condition-based retry logic."""
686
687
def __init__(self, max_retries: int = 3, delay: float = 1.0):
688
super().__init__(max_retries=max_retries, delay=delay)
689
690
def should_retry(self, context, exception: Exception) -> bool:
691
"""Determine if operation should be retried based on exception type."""
692
693
# Always retry connection errors
694
if isinstance(exception, (ConnectionError, TimeoutError)):
695
return True
696
697
# Retry rate limit errors
698
if "rate limit" in str(exception).lower():
699
return True
700
701
# Don't retry authentication errors
702
if isinstance(exception, (PermissionError, AuthenticationError)):
703
return False
704
705
# Don't retry validation errors
706
if isinstance(exception, ValueError):
707
return False
708
709
# Default behavior for other exceptions
710
return True
711
712
conditional_retry = ConditionalRetryPolicy(max_retries=3, delay=2.0)
713
714
@op(retry_policy=conditional_retry)
715
def smart_retry_op(context) -> str:
716
"""Op with intelligent retry logic."""
717
718
# This will retry connection errors but not validation errors
719
operation_type = random.choice(["connection_error", "validation_error", "success"])
720
721
if operation_type == "connection_error":
722
raise ConnectionError("Network connection failed")
723
elif operation_type == "validation_error":
724
raise ValueError("Invalid input data")
725
726
return "Operation succeeded"
727
```
728
729
### `RetryRequested` { .api }
730
731
**Module:** `dagster._core.definitions.events`
732
**Type:** Exception class
733
734
Explicit retry request with custom delay and metadata.
735
736
```python
737
from dagster import RetryRequested, op, MetadataValue
738
739
@op
740
def explicit_retry_op(context) -> str:
741
"""Op that explicitly requests retries with custom logic."""
742
743
# Track retry attempts in op context
744
attempt_count = getattr(context, '_attempt_count', 0)
745
context._attempt_count = attempt_count + 1
746
747
context.log.info(f"Attempt {attempt_count + 1}")
748
749
# Simulate different failure scenarios
750
if attempt_count == 0:
751
# First attempt: request retry immediately
752
raise RetryRequested(
753
max_retries=3,
754
seconds_to_wait=0, # Immediate retry
755
metadata={
756
"retry_reason": MetadataValue.text("Initial setup required"),
757
"attempt_number": MetadataValue.int(attempt_count + 1)
758
}
759
)
760
761
elif attempt_count == 1:
762
# Second attempt: wait 5 seconds before retry
763
raise RetryRequested(
764
max_retries=3,
765
seconds_to_wait=5,
766
metadata={
767
"retry_reason": MetadataValue.text("Waiting for external service"),
768
"wait_time_seconds": MetadataValue.int(5),
769
"attempt_number": MetadataValue.int(attempt_count + 1)
770
}
771
)
772
773
elif attempt_count == 2:
774
# Third attempt: exponential backoff
775
wait_time = 2 ** attempt_count # 4 seconds
776
raise RetryRequested(
777
max_retries=3,
778
seconds_to_wait=wait_time,
779
metadata={
780
"retry_reason": MetadataValue.text("Exponential backoff"),
781
"wait_time_seconds": MetadataValue.int(wait_time),
782
"attempt_number": MetadataValue.int(attempt_count + 1)
783
}
784
)
785
786
# Fourth attempt: succeed
787
return f"Success on attempt {attempt_count + 1}"
788
789
@asset
790
def resilient_data_processing(context) -> pd.DataFrame:
791
"""Asset with sophisticated retry logic for data processing."""
792
793
max_attempts = 5
794
base_delay = 1.0
795
796
for attempt in range(max_attempts):
797
try:
798
context.log.info(f"Processing attempt {attempt + 1}/{max_attempts}")
799
800
# Load data with potential failures
801
data = load_data_with_retries()
802
803
# Validate data quality
804
if len(data) == 0:
805
if attempt < max_attempts - 1:
806
wait_time = base_delay * (2 ** attempt) # Exponential backoff
807
808
raise RetryRequested(
809
max_retries=max_attempts - 1,
810
seconds_to_wait=wait_time,
811
metadata={
812
"retry_reason": MetadataValue.text("Empty dataset, waiting for data arrival"),
813
"wait_time": MetadataValue.float(wait_time),
814
"attempt": MetadataValue.int(attempt + 1),
815
"data_source_status": MetadataValue.text("checking")
816
}
817
)
818
else:
819
raise Failure(
820
description="No data available after all retry attempts",
821
metadata={
822
"total_attempts": MetadataValue.int(max_attempts),
823
"final_status": MetadataValue.text("no_data_available")
824
}
825
)
826
827
# Check data quality
828
quality_score = calculate_data_quality(data)
829
830
if quality_score < 0.8: # 80% quality threshold
831
if attempt < max_attempts - 1:
832
wait_time = base_delay * (attempt + 1) # Linear backoff for quality issues
833
834
raise RetryRequested(
835
max_retries=max_attempts - 1,
836
seconds_to_wait=wait_time,
837
metadata={
838
"retry_reason": MetadataValue.text("Poor data quality, waiting for data refresh"),
839
"quality_score": MetadataValue.float(quality_score),
840
"quality_threshold": MetadataValue.float(0.8),
841
"wait_time": MetadataValue.float(wait_time),
842
"attempt": MetadataValue.int(attempt + 1)
843
}
844
)
845
else:
846
raise Failure(
847
description=f"Data quality too low ({quality_score:.2f}) after all attempts",
848
metadata={
849
"final_quality_score": MetadataValue.float(quality_score),
850
"quality_threshold": MetadataValue.float(0.8),
851
"total_attempts": MetadataValue.int(max_attempts)
852
}
853
)
854
855
# Success case
856
context.log.info(f"Data processing succeeded on attempt {attempt + 1}")
857
context.add_output_metadata({
858
"successful_attempt": MetadataValue.int(attempt + 1),
859
"quality_score": MetadataValue.float(quality_score),
860
"record_count": MetadataValue.int(len(data))
861
})
862
863
return data
864
865
except (RetryRequested, Failure):
866
# Re-raise retry and failure events
867
raise
868
except Exception as e:
869
# Handle unexpected errors
870
if attempt < max_attempts - 1:
871
wait_time = base_delay * (2 ** attempt)
872
873
raise RetryRequested(
874
max_retries=max_attempts - 1,
875
seconds_to_wait=wait_time,
876
metadata={
877
"retry_reason": MetadataValue.text(f"Unexpected error: {str(e)}"),
878
"error_type": MetadataValue.text(type(e).__name__),
879
"wait_time": MetadataValue.float(wait_time),
880
"attempt": MetadataValue.int(attempt + 1)
881
}
882
)
883
else:
884
raise Failure(
885
description=f"Unexpected error after all retry attempts: {str(e)}",
886
metadata={
887
"error_type": MetadataValue.text(type(e).__name__),
888
"error_message": MetadataValue.text(str(e)),
889
"total_attempts": MetadataValue.int(max_attempts)
890
}
891
)
892
893
# Should never reach here
894
raise Failure("Unexpected code path in resilient_data_processing")
895
```
896
897
## Error Handling Best Practices
898
899
### Structured Error Information
900
901
```python
902
from dagster import asset, Failure, MetadataValue
903
from typing import Dict, Any
904
import logging
905
import traceback
906
907
class ErrorTracker:
908
"""Utility class for tracking and reporting errors."""
909
910
@staticmethod
911
def create_error_context(context, error: Exception) -> Dict[str, Any]:
912
"""Create standardized error context."""
913
return {
914
"error_type": type(error).__name__,
915
"error_message": str(error),
916
"stack_trace": traceback.format_exc(),
917
"asset_key": str(context.asset_key) if hasattr(context, 'asset_key') else None,
918
"op_name": context.op_def.name if hasattr(context, 'op_def') else None,
919
"run_id": context.run_id,
920
"step_key": getattr(context, 'step_key', None),
921
"partition_key": getattr(context, 'partition_key', None),
922
"timestamp": pd.Timestamp.now().isoformat()
923
}
924
925
@staticmethod
926
def should_retry(error: Exception) -> bool:
927
"""Determine if error is retryable."""
928
retryable_errors = (
929
ConnectionError,
930
TimeoutError,
931
requests.exceptions.ConnectionError,
932
requests.exceptions.Timeout
933
)
934
935
non_retryable_errors = (
936
ValueError,
937
KeyError,
938
TypeError,
939
PermissionError
940
)
941
942
if isinstance(error, retryable_errors):
943
return True
944
elif isinstance(error, non_retryable_errors):
945
return False
946
947
# Check error message for retry indicators
948
error_msg = str(error).lower()
949
if any(keyword in error_msg for keyword in ["timeout", "connection", "network"]):
950
return True
951
elif any(keyword in error_msg for keyword in ["permission", "auth", "invalid"]):
952
return False
953
954
return True # Default to retryable for unknown errors
955
956
@asset
957
def robust_data_pipeline(context) -> pd.DataFrame:
958
"""Asset with comprehensive error handling best practices."""
959
960
error_tracker = ErrorTracker()
961
962
try:
963
# Step 1: Data loading with error context
964
context.log.info("Starting data loading phase")
965
966
try:
967
raw_data = load_raw_data()
968
context.log.info(f"Loaded {len(raw_data)} raw records")
969
except Exception as e:
970
error_context = error_tracker.create_error_context(context, e)
971
972
if error_tracker.should_retry(e):
973
raise RetryRequested(
974
max_retries=3,
975
seconds_to_wait=5.0,
976
metadata={
977
"phase": MetadataValue.text("data_loading"),
978
"error_context": MetadataValue.json(error_context),
979
"retry_recommended": MetadataValue.bool(True)
980
}
981
)
982
else:
983
raise Failure(
984
description=f"Non-retryable error in data loading: {str(e)}",
985
metadata={
986
"phase": MetadataValue.text("data_loading"),
987
"error_context": MetadataValue.json(error_context),
988
"retry_recommended": MetadataValue.bool(False)
989
}
990
)
991
992
# Step 2: Data validation with structured errors
993
context.log.info("Starting data validation phase")
994
995
validation_errors = []
996
997
if len(raw_data) == 0:
998
validation_errors.append("Empty dataset")
999
1000
required_columns = ["id", "timestamp", "value"]
1001
missing_columns = set(required_columns) - set(raw_data.columns)
1002
if missing_columns:
1003
validation_errors.append(f"Missing columns: {missing_columns}")
1004
1005
null_percentage = (raw_data.isnull().sum().sum() / (len(raw_data) * len(raw_data.columns))) * 100
1006
if null_percentage > 5: # 5% threshold
1007
validation_errors.append(f"Too many null values: {null_percentage:.1f}%")
1008
1009
if validation_errors:
1010
raise Failure(
1011
description="Data validation failed",
1012
metadata={
1013
"phase": MetadataValue.text("data_validation"),
1014
"validation_errors": MetadataValue.json(validation_errors),
1015
"record_count": MetadataValue.int(len(raw_data)),
1016
"null_percentage": MetadataValue.float(null_percentage),
1017
"data_quality_score": MetadataValue.float(max(0, 1 - null_percentage/100))
1018
}
1019
)
1020
1021
# Step 3: Data processing with progress tracking
1022
context.log.info("Starting data processing phase")
1023
1024
try:
1025
processed_data = process_data(raw_data)
1026
1027
# Validate processing results
1028
processing_loss = len(raw_data) - len(processed_data)
1029
loss_percentage = (processing_loss / len(raw_data)) * 100
1030
1031
if loss_percentage > 20: # 20% loss threshold
1032
context.log.warning(f"High data loss during processing: {loss_percentage:.1f}%")
1033
1034
context.add_output_metadata({
1035
"input_records": MetadataValue.int(len(raw_data)),
1036
"output_records": MetadataValue.int(len(processed_data)),
1037
"processing_loss": MetadataValue.int(processing_loss),
1038
"loss_percentage": MetadataValue.float(loss_percentage),
1039
"processing_success": MetadataValue.bool(True)
1040
})
1041
1042
return processed_data
1043
1044
except Exception as e:
1045
error_context = error_tracker.create_error_context(context, e)
1046
1047
raise Failure(
1048
description=f"Data processing failed: {str(e)}",
1049
metadata={
1050
"phase": MetadataValue.text("data_processing"),
1051
"error_context": MetadataValue.json(error_context),
1052
"input_records": MetadataValue.int(len(raw_data)),
1053
"processing_success": MetadataValue.bool(False)
1054
}
1055
)
1056
1057
except (Failure, RetryRequested):
1058
# Re-raise structured Dagster events
1059
raise
1060
1061
except Exception as e:
1062
# Catch-all for unexpected errors
1063
error_context = error_tracker.create_error_context(context, e)
1064
1065
raise Failure(
1066
description=f"Unexpected error in robust_data_pipeline: {str(e)}",
1067
metadata={
1068
"phase": MetadataValue.text("unknown"),
1069
"error_context": MetadataValue.json(error_context),
1070
"unexpected_error": MetadataValue.bool(True),
1071
"troubleshooting_guide": MetadataValue.url("https://docs.company.com/troubleshooting")
1072
}
1073
)
1074
```
1075
1076
### Error Monitoring and Alerting
1077
1078
```python
1079
@run_failure_sensor
1080
def comprehensive_failure_monitor(context):
1081
"""Monitor failures with detailed analysis and alerting."""
1082
1083
failed_run = context.dagster_run
1084
failure_event = context.failure_event
1085
1086
# Extract failure information
1087
job_name = failed_run.job_name
1088
step_key = failure_event.step_key if failure_event.step_key else "unknown"
1089
error_info = failure_event.dagster_event.step_failure_data
1090
1091
# Analyze failure type
1092
failure_analysis = {
1093
"job_name": job_name,
1094
"step_key": step_key,
1095
"run_id": failed_run.run_id,
1096
"failure_time": datetime.fromtimestamp(failure_event.timestamp),
1097
"error_category": "unknown",
1098
"severity": "medium",
1099
"auto_recoverable": False
1100
}
1101
1102
if error_info and error_info.error:
1103
error_message = error_info.error.message
1104
error_type = error_info.error.__class__.__name__
1105
1106
# Categorize error
1107
if any(keyword in error_message.lower() for keyword in ["connection", "timeout", "network"]):
1108
failure_analysis["error_category"] = "connectivity"
1109
failure_analysis["auto_recoverable"] = True
1110
failure_analysis["severity"] = "low"
1111
elif any(keyword in error_message.lower() for keyword in ["permission", "auth", "credentials"]):
1112
failure_analysis["error_category"] = "authentication"
1113
failure_analysis["severity"] = "high"
1114
elif any(keyword in error_message.lower() for keyword in ["data", "validation", "schema"]):
1115
failure_analysis["error_category"] = "data_quality"
1116
failure_analysis["severity"] = "medium"
1117
elif any(keyword in error_message.lower() for keyword in ["memory", "disk", "resource"]):
1118
failure_analysis["error_category"] = "resource_exhaustion"
1119
failure_analysis["severity"] = "high"
1120
1121
failure_analysis["error_message"] = error_message
1122
failure_analysis["error_type"] = error_type
1123
1124
# Send appropriate alerts based on severity
1125
if failure_analysis["severity"] == "high":
1126
send_pager_duty_alert(failure_analysis)
1127
send_slack_alert(failure_analysis, channel="#critical-alerts")
1128
elif failure_analysis["severity"] == "medium":
1129
send_slack_alert(failure_analysis, channel="#data-alerts")
1130
send_email_alert(failure_analysis, recipients=["data-team@company.com"])
1131
1132
# Log failure for analysis
1133
context.log.error(f"Job failure analysis: {failure_analysis}")
1134
1135
# Attempt auto-recovery for recoverable failures
1136
if failure_analysis["auto_recoverable"]:
1137
context.log.info("Attempting auto-recovery for recoverable failure")
1138
1139
# Wait a bit and retry
1140
return RunRequest(
1141
run_key=f"auto_retry_{failed_run.run_id}_{int(time.time())}",
1142
job_name=job_name,
1143
tags={
1144
"retry_type": "auto_recovery",
1145
"original_run_id": failed_run.run_id,
1146
"failure_category": failure_analysis["error_category"]
1147
}
1148
)
1149
```
1150
1151
This comprehensive error handling system provides structured error information, intelligent retry strategies, and robust failure recovery mechanisms. The system enables precise error categorization, automated recovery for transient failures, and detailed debugging information for complex failure scenarios.
1152
1153
For monitoring failures with sensors, see [Sensors and Schedules](./sensors-schedules.md). For execution contexts that handle errors, see [Execution and Contexts](./execution-contexts.md).