CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-webserver

Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.

Pending
Overview
Eval results
Files

graphql.mddocs/

GraphQL Server

Complete GraphQL server implementation with HTTP and WebSocket support, providing the core API interface for the Dagster UI and external integrations. Built on Starlette/ASGI with support for queries, mutations, and real-time subscriptions.

Capabilities

DagsterWebserver Class

Main webserver implementation that extends the GraphQL server base class with Dagster-specific functionality.

class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext], Generic[T_IWorkspaceProcessContext]):
    def __init__(
        self,
        process_context: T_IWorkspaceProcessContext,
        app_path_prefix: str = "",
        live_data_poll_rate: Optional[int] = None,
        uses_app_path_prefix: bool = True
    ):
        """
        Initialize the Dagster webserver.
        
        Args:
            process_context: Workspace process context with instance and code locations
            app_path_prefix: URL path prefix for the application
            live_data_poll_rate: Polling rate for live data updates in milliseconds
            uses_app_path_prefix: Whether to use path prefix in routing
        """

    def build_graphql_schema(self) -> Schema:
        """Build the GraphQL schema using dagster-graphql."""

    def build_graphql_middleware(self) -> list:
        """Build GraphQL middleware stack."""

    def build_middleware(self) -> list[Middleware]:
        """Build ASGI middleware stack with tracing support."""

    def make_security_headers(self) -> dict:
        """Generate security headers for HTTP responses."""

    def make_csp_header(self, nonce: str) -> str:
        """Create Content Security Policy header with nonce."""

    def create_asgi_app(self, **kwargs) -> Starlette:
        """Create the ASGI application with all routes and middleware."""

Usage Examples:

from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver.webserver import DagsterWebserver
import uvicorn

# Create webserver instance
with WorkspaceProcessContext(instance) as workspace_context:
    webserver = DagsterWebserver(
        workspace_context,
        app_path_prefix="/dagster",
        live_data_poll_rate=2000
    )
    
    # Get ASGI app
    app = webserver.create_asgi_app()
    
    # Run with uvicorn
    uvicorn.run(app, host="127.0.0.1", port=3000)

# Custom middleware
webserver = DagsterWebserver(workspace_context)
app = webserver.create_asgi_app(
    middleware=[custom_middleware],
    debug=True
)

GraphQL Server Base Class

Abstract base class providing GraphQL server functionality that can be extended for custom implementations.

class GraphQLServer(ABC, Generic[TRequestContext]):
    def __init__(self, app_path_prefix: str = ""):
        """
        Initialize GraphQL server with optional path prefix.
        
        Args:
            app_path_prefix: URL path prefix for GraphQL endpoints
        """

    @abstractmethod
    def build_graphql_schema(self) -> Schema:
        """Build the GraphQL schema. Must be implemented by subclasses."""

    @abstractmethod
    def build_graphql_middleware(self) -> list:
        """Build GraphQL middleware. Must be implemented by subclasses."""

    def build_graphql_validation_rules(self) -> Collection[type[ASTValidationRule]]:
        """Build GraphQL validation rules. Uses standard rules by default."""

    @abstractmethod
    def build_middleware(self) -> list[Middleware]:
        """Build ASGI middleware stack. Must be implemented by subclasses."""

    @abstractmethod
    def build_routes(self) -> list[BaseRoute]:
        """Build application routes. Must be implemented by subclasses."""

    @abstractmethod
    def _make_request_context(self, conn: HTTPConnection) -> TRequestContext:
        """Create request context from HTTP connection. Must be implemented by subclasses."""

    def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:
        """Context manager for request context lifecycle."""

    async def graphql_http_endpoint(self, request: Request):
        """HTTP endpoint handler for GraphQL queries and mutations."""

    async def graphql_ws_endpoint(self, websocket: WebSocket):
        """WebSocket endpoint handler for GraphQL subscriptions."""

    async def execute_graphql_request(
        self,
        request: Request,
        query: str,
        variables: Optional[dict[str, Any]],
        operation_name: Optional[str]
    ) -> JSONResponse:
        """Execute a GraphQL request and return JSON response."""

    def create_asgi_app(self, **kwargs) -> Starlette:
        """Create ASGI application with routes and middleware."""

HTTP Endpoints

The webserver provides comprehensive HTTP endpoints beyond GraphQL:

# GraphQL endpoints
async def graphql_http_endpoint(self, request: Request):
    """Handle GET/POST requests to /graphql with GraphiQL support."""

# Information endpoints  
async def webserver_info_endpoint(self, request: Request):
    """Return webserver version and configuration info at /server_info."""

async def dagit_info_endpoint(self, request: Request):
    """Legacy info endpoint at /dagit_info (deprecated but maintained for compatibility)."""

# File download endpoints
async def download_debug_file_endpoint(self, request: Request):
    """Download debug export files at /download_debug/{run_id}."""

async def download_notebook(self, request: Request):
    """Download/view Jupyter notebooks at /notebook."""

async def download_captured_logs_endpoint(self, request: Request):
    """Download compute logs at /logs/{path}."""

# Asset reporting endpoints
async def report_asset_materialization_endpoint(self, request: Request):
    """Handle asset materialization reports at /report_asset_materialization/{asset_key}."""

async def report_asset_check_endpoint(self, request: Request):
    """Handle asset check reports at /report_asset_check/{asset_key}."""

async def report_asset_observation_endpoint(self, request: Request):
    """Handle asset observation reports at /report_asset_observation/{asset_key}."""

# UI endpoints
async def index_html_endpoint(self, request: Request):
    """Serve main UI HTML with CSP headers."""

WebSocket Support

Full WebSocket support for GraphQL subscriptions using the graphql-ws protocol:

class GraphQLWS(str, Enum):
    """GraphQL WebSocket protocol constants."""
    PROTOCOL = "graphql-ws"
    CONNECTION_INIT = "connection_init"
    CONNECTION_ACK = "connection_ack"
    CONNECTION_ERROR = "connection_error"
    CONNECTION_TERMINATE = "connection_terminate"
    CONNECTION_KEEP_ALIVE = "ka"
    START = "start"
    DATA = "data"
    ERROR = "error"
    COMPLETE = "complete"
    STOP = "stop"

async def execute_graphql_subscription(
    self,
    websocket: WebSocket,
    operation_id: str,
    query: str,
    variables: Optional[dict[str, Any]],
    operation_name: Optional[str]
) -> tuple[Optional[Task], Optional[GraphQLFormattedError]]:
    """Execute GraphQL subscription and return async task."""

Middleware Support

Built-in middleware for request tracing and custom middleware integration:

class DagsterTracedCounterMiddleware:
    """ASGI middleware for counting traced Dagster operations."""
    def __init__(self, app): ...
    async def __call__(self, scope, receive, send): ...

class TracingMiddleware:
    """ASGI middleware for request tracing and observability."""
    def __init__(self, app): ...
    async def __call__(self, scope, receive, send): ...

Custom middleware integration:

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware import Middleware

class CustomAuthMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        # Add authentication logic
        auth_header = request.headers.get("Authorization")
        if not auth_header:
            return JSONResponse({"error": "Unauthorized"}, status_code=401)
        return await call_next(request)

# Usage with webserver
webserver = DagsterWebserver(workspace_context)
app = webserver.create_asgi_app(
    middleware=[Middleware(CustomAuthMiddleware)]
)

Security Features

Built-in security headers and Content Security Policy support:

def make_security_headers(self) -> dict:
    """
    Generate security headers for HTTP responses.
    
    Returns:
        dict: Security headers including Cache-Control, Feature-Policy, etc.
    """

def make_csp_header(self, nonce: str) -> str:
    """
    Create Content Security Policy header with nonce.
    
    Args:
        nonce: Cryptographic nonce for inline scripts
        
    Returns:
        str: CSP header value
    """

Security headers include:

  • Cache-Control: no-store
  • Feature-Policy: microphone 'none'; camera 'none'
  • Referrer-Policy: strict-origin-when-cross-origin
  • X-Content-Type-Options: nosniff
  • Content Security Policy with nonce support

Error Handling

Comprehensive error handling for GraphQL operations:

def handle_graphql_errors(self, errors: Sequence[GraphQLError]):
    """
    Process GraphQL errors and add serializable error info.
    
    Args:
        errors: List of GraphQL errors
        
    Returns:
        list: Formatted error objects with debugging information
    """

def _determine_status_code(
    self,
    resolver_errors: Optional[list[GraphQLError]],
    captured_errors: list[Exception]
) -> int:
    """
    Determine appropriate HTTP status code based on error types.
    
    Returns:
        int: HTTP status code (200, 400, or 500)
    """

Static File Serving

Built-in static file serving for the Dagster UI:

def build_static_routes(self) -> list[Route]:
    """Build routes for static file serving including UI assets."""

def build_routes(self) -> list[BaseRoute]:
    """Build all application routes including GraphQL, API, and static files."""

Request Context Management

Sophisticated request context management for workspace integration:

def _make_request_context(self, conn: HTTPConnection) -> BaseWorkspaceRequestContext:
    """Create request context from HTTP connection with workspace access."""

@contextmanager
def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:
    """Context manager ensuring proper request context lifecycle."""

GraphQL Schema Integration

The webserver integrates with dagster-graphql for schema generation:

from dagster_graphql.schema import create_schema

# Schema is automatically created
schema = webserver.build_graphql_schema()

# Custom schema modifications (advanced usage)
class CustomDagsterWebserver(DagsterWebserver):
    def build_graphql_schema(self) -> Schema:
        schema = create_schema()
        # Add custom types or resolvers
        return schema

Deployment Considerations

Production Configuration

# Production webserver setup
webserver = DagsterWebserver(
    workspace_context,
    app_path_prefix="/dagster",
    live_data_poll_rate=5000,  # Reduce polling frequency
    uses_app_path_prefix=True
)

app = webserver.create_asgi_app()

# Deploy with production ASGI server
# gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app

Development Configuration

# Development webserver setup
webserver = DagsterWebserver(
    workspace_context,
    live_data_poll_rate=1000,  # Fast polling for development
)

app = webserver.create_asgi_app(debug=True)

Load Balancer Integration

# Health check support
class HealthCheckWebserver(DagsterWebserver):
    def build_routes(self) -> list[BaseRoute]:
        routes = super().build_routes()
        routes.append(Route("/health", self.health_check, methods=["GET"]))
        return routes
    
    async def health_check(self, request: Request):
        return JSONResponse({"status": "healthy", "version": __version__})

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-webserver

docs

application.md

asset-reporting.md

cli.md

debug.md

graphql.md

index.md

tile.json