Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.
—
Complete GraphQL server implementation with HTTP and WebSocket support, providing the core API interface for the Dagster UI and external integrations. Built on Starlette/ASGI with support for queries, mutations, and real-time subscriptions.
Main webserver implementation that extends the GraphQL server base class with Dagster-specific functionality.
class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext], Generic[T_IWorkspaceProcessContext]):
def __init__(
self,
process_context: T_IWorkspaceProcessContext,
app_path_prefix: str = "",
live_data_poll_rate: Optional[int] = None,
uses_app_path_prefix: bool = True
):
"""
Initialize the Dagster webserver.
Args:
process_context: Workspace process context with instance and code locations
app_path_prefix: URL path prefix for the application
live_data_poll_rate: Polling rate for live data updates in milliseconds
uses_app_path_prefix: Whether to use path prefix in routing
"""
def build_graphql_schema(self) -> Schema:
"""Build the GraphQL schema using dagster-graphql."""
def build_graphql_middleware(self) -> list:
"""Build GraphQL middleware stack."""
def build_middleware(self) -> list[Middleware]:
"""Build ASGI middleware stack with tracing support."""
def make_security_headers(self) -> dict:
"""Generate security headers for HTTP responses."""
def make_csp_header(self, nonce: str) -> str:
"""Create Content Security Policy header with nonce."""
def create_asgi_app(self, **kwargs) -> Starlette:
"""Create the ASGI application with all routes and middleware."""Usage Examples:
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver.webserver import DagsterWebserver
import uvicorn
# Create webserver instance
with WorkspaceProcessContext(instance) as workspace_context:
webserver = DagsterWebserver(
workspace_context,
app_path_prefix="/dagster",
live_data_poll_rate=2000
)
# Get ASGI app
app = webserver.create_asgi_app()
# Run with uvicorn
uvicorn.run(app, host="127.0.0.1", port=3000)
# Custom middleware
webserver = DagsterWebserver(workspace_context)
app = webserver.create_asgi_app(
middleware=[custom_middleware],
debug=True
)Abstract base class providing GraphQL server functionality that can be extended for custom implementations.
class GraphQLServer(ABC, Generic[TRequestContext]):
def __init__(self, app_path_prefix: str = ""):
"""
Initialize GraphQL server with optional path prefix.
Args:
app_path_prefix: URL path prefix for GraphQL endpoints
"""
@abstractmethod
def build_graphql_schema(self) -> Schema:
"""Build the GraphQL schema. Must be implemented by subclasses."""
@abstractmethod
def build_graphql_middleware(self) -> list:
"""Build GraphQL middleware. Must be implemented by subclasses."""
def build_graphql_validation_rules(self) -> Collection[type[ASTValidationRule]]:
"""Build GraphQL validation rules. Uses standard rules by default."""
@abstractmethod
def build_middleware(self) -> list[Middleware]:
"""Build ASGI middleware stack. Must be implemented by subclasses."""
@abstractmethod
def build_routes(self) -> list[BaseRoute]:
"""Build application routes. Must be implemented by subclasses."""
@abstractmethod
def _make_request_context(self, conn: HTTPConnection) -> TRequestContext:
"""Create request context from HTTP connection. Must be implemented by subclasses."""
def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:
"""Context manager for request context lifecycle."""
async def graphql_http_endpoint(self, request: Request):
"""HTTP endpoint handler for GraphQL queries and mutations."""
async def graphql_ws_endpoint(self, websocket: WebSocket):
"""WebSocket endpoint handler for GraphQL subscriptions."""
async def execute_graphql_request(
self,
request: Request,
query: str,
variables: Optional[dict[str, Any]],
operation_name: Optional[str]
) -> JSONResponse:
"""Execute a GraphQL request and return JSON response."""
def create_asgi_app(self, **kwargs) -> Starlette:
"""Create ASGI application with routes and middleware."""The webserver provides comprehensive HTTP endpoints beyond GraphQL:
# GraphQL endpoints
async def graphql_http_endpoint(self, request: Request):
"""Handle GET/POST requests to /graphql with GraphiQL support."""
# Information endpoints
async def webserver_info_endpoint(self, request: Request):
"""Return webserver version and configuration info at /server_info."""
async def dagit_info_endpoint(self, request: Request):
"""Legacy info endpoint at /dagit_info (deprecated but maintained for compatibility)."""
# File download endpoints
async def download_debug_file_endpoint(self, request: Request):
"""Download debug export files at /download_debug/{run_id}."""
async def download_notebook(self, request: Request):
"""Download/view Jupyter notebooks at /notebook."""
async def download_captured_logs_endpoint(self, request: Request):
"""Download compute logs at /logs/{path}."""
# Asset reporting endpoints
async def report_asset_materialization_endpoint(self, request: Request):
"""Handle asset materialization reports at /report_asset_materialization/{asset_key}."""
async def report_asset_check_endpoint(self, request: Request):
"""Handle asset check reports at /report_asset_check/{asset_key}."""
async def report_asset_observation_endpoint(self, request: Request):
"""Handle asset observation reports at /report_asset_observation/{asset_key}."""
# UI endpoints
async def index_html_endpoint(self, request: Request):
"""Serve main UI HTML with CSP headers."""Full WebSocket support for GraphQL subscriptions using the graphql-ws protocol:
class GraphQLWS(str, Enum):
"""GraphQL WebSocket protocol constants."""
PROTOCOL = "graphql-ws"
CONNECTION_INIT = "connection_init"
CONNECTION_ACK = "connection_ack"
CONNECTION_ERROR = "connection_error"
CONNECTION_TERMINATE = "connection_terminate"
CONNECTION_KEEP_ALIVE = "ka"
START = "start"
DATA = "data"
ERROR = "error"
COMPLETE = "complete"
STOP = "stop"
async def execute_graphql_subscription(
self,
websocket: WebSocket,
operation_id: str,
query: str,
variables: Optional[dict[str, Any]],
operation_name: Optional[str]
) -> tuple[Optional[Task], Optional[GraphQLFormattedError]]:
"""Execute GraphQL subscription and return async task."""Built-in middleware for request tracing and custom middleware integration:
class DagsterTracedCounterMiddleware:
"""ASGI middleware for counting traced Dagster operations."""
def __init__(self, app): ...
async def __call__(self, scope, receive, send): ...
class TracingMiddleware:
"""ASGI middleware for request tracing and observability."""
def __init__(self, app): ...
async def __call__(self, scope, receive, send): ...Custom middleware integration:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware import Middleware
class CustomAuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# Add authentication logic
auth_header = request.headers.get("Authorization")
if not auth_header:
return JSONResponse({"error": "Unauthorized"}, status_code=401)
return await call_next(request)
# Usage with webserver
webserver = DagsterWebserver(workspace_context)
app = webserver.create_asgi_app(
middleware=[Middleware(CustomAuthMiddleware)]
)Built-in security headers and Content Security Policy support:
def make_security_headers(self) -> dict:
"""
Generate security headers for HTTP responses.
Returns:
dict: Security headers including Cache-Control, Feature-Policy, etc.
"""
def make_csp_header(self, nonce: str) -> str:
"""
Create Content Security Policy header with nonce.
Args:
nonce: Cryptographic nonce for inline scripts
Returns:
str: CSP header value
"""Security headers include:
Cache-Control: no-storeFeature-Policy: microphone 'none'; camera 'none'Referrer-Policy: strict-origin-when-cross-originX-Content-Type-Options: nosniffComprehensive error handling for GraphQL operations:
def handle_graphql_errors(self, errors: Sequence[GraphQLError]):
"""
Process GraphQL errors and add serializable error info.
Args:
errors: List of GraphQL errors
Returns:
list: Formatted error objects with debugging information
"""
def _determine_status_code(
self,
resolver_errors: Optional[list[GraphQLError]],
captured_errors: list[Exception]
) -> int:
"""
Determine appropriate HTTP status code based on error types.
Returns:
int: HTTP status code (200, 400, or 500)
"""Built-in static file serving for the Dagster UI:
def build_static_routes(self) -> list[Route]:
"""Build routes for static file serving including UI assets."""
def build_routes(self) -> list[BaseRoute]:
"""Build all application routes including GraphQL, API, and static files."""Sophisticated request context management for workspace integration:
def _make_request_context(self, conn: HTTPConnection) -> BaseWorkspaceRequestContext:
"""Create request context from HTTP connection with workspace access."""
@contextmanager
def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:
"""Context manager ensuring proper request context lifecycle."""The webserver integrates with dagster-graphql for schema generation:
from dagster_graphql.schema import create_schema
# Schema is automatically created
schema = webserver.build_graphql_schema()
# Custom schema modifications (advanced usage)
class CustomDagsterWebserver(DagsterWebserver):
def build_graphql_schema(self) -> Schema:
schema = create_schema()
# Add custom types or resolvers
return schema# Production webserver setup
webserver = DagsterWebserver(
workspace_context,
app_path_prefix="/dagster",
live_data_poll_rate=5000, # Reduce polling frequency
uses_app_path_prefix=True
)
app = webserver.create_asgi_app()
# Deploy with production ASGI server
# gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app# Development webserver setup
webserver = DagsterWebserver(
workspace_context,
live_data_poll_rate=1000, # Fast polling for development
)
app = webserver.create_asgi_app(debug=True)# Health check support
class HealthCheckWebserver(DagsterWebserver):
def build_routes(self) -> list[BaseRoute]:
routes = super().build_routes()
routes.append(Route("/health", self.health_check, methods=["GET"]))
return routes
async def health_check(self, request: Request):
return JSONResponse({"status": "healthy", "version": __version__})Install with Tessl CLI
npx tessl i tessl/pypi-dagster-webserver