or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application.mdasset-reporting.mdcli.mddebug.mdgraphql.mdindex.md
tile.json

tessl/pypi-dagster-webserver

Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-webserver@1.11.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-webserver@1.11.0

index.mddocs/

Dagster Webserver

A web server implementation for the Dagster UI that provides a browser-based interface for data orchestration and pipeline management. The webserver serves as the primary interface between users and Dagster instances, offering GraphQL APIs, asset reporting capabilities, and comprehensive web UI functionality.

Package Information

  • Package Name: dagster-webserver
  • Package Type: pypi
  • Language: Python
  • Installation: pip install dagster-webserver
  • Version: 1.11.8

Core Imports

from dagster_webserver.app import create_app_from_workspace_process_context
from dagster_webserver.cli import host_dagster_ui_with_workspace_process_context, create_dagster_webserver_cli
from dagster_webserver.webserver import DagsterWebserver
from dagster_webserver.version import __version__

Basic Usage

Starting the Webserver (CLI)

# Command line usage
dagster-webserver --host 0.0.0.0 --port 3000 --workspace-file workspace.yaml

Programmatic Usage

from dagster import DagsterInstance
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver.app import create_app_from_workspace_process_context
import uvicorn

# Create workspace context
instance = DagsterInstance.get()
with WorkspaceProcessContext(instance) as workspace_context:
    # Create ASGI app
    app = create_app_from_workspace_process_context(
        workspace_context,
        path_prefix="",
        live_data_poll_rate=2000
    )
    
    # Run with uvicorn
    uvicorn.run(app, host="127.0.0.1", port=3000)

Architecture

The dagster-webserver is built on a layered architecture:

  • CLI Layer: Command-line interface for server management and configuration
  • Application Layer: ASGI application factory and routing logic
  • GraphQL Layer: GraphQL server implementation with HTTP and WebSocket support
  • Asset Reporting Layer: HTTP endpoints for external asset event reporting
  • Web UI Layer: Static file serving and HTML template rendering
  • Debug Layer: Specialized debugging tools and ephemeral instance support

This design enables flexible deployment scenarios, from simple CLI usage to complex programmatic integration with custom middleware and routing.

Capabilities

Command Line Interface

Primary interface for starting and configuring the dagster webserver, including production deployments, development environments, and debugging scenarios.

def main(): ...
def host_dagster_ui_with_workspace_process_context(
    workspace_process_context: IWorkspaceProcessContext,
    host: Optional[str] = None,
    port: Optional[int] = None,
    path_prefix: str = "",
    log_level: str = "warning",
    live_data_poll_rate: Optional[int] = None
): ...
def create_dagster_webserver_cli() -> click.Command: ...

CLI Commands

Application Factory

Core functionality for creating ASGI applications from workspace contexts, enabling programmatic webserver integration and custom deployment scenarios.

def create_app_from_workspace_process_context(
    workspace_process_context: IWorkspaceProcessContext,
    path_prefix: str = "",
    live_data_poll_rate: Optional[int] = None,
    **kwargs
) -> Starlette: ...

Application Factory

GraphQL Server

Complete GraphQL server implementation with HTTP and WebSocket support, providing the core API interface for the Dagster UI and external integrations.

class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext]): ...
class GraphQLServer(ABC, Generic[TRequestContext]): ...

GraphQL Server

Asset Reporting

HTTP endpoints for external systems to report asset events directly to Dagster instances, enabling integration with external data pipelines and monitoring systems.

async def handle_report_asset_materialization_request(
    context: BaseWorkspaceRequestContext,
    request: Request
) -> JSONResponse: ...
async def handle_report_asset_check_request(
    context: BaseWorkspaceRequestContext,
    request: Request
) -> JSONResponse: ...
async def handle_report_asset_observation_request(
    context: BaseWorkspaceRequestContext,
    request: Request
) -> JSONResponse: ...

Asset Reporting

Debug Tools

Specialized debugging functionality for loading debug export files and running webserver instances with ephemeral data for troubleshooting and development.

class WebserverDebugWorkspaceProcessContext(IWorkspaceProcessContext): ...
def webserver_debug_command(input_files, port): ...

Debug Tools

Types

# Core webserver class
class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext], Generic[T_IWorkspaceProcessContext]):
    def __init__(
        self,
        process_context: T_IWorkspaceProcessContext,
        app_path_prefix: str = "",
        live_data_poll_rate: Optional[int] = None,
        uses_app_path_prefix: bool = True
    ): ...

# Abstract GraphQL server base
class GraphQLServer(ABC, Generic[TRequestContext]):
    def __init__(self, app_path_prefix: str = ""): ...

# Debug workspace context  
class WebserverDebugWorkspaceProcessContext(IWorkspaceProcessContext):
    def __init__(self, instance: DagsterInstance): ...

# GraphQL WebSocket protocol constants
class GraphQLWS(str, Enum):
    PROTOCOL = "graphql-ws"
    CONNECTION_INIT = "connection_init"
    CONNECTION_ACK = "connection_ack"
    CONNECTION_ERROR = "connection_error"
    CONNECTION_TERMINATE = "connection_terminate"
    CONNECTION_KEEP_ALIVE = "ka"
    START = "start"
    DATA = "data"
    ERROR = "error"
    COMPLETE = "complete"
    STOP = "stop"

# Asset reporting parameter classes
class ReportAssetMatParam:
    asset_key = "asset_key"
    data_version = "data_version"
    metadata = "metadata"
    description = "description"
    partition = "partition"

class ReportAssetCheckEvalParam:
    asset_key = "asset_key"
    check_name = "check_name"
    metadata = "metadata"
    severity = "severity"
    passed = "passed"

class ReportAssetObsParam:
    asset_key = "asset_key"
    data_version = "data_version"
    metadata = "metadata"
    description = "description"
    partition = "partition"

Constants

# Default configuration values
DEFAULT_WEBSERVER_HOST = "127.0.0.1"
DEFAULT_WEBSERVER_PORT = 3000
DEFAULT_DB_STATEMENT_TIMEOUT = 15000  # milliseconds
DEFAULT_POOL_RECYCLE = 3600  # seconds
DEFAULT_POOL_MAX_OVERFLOW = 20
WEBSERVER_LOGGER_NAME = "dagster-webserver"

# Version information (imported from version.py)
from dagster_webserver.version import __version__