A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
This document covers Dagster's comprehensive event and metadata system, including materialization events, observation events, metadata values, table metadata, and code references. The event system provides rich observability and lineage tracking for all pipeline operations.
Dagster's event system captures and records all significant occurrences during pipeline execution, providing comprehensive observability and debugging capabilities.
Output { .api }Module: dagster._core.definitions.events
Type: Class
Represents the output of an operation with value and metadata.
from dagster import op, Output, MetadataValue
import pandas as pd
@op
def process_data() -> Output[pd.DataFrame]:
"""Op producing an output with metadata."""
# Process data
df = pd.DataFrame({
"id": range(1000),
"value": np.random.randn(1000),
"category": np.random.choice(["A", "B", "C"], 1000)
})
# Return output with rich metadata
return Output(
value=df,
metadata={
"records": len(df),
"columns": MetadataValue.int(len(df.columns)),
"memory_usage_mb": MetadataValue.float(df.memory_usage(deep=True).sum() / 1024 / 1024),
"categories": MetadataValue.text(",".join(df["category"].unique())),
"data_quality_score": MetadataValue.float(0.95),
"processing_time": MetadataValue.text("2.3 seconds"),
"sample_data": MetadataValue.md(df.head().to_markdown())
}
)
@op
def validate_data(df: pd.DataFrame) -> Output[bool]:
"""Op with conditional output metadata."""
# Validation logic
null_count = df.isnull().sum().sum()
duplicate_count = df.duplicated().sum()
is_valid = null_count == 0 and duplicate_count == 0
metadata = {
"null_count": MetadataValue.int(null_count),
"duplicate_count": MetadataValue.int(duplicate_count),
"validation_status": MetadataValue.text("PASSED" if is_valid else "FAILED")
}
if not is_valid:
metadata["issues"] = MetadataValue.text(f"Found {null_count} nulls, {duplicate_count} duplicates")
return Output(
value=is_valid,
metadata=metadata
)Parameters:
value: Any - The output valuemetadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Output metadataoutput_name: str = "result" - Name of the outputDynamicOutput { .api }Module: dagster._core.definitions.events
Type: Class
Output for dynamic/fan-out operations that produce multiple outputs.
from dagster import op, DynamicOutput, DynamicOut
import pandas as pd
@op(out=DynamicOut())
def split_data_by_category(df: pd.DataFrame):
"""Op producing dynamic outputs for each category."""
categories = df["category"].unique()
for category in categories:
category_data = df[df["category"] == category]
yield DynamicOutput(
value=category_data,
mapping_key=category, # Dynamic key
metadata={
"category": MetadataValue.text(category),
"record_count": MetadataValue.int(len(category_data)),
"avg_value": MetadataValue.float(category_data["value"].mean())
}
)
@op
def process_category(category_data: pd.DataFrame) -> dict:
"""Process individual category data."""
return {
"category": category_data["category"].iloc[0],
"summary": {
"count": len(category_data),
"mean": category_data["value"].mean(),
"std": category_data["value"].std()
}
}
@job
def dynamic_processing_job():
"""Job with dynamic fan-out."""
categories = split_data_by_category(load_data())
categories.map(process_category)Parameters:
value: Any - The output valuemapping_key: str - Dynamic mapping keymetadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Output metadataoutput_name: str = "result" - Name of the outputAssetMaterialization { .api }Module: dagster._core.definitions.events
Type: Class
Event representing the materialization of an asset with metadata and lineage.
from dagster import op, AssetMaterialization, AssetKey, MetadataValue
import pandas as pd
@op
def generate_asset_materialization() -> AssetMaterialization:
"""Op that explicitly creates asset materialization."""
# Process data
df = pd.read_csv("input_data.csv")
processed_df = df.dropna().reset_index(drop=True)
# Save to storage
processed_df.to_parquet("processed_data.parquet")
# Create materialization event
return AssetMaterialization(
asset_key=AssetKey(["warehouse", "processed_data"]),
description="Processed customer data with null values removed",
metadata={
"input_records": MetadataValue.int(len(df)),
"output_records": MetadataValue.int(len(processed_df)),
"null_records_removed": MetadataValue.int(len(df) - len(processed_df)),
"output_file": MetadataValue.path("processed_data.parquet"),
"file_size_mb": MetadataValue.float(
os.path.getsize("processed_data.parquet") / 1024 / 1024
),
"processing_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
"data_schema": MetadataValue.table_schema(
TableSchema(
columns=[
TableColumn(name=col, type=str(dtype))
for col, dtype in processed_df.dtypes.items()
]
)
)
}
)
# Asset materialization in asset function
@asset
def customer_analytics(context) -> dict:
"""Asset with rich materialization metadata."""
# Load and process data
customers = pd.read_sql("SELECT * FROM customers", connection)
# Generate analytics
analytics = {
"total_customers": len(customers),
"active_customers": len(customers[customers["active"] == True]),
"avg_age": customers["age"].mean(),
"top_cities": customers["city"].value_counts().head(5).to_dict()
}
# Log materialization with comprehensive metadata
context.add_output_metadata({
"total_customers": MetadataValue.int(analytics["total_customers"]),
"active_rate": MetadataValue.float(
analytics["active_customers"] / analytics["total_customers"]
),
"average_age": MetadataValue.float(analytics["avg_age"]),
"top_cities": MetadataValue.json(analytics["top_cities"]),
"data_freshness": MetadataValue.timestamp(pd.Timestamp.now()),
"query_execution_time": MetadataValue.text("1.2 seconds"),
"customer_distribution": MetadataValue.md(
customers.groupby("city").size().head(10).to_markdown()
)
})
return analyticsParameters:
asset_key: AssetKey - Asset key being materializeddescription: Optional[str] - Description of materializationmetadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Materialization metadatapartition: Optional[str] - Partition key if asset is partitionedtags: Optional[Dict[str, str]] - Materialization tagsAssetObservation { .api }Module: dagster._core.definitions.events
Type: Class
Event representing observation of an asset without materialization (read-only).
from dagster import op, AssetObservation, AssetKey, MetadataValue
import pandas as pd
@op
def monitor_external_table() -> AssetObservation:
"""Monitor external data source without materializing."""
# Connect to external system (no materialization)
table_stats = get_table_stats("external_database", "user_events")
return AssetObservation(
asset_key=AssetKey(["external", "user_events"]),
description="Daily monitoring of external user events table",
metadata={
"row_count": MetadataValue.int(table_stats["row_count"]),
"size_gb": MetadataValue.float(table_stats["size_bytes"] / (1024**3)),
"last_updated": MetadataValue.timestamp(table_stats["last_modified"]),
"health_score": MetadataValue.float(table_stats["health_score"]),
"availability": MetadataValue.text(
"AVAILABLE" if table_stats["is_accessible"] else "UNAVAILABLE"
),
"partition_count": MetadataValue.int(table_stats["partition_count"]),
"schema_version": MetadataValue.text(table_stats["schema_version"])
}
)
# Asset observation in sensor
from dagster import sensor, RunRequest
@sensor(minimum_interval_seconds=300) # Every 5 minutes
def external_data_monitor():
"""Sensor that observes external data sources."""
observations = []
# Check multiple external sources
sources = [
{"key": ["external", "payments"], "table": "payments"},
{"key": ["external", "orders"], "table": "orders"},
{"key": ["external", "inventory"], "table": "inventory"}
]
for source in sources:
try:
stats = get_external_table_stats(source["table"])
observation = AssetObservation(
asset_key=AssetKey(source["key"]),
metadata={
"record_count": MetadataValue.int(stats["count"]),
"last_update": MetadataValue.timestamp(stats["last_modified"]),
"data_lag_minutes": MetadataValue.int(
(pd.Timestamp.now() - stats["last_modified"]).total_seconds() / 60
),
"status": MetadataValue.text("HEALTHY" if stats["is_healthy"] else "DEGRADED")
}
)
observations.append(observation)
except Exception as e:
# Log observation failure
observation = AssetObservation(
asset_key=AssetKey(source["key"]),
metadata={
"status": MetadataValue.text("ERROR"),
"error_message": MetadataValue.text(str(e)),
"check_timestamp": MetadataValue.timestamp(pd.Timestamp.now())
}
)
observations.append(observation)
return observationsParameters:
asset_key: AssetKey - Asset key being observeddescription: Optional[str] - Description of observationmetadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Observation metadatapartition: Optional[str] - Partition key if asset is partitionedtags: Optional[Dict[str, str]] - Observation tagsMaterializeResult { .api }Module: dagster._core.definitions.result
Type: Class
Result of asset materialization with metadata and asset information.
from dagster import asset, MaterializeResult, MetadataValue
import pandas as pd
import numpy as np
@asset
def sales_report(context, sales_data: pd.DataFrame) -> MaterializeResult:
"""Asset returning MaterializeResult with comprehensive metadata."""
# Generate sales report
report_data = {
"total_sales": sales_data["amount"].sum(),
"transaction_count": len(sales_data),
"avg_transaction": sales_data["amount"].mean(),
"top_products": sales_data.groupby("product")["amount"].sum().nlargest(5)
}
# Create report DataFrame
report_df = pd.DataFrame([
{"metric": "total_sales", "value": report_data["total_sales"]},
{"metric": "transaction_count", "value": report_data["transaction_count"]},
{"metric": "avg_transaction", "value": report_data["avg_transaction"]}
])
return MaterializeResult(
value=report_df, # The materialized asset value
metadata={
# Business metrics
"total_sales": MetadataValue.float(report_data["total_sales"]),
"transaction_count": MetadataValue.int(report_data["transaction_count"]),
"average_transaction": MetadataValue.float(report_data["avg_transaction"]),
# Data quality metrics
"data_completeness": MetadataValue.float(0.98),
"outlier_count": MetadataValue.int(
len(sales_data[sales_data["amount"] > sales_data["amount"].quantile(0.95)])
),
# Technical metadata
"processing_time_seconds": MetadataValue.float(2.1),
"memory_usage_mb": MetadataValue.float(
report_df.memory_usage(deep=True).sum() / 1024 / 1024
),
# Visualization metadata
"top_products": MetadataValue.json(report_data["top_products"].to_dict()),
"sales_distribution": MetadataValue.md(
sales_data.describe().to_markdown()
),
# File metadata
"report_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
"data_version": MetadataValue.text("v2.1.0")
},
# Optional: Asset check results
check_results=[
AssetCheckResult(
passed=report_data["total_sales"] > 0,
metadata={
"sales_validation": MetadataValue.text(
"PASSED: Sales data contains valid transactions"
)
}
)
]
)
# Multi-asset with MaterializeResult
@multi_asset(
outs={
"daily_summary": AssetOut(),
"weekly_summary": AssetOut()
}
)
def sales_summaries(sales_data: pd.DataFrame) -> Dict[str, MaterializeResult]:
"""Multi-asset returning MaterializeResults for each output."""
# Generate daily summary
daily = sales_data.groupby(sales_data["date"].dt.date).agg({
"amount": ["sum", "count", "mean"]
}).round(2)
# Generate weekly summary
weekly = sales_data.groupby(sales_data["date"].dt.to_period("W")).agg({
"amount": ["sum", "count", "mean"]
}).round(2)
return {
"daily_summary": MaterializeResult(
value=daily,
metadata={
"date_range": MetadataValue.text(f"{daily.index.min()} to {daily.index.max()}"),
"total_days": MetadataValue.int(len(daily)),
"avg_daily_sales": MetadataValue.float(daily[("amount", "sum")].mean())
}
),
"weekly_summary": MaterializeResult(
value=weekly,
metadata={
"week_count": MetadataValue.int(len(weekly)),
"avg_weekly_sales": MetadataValue.float(weekly[("amount", "sum")].mean()),
"peak_week": MetadataValue.text(str(weekly[("amount", "sum")].idxmax()))
}
)
}Parameters:
value: Any - The materialized asset valuemetadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Asset metadatacheck_results: Optional[Sequence[AssetCheckResult]] - Asset check resultsObserveResult { .api }Module: dagster._core.definitions.result
Type: Class
Result of asset observation with metadata but no materialized value.
from dagster import asset, ObserveResult, MetadataValue
@asset
def external_api_status(context) -> ObserveResult:
"""Asset that observes external API without materializing data."""
# Check API health
api_health = check_api_health("https://api.example.com")
return ObserveResult(
metadata={
"api_status": MetadataValue.text("HEALTHY" if api_health["is_up"] else "DOWN"),
"response_time_ms": MetadataValue.int(api_health["response_time_ms"]),
"success_rate": MetadataValue.float(api_health["success_rate"]),
"last_error": MetadataValue.text(api_health.get("last_error", "None")),
"check_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
"endpoint_count": MetadataValue.int(len(api_health["endpoints"])),
"rate_limit_remaining": MetadataValue.int(api_health["rate_limit_remaining"])
}
)Dagster's metadata system provides rich, typed metadata values for comprehensive observability and documentation.
MetadataValue { .api }Module: dagster._core.definitions.metadata
Type: Base class
Base class for all metadata values with type-safe metadata creation.
from dagster import MetadataValue, asset
import pandas as pd
import json
from pathlib import Path
@asset
def comprehensive_metadata_asset() -> pd.DataFrame:
"""Asset demonstrating all metadata value types."""
df = pd.DataFrame({
"id": range(100),
"value": np.random.randn(100),
"category": np.random.choice(["A", "B", "C"], 100)
})
# Save to file for path metadata
output_path = Path("/tmp/output.parquet")
df.to_parquet(output_path)
# Comprehensive metadata using all MetadataValue types
metadata = {
# Text metadata
"description": MetadataValue.text("Generated dataset with random values"),
"processing_status": MetadataValue.text("COMPLETED"),
# Numeric metadata
"record_count": MetadataValue.int(len(df)),
"column_count": MetadataValue.int(len(df.columns)),
"file_size_mb": MetadataValue.float(output_path.stat().st_size / 1024 / 1024),
"processing_time": MetadataValue.float(1.23),
# Boolean metadata
"has_nulls": MetadataValue.bool(df.isnull().any().any()),
"data_quality_passed": MetadataValue.bool(True),
# Path metadata
"output_file": MetadataValue.path(output_path),
"config_file": MetadataValue.path("/config/settings.yaml"),
# URL metadata
"data_source": MetadataValue.url("https://api.example.com/data"),
"documentation": MetadataValue.url("https://docs.example.com/dataset"),
# JSON metadata
"column_stats": MetadataValue.json({
col: {"mean": df[col].mean(), "std": df[col].std()}
for col in df.select_dtypes(include=[np.number]).columns
}),
"configuration": MetadataValue.json({
"version": "2.1.0",
"parameters": {"sample_size": 100, "random_seed": 42}
}),
# Markdown metadata
"data_preview": MetadataValue.md(df.head().to_markdown()),
"summary_report": MetadataValue.md(f"""
# Data Summary
- **Records**: {len(df)}
- **Columns**: {len(df.columns)}
- **Categories**: {df['category'].nunique()}
## Statistics
{df.describe().to_markdown()}
"""),
# Timestamp metadata
"created_at": MetadataValue.timestamp(pd.Timestamp.now()),
"data_as_of": MetadataValue.timestamp(pd.Timestamp("2023-01-01")),
# Python artifact metadata
"generator_function": MetadataValue.python_artifact(comprehensive_metadata_asset),
"processing_class": MetadataValue.python_artifact(pd.DataFrame),
# Null metadata (explicitly null values)
"optional_field": MetadataValue.null(),
# Dagster-specific metadata
"upstream_asset": MetadataValue.dagster_asset(AssetKey(["raw", "data"])),
"related_job": MetadataValue.dagster_job("etl_job"),
"pipeline_run": MetadataValue.dagster_run("run-12345")
}
return df, metadata # Return both data and metadataTableMetadataValue { .api }Module: dagster._core.definitions.metadata
Type: MetadataValue subclass
Metadata for tabular data with schema and statistics.
from dagster import TableMetadataValue, TableSchema, TableColumn, TableRecord
import pandas as pd
@asset
def customers_table() -> pd.DataFrame:
"""Asset with comprehensive table metadata."""
# Create customer data
customers_df = pd.DataFrame({
"customer_id": range(1, 1001),
"name": [f"Customer {i}" for i in range(1, 1001)],
"email": [f"customer{i}@example.com" for i in range(1, 1001)],
"age": np.random.randint(18, 80, 1000),
"city": np.random.choice(["NYC", "LA", "Chicago", "Houston"], 1000),
"signup_date": pd.date_range("2020-01-01", periods=1000, freq="D"),
"total_orders": np.random.randint(0, 50, 1000),
"lifetime_value": np.random.uniform(100, 5000, 1000).round(2)
})
# Table schema metadata
schema = TableSchema(
columns=[
TableColumn(name="customer_id", type="INTEGER", description="Unique customer identifier"),
TableColumn(name="name", type="VARCHAR(255)", description="Customer full name"),
TableColumn(name="email", type="VARCHAR(255)", description="Customer email address"),
TableColumn(name="age", type="INTEGER", description="Customer age in years"),
TableColumn(name="city", type="VARCHAR(100)", description="Customer city"),
TableColumn(name="signup_date", type="DATE", description="Customer signup date"),
TableColumn(name="total_orders", type="INTEGER", description="Total number of orders"),
TableColumn(name="lifetime_value", type="DECIMAL(10,2)", description="Customer lifetime value in USD")
]
)
# Table statistics as records
stats_records = [
TableRecord({
"column": "customer_id",
"count": len(customers_df),
"unique_count": customers_df["customer_id"].nunique(),
"null_count": customers_df["customer_id"].isnull().sum()
}),
TableRecord({
"column": "age",
"mean": customers_df["age"].mean(),
"min": customers_df["age"].min(),
"max": customers_df["age"].max(),
"std": customers_df["age"].std()
}),
TableRecord({
"column": "lifetime_value",
"mean": customers_df["lifetime_value"].mean(),
"median": customers_df["lifetime_value"].median(),
"total": customers_df["lifetime_value"].sum()
})
]
metadata = {
# Table schema
"schema": MetadataValue.table_schema(schema),
# Table statistics
"table_stats": TableMetadataValue(
records=stats_records,
schema=TableSchema(
columns=[
TableColumn("column", "VARCHAR"),
TableColumn("count", "INTEGER"),
TableColumn("mean", "FLOAT"),
TableColumn("min", "FLOAT"),
TableColumn("max", "FLOAT")
]
)
),
# Additional table metadata
"row_count": MetadataValue.int(len(customers_df)),
"column_count": MetadataValue.int(len(customers_df.columns)),
"estimated_size_mb": MetadataValue.float(
customers_df.memory_usage(deep=True).sum() / 1024 / 1024
)
}
return customers_df, metadataTableSchemaMetadataValue { .api }Module: dagster._core.definitions.metadata
Type: MetadataValue subclass
Metadata specifically for table schemas.
from dagster import TableSchemaMetadataValue, TableSchema, TableColumn, TableConstraints
from dagster import TableColumnConstraints
@asset
def validated_orders_table() -> pd.DataFrame:
"""Asset with detailed schema and constraints metadata."""
orders_df = pd.DataFrame({
"order_id": range(1, 501),
"customer_id": np.random.randint(1, 101, 500),
"product_id": np.random.randint(1, 21, 500),
"quantity": np.random.randint(1, 10, 500),
"price": np.random.uniform(10, 500, 500).round(2),
"order_date": pd.date_range("2023-01-01", periods=500, freq="H"),
"status": np.random.choice(["pending", "shipped", "delivered"], 500)
})
# Define schema with constraints
schema = TableSchema(
columns=[
TableColumn(
name="order_id",
type="INTEGER",
description="Primary key for orders",
constraints=TableColumnConstraints(
nullable=False,
unique=True
)
),
TableColumn(
name="customer_id",
type="INTEGER",
description="Foreign key to customers table",
constraints=TableColumnConstraints(nullable=False)
),
TableColumn(
name="product_id",
type="INTEGER",
description="Foreign key to products table",
constraints=TableColumnConstraints(nullable=False)
),
TableColumn(
name="quantity",
type="INTEGER",
description="Order quantity",
constraints=TableColumnConstraints(nullable=False)
),
TableColumn(
name="price",
type="DECIMAL(10,2)",
description="Order price in USD",
constraints=TableColumnConstraints(nullable=False)
),
TableColumn(
name="order_date",
type="TIMESTAMP",
description="Order creation timestamp",
constraints=TableColumnConstraints(nullable=False)
),
TableColumn(
name="status",
type="VARCHAR(20)",
description="Order status",
constraints=TableColumnConstraints(nullable=False)
)
],
constraints=TableConstraints(
other=[
"FOREIGN KEY (customer_id) REFERENCES customers(customer_id)",
"FOREIGN KEY (product_id) REFERENCES products(product_id)",
"CHECK (quantity > 0)",
"CHECK (price > 0)"
]
)
)
metadata = {
"table_schema": MetadataValue.table_schema(schema),
"schema_version": MetadataValue.text("v1.2.0"),
"last_schema_update": MetadataValue.timestamp(pd.Timestamp.now())
}
return orders_df, metadataTableColumnLineageMetadataValue { .api }Module: dagster._core.definitions.metadata
Type: MetadataValue subclass
Metadata for column-level lineage tracking.
from dagster import TableColumnLineageMetadataValue, TableColumnLineage, TableColumnDep, AssetKey
@asset
def enriched_customer_data(customers: pd.DataFrame, demographics: pd.DataFrame) -> pd.DataFrame:
"""Asset with column lineage metadata."""
# Merge customer data with demographics
enriched = customers.merge(demographics, on="customer_id", how="left")
# Add computed columns
enriched["age_group"] = pd.cut(enriched["age"], bins=[0, 25, 35, 50, 100],
labels=["18-25", "26-35", "36-50", "50+"])
enriched["is_high_value"] = enriched["lifetime_value"] > enriched["lifetime_value"].median()
# Define column lineage
lineage = TableColumnLineage(
deps_by_column={
# Direct mappings from upstream assets
"customer_id": [
TableColumnDep(asset_key=AssetKey("customers"), column_name="customer_id")
],
"name": [
TableColumnDep(asset_key=AssetKey("customers"), column_name="name")
],
"email": [
TableColumnDep(asset_key=AssetKey("customers"), column_name="email")
],
"age": [
TableColumnDep(asset_key=AssetKey("customers"), column_name="age")
],
# Columns from demographics table
"income": [
TableColumnDep(asset_key=AssetKey("demographics"), column_name="income")
],
"education": [
TableColumnDep(asset_key=AssetKey("demographics"), column_name="education")
],
# Computed columns with dependencies
"age_group": [
TableColumnDep(asset_key=AssetKey("customers"), column_name="age")
],
"is_high_value": [
TableColumnDep(asset_key=AssetKey("customers"), column_name="lifetime_value")
]
}
)
metadata = {
"column_lineage": MetadataValue.column_lineage(lineage),
"enrichment_source": MetadataValue.text("demographics table"),
"computed_columns": MetadataValue.json(["age_group", "is_high_value"]),
"join_key": MetadataValue.text("customer_id")
}
return enriched, metadataCodeReferencesMetadataValue { .api }Module: dagster._core.definitions.metadata
Type: MetadataValue subclass
Metadata for linking assets to source code locations.
from dagster import CodeReferencesMetadataValue, LocalFileCodeReference, UrlCodeReference
from dagster import with_source_code_references
@with_source_code_references
@asset
def analytics_report() -> pd.DataFrame:
"""Asset with automatic source code references."""
# This decorator automatically adds code references
return generate_analytics_data()
@asset
def manual_code_references() -> pd.DataFrame:
"""Asset with manual code reference metadata."""
df = process_data()
# Manual code references
code_refs = [
LocalFileCodeReference(
file_path="/src/analytics/data_processing.py",
line_number=142,
label="Main processing function"
),
LocalFileCodeReference(
file_path="/src/analytics/transformations.py",
line_number=67,
label="Data transformation logic"
),
UrlCodeReference(
url="https://github.com/company/analytics/blob/main/src/analytics/data_processing.py#L142",
label="GitHub source"
)
]
metadata = {
"code_references": MetadataValue.code_references(code_refs),
"implementation_notes": MetadataValue.md("""
## Implementation Details
This asset implements the following logic:
1. Load raw data from warehouse
2. Apply business rules transformation
3. Generate aggregated metrics
See code references for detailed implementation.
""")
}
return df, metadata
# Link code references to Git
from dagster import link_code_references_to_git, AnchorBasedFilePathMapping
@link_code_references_to_git(
git_url="https://github.com/company/analytics",
git_branch="main",
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor="/src",
file_anchor_path_in_repository="src"
)
)
def git_linked_asset() -> pd.DataFrame:
"""Asset with Git-linked code references."""
return process_data()NotebookMetadataValue { .api }Module: dagster._core.definitions.metadata
Type: MetadataValue subclass
Metadata for Jupyter notebooks and analysis artifacts.
from dagster import NotebookMetadataValue, asset
import papermill as pm
@asset
def notebook_analysis() -> dict:
"""Asset that executes and tracks notebook analysis."""
# Execute notebook with papermill
output_notebook = "/tmp/analysis_output.ipynb"
pm.execute_notebook(
input_path="analysis_template.ipynb",
output_path=output_notebook,
parameters={"data_date": "2023-01-01", "sample_size": 1000}
)
# Extract results from executed notebook
nb = pm.read_notebook(output_notebook)
results = nb.dataframe.set_index("name")["value"].to_dict()
metadata = {
"analysis_notebook": MetadataValue.notebook(output_notebook),
"template_notebook": MetadataValue.path("analysis_template.ipynb"),
"execution_time": MetadataValue.float(nb.metadata.get("execution_time", 0)),
"kernel_name": MetadataValue.text(nb.metadata.get("kernelspec", {}).get("name")),
"cell_count": MetadataValue.int(len(nb.cells)),
"parameters": MetadataValue.json({
"data_date": "2023-01-01",
"sample_size": 1000
})
}
return results, metadataDataVersion { .api }Module: dagster._core.definitions.data_version
Type: Class
Data versioning for change tracking and lineage.
from dagster import DataVersion, DataProvenance, asset
@asset
def versioned_dataset() -> pd.DataFrame:
"""Asset with data versioning."""
# Load source data
source_df = pd.read_csv("source_data.csv")
# Process data
processed_df = source_df.dropna().reset_index(drop=True)
# Create data version based on content
content_hash = hashlib.sha256(processed_df.to_json().encode()).hexdigest()[:12]
data_version = DataVersion(content_hash)
# Add provenance information
provenance = DataProvenance(
code_version="v2.1.0",
input_data_versions={"source_data": DataVersion("abc123")},
is_user_provided=False
)
metadata = {
"data_version": MetadataValue.text(str(data_version)),
"content_hash": MetadataValue.text(content_hash),
"provenance": MetadataValue.json({
"code_version": "v2.1.0",
"input_versions": {"source_data": "abc123"}
})
}
return processed_df, metadataThis comprehensive event and metadata system provides rich observability, lineage tracking, and documentation capabilities for all Dagster pipelines. The typed metadata system ensures consistency and enables powerful filtering, search, and visualization in the Dagster UI.
For asset materialization and execution contexts that generate events, see Execution and Contexts. For sensors that can respond to asset events, see Sensors and Schedules.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster