Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.
npx @tessl/cli install tessl/pypi-dagster-webserver@1.11.0A web server implementation for the Dagster UI that provides a browser-based interface for data orchestration and pipeline management. The webserver serves as the primary interface between users and Dagster instances, offering GraphQL APIs, asset reporting capabilities, and comprehensive web UI functionality.
pip install dagster-webserverfrom dagster_webserver.app import create_app_from_workspace_process_context
from dagster_webserver.cli import host_dagster_ui_with_workspace_process_context, create_dagster_webserver_cli
from dagster_webserver.webserver import DagsterWebserver
from dagster_webserver.version import __version__# Command line usage
dagster-webserver --host 0.0.0.0 --port 3000 --workspace-file workspace.yamlfrom dagster import DagsterInstance
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver.app import create_app_from_workspace_process_context
import uvicorn
# Create workspace context
instance = DagsterInstance.get()
with WorkspaceProcessContext(instance) as workspace_context:
# Create ASGI app
app = create_app_from_workspace_process_context(
workspace_context,
path_prefix="",
live_data_poll_rate=2000
)
# Run with uvicorn
uvicorn.run(app, host="127.0.0.1", port=3000)The dagster-webserver is built on a layered architecture:
This design enables flexible deployment scenarios, from simple CLI usage to complex programmatic integration with custom middleware and routing.
Primary interface for starting and configuring the dagster webserver, including production deployments, development environments, and debugging scenarios.
def main(): ...
def host_dagster_ui_with_workspace_process_context(
workspace_process_context: IWorkspaceProcessContext,
host: Optional[str] = None,
port: Optional[int] = None,
path_prefix: str = "",
log_level: str = "warning",
live_data_poll_rate: Optional[int] = None
): ...
def create_dagster_webserver_cli() -> click.Command: ...Core functionality for creating ASGI applications from workspace contexts, enabling programmatic webserver integration and custom deployment scenarios.
def create_app_from_workspace_process_context(
workspace_process_context: IWorkspaceProcessContext,
path_prefix: str = "",
live_data_poll_rate: Optional[int] = None,
**kwargs
) -> Starlette: ...Complete GraphQL server implementation with HTTP and WebSocket support, providing the core API interface for the Dagster UI and external integrations.
class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext]): ...
class GraphQLServer(ABC, Generic[TRequestContext]): ...HTTP endpoints for external systems to report asset events directly to Dagster instances, enabling integration with external data pipelines and monitoring systems.
async def handle_report_asset_materialization_request(
context: BaseWorkspaceRequestContext,
request: Request
) -> JSONResponse: ...
async def handle_report_asset_check_request(
context: BaseWorkspaceRequestContext,
request: Request
) -> JSONResponse: ...
async def handle_report_asset_observation_request(
context: BaseWorkspaceRequestContext,
request: Request
) -> JSONResponse: ...Specialized debugging functionality for loading debug export files and running webserver instances with ephemeral data for troubleshooting and development.
class WebserverDebugWorkspaceProcessContext(IWorkspaceProcessContext): ...
def webserver_debug_command(input_files, port): ...# Core webserver class
class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext], Generic[T_IWorkspaceProcessContext]):
def __init__(
self,
process_context: T_IWorkspaceProcessContext,
app_path_prefix: str = "",
live_data_poll_rate: Optional[int] = None,
uses_app_path_prefix: bool = True
): ...
# Abstract GraphQL server base
class GraphQLServer(ABC, Generic[TRequestContext]):
def __init__(self, app_path_prefix: str = ""): ...
# Debug workspace context
class WebserverDebugWorkspaceProcessContext(IWorkspaceProcessContext):
def __init__(self, instance: DagsterInstance): ...
# GraphQL WebSocket protocol constants
class GraphQLWS(str, Enum):
PROTOCOL = "graphql-ws"
CONNECTION_INIT = "connection_init"
CONNECTION_ACK = "connection_ack"
CONNECTION_ERROR = "connection_error"
CONNECTION_TERMINATE = "connection_terminate"
CONNECTION_KEEP_ALIVE = "ka"
START = "start"
DATA = "data"
ERROR = "error"
COMPLETE = "complete"
STOP = "stop"
# Asset reporting parameter classes
class ReportAssetMatParam:
asset_key = "asset_key"
data_version = "data_version"
metadata = "metadata"
description = "description"
partition = "partition"
class ReportAssetCheckEvalParam:
asset_key = "asset_key"
check_name = "check_name"
metadata = "metadata"
severity = "severity"
passed = "passed"
class ReportAssetObsParam:
asset_key = "asset_key"
data_version = "data_version"
metadata = "metadata"
description = "description"
partition = "partition"# Default configuration values
DEFAULT_WEBSERVER_HOST = "127.0.0.1"
DEFAULT_WEBSERVER_PORT = 3000
DEFAULT_DB_STATEMENT_TIMEOUT = 15000 # milliseconds
DEFAULT_POOL_RECYCLE = 3600 # seconds
DEFAULT_POOL_MAX_OVERFLOW = 20
WEBSERVER_LOGGER_NAME = "dagster-webserver"
# Version information (imported from version.py)
from dagster_webserver.version import __version__