Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.
—
HTTP endpoints for external systems to report asset events directly to Dagster instances. Enables integration with external data pipelines, monitoring systems, and third-party tools that need to communicate asset state changes to Dagster.
Report when external systems have materialized (created or updated) assets, allowing Dagster to track data lineage and freshness across the entire data ecosystem.
async def handle_report_asset_materialization_request(
context: BaseWorkspaceRequestContext,
request: Request
) -> JSONResponse:
"""
Handle asset materialization reporting via HTTP.
Args:
context: Workspace request context with instance access
request: HTTP request containing asset materialization data
Returns:
JSONResponse: Success/error response with event details
"""Endpoint: POST /report_asset_materialization/{asset_key:path}
Usage Examples:
import requests
import json
# Basic asset materialization report
response = requests.post(
"http://localhost:3000/report_asset_materialization/my_dataset",
json={
"metadata": {
"rows": 1000,
"columns": 10,
"file_size": "5.2MB"
},
"description": "Daily ETL run completed successfully"
}
)
# With data version and partition
response = requests.post(
"http://localhost:3000/report_asset_materialization/sales/daily_summary",
json={
"data_version": "v1.2.3",
"partition": "2024-01-15",
"metadata": {
"total_sales": 50000.00,
"transaction_count": 1250
}
}
)
# Query parameters instead of JSON body
response = requests.post(
"http://localhost:3000/report_asset_materialization/my_asset",
params={
"data_version": "v1.0.0",
"description": "Processed via external pipeline"
}
)Report the results of data quality checks or validation tests performed by external systems.
async def handle_report_asset_check_request(
context: BaseWorkspaceRequestContext,
request: Request
) -> JSONResponse:
"""
Handle asset check evaluation reporting via HTTP.
Args:
context: Workspace request context with instance access
request: HTTP request containing asset check evaluation data
Returns:
JSONResponse: Success/error response with check details
"""Endpoint: POST /report_asset_check/{asset_key:path}
Usage Examples:
# Successful data quality check
response = requests.post(
"http://localhost:3000/report_asset_check/customer_data",
json={
"check_name": "null_values_check",
"passed": True,
"metadata": {
"null_count": 0,
"total_rows": 10000,
"check_duration": "0.5s"
}
}
)
# Failed data quality check
response = requests.post(
"http://localhost:3000/report_asset_check/product_catalog",
json={
"check_name": "price_validation",
"passed": False,
"severity": "ERROR",
"metadata": {
"invalid_prices": 15,
"error_details": "Negative prices found in products"
}
}
)
# Warning level check
response = requests.post(
"http://localhost:3000/report_asset_check/user_events",
json={
"check_name": "volume_check",
"passed": True,
"severity": "WARN",
"metadata": {
"daily_volume": 50000,
"expected_min": 75000,
"note": "Volume below expected threshold"
}
}
)Report observations about asset state without indicating materialization, useful for monitoring and metadata collection.
async def handle_report_asset_observation_request(
context: BaseWorkspaceRequestContext,
request: Request
) -> JSONResponse:
"""
Handle asset observation reporting via HTTP.
Args:
context: Workspace request context with instance access
request: HTTP request containing asset observation data
Returns:
JSONResponse: Success/error response with observation details
"""Endpoint: POST /report_asset_observation/{asset_key:path}
Usage Examples:
# Schema change observation
response = requests.post(
"http://localhost:3000/report_asset_observation/warehouse/customers",
json={
"metadata": {
"schema_version": "v2.1",
"columns_added": ["phone_verified", "last_login"],
"migration_applied": "2024-01-15T10:30:00Z"
},
"description": "Schema migration applied successfully"
}
)
# Performance metrics observation
response = requests.post(
"http://localhost:3000/report_asset_observation/api/user_service",
json={
"metadata": {
"avg_response_time": "45ms",
"error_rate": "0.1%",
"throughput": "1000 req/min",
"monitoring_window": "1h"
}
}
)
# Data freshness observation
response = requests.post(
"http://localhost:3000/report_asset_observation/reports/daily_kpis",
json={
"data_version": "2024-01-15",
"metadata": {
"last_updated": "2024-01-15T23:45:00Z",
"lag_minutes": 15,
"source_system": "analytics_pipeline"
}
}
)Utility classes defining parameter names for consistent API usage:
class ReportAssetMatParam:
"""Parameter names for asset materialization reporting."""
asset_key = "asset_key"
data_version = "data_version"
metadata = "metadata"
description = "description"
partition = "partition"
class ReportAssetCheckEvalParam:
"""Parameter names for asset check evaluation reporting."""
asset_key = "asset_key"
check_name = "check_name"
metadata = "metadata"
severity = "severity"
passed = "passed"
class ReportAssetObsParam:
"""Parameter names for asset observation reporting."""
asset_key = "asset_key"
data_version = "data_version"
metadata = "metadata"
description = "description"
partition = "partition"Asset keys can be specified in multiple ways:
# Single part asset key
POST /report_asset_materialization/my_asset
# Multi-part asset key (separated by /)
POST /report_asset_materialization/warehouse/customers/daily# Multi-part asset key as array
{
"asset_key": ["warehouse", "customers", "daily"],
"metadata": {...}
}# Database string format
POST /report_asset_materialization/my_asset?asset_key=warehouse.customers.dailyAll endpoints support both JSON body and query parameters:
requests.post(url, json={
"metadata": {"key": "value"},
"description": "Event description"
})requests.post(url, params={
"metadata": json.dumps({"key": "value"}),
"description": "Event description"
})# JSON body takes precedence over query parameters
requests.post(
url + "?description=fallback",
json={"description": "primary", "metadata": {...}}
)Metadata can be any JSON-serializable data:
# Simple metadata
{
"metadata": {
"rows": 1000,
"size_mb": 15.5,
"success": True
}
}
# Complex nested metadata
{
"metadata": {
"statistics": {
"mean": 42.5,
"std_dev": 10.2,
"quartiles": [35.0, 42.0, 50.0]
},
"quality_checks": [
{"name": "null_check", "passed": True},
{"name": "range_check", "passed": False, "details": "5 values out of range"}
],
"processing_time": "00:05:23",
"source_files": ["data1.csv", "data2.csv"]
}
}The endpoints return appropriate HTTP status codes and error messages:
# Success response (200)
{
"status": "success",
"event_id": "12345",
"message": "Asset materialization recorded"
}
# Bad request (400) - missing required fields
{
"error": "Missing required field: check_name",
"status": "error"
}
# Not found (404) - invalid asset key
{
"error": "Asset key 'invalid/asset' not found",
"status": "error"
}
# Server error (500) - internal processing error
{
"error": "Failed to process asset event",
"status": "error"
}When deployed in production, these endpoints should be secured:
# Example middleware for API key authentication
class AssetReportingAuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if request.url.path.startswith("/report_asset"):
api_key = request.headers.get("X-API-Key")
if not api_key or not validate_api_key(api_key):
return JSONResponse(
{"error": "Invalid API key"},
status_code=401
)
return await call_next(request)
# Deploy with authentication
webserver = DagsterWebserver(workspace_context)
app = webserver.create_asgi_app(
middleware=[Middleware(AssetReportingAuthMiddleware)]
)# At end of ETL job
def report_completion(asset_name, stats):
requests.post(
f"http://dagster-webserver/report_asset_materialization/{asset_name}",
json={
"metadata": stats,
"description": f"ETL completed for {asset_name}"
},
headers={"X-API-Key": os.getenv("DAGSTER_API_KEY")}
)# After running data quality checks
def report_quality_check(dataset, check_name, results):
requests.post(
f"http://dagster-webserver/report_asset_check/{dataset}",
json={
"check_name": check_name,
"passed": results["passed"],
"severity": "ERROR" if not results["passed"] else "INFO",
"metadata": results["details"]
}
)# Periodic monitoring of external systems
def monitor_system_health(system_name):
health_stats = get_system_health()
requests.post(
f"http://dagster-webserver/report_asset_observation/{system_name}",
json={
"metadata": health_stats,
"description": "System health check"
}
)Install with Tessl CLI
npx tessl i tessl/pypi-dagster-webserver