0
# Events and Metadata
1
2
This document covers Dagster's comprehensive event and metadata system, including materialization events, observation events, metadata values, table metadata, and code references. The event system provides rich observability and lineage tracking for all pipeline operations.
3
4
## Core Events
5
6
Dagster's event system captures and records all significant occurrences during pipeline execution, providing comprehensive observability and debugging capabilities.
7
8
### Output Events
9
10
#### `Output` { .api }
11
12
**Module:** `dagster._core.definitions.events`
13
**Type:** Class
14
15
Represents the output of an operation with value and metadata.
16
17
```python
18
from dagster import op, Output, MetadataValue
19
import pandas as pd
20
21
@op
22
def process_data() -> Output[pd.DataFrame]:
23
"""Op producing an output with metadata."""
24
25
# Process data
26
df = pd.DataFrame({
27
"id": range(1000),
28
"value": np.random.randn(1000),
29
"category": np.random.choice(["A", "B", "C"], 1000)
30
})
31
32
# Return output with rich metadata
33
return Output(
34
value=df,
35
metadata={
36
"records": len(df),
37
"columns": MetadataValue.int(len(df.columns)),
38
"memory_usage_mb": MetadataValue.float(df.memory_usage(deep=True).sum() / 1024 / 1024),
39
"categories": MetadataValue.text(",".join(df["category"].unique())),
40
"data_quality_score": MetadataValue.float(0.95),
41
"processing_time": MetadataValue.text("2.3 seconds"),
42
"sample_data": MetadataValue.md(df.head().to_markdown())
43
}
44
)
45
46
@op
47
def validate_data(df: pd.DataFrame) -> Output[bool]:
48
"""Op with conditional output metadata."""
49
50
# Validation logic
51
null_count = df.isnull().sum().sum()
52
duplicate_count = df.duplicated().sum()
53
is_valid = null_count == 0 and duplicate_count == 0
54
55
metadata = {
56
"null_count": MetadataValue.int(null_count),
57
"duplicate_count": MetadataValue.int(duplicate_count),
58
"validation_status": MetadataValue.text("PASSED" if is_valid else "FAILED")
59
}
60
61
if not is_valid:
62
metadata["issues"] = MetadataValue.text(f"Found {null_count} nulls, {duplicate_count} duplicates")
63
64
return Output(
65
value=is_valid,
66
metadata=metadata
67
)
68
```
69
70
**Parameters:**
71
- `value: Any` - The output value
72
- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Output metadata
73
- `output_name: str = "result"` - Name of the output
74
75
#### `DynamicOutput` { .api }
76
77
**Module:** `dagster._core.definitions.events`
78
**Type:** Class
79
80
Output for dynamic/fan-out operations that produce multiple outputs.
81
82
```python
83
from dagster import op, DynamicOutput, DynamicOut
84
import pandas as pd
85
86
@op(out=DynamicOut())
87
def split_data_by_category(df: pd.DataFrame):
88
"""Op producing dynamic outputs for each category."""
89
90
categories = df["category"].unique()
91
92
for category in categories:
93
category_data = df[df["category"] == category]
94
95
yield DynamicOutput(
96
value=category_data,
97
mapping_key=category, # Dynamic key
98
metadata={
99
"category": MetadataValue.text(category),
100
"record_count": MetadataValue.int(len(category_data)),
101
"avg_value": MetadataValue.float(category_data["value"].mean())
102
}
103
)
104
105
@op
106
def process_category(category_data: pd.DataFrame) -> dict:
107
"""Process individual category data."""
108
return {
109
"category": category_data["category"].iloc[0],
110
"summary": {
111
"count": len(category_data),
112
"mean": category_data["value"].mean(),
113
"std": category_data["value"].std()
114
}
115
}
116
117
@job
118
def dynamic_processing_job():
119
"""Job with dynamic fan-out."""
120
categories = split_data_by_category(load_data())
121
categories.map(process_category)
122
```
123
124
**Parameters:**
125
- `value: Any` - The output value
126
- `mapping_key: str` - Dynamic mapping key
127
- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Output metadata
128
- `output_name: str = "result"` - Name of the output
129
130
### Asset Events
131
132
#### `AssetMaterialization` { .api }
133
134
**Module:** `dagster._core.definitions.events`
135
**Type:** Class
136
137
Event representing the materialization of an asset with metadata and lineage.
138
139
```python
140
from dagster import op, AssetMaterialization, AssetKey, MetadataValue
141
import pandas as pd
142
143
@op
144
def generate_asset_materialization() -> AssetMaterialization:
145
"""Op that explicitly creates asset materialization."""
146
147
# Process data
148
df = pd.read_csv("input_data.csv")
149
processed_df = df.dropna().reset_index(drop=True)
150
151
# Save to storage
152
processed_df.to_parquet("processed_data.parquet")
153
154
# Create materialization event
155
return AssetMaterialization(
156
asset_key=AssetKey(["warehouse", "processed_data"]),
157
description="Processed customer data with null values removed",
158
metadata={
159
"input_records": MetadataValue.int(len(df)),
160
"output_records": MetadataValue.int(len(processed_df)),
161
"null_records_removed": MetadataValue.int(len(df) - len(processed_df)),
162
"output_file": MetadataValue.path("processed_data.parquet"),
163
"file_size_mb": MetadataValue.float(
164
os.path.getsize("processed_data.parquet") / 1024 / 1024
165
),
166
"processing_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
167
"data_schema": MetadataValue.table_schema(
168
TableSchema(
169
columns=[
170
TableColumn(name=col, type=str(dtype))
171
for col, dtype in processed_df.dtypes.items()
172
]
173
)
174
)
175
}
176
)
177
178
# Asset materialization in asset function
179
@asset
180
def customer_analytics(context) -> dict:
181
"""Asset with rich materialization metadata."""
182
183
# Load and process data
184
customers = pd.read_sql("SELECT * FROM customers", connection)
185
186
# Generate analytics
187
analytics = {
188
"total_customers": len(customers),
189
"active_customers": len(customers[customers["active"] == True]),
190
"avg_age": customers["age"].mean(),
191
"top_cities": customers["city"].value_counts().head(5).to_dict()
192
}
193
194
# Log materialization with comprehensive metadata
195
context.add_output_metadata({
196
"total_customers": MetadataValue.int(analytics["total_customers"]),
197
"active_rate": MetadataValue.float(
198
analytics["active_customers"] / analytics["total_customers"]
199
),
200
"average_age": MetadataValue.float(analytics["avg_age"]),
201
"top_cities": MetadataValue.json(analytics["top_cities"]),
202
"data_freshness": MetadataValue.timestamp(pd.Timestamp.now()),
203
"query_execution_time": MetadataValue.text("1.2 seconds"),
204
"customer_distribution": MetadataValue.md(
205
customers.groupby("city").size().head(10).to_markdown()
206
)
207
})
208
209
return analytics
210
```
211
212
**Parameters:**
213
- `asset_key: AssetKey` - Asset key being materialized
214
- `description: Optional[str]` - Description of materialization
215
- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Materialization metadata
216
- `partition: Optional[str]` - Partition key if asset is partitioned
217
- `tags: Optional[Dict[str, str]]` - Materialization tags
218
219
#### `AssetObservation` { .api }
220
221
**Module:** `dagster._core.definitions.events`
222
**Type:** Class
223
224
Event representing observation of an asset without materialization (read-only).
225
226
```python
227
from dagster import op, AssetObservation, AssetKey, MetadataValue
228
import pandas as pd
229
230
@op
231
def monitor_external_table() -> AssetObservation:
232
"""Monitor external data source without materializing."""
233
234
# Connect to external system (no materialization)
235
table_stats = get_table_stats("external_database", "user_events")
236
237
return AssetObservation(
238
asset_key=AssetKey(["external", "user_events"]),
239
description="Daily monitoring of external user events table",
240
metadata={
241
"row_count": MetadataValue.int(table_stats["row_count"]),
242
"size_gb": MetadataValue.float(table_stats["size_bytes"] / (1024**3)),
243
"last_updated": MetadataValue.timestamp(table_stats["last_modified"]),
244
"health_score": MetadataValue.float(table_stats["health_score"]),
245
"availability": MetadataValue.text(
246
"AVAILABLE" if table_stats["is_accessible"] else "UNAVAILABLE"
247
),
248
"partition_count": MetadataValue.int(table_stats["partition_count"]),
249
"schema_version": MetadataValue.text(table_stats["schema_version"])
250
}
251
)
252
253
# Asset observation in sensor
254
from dagster import sensor, RunRequest
255
256
@sensor(minimum_interval_seconds=300) # Every 5 minutes
257
def external_data_monitor():
258
"""Sensor that observes external data sources."""
259
260
observations = []
261
262
# Check multiple external sources
263
sources = [
264
{"key": ["external", "payments"], "table": "payments"},
265
{"key": ["external", "orders"], "table": "orders"},
266
{"key": ["external", "inventory"], "table": "inventory"}
267
]
268
269
for source in sources:
270
try:
271
stats = get_external_table_stats(source["table"])
272
273
observation = AssetObservation(
274
asset_key=AssetKey(source["key"]),
275
metadata={
276
"record_count": MetadataValue.int(stats["count"]),
277
"last_update": MetadataValue.timestamp(stats["last_modified"]),
278
"data_lag_minutes": MetadataValue.int(
279
(pd.Timestamp.now() - stats["last_modified"]).total_seconds() / 60
280
),
281
"status": MetadataValue.text("HEALTHY" if stats["is_healthy"] else "DEGRADED")
282
}
283
)
284
observations.append(observation)
285
286
except Exception as e:
287
# Log observation failure
288
observation = AssetObservation(
289
asset_key=AssetKey(source["key"]),
290
metadata={
291
"status": MetadataValue.text("ERROR"),
292
"error_message": MetadataValue.text(str(e)),
293
"check_timestamp": MetadataValue.timestamp(pd.Timestamp.now())
294
}
295
)
296
observations.append(observation)
297
298
return observations
299
```
300
301
**Parameters:**
302
- `asset_key: AssetKey` - Asset key being observed
303
- `description: Optional[str]` - Description of observation
304
- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Observation metadata
305
- `partition: Optional[str]` - Partition key if asset is partitioned
306
- `tags: Optional[Dict[str, str]]` - Observation tags
307
308
### Result Classes
309
310
#### `MaterializeResult` { .api }
311
312
**Module:** `dagster._core.definitions.result`
313
**Type:** Class
314
315
Result of asset materialization with metadata and asset information.
316
317
```python
318
from dagster import asset, MaterializeResult, MetadataValue
319
import pandas as pd
320
import numpy as np
321
322
@asset
323
def sales_report(context, sales_data: pd.DataFrame) -> MaterializeResult:
324
"""Asset returning MaterializeResult with comprehensive metadata."""
325
326
# Generate sales report
327
report_data = {
328
"total_sales": sales_data["amount"].sum(),
329
"transaction_count": len(sales_data),
330
"avg_transaction": sales_data["amount"].mean(),
331
"top_products": sales_data.groupby("product")["amount"].sum().nlargest(5)
332
}
333
334
# Create report DataFrame
335
report_df = pd.DataFrame([
336
{"metric": "total_sales", "value": report_data["total_sales"]},
337
{"metric": "transaction_count", "value": report_data["transaction_count"]},
338
{"metric": "avg_transaction", "value": report_data["avg_transaction"]}
339
])
340
341
return MaterializeResult(
342
value=report_df, # The materialized asset value
343
metadata={
344
# Business metrics
345
"total_sales": MetadataValue.float(report_data["total_sales"]),
346
"transaction_count": MetadataValue.int(report_data["transaction_count"]),
347
"average_transaction": MetadataValue.float(report_data["avg_transaction"]),
348
349
# Data quality metrics
350
"data_completeness": MetadataValue.float(0.98),
351
"outlier_count": MetadataValue.int(
352
len(sales_data[sales_data["amount"] > sales_data["amount"].quantile(0.95)])
353
),
354
355
# Technical metadata
356
"processing_time_seconds": MetadataValue.float(2.1),
357
"memory_usage_mb": MetadataValue.float(
358
report_df.memory_usage(deep=True).sum() / 1024 / 1024
359
),
360
361
# Visualization metadata
362
"top_products": MetadataValue.json(report_data["top_products"].to_dict()),
363
"sales_distribution": MetadataValue.md(
364
sales_data.describe().to_markdown()
365
),
366
367
# File metadata
368
"report_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
369
"data_version": MetadataValue.text("v2.1.0")
370
},
371
372
# Optional: Asset check results
373
check_results=[
374
AssetCheckResult(
375
passed=report_data["total_sales"] > 0,
376
metadata={
377
"sales_validation": MetadataValue.text(
378
"PASSED: Sales data contains valid transactions"
379
)
380
}
381
)
382
]
383
)
384
385
# Multi-asset with MaterializeResult
386
@multi_asset(
387
outs={
388
"daily_summary": AssetOut(),
389
"weekly_summary": AssetOut()
390
}
391
)
392
def sales_summaries(sales_data: pd.DataFrame) -> Dict[str, MaterializeResult]:
393
"""Multi-asset returning MaterializeResults for each output."""
394
395
# Generate daily summary
396
daily = sales_data.groupby(sales_data["date"].dt.date).agg({
397
"amount": ["sum", "count", "mean"]
398
}).round(2)
399
400
# Generate weekly summary
401
weekly = sales_data.groupby(sales_data["date"].dt.to_period("W")).agg({
402
"amount": ["sum", "count", "mean"]
403
}).round(2)
404
405
return {
406
"daily_summary": MaterializeResult(
407
value=daily,
408
metadata={
409
"date_range": MetadataValue.text(f"{daily.index.min()} to {daily.index.max()}"),
410
"total_days": MetadataValue.int(len(daily)),
411
"avg_daily_sales": MetadataValue.float(daily[("amount", "sum")].mean())
412
}
413
),
414
"weekly_summary": MaterializeResult(
415
value=weekly,
416
metadata={
417
"week_count": MetadataValue.int(len(weekly)),
418
"avg_weekly_sales": MetadataValue.float(weekly[("amount", "sum")].mean()),
419
"peak_week": MetadataValue.text(str(weekly[("amount", "sum")].idxmax()))
420
}
421
)
422
}
423
```
424
425
**Parameters:**
426
- `value: Any` - The materialized asset value
427
- `metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]]` - Asset metadata
428
- `check_results: Optional[Sequence[AssetCheckResult]]` - Asset check results
429
430
#### `ObserveResult` { .api }
431
432
**Module:** `dagster._core.definitions.result`
433
**Type:** Class
434
435
Result of asset observation with metadata but no materialized value.
436
437
```python
438
from dagster import asset, ObserveResult, MetadataValue
439
440
@asset
441
def external_api_status(context) -> ObserveResult:
442
"""Asset that observes external API without materializing data."""
443
444
# Check API health
445
api_health = check_api_health("https://api.example.com")
446
447
return ObserveResult(
448
metadata={
449
"api_status": MetadataValue.text("HEALTHY" if api_health["is_up"] else "DOWN"),
450
"response_time_ms": MetadataValue.int(api_health["response_time_ms"]),
451
"success_rate": MetadataValue.float(api_health["success_rate"]),
452
"last_error": MetadataValue.text(api_health.get("last_error", "None")),
453
"check_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
454
"endpoint_count": MetadataValue.int(len(api_health["endpoints"])),
455
"rate_limit_remaining": MetadataValue.int(api_health["rate_limit_remaining"])
456
}
457
)
458
```
459
460
## Metadata System
461
462
Dagster's metadata system provides rich, typed metadata values for comprehensive observability and documentation.
463
464
### Core Metadata Values
465
466
#### `MetadataValue` { .api }
467
468
**Module:** `dagster._core.definitions.metadata`
469
**Type:** Base class
470
471
Base class for all metadata values with type-safe metadata creation.
472
473
```python
474
from dagster import MetadataValue, asset
475
import pandas as pd
476
import json
477
from pathlib import Path
478
479
@asset
480
def comprehensive_metadata_asset() -> pd.DataFrame:
481
"""Asset demonstrating all metadata value types."""
482
483
df = pd.DataFrame({
484
"id": range(100),
485
"value": np.random.randn(100),
486
"category": np.random.choice(["A", "B", "C"], 100)
487
})
488
489
# Save to file for path metadata
490
output_path = Path("/tmp/output.parquet")
491
df.to_parquet(output_path)
492
493
# Comprehensive metadata using all MetadataValue types
494
metadata = {
495
# Text metadata
496
"description": MetadataValue.text("Generated dataset with random values"),
497
"processing_status": MetadataValue.text("COMPLETED"),
498
499
# Numeric metadata
500
"record_count": MetadataValue.int(len(df)),
501
"column_count": MetadataValue.int(len(df.columns)),
502
"file_size_mb": MetadataValue.float(output_path.stat().st_size / 1024 / 1024),
503
"processing_time": MetadataValue.float(1.23),
504
505
# Boolean metadata
506
"has_nulls": MetadataValue.bool(df.isnull().any().any()),
507
"data_quality_passed": MetadataValue.bool(True),
508
509
# Path metadata
510
"output_file": MetadataValue.path(output_path),
511
"config_file": MetadataValue.path("/config/settings.yaml"),
512
513
# URL metadata
514
"data_source": MetadataValue.url("https://api.example.com/data"),
515
"documentation": MetadataValue.url("https://docs.example.com/dataset"),
516
517
# JSON metadata
518
"column_stats": MetadataValue.json({
519
col: {"mean": df[col].mean(), "std": df[col].std()}
520
for col in df.select_dtypes(include=[np.number]).columns
521
}),
522
"configuration": MetadataValue.json({
523
"version": "2.1.0",
524
"parameters": {"sample_size": 100, "random_seed": 42}
525
}),
526
527
# Markdown metadata
528
"data_preview": MetadataValue.md(df.head().to_markdown()),
529
"summary_report": MetadataValue.md(f"""
530
# Data Summary
531
532
- **Records**: {len(df)}
533
- **Columns**: {len(df.columns)}
534
- **Categories**: {df['category'].nunique()}
535
536
## Statistics
537
{df.describe().to_markdown()}
538
"""),
539
540
# Timestamp metadata
541
"created_at": MetadataValue.timestamp(pd.Timestamp.now()),
542
"data_as_of": MetadataValue.timestamp(pd.Timestamp("2023-01-01")),
543
544
# Python artifact metadata
545
"generator_function": MetadataValue.python_artifact(comprehensive_metadata_asset),
546
"processing_class": MetadataValue.python_artifact(pd.DataFrame),
547
548
# Null metadata (explicitly null values)
549
"optional_field": MetadataValue.null(),
550
551
# Dagster-specific metadata
552
"upstream_asset": MetadataValue.dagster_asset(AssetKey(["raw", "data"])),
553
"related_job": MetadataValue.dagster_job("etl_job"),
554
"pipeline_run": MetadataValue.dagster_run("run-12345")
555
}
556
557
return df, metadata # Return both data and metadata
558
```
559
560
### Specialized Metadata Values
561
562
#### Table Metadata
563
564
##### `TableMetadataValue` { .api }
565
566
**Module:** `dagster._core.definitions.metadata`
567
**Type:** MetadataValue subclass
568
569
Metadata for tabular data with schema and statistics.
570
571
```python
572
from dagster import TableMetadataValue, TableSchema, TableColumn, TableRecord
573
import pandas as pd
574
575
@asset
576
def customers_table() -> pd.DataFrame:
577
"""Asset with comprehensive table metadata."""
578
579
# Create customer data
580
customers_df = pd.DataFrame({
581
"customer_id": range(1, 1001),
582
"name": [f"Customer {i}" for i in range(1, 1001)],
583
"email": [f"customer{i}@example.com" for i in range(1, 1001)],
584
"age": np.random.randint(18, 80, 1000),
585
"city": np.random.choice(["NYC", "LA", "Chicago", "Houston"], 1000),
586
"signup_date": pd.date_range("2020-01-01", periods=1000, freq="D"),
587
"total_orders": np.random.randint(0, 50, 1000),
588
"lifetime_value": np.random.uniform(100, 5000, 1000).round(2)
589
})
590
591
# Table schema metadata
592
schema = TableSchema(
593
columns=[
594
TableColumn(name="customer_id", type="INTEGER", description="Unique customer identifier"),
595
TableColumn(name="name", type="VARCHAR(255)", description="Customer full name"),
596
TableColumn(name="email", type="VARCHAR(255)", description="Customer email address"),
597
TableColumn(name="age", type="INTEGER", description="Customer age in years"),
598
TableColumn(name="city", type="VARCHAR(100)", description="Customer city"),
599
TableColumn(name="signup_date", type="DATE", description="Customer signup date"),
600
TableColumn(name="total_orders", type="INTEGER", description="Total number of orders"),
601
TableColumn(name="lifetime_value", type="DECIMAL(10,2)", description="Customer lifetime value in USD")
602
]
603
)
604
605
# Table statistics as records
606
stats_records = [
607
TableRecord({
608
"column": "customer_id",
609
"count": len(customers_df),
610
"unique_count": customers_df["customer_id"].nunique(),
611
"null_count": customers_df["customer_id"].isnull().sum()
612
}),
613
TableRecord({
614
"column": "age",
615
"mean": customers_df["age"].mean(),
616
"min": customers_df["age"].min(),
617
"max": customers_df["age"].max(),
618
"std": customers_df["age"].std()
619
}),
620
TableRecord({
621
"column": "lifetime_value",
622
"mean": customers_df["lifetime_value"].mean(),
623
"median": customers_df["lifetime_value"].median(),
624
"total": customers_df["lifetime_value"].sum()
625
})
626
]
627
628
metadata = {
629
# Table schema
630
"schema": MetadataValue.table_schema(schema),
631
632
# Table statistics
633
"table_stats": TableMetadataValue(
634
records=stats_records,
635
schema=TableSchema(
636
columns=[
637
TableColumn("column", "VARCHAR"),
638
TableColumn("count", "INTEGER"),
639
TableColumn("mean", "FLOAT"),
640
TableColumn("min", "FLOAT"),
641
TableColumn("max", "FLOAT")
642
]
643
)
644
),
645
646
# Additional table metadata
647
"row_count": MetadataValue.int(len(customers_df)),
648
"column_count": MetadataValue.int(len(customers_df.columns)),
649
"estimated_size_mb": MetadataValue.float(
650
customers_df.memory_usage(deep=True).sum() / 1024 / 1024
651
)
652
}
653
654
return customers_df, metadata
655
```
656
657
##### `TableSchemaMetadataValue` { .api }
658
659
**Module:** `dagster._core.definitions.metadata`
660
**Type:** MetadataValue subclass
661
662
Metadata specifically for table schemas.
663
664
```python
665
from dagster import TableSchemaMetadataValue, TableSchema, TableColumn, TableConstraints
666
from dagster import TableColumnConstraints
667
668
@asset
669
def validated_orders_table() -> pd.DataFrame:
670
"""Asset with detailed schema and constraints metadata."""
671
672
orders_df = pd.DataFrame({
673
"order_id": range(1, 501),
674
"customer_id": np.random.randint(1, 101, 500),
675
"product_id": np.random.randint(1, 21, 500),
676
"quantity": np.random.randint(1, 10, 500),
677
"price": np.random.uniform(10, 500, 500).round(2),
678
"order_date": pd.date_range("2023-01-01", periods=500, freq="H"),
679
"status": np.random.choice(["pending", "shipped", "delivered"], 500)
680
})
681
682
# Define schema with constraints
683
schema = TableSchema(
684
columns=[
685
TableColumn(
686
name="order_id",
687
type="INTEGER",
688
description="Primary key for orders",
689
constraints=TableColumnConstraints(
690
nullable=False,
691
unique=True
692
)
693
),
694
TableColumn(
695
name="customer_id",
696
type="INTEGER",
697
description="Foreign key to customers table",
698
constraints=TableColumnConstraints(nullable=False)
699
),
700
TableColumn(
701
name="product_id",
702
type="INTEGER",
703
description="Foreign key to products table",
704
constraints=TableColumnConstraints(nullable=False)
705
),
706
TableColumn(
707
name="quantity",
708
type="INTEGER",
709
description="Order quantity",
710
constraints=TableColumnConstraints(nullable=False)
711
),
712
TableColumn(
713
name="price",
714
type="DECIMAL(10,2)",
715
description="Order price in USD",
716
constraints=TableColumnConstraints(nullable=False)
717
),
718
TableColumn(
719
name="order_date",
720
type="TIMESTAMP",
721
description="Order creation timestamp",
722
constraints=TableColumnConstraints(nullable=False)
723
),
724
TableColumn(
725
name="status",
726
type="VARCHAR(20)",
727
description="Order status",
728
constraints=TableColumnConstraints(nullable=False)
729
)
730
],
731
constraints=TableConstraints(
732
other=[
733
"FOREIGN KEY (customer_id) REFERENCES customers(customer_id)",
734
"FOREIGN KEY (product_id) REFERENCES products(product_id)",
735
"CHECK (quantity > 0)",
736
"CHECK (price > 0)"
737
]
738
)
739
)
740
741
metadata = {
742
"table_schema": MetadataValue.table_schema(schema),
743
"schema_version": MetadataValue.text("v1.2.0"),
744
"last_schema_update": MetadataValue.timestamp(pd.Timestamp.now())
745
}
746
747
return orders_df, metadata
748
```
749
750
##### `TableColumnLineageMetadataValue` { .api }
751
752
**Module:** `dagster._core.definitions.metadata`
753
**Type:** MetadataValue subclass
754
755
Metadata for column-level lineage tracking.
756
757
```python
758
from dagster import TableColumnLineageMetadataValue, TableColumnLineage, TableColumnDep, AssetKey
759
760
@asset
761
def enriched_customer_data(customers: pd.DataFrame, demographics: pd.DataFrame) -> pd.DataFrame:
762
"""Asset with column lineage metadata."""
763
764
# Merge customer data with demographics
765
enriched = customers.merge(demographics, on="customer_id", how="left")
766
767
# Add computed columns
768
enriched["age_group"] = pd.cut(enriched["age"], bins=[0, 25, 35, 50, 100],
769
labels=["18-25", "26-35", "36-50", "50+"])
770
enriched["is_high_value"] = enriched["lifetime_value"] > enriched["lifetime_value"].median()
771
772
# Define column lineage
773
lineage = TableColumnLineage(
774
deps_by_column={
775
# Direct mappings from upstream assets
776
"customer_id": [
777
TableColumnDep(asset_key=AssetKey("customers"), column_name="customer_id")
778
],
779
"name": [
780
TableColumnDep(asset_key=AssetKey("customers"), column_name="name")
781
],
782
"email": [
783
TableColumnDep(asset_key=AssetKey("customers"), column_name="email")
784
],
785
"age": [
786
TableColumnDep(asset_key=AssetKey("customers"), column_name="age")
787
],
788
789
# Columns from demographics table
790
"income": [
791
TableColumnDep(asset_key=AssetKey("demographics"), column_name="income")
792
],
793
"education": [
794
TableColumnDep(asset_key=AssetKey("demographics"), column_name="education")
795
],
796
797
# Computed columns with dependencies
798
"age_group": [
799
TableColumnDep(asset_key=AssetKey("customers"), column_name="age")
800
],
801
"is_high_value": [
802
TableColumnDep(asset_key=AssetKey("customers"), column_name="lifetime_value")
803
]
804
}
805
)
806
807
metadata = {
808
"column_lineage": MetadataValue.column_lineage(lineage),
809
"enrichment_source": MetadataValue.text("demographics table"),
810
"computed_columns": MetadataValue.json(["age_group", "is_high_value"]),
811
"join_key": MetadataValue.text("customer_id")
812
}
813
814
return enriched, metadata
815
```
816
817
### Code References
818
819
#### `CodeReferencesMetadataValue` { .api }
820
821
**Module:** `dagster._core.definitions.metadata`
822
**Type:** MetadataValue subclass
823
824
Metadata for linking assets to source code locations.
825
826
```python
827
from dagster import CodeReferencesMetadataValue, LocalFileCodeReference, UrlCodeReference
828
from dagster import with_source_code_references
829
830
@with_source_code_references
831
@asset
832
def analytics_report() -> pd.DataFrame:
833
"""Asset with automatic source code references."""
834
835
# This decorator automatically adds code references
836
return generate_analytics_data()
837
838
@asset
839
def manual_code_references() -> pd.DataFrame:
840
"""Asset with manual code reference metadata."""
841
842
df = process_data()
843
844
# Manual code references
845
code_refs = [
846
LocalFileCodeReference(
847
file_path="/src/analytics/data_processing.py",
848
line_number=142,
849
label="Main processing function"
850
),
851
LocalFileCodeReference(
852
file_path="/src/analytics/transformations.py",
853
line_number=67,
854
label="Data transformation logic"
855
),
856
UrlCodeReference(
857
url="https://github.com/company/analytics/blob/main/src/analytics/data_processing.py#L142",
858
label="GitHub source"
859
)
860
]
861
862
metadata = {
863
"code_references": MetadataValue.code_references(code_refs),
864
"implementation_notes": MetadataValue.md("""
865
## Implementation Details
866
867
This asset implements the following logic:
868
1. Load raw data from warehouse
869
2. Apply business rules transformation
870
3. Generate aggregated metrics
871
872
See code references for detailed implementation.
873
""")
874
}
875
876
return df, metadata
877
878
# Link code references to Git
879
from dagster import link_code_references_to_git, AnchorBasedFilePathMapping
880
881
@link_code_references_to_git(
882
git_url="https://github.com/company/analytics",
883
git_branch="main",
884
file_path_mapping=AnchorBasedFilePathMapping(
885
local_file_anchor="/src",
886
file_anchor_path_in_repository="src"
887
)
888
)
889
def git_linked_asset() -> pd.DataFrame:
890
"""Asset with Git-linked code references."""
891
return process_data()
892
```
893
894
### Notebook Metadata
895
896
#### `NotebookMetadataValue` { .api }
897
898
**Module:** `dagster._core.definitions.metadata`
899
**Type:** MetadataValue subclass
900
901
Metadata for Jupyter notebooks and analysis artifacts.
902
903
```python
904
from dagster import NotebookMetadataValue, asset
905
import papermill as pm
906
907
@asset
908
def notebook_analysis() -> dict:
909
"""Asset that executes and tracks notebook analysis."""
910
911
# Execute notebook with papermill
912
output_notebook = "/tmp/analysis_output.ipynb"
913
pm.execute_notebook(
914
input_path="analysis_template.ipynb",
915
output_path=output_notebook,
916
parameters={"data_date": "2023-01-01", "sample_size": 1000}
917
)
918
919
# Extract results from executed notebook
920
nb = pm.read_notebook(output_notebook)
921
results = nb.dataframe.set_index("name")["value"].to_dict()
922
923
metadata = {
924
"analysis_notebook": MetadataValue.notebook(output_notebook),
925
"template_notebook": MetadataValue.path("analysis_template.ipynb"),
926
"execution_time": MetadataValue.float(nb.metadata.get("execution_time", 0)),
927
"kernel_name": MetadataValue.text(nb.metadata.get("kernelspec", {}).get("name")),
928
"cell_count": MetadataValue.int(len(nb.cells)),
929
"parameters": MetadataValue.json({
930
"data_date": "2023-01-01",
931
"sample_size": 1000
932
})
933
}
934
935
return results, metadata
936
```
937
938
### Data Versioning and Provenance
939
940
#### `DataVersion` { .api }
941
942
**Module:** `dagster._core.definitions.data_version`
943
**Type:** Class
944
945
Data versioning for change tracking and lineage.
946
947
```python
948
from dagster import DataVersion, DataProvenance, asset
949
950
@asset
951
def versioned_dataset() -> pd.DataFrame:
952
"""Asset with data versioning."""
953
954
# Load source data
955
source_df = pd.read_csv("source_data.csv")
956
957
# Process data
958
processed_df = source_df.dropna().reset_index(drop=True)
959
960
# Create data version based on content
961
content_hash = hashlib.sha256(processed_df.to_json().encode()).hexdigest()[:12]
962
data_version = DataVersion(content_hash)
963
964
# Add provenance information
965
provenance = DataProvenance(
966
code_version="v2.1.0",
967
input_data_versions={"source_data": DataVersion("abc123")},
968
is_user_provided=False
969
)
970
971
metadata = {
972
"data_version": MetadataValue.text(str(data_version)),
973
"content_hash": MetadataValue.text(content_hash),
974
"provenance": MetadataValue.json({
975
"code_version": "v2.1.0",
976
"input_versions": {"source_data": "abc123"}
977
})
978
}
979
980
return processed_df, metadata
981
```
982
983
This comprehensive event and metadata system provides rich observability, lineage tracking, and documentation capabilities for all Dagster pipelines. The typed metadata system ensures consistency and enables powerful filtering, search, and visualization in the Dagster UI.
984
985
For asset materialization and execution contexts that generate events, see [Execution and Contexts](./execution-contexts.md). For sensors that can respond to asset events, see [Sensors and Schedules](./sensors-schedules.md).