CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-webserver

Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.

Pending
Overview
Eval results
Files

asset-reporting.mddocs/

Asset Reporting

HTTP endpoints for external systems to report asset events directly to Dagster instances. Enables integration with external data pipelines, monitoring systems, and third-party tools that need to communicate asset state changes to Dagster.

Capabilities

Asset Materialization Reporting

Report when external systems have materialized (created or updated) assets, allowing Dagster to track data lineage and freshness across the entire data ecosystem.

async def handle_report_asset_materialization_request(
    context: BaseWorkspaceRequestContext,
    request: Request
) -> JSONResponse:
    """
    Handle asset materialization reporting via HTTP.
    
    Args:
        context: Workspace request context with instance access
        request: HTTP request containing asset materialization data
        
    Returns:
        JSONResponse: Success/error response with event details
    """

Endpoint: POST /report_asset_materialization/{asset_key:path}

Usage Examples:

import requests
import json

# Basic asset materialization report
response = requests.post(
    "http://localhost:3000/report_asset_materialization/my_dataset",
    json={
        "metadata": {
            "rows": 1000,
            "columns": 10,
            "file_size": "5.2MB"
        },
        "description": "Daily ETL run completed successfully"
    }
)

# With data version and partition
response = requests.post(
    "http://localhost:3000/report_asset_materialization/sales/daily_summary",
    json={
        "data_version": "v1.2.3",
        "partition": "2024-01-15",
        "metadata": {
            "total_sales": 50000.00,
            "transaction_count": 1250
        }
    }
)

# Query parameters instead of JSON body
response = requests.post(
    "http://localhost:3000/report_asset_materialization/my_asset",
    params={
        "data_version": "v1.0.0",
        "description": "Processed via external pipeline"
    }
)

Asset Check Reporting

Report the results of data quality checks or validation tests performed by external systems.

async def handle_report_asset_check_request(
    context: BaseWorkspaceRequestContext,
    request: Request
) -> JSONResponse:
    """
    Handle asset check evaluation reporting via HTTP.
    
    Args:
        context: Workspace request context with instance access
        request: HTTP request containing asset check evaluation data
        
    Returns:
        JSONResponse: Success/error response with check details
    """

Endpoint: POST /report_asset_check/{asset_key:path}

Usage Examples:

# Successful data quality check
response = requests.post(
    "http://localhost:3000/report_asset_check/customer_data",
    json={
        "check_name": "null_values_check",
        "passed": True,
        "metadata": {
            "null_count": 0,
            "total_rows": 10000,
            "check_duration": "0.5s"
        }
    }
)

# Failed data quality check
response = requests.post(
    "http://localhost:3000/report_asset_check/product_catalog",
    json={
        "check_name": "price_validation", 
        "passed": False,
        "severity": "ERROR",
        "metadata": {
            "invalid_prices": 15,
            "error_details": "Negative prices found in products"
        }
    }
)

# Warning level check
response = requests.post(
    "http://localhost:3000/report_asset_check/user_events", 
    json={
        "check_name": "volume_check",
        "passed": True,
        "severity": "WARN",
        "metadata": {
            "daily_volume": 50000,
            "expected_min": 75000,
            "note": "Volume below expected threshold"
        }
    }
)

Asset Observation Reporting

Report observations about asset state without indicating materialization, useful for monitoring and metadata collection.

async def handle_report_asset_observation_request(
    context: BaseWorkspaceRequestContext,
    request: Request
) -> JSONResponse:
    """
    Handle asset observation reporting via HTTP.
    
    Args:
        context: Workspace request context with instance access
        request: HTTP request containing asset observation data
        
    Returns:
        JSONResponse: Success/error response with observation details
    """

Endpoint: POST /report_asset_observation/{asset_key:path}

Usage Examples:

# Schema change observation
response = requests.post(
    "http://localhost:3000/report_asset_observation/warehouse/customers",
    json={
        "metadata": {
            "schema_version": "v2.1",
            "columns_added": ["phone_verified", "last_login"],
            "migration_applied": "2024-01-15T10:30:00Z"
        },
        "description": "Schema migration applied successfully"
    }
)

# Performance metrics observation
response = requests.post(
    "http://localhost:3000/report_asset_observation/api/user_service",
    json={
        "metadata": {
            "avg_response_time": "45ms",
            "error_rate": "0.1%", 
            "throughput": "1000 req/min",
            "monitoring_window": "1h"
        }
    }
)

# Data freshness observation  
response = requests.post(
    "http://localhost:3000/report_asset_observation/reports/daily_kpis",
    json={
        "data_version": "2024-01-15",
        "metadata": {
            "last_updated": "2024-01-15T23:45:00Z",
            "lag_minutes": 15,
            "source_system": "analytics_pipeline"
        }
    }
)

Parameter Classes

Utility classes defining parameter names for consistent API usage:

class ReportAssetMatParam:
    """Parameter names for asset materialization reporting."""
    asset_key = "asset_key"
    data_version = "data_version"
    metadata = "metadata"
    description = "description"
    partition = "partition"

class ReportAssetCheckEvalParam:
    """Parameter names for asset check evaluation reporting."""
    asset_key = "asset_key"
    check_name = "check_name"
    metadata = "metadata"
    severity = "severity"
    passed = "passed"

class ReportAssetObsParam:
    """Parameter names for asset observation reporting."""
    asset_key = "asset_key"
    data_version = "data_version"
    metadata = "metadata"
    description = "description"
    partition = "partition"

Asset Key Formats

Asset keys can be specified in multiple ways:

URL Path Format

# Single part asset key
POST /report_asset_materialization/my_asset

# Multi-part asset key (separated by /)
POST /report_asset_materialization/warehouse/customers/daily

JSON Body Format

# Multi-part asset key as array
{
    "asset_key": ["warehouse", "customers", "daily"],
    "metadata": {...}
}

Query Parameter Format

# Database string format
POST /report_asset_materialization/my_asset?asset_key=warehouse.customers.daily

Request Formats

All endpoints support both JSON body and query parameters:

JSON Request Body

requests.post(url, json={
    "metadata": {"key": "value"},
    "description": "Event description"
})

Query Parameters

requests.post(url, params={
    "metadata": json.dumps({"key": "value"}),
    "description": "Event description"
})

Mixed Format

# JSON body takes precedence over query parameters
requests.post(
    url + "?description=fallback",
    json={"description": "primary", "metadata": {...}}
)

Metadata Handling

Metadata can be any JSON-serializable data:

# Simple metadata
{
    "metadata": {
        "rows": 1000,
        "size_mb": 15.5,
        "success": True
    }
}

# Complex nested metadata
{
    "metadata": {
        "statistics": {
            "mean": 42.5,
            "std_dev": 10.2,
            "quartiles": [35.0, 42.0, 50.0]
        },
        "quality_checks": [
            {"name": "null_check", "passed": True},
            {"name": "range_check", "passed": False, "details": "5 values out of range"}
        ],
        "processing_time": "00:05:23",
        "source_files": ["data1.csv", "data2.csv"]
    }
}

Error Handling

The endpoints return appropriate HTTP status codes and error messages:

# Success response (200)
{
    "status": "success",
    "event_id": "12345",
    "message": "Asset materialization recorded"
}

# Bad request (400) - missing required fields
{
    "error": "Missing required field: check_name",
    "status": "error"
}

# Not found (404) - invalid asset key
{
    "error": "Asset key 'invalid/asset' not found",
    "status": "error"
}

# Server error (500) - internal processing error
{
    "error": "Failed to process asset event",
    "status": "error"
}

Authentication and Security

When deployed in production, these endpoints should be secured:

# Example middleware for API key authentication
class AssetReportingAuthMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        if request.url.path.startswith("/report_asset"):
            api_key = request.headers.get("X-API-Key")
            if not api_key or not validate_api_key(api_key):
                return JSONResponse(
                    {"error": "Invalid API key"}, 
                    status_code=401
                )
        return await call_next(request)

# Deploy with authentication
webserver = DagsterWebserver(workspace_context)
app = webserver.create_asgi_app(
    middleware=[Middleware(AssetReportingAuthMiddleware)]
)

Integration Patterns

ETL Pipeline Integration

# At end of ETL job
def report_completion(asset_name, stats):
    requests.post(
        f"http://dagster-webserver/report_asset_materialization/{asset_name}",
        json={
            "metadata": stats,
            "description": f"ETL completed for {asset_name}"
        },
        headers={"X-API-Key": os.getenv("DAGSTER_API_KEY")}
    )

Data Quality Monitoring

# After running data quality checks
def report_quality_check(dataset, check_name, results):
    requests.post(
        f"http://dagster-webserver/report_asset_check/{dataset}",
        json={
            "check_name": check_name,
            "passed": results["passed"],
            "severity": "ERROR" if not results["passed"] else "INFO",
            "metadata": results["details"]
        }
    )

External System Monitoring

# Periodic monitoring of external systems
def monitor_system_health(system_name):
    health_stats = get_system_health()
    requests.post(
        f"http://dagster-webserver/report_asset_observation/{system_name}",
        json={
            "metadata": health_stats,
            "description": "System health check"
        }
    )

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-webserver

docs

application.md

asset-reporting.md

cli.md

debug.md

graphql.md

index.md

tile.json