CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

events-metadata.mddocs/

Events and Metadata

This document covers Dagster's comprehensive event and metadata system, including materialization events, observation events, metadata values, table metadata, and code references. The event system provides rich observability and lineage tracking for all pipeline operations.

Core Events

Dagster's event system captures and records all significant occurrences during pipeline execution, providing comprehensive observability and debugging capabilities.

Output Events

Output { .api }

Module: dagster._core.definitions.events
Type: Class

Represents the output of an operation with value and metadata.

from dagster import op, Output, MetadataValue
import pandas as pd

@op
def process_data() -> Output[pd.DataFrame]:
    """Op producing an output with metadata."""
    
    # Process data
    df = pd.DataFrame({
        "id": range(1000),
        "value": np.random.randn(1000),
        "category": np.random.choice(["A", "B", "C"], 1000)
    })
    
    # Return output with rich metadata
    return Output(
        value=df,
        metadata={
            "records": len(df),
            "columns": MetadataValue.int(len(df.columns)),
            "memory_usage_mb": MetadataValue.float(df.memory_usage(deep=True).sum() / 1024 / 1024),
            "categories": MetadataValue.text(",".join(df["category"].unique())),
            "data_quality_score": MetadataValue.float(0.95),
            "processing_time": MetadataValue.text("2.3 seconds"),
            "sample_data": MetadataValue.md(df.head().to_markdown())
        }
    )

@op  
def validate_data(df: pd.DataFrame) -> Output[bool]:
    """Op with conditional output metadata."""
    
    # Validation logic
    null_count = df.isnull().sum().sum()
    duplicate_count = df.duplicated().sum()
    is_valid = null_count == 0 and duplicate_count == 0
    
    metadata = {
        "null_count": MetadataValue.int(null_count),
        "duplicate_count": MetadataValue.int(duplicate_count),
        "validation_status": MetadataValue.text("PASSED" if is_valid else "FAILED")
    }
    
    if not is_valid:
        metadata["issues"] = MetadataValue.text(f"Found {null_count} nulls, {duplicate_count} duplicates")
    
    return Output(
        value=is_valid,
        metadata=metadata
    )

Parameters:

  • value: Any - The output value
  • metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Output metadata
  • output_name: str = "result" - Name of the output

DynamicOutput { .api }

Module: dagster._core.definitions.events
Type: Class

Output for dynamic/fan-out operations that produce multiple outputs.

from dagster import op, DynamicOutput, DynamicOut
import pandas as pd

@op(out=DynamicOut())
def split_data_by_category(df: pd.DataFrame):
    """Op producing dynamic outputs for each category."""
    
    categories = df["category"].unique()
    
    for category in categories:
        category_data = df[df["category"] == category]
        
        yield DynamicOutput(
            value=category_data,
            mapping_key=category,  # Dynamic key
            metadata={
                "category": MetadataValue.text(category),
                "record_count": MetadataValue.int(len(category_data)),
                "avg_value": MetadataValue.float(category_data["value"].mean())
            }
        )

@op
def process_category(category_data: pd.DataFrame) -> dict:
    """Process individual category data."""
    return {
        "category": category_data["category"].iloc[0],
        "summary": {
            "count": len(category_data),
            "mean": category_data["value"].mean(),
            "std": category_data["value"].std()
        }
    }

@job
def dynamic_processing_job():
    """Job with dynamic fan-out."""
    categories = split_data_by_category(load_data())
    categories.map(process_category)

Parameters:

  • value: Any - The output value
  • mapping_key: str - Dynamic mapping key
  • metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Output metadata
  • output_name: str = "result" - Name of the output

Asset Events

AssetMaterialization { .api }

Module: dagster._core.definitions.events
Type: Class

Event representing the materialization of an asset with metadata and lineage.

from dagster import op, AssetMaterialization, AssetKey, MetadataValue
import pandas as pd

@op
def generate_asset_materialization() -> AssetMaterialization:
    """Op that explicitly creates asset materialization."""
    
    # Process data
    df = pd.read_csv("input_data.csv")
    processed_df = df.dropna().reset_index(drop=True)
    
    # Save to storage
    processed_df.to_parquet("processed_data.parquet")
    
    # Create materialization event
    return AssetMaterialization(
        asset_key=AssetKey(["warehouse", "processed_data"]),
        description="Processed customer data with null values removed",
        metadata={
            "input_records": MetadataValue.int(len(df)),
            "output_records": MetadataValue.int(len(processed_df)),
            "null_records_removed": MetadataValue.int(len(df) - len(processed_df)),
            "output_file": MetadataValue.path("processed_data.parquet"),
            "file_size_mb": MetadataValue.float(
                os.path.getsize("processed_data.parquet") / 1024 / 1024
            ),
            "processing_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
            "data_schema": MetadataValue.table_schema(
                TableSchema(
                    columns=[
                        TableColumn(name=col, type=str(dtype)) 
                        for col, dtype in processed_df.dtypes.items()
                    ]
                )
            )
        }
    )

# Asset materialization in asset function
@asset
def customer_analytics(context) -> dict:
    """Asset with rich materialization metadata."""
    
    # Load and process data
    customers = pd.read_sql("SELECT * FROM customers", connection)
    
    # Generate analytics
    analytics = {
        "total_customers": len(customers),
        "active_customers": len(customers[customers["active"] == True]),
        "avg_age": customers["age"].mean(),
        "top_cities": customers["city"].value_counts().head(5).to_dict()
    }
    
    # Log materialization with comprehensive metadata
    context.add_output_metadata({
        "total_customers": MetadataValue.int(analytics["total_customers"]),
        "active_rate": MetadataValue.float(
            analytics["active_customers"] / analytics["total_customers"]
        ),
        "average_age": MetadataValue.float(analytics["avg_age"]),
        "top_cities": MetadataValue.json(analytics["top_cities"]),
        "data_freshness": MetadataValue.timestamp(pd.Timestamp.now()),
        "query_execution_time": MetadataValue.text("1.2 seconds"),
        "customer_distribution": MetadataValue.md(
            customers.groupby("city").size().head(10).to_markdown()
        )
    })
    
    return analytics

Parameters:

  • asset_key: AssetKey - Asset key being materialized
  • description: Optional[str] - Description of materialization
  • metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Materialization metadata
  • partition: Optional[str] - Partition key if asset is partitioned
  • tags: Optional[Dict[str, str]] - Materialization tags

AssetObservation { .api }

Module: dagster._core.definitions.events
Type: Class

Event representing observation of an asset without materialization (read-only).

from dagster import op, AssetObservation, AssetKey, MetadataValue
import pandas as pd

@op
def monitor_external_table() -> AssetObservation:
    """Monitor external data source without materializing."""
    
    # Connect to external system (no materialization)
    table_stats = get_table_stats("external_database", "user_events")
    
    return AssetObservation(
        asset_key=AssetKey(["external", "user_events"]),
        description="Daily monitoring of external user events table",
        metadata={
            "row_count": MetadataValue.int(table_stats["row_count"]),
            "size_gb": MetadataValue.float(table_stats["size_bytes"] / (1024**3)),
            "last_updated": MetadataValue.timestamp(table_stats["last_modified"]),
            "health_score": MetadataValue.float(table_stats["health_score"]),
            "availability": MetadataValue.text(
                "AVAILABLE" if table_stats["is_accessible"] else "UNAVAILABLE"
            ),
            "partition_count": MetadataValue.int(table_stats["partition_count"]),
            "schema_version": MetadataValue.text(table_stats["schema_version"])
        }
    )

# Asset observation in sensor
from dagster import sensor, RunRequest

@sensor(minimum_interval_seconds=300)  # Every 5 minutes
def external_data_monitor():
    """Sensor that observes external data sources."""
    
    observations = []
    
    # Check multiple external sources
    sources = [
        {"key": ["external", "payments"], "table": "payments"},
        {"key": ["external", "orders"], "table": "orders"},
        {"key": ["external", "inventory"], "table": "inventory"}
    ]
    
    for source in sources:
        try:
            stats = get_external_table_stats(source["table"])
            
            observation = AssetObservation(
                asset_key=AssetKey(source["key"]),
                metadata={
                    "record_count": MetadataValue.int(stats["count"]),
                    "last_update": MetadataValue.timestamp(stats["last_modified"]),
                    "data_lag_minutes": MetadataValue.int(
                        (pd.Timestamp.now() - stats["last_modified"]).total_seconds() / 60
                    ),
                    "status": MetadataValue.text("HEALTHY" if stats["is_healthy"] else "DEGRADED")
                }
            )
            observations.append(observation)
            
        except Exception as e:
            # Log observation failure
            observation = AssetObservation(
                asset_key=AssetKey(source["key"]),
                metadata={
                    "status": MetadataValue.text("ERROR"),
                    "error_message": MetadataValue.text(str(e)),
                    "check_timestamp": MetadataValue.timestamp(pd.Timestamp.now())
                }
            )
            observations.append(observation)
    
    return observations

Parameters:

  • asset_key: AssetKey - Asset key being observed
  • description: Optional[str] - Description of observation
  • metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Observation metadata
  • partition: Optional[str] - Partition key if asset is partitioned
  • tags: Optional[Dict[str, str]] - Observation tags

Result Classes

MaterializeResult { .api }

Module: dagster._core.definitions.result
Type: Class

Result of asset materialization with metadata and asset information.

from dagster import asset, MaterializeResult, MetadataValue
import pandas as pd
import numpy as np

@asset
def sales_report(context, sales_data: pd.DataFrame) -> MaterializeResult:
    """Asset returning MaterializeResult with comprehensive metadata."""
    
    # Generate sales report
    report_data = {
        "total_sales": sales_data["amount"].sum(),
        "transaction_count": len(sales_data),
        "avg_transaction": sales_data["amount"].mean(),
        "top_products": sales_data.groupby("product")["amount"].sum().nlargest(5)
    }
    
    # Create report DataFrame
    report_df = pd.DataFrame([
        {"metric": "total_sales", "value": report_data["total_sales"]},
        {"metric": "transaction_count", "value": report_data["transaction_count"]},
        {"metric": "avg_transaction", "value": report_data["avg_transaction"]}
    ])
    
    return MaterializeResult(
        value=report_df,  # The materialized asset value
        metadata={
            # Business metrics
            "total_sales": MetadataValue.float(report_data["total_sales"]),
            "transaction_count": MetadataValue.int(report_data["transaction_count"]),
            "average_transaction": MetadataValue.float(report_data["avg_transaction"]),
            
            # Data quality metrics
            "data_completeness": MetadataValue.float(0.98),
            "outlier_count": MetadataValue.int(
                len(sales_data[sales_data["amount"] > sales_data["amount"].quantile(0.95)])
            ),
            
            # Technical metadata
            "processing_time_seconds": MetadataValue.float(2.1),
            "memory_usage_mb": MetadataValue.float(
                report_df.memory_usage(deep=True).sum() / 1024 / 1024
            ),
            
            # Visualization metadata
            "top_products": MetadataValue.json(report_data["top_products"].to_dict()),
            "sales_distribution": MetadataValue.md(
                sales_data.describe().to_markdown()
            ),
            
            # File metadata  
            "report_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
            "data_version": MetadataValue.text("v2.1.0")
        },
        
        # Optional: Asset check results
        check_results=[
            AssetCheckResult(
                passed=report_data["total_sales"] > 0,
                metadata={
                    "sales_validation": MetadataValue.text(
                        "PASSED: Sales data contains valid transactions"
                    )
                }
            )
        ]
    )

# Multi-asset with MaterializeResult
@multi_asset(
    outs={
        "daily_summary": AssetOut(),
        "weekly_summary": AssetOut()
    }
)
def sales_summaries(sales_data: pd.DataFrame) -> Dict[str, MaterializeResult]:
    """Multi-asset returning MaterializeResults for each output."""
    
    # Generate daily summary
    daily = sales_data.groupby(sales_data["date"].dt.date).agg({
        "amount": ["sum", "count", "mean"]
    }).round(2)
    
    # Generate weekly summary  
    weekly = sales_data.groupby(sales_data["date"].dt.to_period("W")).agg({
        "amount": ["sum", "count", "mean"]
    }).round(2)
    
    return {
        "daily_summary": MaterializeResult(
            value=daily,
            metadata={
                "date_range": MetadataValue.text(f"{daily.index.min()} to {daily.index.max()}"),
                "total_days": MetadataValue.int(len(daily)),
                "avg_daily_sales": MetadataValue.float(daily[("amount", "sum")].mean())
            }
        ),
        "weekly_summary": MaterializeResult(
            value=weekly,
            metadata={
                "week_count": MetadataValue.int(len(weekly)),
                "avg_weekly_sales": MetadataValue.float(weekly[("amount", "sum")].mean()),
                "peak_week": MetadataValue.text(str(weekly[("amount", "sum")].idxmax()))
            }
        )
    }

Parameters:

  • value: Any - The materialized asset value
  • metadata: Optional[Dict[str, Union[MetadataValue, RawMetadataValue]]] - Asset metadata
  • check_results: Optional[Sequence[AssetCheckResult]] - Asset check results

ObserveResult { .api }

Module: dagster._core.definitions.result
Type: Class

Result of asset observation with metadata but no materialized value.

from dagster import asset, ObserveResult, MetadataValue

@asset
def external_api_status(context) -> ObserveResult:
    """Asset that observes external API without materializing data."""
    
    # Check API health
    api_health = check_api_health("https://api.example.com")
    
    return ObserveResult(
        metadata={
            "api_status": MetadataValue.text("HEALTHY" if api_health["is_up"] else "DOWN"),
            "response_time_ms": MetadataValue.int(api_health["response_time_ms"]),
            "success_rate": MetadataValue.float(api_health["success_rate"]),
            "last_error": MetadataValue.text(api_health.get("last_error", "None")),
            "check_timestamp": MetadataValue.timestamp(pd.Timestamp.now()),
            "endpoint_count": MetadataValue.int(len(api_health["endpoints"])),
            "rate_limit_remaining": MetadataValue.int(api_health["rate_limit_remaining"])
        }
    )

Metadata System

Dagster's metadata system provides rich, typed metadata values for comprehensive observability and documentation.

Core Metadata Values

MetadataValue { .api }

Module: dagster._core.definitions.metadata
Type: Base class

Base class for all metadata values with type-safe metadata creation.

from dagster import MetadataValue, asset
import pandas as pd
import json
from pathlib import Path

@asset
def comprehensive_metadata_asset() -> pd.DataFrame:
    """Asset demonstrating all metadata value types."""
    
    df = pd.DataFrame({
        "id": range(100),
        "value": np.random.randn(100),
        "category": np.random.choice(["A", "B", "C"], 100)
    })
    
    # Save to file for path metadata
    output_path = Path("/tmp/output.parquet")
    df.to_parquet(output_path)
    
    # Comprehensive metadata using all MetadataValue types
    metadata = {
        # Text metadata
        "description": MetadataValue.text("Generated dataset with random values"),
        "processing_status": MetadataValue.text("COMPLETED"),
        
        # Numeric metadata
        "record_count": MetadataValue.int(len(df)),
        "column_count": MetadataValue.int(len(df.columns)),
        "file_size_mb": MetadataValue.float(output_path.stat().st_size / 1024 / 1024),
        "processing_time": MetadataValue.float(1.23),
        
        # Boolean metadata
        "has_nulls": MetadataValue.bool(df.isnull().any().any()),
        "data_quality_passed": MetadataValue.bool(True),
        
        # Path metadata
        "output_file": MetadataValue.path(output_path),
        "config_file": MetadataValue.path("/config/settings.yaml"),
        
        # URL metadata
        "data_source": MetadataValue.url("https://api.example.com/data"),
        "documentation": MetadataValue.url("https://docs.example.com/dataset"),
        
        # JSON metadata
        "column_stats": MetadataValue.json({
            col: {"mean": df[col].mean(), "std": df[col].std()} 
            for col in df.select_dtypes(include=[np.number]).columns
        }),
        "configuration": MetadataValue.json({
            "version": "2.1.0",
            "parameters": {"sample_size": 100, "random_seed": 42}
        }),
        
        # Markdown metadata
        "data_preview": MetadataValue.md(df.head().to_markdown()),
        "summary_report": MetadataValue.md(f"""
        # Data Summary
        
        - **Records**: {len(df)}
        - **Columns**: {len(df.columns)} 
        - **Categories**: {df['category'].nunique()}
        
        ## Statistics
        {df.describe().to_markdown()}
        """),
        
        # Timestamp metadata
        "created_at": MetadataValue.timestamp(pd.Timestamp.now()),
        "data_as_of": MetadataValue.timestamp(pd.Timestamp("2023-01-01")),
        
        # Python artifact metadata
        "generator_function": MetadataValue.python_artifact(comprehensive_metadata_asset),
        "processing_class": MetadataValue.python_artifact(pd.DataFrame),
        
        # Null metadata (explicitly null values)
        "optional_field": MetadataValue.null(),
        
        # Dagster-specific metadata
        "upstream_asset": MetadataValue.dagster_asset(AssetKey(["raw", "data"])),
        "related_job": MetadataValue.dagster_job("etl_job"),
        "pipeline_run": MetadataValue.dagster_run("run-12345")
    }
    
    return df, metadata  # Return both data and metadata

Specialized Metadata Values

Table Metadata

TableMetadataValue { .api }

Module: dagster._core.definitions.metadata
Type: MetadataValue subclass

Metadata for tabular data with schema and statistics.

from dagster import TableMetadataValue, TableSchema, TableColumn, TableRecord
import pandas as pd

@asset
def customers_table() -> pd.DataFrame:
    """Asset with comprehensive table metadata."""
    
    # Create customer data
    customers_df = pd.DataFrame({
        "customer_id": range(1, 1001),
        "name": [f"Customer {i}" for i in range(1, 1001)],
        "email": [f"customer{i}@example.com" for i in range(1, 1001)],
        "age": np.random.randint(18, 80, 1000),
        "city": np.random.choice(["NYC", "LA", "Chicago", "Houston"], 1000),
        "signup_date": pd.date_range("2020-01-01", periods=1000, freq="D"),
        "total_orders": np.random.randint(0, 50, 1000),
        "lifetime_value": np.random.uniform(100, 5000, 1000).round(2)
    })
    
    # Table schema metadata
    schema = TableSchema(
        columns=[
            TableColumn(name="customer_id", type="INTEGER", description="Unique customer identifier"),
            TableColumn(name="name", type="VARCHAR(255)", description="Customer full name"),
            TableColumn(name="email", type="VARCHAR(255)", description="Customer email address"),
            TableColumn(name="age", type="INTEGER", description="Customer age in years"),
            TableColumn(name="city", type="VARCHAR(100)", description="Customer city"),
            TableColumn(name="signup_date", type="DATE", description="Customer signup date"),
            TableColumn(name="total_orders", type="INTEGER", description="Total number of orders"),
            TableColumn(name="lifetime_value", type="DECIMAL(10,2)", description="Customer lifetime value in USD")
        ]
    )
    
    # Table statistics as records
    stats_records = [
        TableRecord({
            "column": "customer_id",
            "count": len(customers_df),
            "unique_count": customers_df["customer_id"].nunique(),
            "null_count": customers_df["customer_id"].isnull().sum()
        }),
        TableRecord({
            "column": "age", 
            "mean": customers_df["age"].mean(),
            "min": customers_df["age"].min(),
            "max": customers_df["age"].max(),
            "std": customers_df["age"].std()
        }),
        TableRecord({
            "column": "lifetime_value",
            "mean": customers_df["lifetime_value"].mean(),
            "median": customers_df["lifetime_value"].median(),
            "total": customers_df["lifetime_value"].sum()
        })
    ]
    
    metadata = {
        # Table schema
        "schema": MetadataValue.table_schema(schema),
        
        # Table statistics
        "table_stats": TableMetadataValue(
            records=stats_records,
            schema=TableSchema(
                columns=[
                    TableColumn("column", "VARCHAR"),
                    TableColumn("count", "INTEGER"),  
                    TableColumn("mean", "FLOAT"),
                    TableColumn("min", "FLOAT"),
                    TableColumn("max", "FLOAT")
                ]
            )
        ),
        
        # Additional table metadata
        "row_count": MetadataValue.int(len(customers_df)),
        "column_count": MetadataValue.int(len(customers_df.columns)),
        "estimated_size_mb": MetadataValue.float(
            customers_df.memory_usage(deep=True).sum() / 1024 / 1024
        )
    }
    
    return customers_df, metadata
TableSchemaMetadataValue { .api }

Module: dagster._core.definitions.metadata
Type: MetadataValue subclass

Metadata specifically for table schemas.

from dagster import TableSchemaMetadataValue, TableSchema, TableColumn, TableConstraints
from dagster import TableColumnConstraints

@asset
def validated_orders_table() -> pd.DataFrame:
    """Asset with detailed schema and constraints metadata."""
    
    orders_df = pd.DataFrame({
        "order_id": range(1, 501),
        "customer_id": np.random.randint(1, 101, 500),
        "product_id": np.random.randint(1, 21, 500), 
        "quantity": np.random.randint(1, 10, 500),
        "price": np.random.uniform(10, 500, 500).round(2),
        "order_date": pd.date_range("2023-01-01", periods=500, freq="H"),
        "status": np.random.choice(["pending", "shipped", "delivered"], 500)
    })
    
    # Define schema with constraints
    schema = TableSchema(
        columns=[
            TableColumn(
                name="order_id",
                type="INTEGER",
                description="Primary key for orders",
                constraints=TableColumnConstraints(
                    nullable=False,
                    unique=True
                )
            ),
            TableColumn(
                name="customer_id", 
                type="INTEGER",
                description="Foreign key to customers table",
                constraints=TableColumnConstraints(nullable=False)
            ),
            TableColumn(
                name="product_id",
                type="INTEGER", 
                description="Foreign key to products table",
                constraints=TableColumnConstraints(nullable=False)
            ),
            TableColumn(
                name="quantity",
                type="INTEGER",
                description="Order quantity",
                constraints=TableColumnConstraints(nullable=False)
            ),
            TableColumn(
                name="price",
                type="DECIMAL(10,2)",
                description="Order price in USD", 
                constraints=TableColumnConstraints(nullable=False)
            ),
            TableColumn(
                name="order_date",
                type="TIMESTAMP",
                description="Order creation timestamp",
                constraints=TableColumnConstraints(nullable=False)
            ),
            TableColumn(
                name="status",
                type="VARCHAR(20)",
                description="Order status",
                constraints=TableColumnConstraints(nullable=False)
            )
        ],
        constraints=TableConstraints(
            other=[
                "FOREIGN KEY (customer_id) REFERENCES customers(customer_id)",
                "FOREIGN KEY (product_id) REFERENCES products(product_id)",
                "CHECK (quantity > 0)",
                "CHECK (price > 0)"
            ]
        )
    )
    
    metadata = {
        "table_schema": MetadataValue.table_schema(schema),
        "schema_version": MetadataValue.text("v1.2.0"),
        "last_schema_update": MetadataValue.timestamp(pd.Timestamp.now())
    }
    
    return orders_df, metadata
TableColumnLineageMetadataValue { .api }

Module: dagster._core.definitions.metadata
Type: MetadataValue subclass

Metadata for column-level lineage tracking.

from dagster import TableColumnLineageMetadataValue, TableColumnLineage, TableColumnDep, AssetKey

@asset
def enriched_customer_data(customers: pd.DataFrame, demographics: pd.DataFrame) -> pd.DataFrame:
    """Asset with column lineage metadata."""
    
    # Merge customer data with demographics
    enriched = customers.merge(demographics, on="customer_id", how="left")
    
    # Add computed columns
    enriched["age_group"] = pd.cut(enriched["age"], bins=[0, 25, 35, 50, 100], 
                                  labels=["18-25", "26-35", "36-50", "50+"])
    enriched["is_high_value"] = enriched["lifetime_value"] > enriched["lifetime_value"].median()
    
    # Define column lineage
    lineage = TableColumnLineage(
        deps_by_column={
            # Direct mappings from upstream assets
            "customer_id": [
                TableColumnDep(asset_key=AssetKey("customers"), column_name="customer_id")
            ],
            "name": [
                TableColumnDep(asset_key=AssetKey("customers"), column_name="name") 
            ],
            "email": [
                TableColumnDep(asset_key=AssetKey("customers"), column_name="email")
            ],
            "age": [
                TableColumnDep(asset_key=AssetKey("customers"), column_name="age")
            ],
            
            # Columns from demographics table
            "income": [
                TableColumnDep(asset_key=AssetKey("demographics"), column_name="income")
            ],
            "education": [
                TableColumnDep(asset_key=AssetKey("demographics"), column_name="education")
            ],
            
            # Computed columns with dependencies
            "age_group": [
                TableColumnDep(asset_key=AssetKey("customers"), column_name="age")
            ],
            "is_high_value": [
                TableColumnDep(asset_key=AssetKey("customers"), column_name="lifetime_value")
            ]
        }
    )
    
    metadata = {
        "column_lineage": MetadataValue.column_lineage(lineage),
        "enrichment_source": MetadataValue.text("demographics table"),
        "computed_columns": MetadataValue.json(["age_group", "is_high_value"]),
        "join_key": MetadataValue.text("customer_id")
    }
    
    return enriched, metadata

Code References

CodeReferencesMetadataValue { .api }

Module: dagster._core.definitions.metadata
Type: MetadataValue subclass

Metadata for linking assets to source code locations.

from dagster import CodeReferencesMetadataValue, LocalFileCodeReference, UrlCodeReference
from dagster import with_source_code_references

@with_source_code_references
@asset
def analytics_report() -> pd.DataFrame:
    """Asset with automatic source code references."""
    
    # This decorator automatically adds code references
    return generate_analytics_data()

@asset  
def manual_code_references() -> pd.DataFrame:
    """Asset with manual code reference metadata."""
    
    df = process_data()
    
    # Manual code references
    code_refs = [
        LocalFileCodeReference(
            file_path="/src/analytics/data_processing.py",
            line_number=142,
            label="Main processing function"
        ),
        LocalFileCodeReference(
            file_path="/src/analytics/transformations.py", 
            line_number=67,
            label="Data transformation logic"
        ),
        UrlCodeReference(
            url="https://github.com/company/analytics/blob/main/src/analytics/data_processing.py#L142",
            label="GitHub source"
        )
    ]
    
    metadata = {
        "code_references": MetadataValue.code_references(code_refs),
        "implementation_notes": MetadataValue.md("""
        ## Implementation Details
        
        This asset implements the following logic:
        1. Load raw data from warehouse
        2. Apply business rules transformation
        3. Generate aggregated metrics
        
        See code references for detailed implementation.
        """)
    }
    
    return df, metadata

# Link code references to Git
from dagster import link_code_references_to_git, AnchorBasedFilePathMapping

@link_code_references_to_git(
    git_url="https://github.com/company/analytics",
    git_branch="main", 
    file_path_mapping=AnchorBasedFilePathMapping(
        local_file_anchor="/src",
        file_anchor_path_in_repository="src"
    )
)
def git_linked_asset() -> pd.DataFrame:
    """Asset with Git-linked code references."""
    return process_data()

Notebook Metadata

NotebookMetadataValue { .api }

Module: dagster._core.definitions.metadata
Type: MetadataValue subclass

Metadata for Jupyter notebooks and analysis artifacts.

from dagster import NotebookMetadataValue, asset
import papermill as pm

@asset
def notebook_analysis() -> dict:
    """Asset that executes and tracks notebook analysis."""
    
    # Execute notebook with papermill
    output_notebook = "/tmp/analysis_output.ipynb" 
    pm.execute_notebook(
        input_path="analysis_template.ipynb",
        output_path=output_notebook,
        parameters={"data_date": "2023-01-01", "sample_size": 1000}
    )
    
    # Extract results from executed notebook
    nb = pm.read_notebook(output_notebook)
    results = nb.dataframe.set_index("name")["value"].to_dict()
    
    metadata = {
        "analysis_notebook": MetadataValue.notebook(output_notebook),
        "template_notebook": MetadataValue.path("analysis_template.ipynb"), 
        "execution_time": MetadataValue.float(nb.metadata.get("execution_time", 0)),
        "kernel_name": MetadataValue.text(nb.metadata.get("kernelspec", {}).get("name")),
        "cell_count": MetadataValue.int(len(nb.cells)),
        "parameters": MetadataValue.json({
            "data_date": "2023-01-01",
            "sample_size": 1000
        })
    }
    
    return results, metadata

Data Versioning and Provenance

DataVersion { .api }

Module: dagster._core.definitions.data_version
Type: Class

Data versioning for change tracking and lineage.

from dagster import DataVersion, DataProvenance, asset

@asset
def versioned_dataset() -> pd.DataFrame:
    """Asset with data versioning."""
    
    # Load source data
    source_df = pd.read_csv("source_data.csv")
    
    # Process data
    processed_df = source_df.dropna().reset_index(drop=True)
    
    # Create data version based on content
    content_hash = hashlib.sha256(processed_df.to_json().encode()).hexdigest()[:12]
    data_version = DataVersion(content_hash)
    
    # Add provenance information
    provenance = DataProvenance(
        code_version="v2.1.0",
        input_data_versions={"source_data": DataVersion("abc123")},
        is_user_provided=False
    )
    
    metadata = {
        "data_version": MetadataValue.text(str(data_version)),
        "content_hash": MetadataValue.text(content_hash),
        "provenance": MetadataValue.json({
            "code_version": "v2.1.0",
            "input_versions": {"source_data": "abc123"}
        })
    }
    
    return processed_df, metadata

This comprehensive event and metadata system provides rich observability, lineage tracking, and documentation capabilities for all Dagster pipelines. The typed metadata system ensures consistency and enables powerful filtering, search, and visualization in the Dagster UI.

For asset materialization and execution contexts that generate events, see Execution and Contexts. For sensors that can respond to asset events, see Sensors and Schedules.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json