or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-definitions.mderror-handling.mdevents-metadata.mdexecution-contexts.mdindex.mdpartitions.mdsensors-schedules.mdstorage-io.md

error-handling.mddocs/

0

# Error Handling

1

2

This document covers Dagster's comprehensive error handling system, including the error hierarchy, failure events, retry policies, and best practices for robust pipeline development. Dagster provides structured error handling with rich failure information and configurable recovery strategies.

3

4

## Error Hierarchy

5

6

Dagster provides a structured hierarchy of exceptions for different failure scenarios, enabling precise error handling and debugging.

7

8

### Base Errors

9

10

#### `DagsterError` { .api }

11

12

**Module:** `dagster_shared.error`

13

**Type:** Exception base class

14

15

Base class for all Dagster-specific errors with structured error information.

16

17

```python

18

from dagster import DagsterError, op, job, asset

19

import pandas as pd

20

21

class CustomDataError(DagsterError):

22

"""Custom error for data quality issues."""

23

24

def __init__(self, message: str, data_info: dict = None):

25

super().__init__(message)

26

self.data_info = data_info or {}

27

28

@op

29

def validate_data_quality(df: pd.DataFrame) -> pd.DataFrame:

30

"""Op that validates data quality and raises custom errors."""

31

32

# Check for null values

33

null_count = df.isnull().sum().sum()

34

if null_count > 0:

35

raise CustomDataError(

36

f"Data quality check failed: {null_count} null values found",

37

data_info={

38

"null_count": null_count,

39

"total_records": len(df),

40

"null_percentage": (null_count / (len(df) * len(df.columns))) * 100,

41

"affected_columns": df.columns[df.isnull().any()].tolist()

42

}

43

)

44

45

# Check for duplicates

46

duplicate_count = df.duplicated().sum()

47

if duplicate_count > 0:

48

raise CustomDataError(

49

f"Data quality check failed: {duplicate_count} duplicate records found",

50

data_info={

51

"duplicate_count": duplicate_count,

52

"total_records": len(df),

53

"duplicate_percentage": (duplicate_count / len(df)) * 100

54

}

55

)

56

57

return df

58

59

@asset

60

def validated_customer_data(raw_customer_data: pd.DataFrame) -> pd.DataFrame:

61

"""Asset with comprehensive error handling."""

62

63

try:

64

# Validate data quality

65

validated_data = validate_data_quality(raw_customer_data)

66

67

# Additional business rule validation

68

if len(validated_data) == 0:

69

raise CustomDataError(

70

"No valid customer records found after validation",

71

data_info={"original_count": len(raw_customer_data)}

72

)

73

74

# Check required columns

75

required_columns = ["customer_id", "email", "created_at"]

76

missing_columns = set(required_columns) - set(validated_data.columns)

77

if missing_columns:

78

raise CustomDataError(

79

f"Required columns missing: {missing_columns}",

80

data_info={

81

"missing_columns": list(missing_columns),

82

"available_columns": list(validated_data.columns)

83

}

84

)

85

86

return validated_data

87

88

except CustomDataError as e:

89

# Log detailed error information

90

context.log.error(f"Data validation failed: {str(e)}")

91

context.log.error(f"Error details: {e.data_info}")

92

93

# Re-raise to fail the asset materialization

94

raise

95

96

except Exception as e:

97

# Handle unexpected errors

98

raise DagsterError(

99

f"Unexpected error during customer data validation: {str(e)}"

100

) from e

101

```

102

103

#### `DagsterInvariantViolationError` { .api }

104

105

**Module:** `dagster._core.errors`

106

**Type:** DagsterError subclass

107

108

Error for invariant violations and internal consistency checks.

109

110

```python

111

from dagster import DagsterInvariantViolationError, op

112

113

@op

114

def process_configuration(context) -> dict:

115

"""Op that validates configuration invariants."""

116

117

config = context.op_config

118

119

# Validate configuration invariants

120

if "batch_size" in config and "max_memory" in config:

121

batch_size = config["batch_size"]

122

max_memory = config["max_memory"]

123

124

# Check invariant: batch_size * record_size should not exceed max_memory

125

estimated_memory = batch_size * 1024 # Assume 1KB per record

126

127

if estimated_memory > max_memory:

128

raise DagsterInvariantViolationError(

129

f"Configuration invariant violated: "

130

f"batch_size ({batch_size}) * record_size (1KB) = {estimated_memory}KB "

131

f"exceeds max_memory ({max_memory}KB)"

132

)

133

134

# Validate required configuration relationships

135

if config.get("enable_caching") and not config.get("cache_directory"):

136

raise DagsterInvariantViolationError(

137

"Configuration invariant violated: "

138

"cache_directory must be specified when enable_caching is True"

139

)

140

141

return config

142

```

143

144

### Definition Errors

145

146

#### `DagsterInvalidDefinitionError` { .api }

147

148

**Module:** `dagster._core.errors`

149

**Type:** DagsterError subclass

150

151

Error for invalid asset, op, or job definitions.

152

153

```python

154

from dagster import DagsterInvalidDefinitionError, asset, In, Out

155

156

def validate_asset_definition(asset_fn):

157

"""Decorator that validates asset definition at definition time."""

158

159

# Check function signature

160

import inspect

161

signature = inspect.signature(asset_fn)

162

163

# Validate return annotation exists

164

if signature.return_annotation == inspect.Signature.empty:

165

raise DagsterInvalidDefinitionError(

166

f"Asset function {asset_fn.__name__} must have a return type annotation"

167

)

168

169

# Validate docstring exists

170

if not asset_fn.__doc__:

171

raise DagsterInvalidDefinitionError(

172

f"Asset function {asset_fn.__name__} must have a docstring"

173

)

174

175

return asset_fn

176

177

@validate_asset_definition

178

@asset

179

def well_defined_asset() -> pd.DataFrame:

180

"""This asset has proper definition validation."""

181

return pd.DataFrame({"data": [1, 2, 3]})

182

183

# This would raise DagsterInvalidDefinitionError:

184

# @validate_asset_definition

185

# @asset

186

# def poorly_defined_asset(): # Missing return annotation and docstring

187

# return pd.DataFrame({"data": [1, 2, 3]})

188

```

189

190

#### `DagsterInvalidInvocationError` { .api }

191

192

**Module:** `dagster._core.errors`

193

**Type:** DagsterError subclass

194

195

Error for invalid invocation of Dagster definitions.

196

197

```python

198

from dagster import DagsterInvalidInvocationError, op, job

199

200

@op(

201

ins={"input_data": In(dagster_type=pd.DataFrame)},

202

out=Out(dagster_type=pd.DataFrame)

203

)

204

def strict_data_processing(input_data: pd.DataFrame) -> pd.DataFrame:

205

"""Op with strict type checking."""

206

207

# Validate input type at runtime

208

if not isinstance(input_data, pd.DataFrame):

209

raise DagsterInvalidInvocationError(

210

f"Expected pandas DataFrame, got {type(input_data)}. "

211

f"This op requires DataFrame input for processing."

212

)

213

214

# Validate DataFrame structure

215

required_columns = ["id", "value", "timestamp"]

216

missing_columns = set(required_columns) - set(input_data.columns)

217

218

if missing_columns:

219

raise DagsterInvalidInvocationError(

220

f"Input DataFrame missing required columns: {missing_columns}. "

221

f"Available columns: {list(input_data.columns)}"

222

)

223

224

return input_data.dropna()

225

226

@op

227

def generate_invalid_data() -> dict: # Returns dict, not DataFrame

228

"""Op that produces invalid output for downstream consumption."""

229

return {"data": "not a dataframe"}

230

231

@job

232

def invalid_invocation_job():

233

"""Job that demonstrates invalid invocation."""

234

# This will raise DagsterInvalidInvocationError when executed

235

strict_data_processing(generate_invalid_data())

236

```

237

238

### Configuration Errors

239

240

#### `DagsterInvalidConfigError` { .api }

241

242

**Module:** `dagster._core.errors`

243

**Type:** DagsterError subclass

244

245

Error for invalid configuration values.

246

247

```python

248

from dagster import DagsterInvalidConfigError, op, Field, Int

249

250

@op(config_schema={

251

"batch_size": Field(Int, description="Batch size for processing"),

252

"timeout_seconds": Field(Int, description="Timeout in seconds"),

253

"retry_attempts": Field(Int, description="Number of retry attempts")

254

})

255

def configurable_processing_op(context):

256

"""Op with comprehensive config validation."""

257

258

config = context.op_config

259

260

# Validate configuration values

261

batch_size = config["batch_size"]

262

if batch_size <= 0:

263

raise DagsterInvalidConfigError(

264

f"batch_size must be positive, got {batch_size}"

265

)

266

267

if batch_size > 10000:

268

raise DagsterInvalidConfigError(

269

f"batch_size too large ({batch_size}), maximum allowed is 10000"

270

)

271

272

timeout_seconds = config["timeout_seconds"]

273

if timeout_seconds < 1 or timeout_seconds > 3600:

274

raise DagsterInvalidConfigError(

275

f"timeout_seconds must be between 1 and 3600, got {timeout_seconds}"

276

)

277

278

retry_attempts = config["retry_attempts"]

279

if retry_attempts < 0 or retry_attempts > 5:

280

raise DagsterInvalidConfigError(

281

f"retry_attempts must be between 0 and 5, got {retry_attempts}"

282

)

283

284

# Cross-field validation

285

if timeout_seconds < batch_size * 0.1:

286

raise DagsterInvalidConfigError(

287

f"timeout_seconds ({timeout_seconds}) too low for batch_size ({batch_size}). "

288

f"Minimum recommended: {int(batch_size * 0.1)} seconds"

289

)

290

291

context.log.info(f"Processing with batch_size={batch_size}, timeout={timeout_seconds}s")

292

return f"Processed with valid config"

293

```

294

295

### Execution Errors

296

297

#### `DagsterExecutionStepExecutionError` { .api }

298

299

**Module:** `dagster._core.errors`

300

**Type:** DagsterError subclass

301

302

Error during step execution with detailed execution context.

303

304

```python

305

from dagster import DagsterExecutionStepExecutionError, op, asset

306

import requests

307

from requests.exceptions import RequestException

308

309

@op

310

def fetch_external_data(context) -> dict:

311

"""Op that fetches data from external API with error handling."""

312

313

api_url = "https://api.example.com/data"

314

max_retries = 3

315

316

for attempt in range(max_retries):

317

try:

318

context.log.info(f"Fetching data from {api_url} (attempt {attempt + 1}/{max_retries})")

319

320

response = requests.get(api_url, timeout=30)

321

response.raise_for_status()

322

323

data = response.json()

324

context.log.info(f"Successfully fetched {len(data)} records")

325

326

return data

327

328

except RequestException as e:

329

context.log.warning(f"Attempt {attempt + 1} failed: {str(e)}")

330

331

if attempt == max_retries - 1: # Last attempt

332

raise DagsterExecutionStepExecutionError(

333

f"Failed to fetch data from {api_url} after {max_retries} attempts. "

334

f"Last error: {str(e)}",

335

step_context=context.step_context,

336

user_exception=e

337

)

338

339

# Wait before retry

340

import time

341

time.sleep(2 ** attempt) # Exponential backoff

342

343

# This shouldn't be reached, but just in case

344

raise DagsterExecutionStepExecutionError("Unexpected error in fetch_external_data")

345

346

@asset

347

def processed_api_data(context, external_data: dict) -> pd.DataFrame:

348

"""Asset that processes external data with error handling."""

349

350

try:

351

if not external_data or "records" not in external_data:

352

raise ValueError("Invalid data structure from API")

353

354

records = external_data["records"]

355

if not isinstance(records, list):

356

raise ValueError(f"Expected list of records, got {type(records)}")

357

358

df = pd.DataFrame(records)

359

360

if len(df) == 0:

361

raise ValueError("No records found in API response")

362

363

# Validate required fields

364

required_fields = ["id", "timestamp", "value"]

365

missing_fields = set(required_fields) - set(df.columns)

366

367

if missing_fields:

368

raise ValueError(f"Missing required fields: {missing_fields}")

369

370

# Process data

371

processed_df = df.dropna().reset_index(drop=True)

372

373

context.log.info(f"Processed {len(processed_df)} valid records from {len(df)} total")

374

375

return processed_df

376

377

except Exception as e:

378

# Wrap in execution error with context

379

raise DagsterExecutionStepExecutionError(

380

f"Failed to process API data: {str(e)}",

381

user_exception=e

382

) from e

383

```

384

385

## Failure Events

386

387

Dagster provides structured failure events that capture detailed information about errors for debugging and monitoring.

388

389

### `Failure` { .api }

390

391

**Module:** `dagster._core.definitions.events`

392

**Type:** Event class

393

394

Structured failure event with metadata and debugging information.

395

396

```python

397

from dagster import Failure, op, asset, MetadataValue

398

import traceback

399

400

@op

401

def risky_operation(context) -> str:

402

"""Op that demonstrates structured failure reporting."""

403

404

try:

405

# Simulate risky operation

406

data = load_critical_data()

407

408

if not validate_data_integrity(data):

409

# Create structured failure with metadata

410

raise Failure(

411

description="Data integrity validation failed",

412

metadata={

413

"validation_errors": MetadataValue.json({

414

"missing_records": 15,

415

"invalid_checksums": 3,

416

"schema_violations": 7

417

}),

418

"data_source": MetadataValue.text("critical_database"),

419

"validation_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),

420

"expected_record_count": MetadataValue.int(1000),

421

"actual_record_count": MetadataValue.int(975),

422

"severity": MetadataValue.text("HIGH")

423

}

424

)

425

426

return process_data(data)

427

428

except DatabaseConnectionError as e:

429

# Database-specific failure

430

raise Failure(

431

description=f"Database connection failed: {str(e)}",

432

metadata={

433

"connection_string": MetadataValue.text("postgresql://..."),

434

"error_code": MetadataValue.text(getattr(e, 'code', 'UNKNOWN')),

435

"retry_recommended": MetadataValue.bool(True),

436

"estimated_downtime": MetadataValue.text("5-10 minutes"),

437

"contact": MetadataValue.text("database-team@company.com")

438

}

439

)

440

441

except Exception as e:

442

# Generic failure with full context

443

raise Failure(

444

description=f"Unexpected error in risky operation: {str(e)}",

445

metadata={

446

"error_type": MetadataValue.text(type(e).__name__),

447

"error_message": MetadataValue.text(str(e)),

448

"stack_trace": MetadataValue.text(traceback.format_exc()),

449

"operation_context": MetadataValue.json({

450

"step": "data_processing",

451

"input_size": "unknown",

452

"memory_usage": "high"

453

}),

454

"debug_info": MetadataValue.url("https://wiki.company.com/debug/risky-operation")

455

}

456

)

457

458

@asset

459

def resilient_asset(context) -> pd.DataFrame:

460

"""Asset with comprehensive failure handling and recovery."""

461

462

recovery_strategies = ["primary", "secondary", "fallback"]

463

464

for strategy in recovery_strategies:

465

try:

466

context.log.info(f"Attempting data load with {strategy} strategy")

467

468

if strategy == "primary":

469

data = load_primary_data_source()

470

elif strategy == "secondary":

471

data = load_secondary_data_source()

472

elif strategy == "fallback":

473

data = load_fallback_data_source()

474

475

# Validate loaded data

476

if len(data) == 0:

477

raise ValueError(f"No data loaded from {strategy} source")

478

479

context.log.info(f"Successfully loaded {len(data)} records using {strategy} strategy")

480

481

return data

482

483

except Exception as e:

484

context.log.warning(f"Strategy {strategy} failed: {str(e)}")

485

486

if strategy == "fallback": # Last resort failed

487

raise Failure(

488

description="All data loading strategies failed",

489

metadata={

490

"failed_strategies": MetadataValue.json(recovery_strategies),

491

"primary_error": MetadataValue.text("Connection timeout"),

492

"secondary_error": MetadataValue.text("Authentication failed"),

493

"fallback_error": MetadataValue.text(str(e)),

494

"recommended_action": MetadataValue.text(

495

"Check data source availability and credentials"

496

),

497

"escalation_required": MetadataValue.bool(True),

498

"incident_severity": MetadataValue.text("P1")

499

}

500

)

501

502

# Should never reach here

503

raise Failure("Unexpected code path in resilient_asset")

504

```

505

506

### Error Context and Debugging

507

508

```python

509

@op

510

def debugging_friendly_op(context) -> dict:

511

"""Op with comprehensive debugging information in failures."""

512

513

debug_context = {

514

"op_name": context.op_def.name,

515

"run_id": context.run_id,

516

"step_key": context.step_context.step.key,

517

"execution_time": pd.Timestamp.now().isoformat(),

518

"resource_keys": list(context.resources._resource_defs.keys())

519

}

520

521

try:

522

# Simulate complex operation with multiple failure points

523

step1_result = perform_step1()

524

debug_context["step1_completed"] = True

525

526

step2_result = perform_step2(step1_result)

527

debug_context["step2_completed"] = True

528

529

final_result = perform_step3(step2_result)

530

debug_context["step3_completed"] = True

531

532

return final_result

533

534

except Exception as e:

535

# Add failure location to debug context

536

debug_context["failure_location"] = get_failure_location_from_traceback()

537

debug_context["error_type"] = type(e).__name__

538

debug_context["error_message"] = str(e)

539

540

# Determine failure category for better handling

541

if isinstance(e, ConnectionError):

542

failure_category = "connectivity"

543

recovery_time = "1-5 minutes"

544

elif isinstance(e, PermissionError):

545

failure_category = "authentication"

546

recovery_time = "immediate with credentials update"

547

elif isinstance(e, ValueError):

548

failure_category = "data_validation"

549

recovery_time = "requires data fix"

550

else:

551

failure_category = "unknown"

552

recovery_time = "unknown"

553

554

raise Failure(

555

description=f"Operation failed at {debug_context.get('failure_location', 'unknown location')}: {str(e)}",

556

metadata={

557

"debug_context": MetadataValue.json(debug_context),

558

"failure_category": MetadataValue.text(failure_category),

559

"estimated_recovery_time": MetadataValue.text(recovery_time),

560

"troubleshooting_guide": MetadataValue.url(

561

f"https://docs.company.com/troubleshooting/{failure_category}"

562

),

563

"runbook": MetadataValue.url(

564

f"https://runbook.company.com/{context.op_def.name}"

565

)

566

}

567

)

568

```

569

570

## Retry Policies

571

572

Dagster provides configurable retry policies for automatic recovery from transient failures.

573

574

### `RetryPolicy` { .api }

575

576

**Module:** `dagster._core.definitions.policy`

577

**Type:** Class

578

579

Configurable retry policy with backoff and jitter strategies.

580

581

```python

582

from dagster import RetryPolicy, Backoff, Jitter, op, job, asset

583

import random

584

import time

585

586

# Basic retry policy

587

basic_retry = RetryPolicy(

588

max_retries=3,

589

delay=1.0 # 1 second delay

590

)

591

592

@op(retry_policy=basic_retry)

593

def simple_retry_op(context) -> str:

594

"""Op with basic retry policy."""

595

596

# Simulate random failures

597

if random.random() < 0.7: # 70% chance of failure

598

raise Exception("Random failure for retry testing")

599

600

return "Success after retries"

601

602

# Exponential backoff retry policy

603

exponential_retry = RetryPolicy(

604

max_retries=5,

605

delay=1.0,

606

backoff=Backoff.EXPONENTIAL, # 1s, 2s, 4s, 8s, 16s

607

jitter=Jitter.PLUS_MINUS # Add randomness to avoid thundering herd

608

)

609

610

@op(retry_policy=exponential_retry)

611

def external_api_call(context) -> dict:

612

"""Op that calls external API with exponential backoff."""

613

614

context.log.info("Attempting API call...")

615

616

try:

617

# Simulate API call that might fail due to rate limiting

618

response = requests.get(

619

"https://api.example.com/data",

620

headers={"Authorization": "Bearer token"},

621

timeout=30

622

)

623

624

if response.status_code == 429: # Rate limited

625

context.log.warning("Rate limited, will retry with backoff")

626

raise Exception("Rate limit exceeded")

627

628

response.raise_for_status()

629

return response.json()

630

631

except requests.RequestException as e:

632

context.log.warning(f"API call failed: {str(e)}")

633

raise

634

635

# Linear backoff with custom delay calculation

636

linear_retry = RetryPolicy(

637

max_retries=4,

638

delay=2.0,

639

backoff=Backoff.LINEAR, # 2s, 4s, 6s, 8s

640

jitter=Jitter.FULL # Randomize delay completely

641

)

642

643

@op(retry_policy=linear_retry)

644

def database_operation(context) -> pd.DataFrame:

645

"""Database operation with linear backoff retry."""

646

647

attempt_num = getattr(context, '_retry_attempt', 0)

648

context.log.info(f"Database operation attempt {attempt_num + 1}")

649

650

try:

651

# Simulate database operation

652

connection = get_database_connection()

653

654

query = """

655

SELECT id, name, value, created_at

656

FROM important_table

657

WHERE created_at >= NOW() - INTERVAL 1 DAY

658

"""

659

660

df = pd.read_sql(query, connection)

661

662

context.log.info(f"Successfully loaded {len(df)} records")

663

return df

664

665

except DatabaseError as e:

666

context.log.warning(f"Database error: {str(e)}")

667

668

# Check if error is retryable

669

if "connection" in str(e).lower() or "timeout" in str(e).lower():

670

context.log.info("Retryable database error, will attempt retry")

671

raise # Let retry policy handle it

672

else:

673

context.log.error("Non-retryable database error, failing immediately")

674

raise Failure(

675

description=f"Non-retryable database error: {str(e)}",

676

metadata={

677

"error_type": "database_error",

678

"retryable": False,

679

"error_code": getattr(e, 'code', 'UNKNOWN')

680

}

681

)

682

683

# Custom retry policy with conditional logic

684

class ConditionalRetryPolicy(RetryPolicy):

685

"""Custom retry policy with condition-based retry logic."""

686

687

def __init__(self, max_retries: int = 3, delay: float = 1.0):

688

super().__init__(max_retries=max_retries, delay=delay)

689

690

def should_retry(self, context, exception: Exception) -> bool:

691

"""Determine if operation should be retried based on exception type."""

692

693

# Always retry connection errors

694

if isinstance(exception, (ConnectionError, TimeoutError)):

695

return True

696

697

# Retry rate limit errors

698

if "rate limit" in str(exception).lower():

699

return True

700

701

# Don't retry authentication errors

702

if isinstance(exception, (PermissionError, AuthenticationError)):

703

return False

704

705

# Don't retry validation errors

706

if isinstance(exception, ValueError):

707

return False

708

709

# Default behavior for other exceptions

710

return True

711

712

conditional_retry = ConditionalRetryPolicy(max_retries=3, delay=2.0)

713

714

@op(retry_policy=conditional_retry)

715

def smart_retry_op(context) -> str:

716

"""Op with intelligent retry logic."""

717

718

# This will retry connection errors but not validation errors

719

operation_type = random.choice(["connection_error", "validation_error", "success"])

720

721

if operation_type == "connection_error":

722

raise ConnectionError("Network connection failed")

723

elif operation_type == "validation_error":

724

raise ValueError("Invalid input data")

725

726

return "Operation succeeded"

727

```

728

729

### `RetryRequested` { .api }

730

731

**Module:** `dagster._core.definitions.events`

732

**Type:** Exception class

733

734

Explicit retry request with custom delay and metadata.

735

736

```python

737

from dagster import RetryRequested, op, MetadataValue

738

739

@op

740

def explicit_retry_op(context) -> str:

741

"""Op that explicitly requests retries with custom logic."""

742

743

# Track retry attempts in op context

744

attempt_count = getattr(context, '_attempt_count', 0)

745

context._attempt_count = attempt_count + 1

746

747

context.log.info(f"Attempt {attempt_count + 1}")

748

749

# Simulate different failure scenarios

750

if attempt_count == 0:

751

# First attempt: request retry immediately

752

raise RetryRequested(

753

max_retries=3,

754

seconds_to_wait=0, # Immediate retry

755

metadata={

756

"retry_reason": MetadataValue.text("Initial setup required"),

757

"attempt_number": MetadataValue.int(attempt_count + 1)

758

}

759

)

760

761

elif attempt_count == 1:

762

# Second attempt: wait 5 seconds before retry

763

raise RetryRequested(

764

max_retries=3,

765

seconds_to_wait=5,

766

metadata={

767

"retry_reason": MetadataValue.text("Waiting for external service"),

768

"wait_time_seconds": MetadataValue.int(5),

769

"attempt_number": MetadataValue.int(attempt_count + 1)

770

}

771

)

772

773

elif attempt_count == 2:

774

# Third attempt: exponential backoff

775

wait_time = 2 ** attempt_count # 4 seconds

776

raise RetryRequested(

777

max_retries=3,

778

seconds_to_wait=wait_time,

779

metadata={

780

"retry_reason": MetadataValue.text("Exponential backoff"),

781

"wait_time_seconds": MetadataValue.int(wait_time),

782

"attempt_number": MetadataValue.int(attempt_count + 1)

783

}

784

)

785

786

# Fourth attempt: succeed

787

return f"Success on attempt {attempt_count + 1}"

788

789

@asset

790

def resilient_data_processing(context) -> pd.DataFrame:

791

"""Asset with sophisticated retry logic for data processing."""

792

793

max_attempts = 5

794

base_delay = 1.0

795

796

for attempt in range(max_attempts):

797

try:

798

context.log.info(f"Processing attempt {attempt + 1}/{max_attempts}")

799

800

# Load data with potential failures

801

data = load_data_with_retries()

802

803

# Validate data quality

804

if len(data) == 0:

805

if attempt < max_attempts - 1:

806

wait_time = base_delay * (2 ** attempt) # Exponential backoff

807

808

raise RetryRequested(

809

max_retries=max_attempts - 1,

810

seconds_to_wait=wait_time,

811

metadata={

812

"retry_reason": MetadataValue.text("Empty dataset, waiting for data arrival"),

813

"wait_time": MetadataValue.float(wait_time),

814

"attempt": MetadataValue.int(attempt + 1),

815

"data_source_status": MetadataValue.text("checking")

816

}

817

)

818

else:

819

raise Failure(

820

description="No data available after all retry attempts",

821

metadata={

822

"total_attempts": MetadataValue.int(max_attempts),

823

"final_status": MetadataValue.text("no_data_available")

824

}

825

)

826

827

# Check data quality

828

quality_score = calculate_data_quality(data)

829

830

if quality_score < 0.8: # 80% quality threshold

831

if attempt < max_attempts - 1:

832

wait_time = base_delay * (attempt + 1) # Linear backoff for quality issues

833

834

raise RetryRequested(

835

max_retries=max_attempts - 1,

836

seconds_to_wait=wait_time,

837

metadata={

838

"retry_reason": MetadataValue.text("Poor data quality, waiting for data refresh"),

839

"quality_score": MetadataValue.float(quality_score),

840

"quality_threshold": MetadataValue.float(0.8),

841

"wait_time": MetadataValue.float(wait_time),

842

"attempt": MetadataValue.int(attempt + 1)

843

}

844

)

845

else:

846

raise Failure(

847

description=f"Data quality too low ({quality_score:.2f}) after all attempts",

848

metadata={

849

"final_quality_score": MetadataValue.float(quality_score),

850

"quality_threshold": MetadataValue.float(0.8),

851

"total_attempts": MetadataValue.int(max_attempts)

852

}

853

)

854

855

# Success case

856

context.log.info(f"Data processing succeeded on attempt {attempt + 1}")

857

context.add_output_metadata({

858

"successful_attempt": MetadataValue.int(attempt + 1),

859

"quality_score": MetadataValue.float(quality_score),

860

"record_count": MetadataValue.int(len(data))

861

})

862

863

return data

864

865

except (RetryRequested, Failure):

866

# Re-raise retry and failure events

867

raise

868

except Exception as e:

869

# Handle unexpected errors

870

if attempt < max_attempts - 1:

871

wait_time = base_delay * (2 ** attempt)

872

873

raise RetryRequested(

874

max_retries=max_attempts - 1,

875

seconds_to_wait=wait_time,

876

metadata={

877

"retry_reason": MetadataValue.text(f"Unexpected error: {str(e)}"),

878

"error_type": MetadataValue.text(type(e).__name__),

879

"wait_time": MetadataValue.float(wait_time),

880

"attempt": MetadataValue.int(attempt + 1)

881

}

882

)

883

else:

884

raise Failure(

885

description=f"Unexpected error after all retry attempts: {str(e)}",

886

metadata={

887

"error_type": MetadataValue.text(type(e).__name__),

888

"error_message": MetadataValue.text(str(e)),

889

"total_attempts": MetadataValue.int(max_attempts)

890

}

891

)

892

893

# Should never reach here

894

raise Failure("Unexpected code path in resilient_data_processing")

895

```

896

897

## Error Handling Best Practices

898

899

### Structured Error Information

900

901

```python

902

from dagster import asset, Failure, MetadataValue

903

from typing import Dict, Any

904

import logging

905

import traceback

906

907

class ErrorTracker:

908

"""Utility class for tracking and reporting errors."""

909

910

@staticmethod

911

def create_error_context(context, error: Exception) -> Dict[str, Any]:

912

"""Create standardized error context."""

913

return {

914

"error_type": type(error).__name__,

915

"error_message": str(error),

916

"stack_trace": traceback.format_exc(),

917

"asset_key": str(context.asset_key) if hasattr(context, 'asset_key') else None,

918

"op_name": context.op_def.name if hasattr(context, 'op_def') else None,

919

"run_id": context.run_id,

920

"step_key": getattr(context, 'step_key', None),

921

"partition_key": getattr(context, 'partition_key', None),

922

"timestamp": pd.Timestamp.now().isoformat()

923

}

924

925

@staticmethod

926

def should_retry(error: Exception) -> bool:

927

"""Determine if error is retryable."""

928

retryable_errors = (

929

ConnectionError,

930

TimeoutError,

931

requests.exceptions.ConnectionError,

932

requests.exceptions.Timeout

933

)

934

935

non_retryable_errors = (

936

ValueError,

937

KeyError,

938

TypeError,

939

PermissionError

940

)

941

942

if isinstance(error, retryable_errors):

943

return True

944

elif isinstance(error, non_retryable_errors):

945

return False

946

947

# Check error message for retry indicators

948

error_msg = str(error).lower()

949

if any(keyword in error_msg for keyword in ["timeout", "connection", "network"]):

950

return True

951

elif any(keyword in error_msg for keyword in ["permission", "auth", "invalid"]):

952

return False

953

954

return True # Default to retryable for unknown errors

955

956

@asset

957

def robust_data_pipeline(context) -> pd.DataFrame:

958

"""Asset with comprehensive error handling best practices."""

959

960

error_tracker = ErrorTracker()

961

962

try:

963

# Step 1: Data loading with error context

964

context.log.info("Starting data loading phase")

965

966

try:

967

raw_data = load_raw_data()

968

context.log.info(f"Loaded {len(raw_data)} raw records")

969

except Exception as e:

970

error_context = error_tracker.create_error_context(context, e)

971

972

if error_tracker.should_retry(e):

973

raise RetryRequested(

974

max_retries=3,

975

seconds_to_wait=5.0,

976

metadata={

977

"phase": MetadataValue.text("data_loading"),

978

"error_context": MetadataValue.json(error_context),

979

"retry_recommended": MetadataValue.bool(True)

980

}

981

)

982

else:

983

raise Failure(

984

description=f"Non-retryable error in data loading: {str(e)}",

985

metadata={

986

"phase": MetadataValue.text("data_loading"),

987

"error_context": MetadataValue.json(error_context),

988

"retry_recommended": MetadataValue.bool(False)

989

}

990

)

991

992

# Step 2: Data validation with structured errors

993

context.log.info("Starting data validation phase")

994

995

validation_errors = []

996

997

if len(raw_data) == 0:

998

validation_errors.append("Empty dataset")

999

1000

required_columns = ["id", "timestamp", "value"]

1001

missing_columns = set(required_columns) - set(raw_data.columns)

1002

if missing_columns:

1003

validation_errors.append(f"Missing columns: {missing_columns}")

1004

1005

null_percentage = (raw_data.isnull().sum().sum() / (len(raw_data) * len(raw_data.columns))) * 100

1006

if null_percentage > 5: # 5% threshold

1007

validation_errors.append(f"Too many null values: {null_percentage:.1f}%")

1008

1009

if validation_errors:

1010

raise Failure(

1011

description="Data validation failed",

1012

metadata={

1013

"phase": MetadataValue.text("data_validation"),

1014

"validation_errors": MetadataValue.json(validation_errors),

1015

"record_count": MetadataValue.int(len(raw_data)),

1016

"null_percentage": MetadataValue.float(null_percentage),

1017

"data_quality_score": MetadataValue.float(max(0, 1 - null_percentage/100))

1018

}

1019

)

1020

1021

# Step 3: Data processing with progress tracking

1022

context.log.info("Starting data processing phase")

1023

1024

try:

1025

processed_data = process_data(raw_data)

1026

1027

# Validate processing results

1028

processing_loss = len(raw_data) - len(processed_data)

1029

loss_percentage = (processing_loss / len(raw_data)) * 100

1030

1031

if loss_percentage > 20: # 20% loss threshold

1032

context.log.warning(f"High data loss during processing: {loss_percentage:.1f}%")

1033

1034

context.add_output_metadata({

1035

"input_records": MetadataValue.int(len(raw_data)),

1036

"output_records": MetadataValue.int(len(processed_data)),

1037

"processing_loss": MetadataValue.int(processing_loss),

1038

"loss_percentage": MetadataValue.float(loss_percentage),

1039

"processing_success": MetadataValue.bool(True)

1040

})

1041

1042

return processed_data

1043

1044

except Exception as e:

1045

error_context = error_tracker.create_error_context(context, e)

1046

1047

raise Failure(

1048

description=f"Data processing failed: {str(e)}",

1049

metadata={

1050

"phase": MetadataValue.text("data_processing"),

1051

"error_context": MetadataValue.json(error_context),

1052

"input_records": MetadataValue.int(len(raw_data)),

1053

"processing_success": MetadataValue.bool(False)

1054

}

1055

)

1056

1057

except (Failure, RetryRequested):

1058

# Re-raise structured Dagster events

1059

raise

1060

1061

except Exception as e:

1062

# Catch-all for unexpected errors

1063

error_context = error_tracker.create_error_context(context, e)

1064

1065

raise Failure(

1066

description=f"Unexpected error in robust_data_pipeline: {str(e)}",

1067

metadata={

1068

"phase": MetadataValue.text("unknown"),

1069

"error_context": MetadataValue.json(error_context),

1070

"unexpected_error": MetadataValue.bool(True),

1071

"troubleshooting_guide": MetadataValue.url("https://docs.company.com/troubleshooting")

1072

}

1073

)

1074

```

1075

1076

### Error Monitoring and Alerting

1077

1078

```python

1079

@run_failure_sensor

1080

def comprehensive_failure_monitor(context):

1081

"""Monitor failures with detailed analysis and alerting."""

1082

1083

failed_run = context.dagster_run

1084

failure_event = context.failure_event

1085

1086

# Extract failure information

1087

job_name = failed_run.job_name

1088

step_key = failure_event.step_key if failure_event.step_key else "unknown"

1089

error_info = failure_event.dagster_event.step_failure_data

1090

1091

# Analyze failure type

1092

failure_analysis = {

1093

"job_name": job_name,

1094

"step_key": step_key,

1095

"run_id": failed_run.run_id,

1096

"failure_time": datetime.fromtimestamp(failure_event.timestamp),

1097

"error_category": "unknown",

1098

"severity": "medium",

1099

"auto_recoverable": False

1100

}

1101

1102

if error_info and error_info.error:

1103

error_message = error_info.error.message

1104

error_type = error_info.error.__class__.__name__

1105

1106

# Categorize error

1107

if any(keyword in error_message.lower() for keyword in ["connection", "timeout", "network"]):

1108

failure_analysis["error_category"] = "connectivity"

1109

failure_analysis["auto_recoverable"] = True

1110

failure_analysis["severity"] = "low"

1111

elif any(keyword in error_message.lower() for keyword in ["permission", "auth", "credentials"]):

1112

failure_analysis["error_category"] = "authentication"

1113

failure_analysis["severity"] = "high"

1114

elif any(keyword in error_message.lower() for keyword in ["data", "validation", "schema"]):

1115

failure_analysis["error_category"] = "data_quality"

1116

failure_analysis["severity"] = "medium"

1117

elif any(keyword in error_message.lower() for keyword in ["memory", "disk", "resource"]):

1118

failure_analysis["error_category"] = "resource_exhaustion"

1119

failure_analysis["severity"] = "high"

1120

1121

failure_analysis["error_message"] = error_message

1122

failure_analysis["error_type"] = error_type

1123

1124

# Send appropriate alerts based on severity

1125

if failure_analysis["severity"] == "high":

1126

send_pager_duty_alert(failure_analysis)

1127

send_slack_alert(failure_analysis, channel="#critical-alerts")

1128

elif failure_analysis["severity"] == "medium":

1129

send_slack_alert(failure_analysis, channel="#data-alerts")

1130

send_email_alert(failure_analysis, recipients=["data-team@company.com"])

1131

1132

# Log failure for analysis

1133

context.log.error(f"Job failure analysis: {failure_analysis}")

1134

1135

# Attempt auto-recovery for recoverable failures

1136

if failure_analysis["auto_recoverable"]:

1137

context.log.info("Attempting auto-recovery for recoverable failure")

1138

1139

# Wait a bit and retry

1140

return RunRequest(

1141

run_key=f"auto_retry_{failed_run.run_id}_{int(time.time())}",

1142

job_name=job_name,

1143

tags={

1144

"retry_type": "auto_recovery",

1145

"original_run_id": failed_run.run_id,

1146

"failure_category": failure_analysis["error_category"]

1147

}

1148

)

1149

```

1150

1151

This comprehensive error handling system provides structured error information, intelligent retry strategies, and robust failure recovery mechanisms. The system enables precise error categorization, automated recovery for transient failures, and detailed debugging information for complex failure scenarios.

1152

1153

For monitoring failures with sensors, see [Sensors and Schedules](./sensors-schedules.md). For execution contexts that handle errors, see [Execution and Contexts](./execution-contexts.md).