or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-definitions.mderror-handling.mdevents-metadata.mdexecution-contexts.mdindex.mdpartitions.mdsensors-schedules.mdstorage-io.md

events-metadata.mddocs/

0

# Events and Metadata

1

2

This document covers Dagster's comprehensive event and metadata system, including materialization events, observation events, metadata values, table metadata, and code references. The event system provides rich observability and lineage tracking for all pipeline operations.

3

4

## Core Events

5

6

Dagster's event system captures and records all significant occurrences during pipeline execution, providing comprehensive observability and debugging capabilities.

7

8

### Output Events

9

10

#### `Output` { .api }

11

12

**Module:** `dagster._core.definitions.events`

13

**Type:** Class

14

15

Represents the output of an operation with value and metadata.

16

17

```python

18

from dagster import op, Output, MetadataValue

19

import pandas as pd

20

21

@op

22

def process_data() -> Output[pd.DataFrame]:

23

"""Op producing an output with metadata."""

24

25

# Process data

26

df = pd.DataFrame({

27

"id": range(1000),

28

"value": np.random.randn(1000),

29

"category": np.random.choice(["A", "B", "C"], 1000)

30

})

31

32

# Return output with rich metadata

33

return Output(

34

value=df,

35

metadata={

36

"records": len(df),

37

"columns": MetadataValue.int(len(df.columns)),

38

"memory_usage_mb": MetadataValue.float(df.memory_usage(deep=True).sum() / 1024 / 1024),

39

"categories": MetadataValue.text(",".join(df["category"].unique())),

40

"data_quality_score": MetadataValue.float(0.95),

41

"processing_time": MetadataValue.text("2.3 seconds"),

42

"sample_data": MetadataValue.md(df.head().to_markdown())

43

}

44

)

45

46

@op

47

def validate_data(df: pd.DataFrame) -> Output[bool]:

48

"""Op with conditional output metadata."""

49

50

# Validation logic

51

null_count = df.isnull().sum().sum()

52

duplicate_count = df.duplicated().sum()

53

is_valid = null_count == 0 and duplicate_count == 0

54

55

metadata = {

56

"null_count": MetadataValue.int(null_count),

57

"duplicate_count": MetadataValue.int(duplicate_count),

58

"validation_status": MetadataValue.text("PASSED" if is_valid else "FAILED")

59

}

60

61

if not is_valid:

62

metadata["issues"] = MetadataValue.text(f"Found {null_count} nulls, {duplicate_count} duplicates")

63

64

return Output(

65

value=is_valid,

66

metadata=metadata

67

)

68

```

69

70

**Parameters:**

71

- `value: Any` - The output value

72

- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Output metadata

73

- `output_name: str = "result"` - Name of the output

74

75

#### `DynamicOutput` { .api }

76

77

**Module:** `dagster._core.definitions.events`

78

**Type:** Class

79

80

Output for dynamic/fan-out operations that produce multiple outputs.

81

82

```python

83

from dagster import op, DynamicOutput, DynamicOut

84

import pandas as pd

85

86

@op(out=DynamicOut())

87

def split_data_by_category(df: pd.DataFrame):

88

"""Op producing dynamic outputs for each category."""

89

90

categories = df["category"].unique()

91

92

for category in categories:

93

category_data = df[df["category"] == category]

94

95

yield DynamicOutput(

96

value=category_data,

97

mapping_key=category, # Dynamic key

98

metadata={

99

"category": MetadataValue.text(category),

100

"record_count": MetadataValue.int(len(category_data)),

101

"avg_value": MetadataValue.float(category_data["value"].mean())

102

}

103

)

104

105

@op

106

def process_category(category_data: pd.DataFrame) -> dict:

107

"""Process individual category data."""

108

return {

109

"category": category_data["category"].iloc[0],

110

"summary": {

111

"count": len(category_data),

112

"mean": category_data["value"].mean(),

113

"std": category_data["value"].std()

114

}

115

}

116

117

@job

118

def dynamic_processing_job():

119

"""Job with dynamic fan-out."""

120

categories = split_data_by_category(load_data())

121

categories.map(process_category)

122

```

123

124

**Parameters:**

125

- `value: Any` - The output value

126

- `mapping_key: str` - Dynamic mapping key

127

- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Output metadata

128

- `output_name: str = "result"` - Name of the output

129

130

### Asset Events

131

132

#### `AssetMaterialization` { .api }

133

134

**Module:** `dagster._core.definitions.events`

135

**Type:** Class

136

137

Event representing the materialization of an asset with metadata and lineage.

138

139

```python

140

from dagster import op, AssetMaterialization, AssetKey, MetadataValue

141

import pandas as pd

142

143

@op

144

def generate_asset_materialization() -> AssetMaterialization:

145

"""Op that explicitly creates asset materialization."""

146

147

# Process data

148

df = pd.read_csv("input_data.csv")

149

processed_df = df.dropna().reset_index(drop=True)

150

151

# Save to storage

152

processed_df.to_parquet("processed_data.parquet")

153

154

# Create materialization event

155

return AssetMaterialization(

156

asset_key=AssetKey(["warehouse", "processed_data"]),

157

description="Processed customer data with null values removed",

158

metadata={

159

"input_records": MetadataValue.int(len(df)),

160

"output_records": MetadataValue.int(len(processed_df)),

161

"null_records_removed": MetadataValue.int(len(df) - len(processed_df)),

162

"output_file": MetadataValue.path("processed_data.parquet"),

163

"file_size_mb": MetadataValue.float(

164

os.path.getsize("processed_data.parquet") / 1024 / 1024

165

),

166

"processing_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),

167

"data_schema": MetadataValue.table_schema(

168

TableSchema(

169

columns=[

170

TableColumn(name=col, type=str(dtype))

171

for col, dtype in processed_df.dtypes.items()

172

]

173

)

174

)

175

}

176

)

177

178

# Asset materialization in asset function

179

@asset

180

def customer_analytics(context) -> dict:

181

"""Asset with rich materialization metadata."""

182

183

# Load and process data

184

customers = pd.read_sql("SELECT * FROM customers", connection)

185

186

# Generate analytics

187

analytics = {

188

"total_customers": len(customers),

189

"active_customers": len(customers[customers["active"] == True]),

190

"avg_age": customers["age"].mean(),

191

"top_cities": customers["city"].value_counts().head(5).to_dict()

192

}

193

194

# Log materialization with comprehensive metadata

195

context.add_output_metadata({

196

"total_customers": MetadataValue.int(analytics["total_customers"]),

197

"active_rate": MetadataValue.float(

198

analytics["active_customers"] / analytics["total_customers"]

199

),

200

"average_age": MetadataValue.float(analytics["avg_age"]),

201

"top_cities": MetadataValue.json(analytics["top_cities"]),

202

"data_freshness": MetadataValue.timestamp(pd.Timestamp.now()),

203

"query_execution_time": MetadataValue.text("1.2 seconds"),

204

"customer_distribution": MetadataValue.md(

205

customers.groupby("city").size().head(10).to_markdown()

206

)

207

})

208

209

return analytics

210

```

211

212

**Parameters:**

213

- `asset_key: AssetKey` - Asset key being materialized

214

- `description: Optional[str]` - Description of materialization

215

- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Materialization metadata

216

- `partition: Optional[str]` - Partition key if asset is partitioned

217

- `tags: Optional[Dict[str, str]]` - Materialization tags

218

219

#### `AssetObservation` { .api }

220

221

**Module:** `dagster._core.definitions.events`

222

**Type:** Class

223

224

Event representing observation of an asset without materialization (read-only).

225

226

```python

227

from dagster import op, AssetObservation, AssetKey, MetadataValue

228

import pandas as pd

229

230

@op

231

def monitor_external_table() -> AssetObservation:

232

"""Monitor external data source without materializing."""

233

234

# Connect to external system (no materialization)

235

table_stats = get_table_stats("external_database", "user_events")

236

237

return AssetObservation(

238

asset_key=AssetKey(["external", "user_events"]),

239

description="Daily monitoring of external user events table",

240

metadata={

241

"row_count": MetadataValue.int(table_stats["row_count"]),

242

"size_gb": MetadataValue.float(table_stats["size_bytes"] / (1024**3)),

243

"last_updated": MetadataValue.timestamp(table_stats["last_modified"]),

244

"health_score": MetadataValue.float(table_stats["health_score"]),

245

"availability": MetadataValue.text(

246

"AVAILABLE" if table_stats["is_accessible"] else "UNAVAILABLE"

247

),

248

"partition_count": MetadataValue.int(table_stats["partition_count"]),

249

"schema_version": MetadataValue.text(table_stats["schema_version"])

250

}

251

)

252

253

# Asset observation in sensor

254

from dagster import sensor, RunRequest

255

256

@sensor(minimum_interval_seconds=300) # Every 5 minutes

257

def external_data_monitor():

258

"""Sensor that observes external data sources."""

259

260

observations = []

261

262

# Check multiple external sources

263

sources = [

264

{"key": ["external", "payments"], "table": "payments"},

265

{"key": ["external", "orders"], "table": "orders"},

266

{"key": ["external", "inventory"], "table": "inventory"}

267

]

268

269

for source in sources:

270

try:

271

stats = get_external_table_stats(source["table"])

272

273

observation = AssetObservation(

274

asset_key=AssetKey(source["key"]),

275

metadata={

276

"record_count": MetadataValue.int(stats["count"]),

277

"last_update": MetadataValue.timestamp(stats["last_modified"]),

278

"data_lag_minutes": MetadataValue.int(

279

(pd.Timestamp.now() - stats["last_modified"]).total_seconds() / 60

280

),

281

"status": MetadataValue.text("HEALTHY" if stats["is_healthy"] else "DEGRADED")

282

}

283

)

284

observations.append(observation)

285

286

except Exception as e:

287

# Log observation failure

288

observation = AssetObservation(

289

asset_key=AssetKey(source["key"]),

290

metadata={

291

"status": MetadataValue.text("ERROR"),

292

"error_message": MetadataValue.text(str(e)),

293

"check_timestamp": MetadataValue.timestamp(pd.Timestamp.now())

294

}

295

)

296

observations.append(observation)

297

298

return observations

299

```

300

301

**Parameters:**

302

- `asset_key: AssetKey` - Asset key being observed

303

- `description: Optional[str]` - Description of observation

304

- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Observation metadata

305

- `partition: Optional[str]` - Partition key if asset is partitioned

306

- `tags: Optional[Dict[str, str]]` - Observation tags

307

308

### Result Classes

309

310

#### `MaterializeResult` { .api }

311

312

**Module:** `dagster._core.definitions.result`

313

**Type:** Class

314

315

Result of asset materialization with metadata and asset information.

316

317

```python

318

from dagster import asset, MaterializeResult, MetadataValue

319

import pandas as pd

320

import numpy as np

321

322

@asset

323

def sales_report(context, sales_data: pd.DataFrame) -> MaterializeResult:

324

"""Asset returning MaterializeResult with comprehensive metadata."""

325

326

# Generate sales report

327

report_data = {

328

"total_sales": sales_data["amount"].sum(),

329

"transaction_count": len(sales_data),

330

"avg_transaction": sales_data["amount"].mean(),

331

"top_products": sales_data.groupby("product")["amount"].sum().nlargest(5)

332

}

333

334

# Create report DataFrame

335

report_df = pd.DataFrame([

336

{"metric": "total_sales", "value": report_data["total_sales"]},

337

{"metric": "transaction_count", "value": report_data["transaction_count"]},

338

{"metric": "avg_transaction", "value": report_data["avg_transaction"]}

339

])

340

341

return MaterializeResult(

342

value=report_df, # The materialized asset value

343

metadata={

344

# Business metrics

345

"total_sales": MetadataValue.float(report_data["total_sales"]),

346

"transaction_count": MetadataValue.int(report_data["transaction_count"]),

347

"average_transaction": MetadataValue.float(report_data["avg_transaction"]),

348

349

# Data quality metrics

350

"data_completeness": MetadataValue.float(0.98),

351

"outlier_count": MetadataValue.int(

352

len(sales_data[sales_data["amount"] > sales_data["amount"].quantile(0.95)])

353

),

354

355

# Technical metadata

356

"processing_time_seconds": MetadataValue.float(2.1),

357

"memory_usage_mb": MetadataValue.float(

358

report_df.memory_usage(deep=True).sum() / 1024 / 1024

359

),

360

361

# Visualization metadata

362

"top_products": MetadataValue.json(report_data["top_products"].to_dict()),

363

"sales_distribution": MetadataValue.md(

364

sales_data.describe().to_markdown()

365

),

366

367

# File metadata

368

"report_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),

369

"data_version": MetadataValue.text("v2.1.0")

370

},

371

372

# Optional: Asset check results

373

check_results=[

374

AssetCheckResult(

375

passed=report_data["total_sales"] > 0,

376

metadata={

377

"sales_validation": MetadataValue.text(

378

"PASSED: Sales data contains valid transactions"

379

)

380

}

381

)

382

]

383

)

384

385

# Multi-asset with MaterializeResult

386

@multi_asset(

387

outs={

388

"daily_summary": AssetOut(),

389

"weekly_summary": AssetOut()

390

}

391

)

392

def sales_summaries(sales_data: pd.DataFrame) -> Dict[str, MaterializeResult]:

393

"""Multi-asset returning MaterializeResults for each output."""

394

395

# Generate daily summary

396

daily = sales_data.groupby(sales_data["date"].dt.date).agg({

397

"amount": ["sum", "count", "mean"]

398

}).round(2)

399

400

# Generate weekly summary

401

weekly = sales_data.groupby(sales_data["date"].dt.to_period("W")).agg({

402

"amount": ["sum", "count", "mean"]

403

}).round(2)

404

405

return {

406

"daily_summary": MaterializeResult(

407

value=daily,

408

metadata={

409

"date_range": MetadataValue.text(f"{daily.index.min()} to {daily.index.max()}"),

410

"total_days": MetadataValue.int(len(daily)),

411

"avg_daily_sales": MetadataValue.float(daily[("amount", "sum")].mean())

412

}

413

),

414

"weekly_summary": MaterializeResult(

415

value=weekly,

416

metadata={

417

"week_count": MetadataValue.int(len(weekly)),

418

"avg_weekly_sales": MetadataValue.float(weekly[("amount", "sum")].mean()),

419

"peak_week": MetadataValue.text(str(weekly[("amount", "sum")].idxmax()))

420

}

421

)

422

}

423

```

424

425

**Parameters:**

426

- `value: Any` - The materialized asset value

427

- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Asset metadata

428

- `check_results: Optional[Sequence[AssetCheckResult]]` - Asset check results

429

430

#### `ObserveResult` { .api }

431

432

**Module:** `dagster._core.definitions.result`

433

**Type:** Class

434

435

Result of asset observation with metadata but no materialized value.

436

437

```python

438

from dagster import asset, ObserveResult, MetadataValue

439

440

@asset

441

def external_api_status(context) -> ObserveResult:

442

"""Asset that observes external API without materializing data."""

443

444

# Check API health

445

api_health = check_api_health("https://api.example.com")

446

447

return ObserveResult(

448

metadata={

449

"api_status": MetadataValue.text("HEALTHY" if api_health["is_up"] else "DOWN"),

450

"response_time_ms": MetadataValue.int(api_health["response_time_ms"]),

451

"success_rate": MetadataValue.float(api_health["success_rate"]),

452

"last_error": MetadataValue.text(api_health.get("last_error", "None")),

453

"check_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),

454

"endpoint_count": MetadataValue.int(len(api_health["endpoints"])),

455

"rate_limit_remaining": MetadataValue.int(api_health["rate_limit_remaining"])

456

}

457

)

458

```

459

460

## Metadata System

461

462

Dagster's metadata system provides rich, typed metadata values for comprehensive observability and documentation.

463

464

### Core Metadata Values

465

466

#### `MetadataValue` { .api }

467

468

**Module:** `dagster._core.definitions.metadata`

469

**Type:** Base class

470

471

Base class for all metadata values with type-safe metadata creation.

472

473

```python

474

from dagster import MetadataValue, asset

475

import pandas as pd

476

import json

477

from pathlib import Path

478

479

@asset

480

def comprehensive_metadata_asset() -> pd.DataFrame:

481

"""Asset demonstrating all metadata value types."""

482

483

df = pd.DataFrame({

484

"id": range(100),

485

"value": np.random.randn(100),

486

"category": np.random.choice(["A", "B", "C"], 100)

487

})

488

489

# Save to file for path metadata

490

output_path = Path("/tmp/output.parquet")

491

df.to_parquet(output_path)

492

493

# Comprehensive metadata using all MetadataValue types

494

metadata = {

495

# Text metadata

496

"description": MetadataValue.text("Generated dataset with random values"),

497

"processing_status": MetadataValue.text("COMPLETED"),

498

499

# Numeric metadata

500

"record_count": MetadataValue.int(len(df)),

501

"column_count": MetadataValue.int(len(df.columns)),

502

"file_size_mb": MetadataValue.float(output_path.stat().st_size / 1024 / 1024),

503

"processing_time": MetadataValue.float(1.23),

504

505

# Boolean metadata

506

"has_nulls": MetadataValue.bool(df.isnull().any().any()),

507

"data_quality_passed": MetadataValue.bool(True),

508

509

# Path metadata

510

"output_file": MetadataValue.path(output_path),

511

"config_file": MetadataValue.path("/config/settings.yaml"),

512

513

# URL metadata

514

"data_source": MetadataValue.url("https://api.example.com/data"),

515

"documentation": MetadataValue.url("https://docs.example.com/dataset"),

516

517

# JSON metadata

518

"column_stats": MetadataValue.json({

519

col: {"mean": df[col].mean(), "std": df[col].std()}

520

for col in df.select_dtypes(include=[np.number]).columns

521

}),

522

"configuration": MetadataValue.json({

523

"version": "2.1.0",

524

"parameters": {"sample_size": 100, "random_seed": 42}

525

}),

526

527

# Markdown metadata

528

"data_preview": MetadataValue.md(df.head().to_markdown()),

529

"summary_report": MetadataValue.md(f"""

530

# Data Summary

531

532

- **Records**: {len(df)}

533

- **Columns**: {len(df.columns)}

534

- **Categories**: {df['category'].nunique()}

535

536

## Statistics

537

{df.describe().to_markdown()}

538

"""),

539

540

# Timestamp metadata

541

"created_at": MetadataValue.timestamp(pd.Timestamp.now()),

542

"data_as_of": MetadataValue.timestamp(pd.Timestamp("2023-01-01")),

543

544

# Python artifact metadata

545

"generator_function": MetadataValue.python_artifact(comprehensive_metadata_asset),

546

"processing_class": MetadataValue.python_artifact(pd.DataFrame),

547

548

# Null metadata (explicitly null values)

549

"optional_field": MetadataValue.null(),

550

551

# Dagster-specific metadata

552

"upstream_asset": MetadataValue.dagster_asset(AssetKey(["raw", "data"])),

553

"related_job": MetadataValue.dagster_job("etl_job"),

554

"pipeline_run": MetadataValue.dagster_run("run-12345")

555

}

556

557

return df, metadata # Return both data and metadata

558

```

559

560

### Specialized Metadata Values

561

562

#### Table Metadata

563

564

##### `TableMetadataValue` { .api }

565

566

**Module:** `dagster._core.definitions.metadata`

567

**Type:** MetadataValue subclass

568

569

Metadata for tabular data with schema and statistics.

570

571

```python

572

from dagster import TableMetadataValue, TableSchema, TableColumn, TableRecord

573

import pandas as pd

574

575

@asset

576

def customers_table() -> pd.DataFrame:

577

"""Asset with comprehensive table metadata."""

578

579

# Create customer data

580

customers_df = pd.DataFrame({

581

"customer_id": range(1, 1001),

582

"name": [f"Customer {i}" for i in range(1, 1001)],

583

"email": [f"customer{i}@example.com" for i in range(1, 1001)],

584

"age": np.random.randint(18, 80, 1000),

585

"city": np.random.choice(["NYC", "LA", "Chicago", "Houston"], 1000),

586

"signup_date": pd.date_range("2020-01-01", periods=1000, freq="D"),

587

"total_orders": np.random.randint(0, 50, 1000),

588

"lifetime_value": np.random.uniform(100, 5000, 1000).round(2)

589

})

590

591

# Table schema metadata

592

schema = TableSchema(

593

columns=[

594

TableColumn(name="customer_id", type="INTEGER", description="Unique customer identifier"),

595

TableColumn(name="name", type="VARCHAR(255)", description="Customer full name"),

596

TableColumn(name="email", type="VARCHAR(255)", description="Customer email address"),

597

TableColumn(name="age", type="INTEGER", description="Customer age in years"),

598

TableColumn(name="city", type="VARCHAR(100)", description="Customer city"),

599

TableColumn(name="signup_date", type="DATE", description="Customer signup date"),

600

TableColumn(name="total_orders", type="INTEGER", description="Total number of orders"),

601

TableColumn(name="lifetime_value", type="DECIMAL(10,2)", description="Customer lifetime value in USD")

602

]

603

)

604

605

# Table statistics as records

606

stats_records = [

607

TableRecord({

608

"column": "customer_id",

609

"count": len(customers_df),

610

"unique_count": customers_df["customer_id"].nunique(),

611

"null_count": customers_df["customer_id"].isnull().sum()

612

}),

613

TableRecord({

614

"column": "age",

615

"mean": customers_df["age"].mean(),

616

"min": customers_df["age"].min(),

617

"max": customers_df["age"].max(),

618

"std": customers_df["age"].std()

619

}),

620

TableRecord({

621

"column": "lifetime_value",

622

"mean": customers_df["lifetime_value"].mean(),

623

"median": customers_df["lifetime_value"].median(),

624

"total": customers_df["lifetime_value"].sum()

625

})

626

]

627

628

metadata = {

629

# Table schema

630

"schema": MetadataValue.table_schema(schema),

631

632

# Table statistics

633

"table_stats": TableMetadataValue(

634

records=stats_records,

635

schema=TableSchema(

636

columns=[

637

TableColumn("column", "VARCHAR"),

638

TableColumn("count", "INTEGER"),

639

TableColumn("mean", "FLOAT"),

640

TableColumn("min", "FLOAT"),

641

TableColumn("max", "FLOAT")

642

]

643

)

644

),

645

646

# Additional table metadata

647

"row_count": MetadataValue.int(len(customers_df)),

648

"column_count": MetadataValue.int(len(customers_df.columns)),

649

"estimated_size_mb": MetadataValue.float(

650

customers_df.memory_usage(deep=True).sum() / 1024 / 1024

651

)

652

}

653

654

return customers_df, metadata

655

```

656

657

##### `TableSchemaMetadataValue` { .api }

658

659

**Module:** `dagster._core.definitions.metadata`

660

**Type:** MetadataValue subclass

661

662

Metadata specifically for table schemas.

663

664

```python

665

from dagster import TableSchemaMetadataValue, TableSchema, TableColumn, TableConstraints

666

from dagster import TableColumnConstraints

667

668

@asset

669

def validated_orders_table() -> pd.DataFrame:

670

"""Asset with detailed schema and constraints metadata."""

671

672

orders_df = pd.DataFrame({

673

"order_id": range(1, 501),

674

"customer_id": np.random.randint(1, 101, 500),

675

"product_id": np.random.randint(1, 21, 500),

676

"quantity": np.random.randint(1, 10, 500),

677

"price": np.random.uniform(10, 500, 500).round(2),

678

"order_date": pd.date_range("2023-01-01", periods=500, freq="H"),

679

"status": np.random.choice(["pending", "shipped", "delivered"], 500)

680

})

681

682

# Define schema with constraints

683

schema = TableSchema(

684

columns=[

685

TableColumn(

686

name="order_id",

687

type="INTEGER",

688

description="Primary key for orders",

689

constraints=TableColumnConstraints(

690

nullable=False,

691

unique=True

692

)

693

),

694

TableColumn(

695

name="customer_id",

696

type="INTEGER",

697

description="Foreign key to customers table",

698

constraints=TableColumnConstraints(nullable=False)

699

),

700

TableColumn(

701

name="product_id",

702

type="INTEGER",

703

description="Foreign key to products table",

704

constraints=TableColumnConstraints(nullable=False)

705

),

706

TableColumn(

707

name="quantity",

708

type="INTEGER",

709

description="Order quantity",

710

constraints=TableColumnConstraints(nullable=False)

711

),

712

TableColumn(

713

name="price",

714

type="DECIMAL(10,2)",

715

description="Order price in USD",

716

constraints=TableColumnConstraints(nullable=False)

717

),

718

TableColumn(

719

name="order_date",

720

type="TIMESTAMP",

721

description="Order creation timestamp",

722

constraints=TableColumnConstraints(nullable=False)

723

),

724

TableColumn(

725

name="status",

726

type="VARCHAR(20)",

727

description="Order status",

728

constraints=TableColumnConstraints(nullable=False)

729

)

730

],

731

constraints=TableConstraints(

732

other=[

733

"FOREIGN KEY (customer_id) REFERENCES customers(customer_id)",

734

"FOREIGN KEY (product_id) REFERENCES products(product_id)",

735

"CHECK (quantity > 0)",

736

"CHECK (price > 0)"

737

]

738

)

739

)

740

741

metadata = {

742

"table_schema": MetadataValue.table_schema(schema),

743

"schema_version": MetadataValue.text("v1.2.0"),

744

"last_schema_update": MetadataValue.timestamp(pd.Timestamp.now())

745

}

746

747

return orders_df, metadata

748

```

749

750

##### `TableColumnLineageMetadataValue` { .api }

751

752

**Module:** `dagster._core.definitions.metadata`

753

**Type:** MetadataValue subclass

754

755

Metadata for column-level lineage tracking.

756

757

```python

758

from dagster import TableColumnLineageMetadataValue, TableColumnLineage, TableColumnDep, AssetKey

759

760

@asset

761

def enriched_customer_data(customers: pd.DataFrame, demographics: pd.DataFrame) -> pd.DataFrame:

762

"""Asset with column lineage metadata."""

763

764

# Merge customer data with demographics

765

enriched = customers.merge(demographics, on="customer_id", how="left")

766

767

# Add computed columns

768

enriched["age_group"] = pd.cut(enriched["age"], bins=[0, 25, 35, 50, 100],

769

labels=["18-25", "26-35", "36-50", "50+"])

770

enriched["is_high_value"] = enriched["lifetime_value"] > enriched["lifetime_value"].median()

771

772

# Define column lineage

773

lineage = TableColumnLineage(

774

deps_by_column={

775

# Direct mappings from upstream assets

776

"customer_id": [

777

TableColumnDep(asset_key=AssetKey("customers"), column_name="customer_id")

778

],

779

"name": [

780

TableColumnDep(asset_key=AssetKey("customers"), column_name="name")

781

],

782

"email": [

783

TableColumnDep(asset_key=AssetKey("customers"), column_name="email")

784

],

785

"age": [

786

TableColumnDep(asset_key=AssetKey("customers"), column_name="age")

787

],

788

789

# Columns from demographics table

790

"income": [

791

TableColumnDep(asset_key=AssetKey("demographics"), column_name="income")

792

],

793

"education": [

794

TableColumnDep(asset_key=AssetKey("demographics"), column_name="education")

795

],

796

797

# Computed columns with dependencies

798

"age_group": [

799

TableColumnDep(asset_key=AssetKey("customers"), column_name="age")

800

],

801

"is_high_value": [

802

TableColumnDep(asset_key=AssetKey("customers"), column_name="lifetime_value")

803

]

804

}

805

)

806

807

metadata = {

808

"column_lineage": MetadataValue.column_lineage(lineage),

809

"enrichment_source": MetadataValue.text("demographics table"),

810

"computed_columns": MetadataValue.json(["age_group", "is_high_value"]),

811

"join_key": MetadataValue.text("customer_id")

812

}

813

814

return enriched, metadata

815

```

816

817

### Code References

818

819

#### `CodeReferencesMetadataValue` { .api }

820

821

**Module:** `dagster._core.definitions.metadata`

822

**Type:** MetadataValue subclass

823

824

Metadata for linking assets to source code locations.

825

826

```python

827

from dagster import CodeReferencesMetadataValue, LocalFileCodeReference, UrlCodeReference

828

from dagster import with_source_code_references

829

830

@with_source_code_references

831

@asset

832

def analytics_report() -> pd.DataFrame:

833

"""Asset with automatic source code references."""

834

835

# This decorator automatically adds code references

836

return generate_analytics_data()

837

838

@asset

839

def manual_code_references() -> pd.DataFrame:

840

"""Asset with manual code reference metadata."""

841

842

df = process_data()

843

844

# Manual code references

845

code_refs = [

846

LocalFileCodeReference(

847

file_path="/src/analytics/data_processing.py",

848

line_number=142,

849

label="Main processing function"

850

),

851

LocalFileCodeReference(

852

file_path="/src/analytics/transformations.py",

853

line_number=67,

854

label="Data transformation logic"

855

),

856

UrlCodeReference(

857

url="https://github.com/company/analytics/blob/main/src/analytics/data_processing.py#L142",

858

label="GitHub source"

859

)

860

]

861

862

metadata = {

863

"code_references": MetadataValue.code_references(code_refs),

864

"implementation_notes": MetadataValue.md("""

865

## Implementation Details

866

867

This asset implements the following logic:

868

1. Load raw data from warehouse

869

2. Apply business rules transformation

870

3. Generate aggregated metrics

871

872

See code references for detailed implementation.

873

""")

874

}

875

876

return df, metadata

877

878

# Link code references to Git

879

from dagster import link_code_references_to_git, AnchorBasedFilePathMapping

880

881

@link_code_references_to_git(

882

git_url="https://github.com/company/analytics",

883

git_branch="main",

884

file_path_mapping=AnchorBasedFilePathMapping(

885

local_file_anchor="/src",

886

file_anchor_path_in_repository="src"

887

)

888

)

889

def git_linked_asset() -> pd.DataFrame:

890

"""Asset with Git-linked code references."""

891

return process_data()

892

```

893

894

### Notebook Metadata

895

896

#### `NotebookMetadataValue` { .api }

897

898

**Module:** `dagster._core.definitions.metadata`

899

**Type:** MetadataValue subclass

900

901

Metadata for Jupyter notebooks and analysis artifacts.

902

903

```python

904

from dagster import NotebookMetadataValue, asset

905

import papermill as pm

906

907

@asset

908

def notebook_analysis() -> dict:

909

"""Asset that executes and tracks notebook analysis."""

910

911

# Execute notebook with papermill

912

output_notebook = "/tmp/analysis_output.ipynb"

913

pm.execute_notebook(

914

input_path="analysis_template.ipynb",

915

output_path=output_notebook,

916

parameters={"data_date": "2023-01-01", "sample_size": 1000}

917

)

918

919

# Extract results from executed notebook

920

nb = pm.read_notebook(output_notebook)

921

results = nb.dataframe.set_index("name")["value"].to_dict()

922

923

metadata = {

924

"analysis_notebook": MetadataValue.notebook(output_notebook),

925

"template_notebook": MetadataValue.path("analysis_template.ipynb"),

926

"execution_time": MetadataValue.float(nb.metadata.get("execution_time", 0)),

927

"kernel_name": MetadataValue.text(nb.metadata.get("kernelspec", {}).get("name")),

928

"cell_count": MetadataValue.int(len(nb.cells)),

929

"parameters": MetadataValue.json({

930

"data_date": "2023-01-01",

931

"sample_size": 1000

932

})

933

}

934

935

return results, metadata

936

```

937

938

### Data Versioning and Provenance

939

940

#### `DataVersion` { .api }

941

942

**Module:** `dagster._core.definitions.data_version`

943

**Type:** Class

944

945

Data versioning for change tracking and lineage.

946

947

```python

948

from dagster import DataVersion, DataProvenance, asset

949

950

@asset

951

def versioned_dataset() -> pd.DataFrame:

952

"""Asset with data versioning."""

953

954

# Load source data

955

source_df = pd.read_csv("source_data.csv")

956

957

# Process data

958

processed_df = source_df.dropna().reset_index(drop=True)

959

960

# Create data version based on content

961

content_hash = hashlib.sha256(processed_df.to_json().encode()).hexdigest()[:12]

962

data_version = DataVersion(content_hash)

963

964

# Add provenance information

965

provenance = DataProvenance(

966

code_version="v2.1.0",

967

input_data_versions={"source_data": DataVersion("abc123")},

968

is_user_provided=False

969

)

970

971

metadata = {

972

"data_version": MetadataValue.text(str(data_version)),

973

"content_hash": MetadataValue.text(content_hash),

974

"provenance": MetadataValue.json({

975

"code_version": "v2.1.0",

976

"input_versions": {"source_data": "abc123"}

977

})

978

}

979

980

return processed_df, metadata

981

```

982

983

This comprehensive event and metadata system provides rich observability, lineage tracking, and documentation capabilities for all Dagster pipelines. The typed metadata system ensures consistency and enables powerful filtering, search, and visualization in the Dagster UI.

984

985

For asset materialization and execution contexts that generate events, see [Execution and Contexts](./execution-contexts.md). For sensors that can respond to asset events, see [Sensors and Schedules](./sensors-schedules.md).