0
# GraphQL Server
1
2
Complete GraphQL server implementation with HTTP and WebSocket support, providing the core API interface for the Dagster UI and external integrations. Built on Starlette/ASGI with support for queries, mutations, and real-time subscriptions.
3
4
## Capabilities
5
6
### DagsterWebserver Class
7
8
Main webserver implementation that extends the GraphQL server base class with Dagster-specific functionality.
9
10
```python { .api }
11
class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext], Generic[T_IWorkspaceProcessContext]):
12
def __init__(
13
self,
14
process_context: T_IWorkspaceProcessContext,
15
app_path_prefix: str = "",
16
live_data_poll_rate: Optional[int] = None,
17
uses_app_path_prefix: bool = True
18
):
19
"""
20
Initialize the Dagster webserver.
21
22
Args:
23
process_context: Workspace process context with instance and code locations
24
app_path_prefix: URL path prefix for the application
25
live_data_poll_rate: Polling rate for live data updates in milliseconds
26
uses_app_path_prefix: Whether to use path prefix in routing
27
"""
28
29
def build_graphql_schema(self) -> Schema:
30
"""Build the GraphQL schema using dagster-graphql."""
31
32
def build_graphql_middleware(self) -> list:
33
"""Build GraphQL middleware stack."""
34
35
def build_middleware(self) -> list[Middleware]:
36
"""Build ASGI middleware stack with tracing support."""
37
38
def make_security_headers(self) -> dict:
39
"""Generate security headers for HTTP responses."""
40
41
def make_csp_header(self, nonce: str) -> str:
42
"""Create Content Security Policy header with nonce."""
43
44
def create_asgi_app(self, **kwargs) -> Starlette:
45
"""Create the ASGI application with all routes and middleware."""
46
```
47
48
**Usage Examples:**
49
50
```python
51
from dagster._core.workspace.context import WorkspaceProcessContext
52
from dagster_webserver.webserver import DagsterWebserver
53
import uvicorn
54
55
# Create webserver instance
56
with WorkspaceProcessContext(instance) as workspace_context:
57
webserver = DagsterWebserver(
58
workspace_context,
59
app_path_prefix="/dagster",
60
live_data_poll_rate=2000
61
)
62
63
# Get ASGI app
64
app = webserver.create_asgi_app()
65
66
# Run with uvicorn
67
uvicorn.run(app, host="127.0.0.1", port=3000)
68
69
# Custom middleware
70
webserver = DagsterWebserver(workspace_context)
71
app = webserver.create_asgi_app(
72
middleware=[custom_middleware],
73
debug=True
74
)
75
```
76
77
### GraphQL Server Base Class
78
79
Abstract base class providing GraphQL server functionality that can be extended for custom implementations.
80
81
```python { .api }
82
class GraphQLServer(ABC, Generic[TRequestContext]):
83
def __init__(self, app_path_prefix: str = ""):
84
"""
85
Initialize GraphQL server with optional path prefix.
86
87
Args:
88
app_path_prefix: URL path prefix for GraphQL endpoints
89
"""
90
91
@abstractmethod
92
def build_graphql_schema(self) -> Schema:
93
"""Build the GraphQL schema. Must be implemented by subclasses."""
94
95
@abstractmethod
96
def build_graphql_middleware(self) -> list:
97
"""Build GraphQL middleware. Must be implemented by subclasses."""
98
99
def build_graphql_validation_rules(self) -> Collection[type[ASTValidationRule]]:
100
"""Build GraphQL validation rules. Uses standard rules by default."""
101
102
@abstractmethod
103
def build_middleware(self) -> list[Middleware]:
104
"""Build ASGI middleware stack. Must be implemented by subclasses."""
105
106
@abstractmethod
107
def build_routes(self) -> list[BaseRoute]:
108
"""Build application routes. Must be implemented by subclasses."""
109
110
@abstractmethod
111
def _make_request_context(self, conn: HTTPConnection) -> TRequestContext:
112
"""Create request context from HTTP connection. Must be implemented by subclasses."""
113
114
def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:
115
"""Context manager for request context lifecycle."""
116
117
async def graphql_http_endpoint(self, request: Request):
118
"""HTTP endpoint handler for GraphQL queries and mutations."""
119
120
async def graphql_ws_endpoint(self, websocket: WebSocket):
121
"""WebSocket endpoint handler for GraphQL subscriptions."""
122
123
async def execute_graphql_request(
124
self,
125
request: Request,
126
query: str,
127
variables: Optional[dict[str, Any]],
128
operation_name: Optional[str]
129
) -> JSONResponse:
130
"""Execute a GraphQL request and return JSON response."""
131
132
def create_asgi_app(self, **kwargs) -> Starlette:
133
"""Create ASGI application with routes and middleware."""
134
```
135
136
### HTTP Endpoints
137
138
The webserver provides comprehensive HTTP endpoints beyond GraphQL:
139
140
```python { .api }
141
# GraphQL endpoints
142
async def graphql_http_endpoint(self, request: Request):
143
"""Handle GET/POST requests to /graphql with GraphiQL support."""
144
145
# Information endpoints
146
async def webserver_info_endpoint(self, request: Request):
147
"""Return webserver version and configuration info at /server_info."""
148
149
async def dagit_info_endpoint(self, request: Request):
150
"""Legacy info endpoint at /dagit_info (deprecated but maintained for compatibility)."""
151
152
# File download endpoints
153
async def download_debug_file_endpoint(self, request: Request):
154
"""Download debug export files at /download_debug/{run_id}."""
155
156
async def download_notebook(self, request: Request):
157
"""Download/view Jupyter notebooks at /notebook."""
158
159
async def download_captured_logs_endpoint(self, request: Request):
160
"""Download compute logs at /logs/{path}."""
161
162
# Asset reporting endpoints
163
async def report_asset_materialization_endpoint(self, request: Request):
164
"""Handle asset materialization reports at /report_asset_materialization/{asset_key}."""
165
166
async def report_asset_check_endpoint(self, request: Request):
167
"""Handle asset check reports at /report_asset_check/{asset_key}."""
168
169
async def report_asset_observation_endpoint(self, request: Request):
170
"""Handle asset observation reports at /report_asset_observation/{asset_key}."""
171
172
# UI endpoints
173
async def index_html_endpoint(self, request: Request):
174
"""Serve main UI HTML with CSP headers."""
175
```
176
177
### WebSocket Support
178
179
Full WebSocket support for GraphQL subscriptions using the `graphql-ws` protocol:
180
181
```python { .api }
182
class GraphQLWS(str, Enum):
183
"""GraphQL WebSocket protocol constants."""
184
PROTOCOL = "graphql-ws"
185
CONNECTION_INIT = "connection_init"
186
CONNECTION_ACK = "connection_ack"
187
CONNECTION_ERROR = "connection_error"
188
CONNECTION_TERMINATE = "connection_terminate"
189
CONNECTION_KEEP_ALIVE = "ka"
190
START = "start"
191
DATA = "data"
192
ERROR = "error"
193
COMPLETE = "complete"
194
STOP = "stop"
195
196
async def execute_graphql_subscription(
197
self,
198
websocket: WebSocket,
199
operation_id: str,
200
query: str,
201
variables: Optional[dict[str, Any]],
202
operation_name: Optional[str]
203
) -> tuple[Optional[Task], Optional[GraphQLFormattedError]]:
204
"""Execute GraphQL subscription and return async task."""
205
```
206
207
### Middleware Support
208
209
Built-in middleware for request tracing and custom middleware integration:
210
211
```python { .api }
212
class DagsterTracedCounterMiddleware:
213
"""ASGI middleware for counting traced Dagster operations."""
214
def __init__(self, app): ...
215
async def __call__(self, scope, receive, send): ...
216
217
class TracingMiddleware:
218
"""ASGI middleware for request tracing and observability."""
219
def __init__(self, app): ...
220
async def __call__(self, scope, receive, send): ...
221
```
222
223
**Custom middleware integration:**
224
225
```python
226
from starlette.middleware.base import BaseHTTPMiddleware
227
from starlette.middleware import Middleware
228
229
class CustomAuthMiddleware(BaseHTTPMiddleware):
230
async def dispatch(self, request, call_next):
231
# Add authentication logic
232
auth_header = request.headers.get("Authorization")
233
if not auth_header:
234
return JSONResponse({"error": "Unauthorized"}, status_code=401)
235
return await call_next(request)
236
237
# Usage with webserver
238
webserver = DagsterWebserver(workspace_context)
239
app = webserver.create_asgi_app(
240
middleware=[Middleware(CustomAuthMiddleware)]
241
)
242
```
243
244
### Security Features
245
246
Built-in security headers and Content Security Policy support:
247
248
```python { .api }
249
def make_security_headers(self) -> dict:
250
"""
251
Generate security headers for HTTP responses.
252
253
Returns:
254
dict: Security headers including Cache-Control, Feature-Policy, etc.
255
"""
256
257
def make_csp_header(self, nonce: str) -> str:
258
"""
259
Create Content Security Policy header with nonce.
260
261
Args:
262
nonce: Cryptographic nonce for inline scripts
263
264
Returns:
265
str: CSP header value
266
"""
267
```
268
269
**Security headers include:**
270
- `Cache-Control: no-store`
271
- `Feature-Policy: microphone 'none'; camera 'none'`
272
- `Referrer-Policy: strict-origin-when-cross-origin`
273
- `X-Content-Type-Options: nosniff`
274
- Content Security Policy with nonce support
275
276
### Error Handling
277
278
Comprehensive error handling for GraphQL operations:
279
280
```python { .api }
281
def handle_graphql_errors(self, errors: Sequence[GraphQLError]):
282
"""
283
Process GraphQL errors and add serializable error info.
284
285
Args:
286
errors: List of GraphQL errors
287
288
Returns:
289
list: Formatted error objects with debugging information
290
"""
291
292
def _determine_status_code(
293
self,
294
resolver_errors: Optional[list[GraphQLError]],
295
captured_errors: list[Exception]
296
) -> int:
297
"""
298
Determine appropriate HTTP status code based on error types.
299
300
Returns:
301
int: HTTP status code (200, 400, or 500)
302
"""
303
```
304
305
### Static File Serving
306
307
Built-in static file serving for the Dagster UI:
308
309
```python { .api }
310
def build_static_routes(self) -> list[Route]:
311
"""Build routes for static file serving including UI assets."""
312
313
def build_routes(self) -> list[BaseRoute]:
314
"""Build all application routes including GraphQL, API, and static files."""
315
```
316
317
### Request Context Management
318
319
Sophisticated request context management for workspace integration:
320
321
```python { .api }
322
def _make_request_context(self, conn: HTTPConnection) -> BaseWorkspaceRequestContext:
323
"""Create request context from HTTP connection with workspace access."""
324
325
@contextmanager
326
def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:
327
"""Context manager ensuring proper request context lifecycle."""
328
```
329
330
## GraphQL Schema Integration
331
332
The webserver integrates with `dagster-graphql` for schema generation:
333
334
```python
335
from dagster_graphql.schema import create_schema
336
337
# Schema is automatically created
338
schema = webserver.build_graphql_schema()
339
340
# Custom schema modifications (advanced usage)
341
class CustomDagsterWebserver(DagsterWebserver):
342
def build_graphql_schema(self) -> Schema:
343
schema = create_schema()
344
# Add custom types or resolvers
345
return schema
346
```
347
348
## Deployment Considerations
349
350
### Production Configuration
351
352
```python
353
# Production webserver setup
354
webserver = DagsterWebserver(
355
workspace_context,
356
app_path_prefix="/dagster",
357
live_data_poll_rate=5000, # Reduce polling frequency
358
uses_app_path_prefix=True
359
)
360
361
app = webserver.create_asgi_app()
362
363
# Deploy with production ASGI server
364
# gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app
365
```
366
367
### Development Configuration
368
369
```python
370
# Development webserver setup
371
webserver = DagsterWebserver(
372
workspace_context,
373
live_data_poll_rate=1000, # Fast polling for development
374
)
375
376
app = webserver.create_asgi_app(debug=True)
377
```
378
379
### Load Balancer Integration
380
381
```python
382
# Health check support
383
class HealthCheckWebserver(DagsterWebserver):
384
def build_routes(self) -> list[BaseRoute]:
385
routes = super().build_routes()
386
routes.append(Route("/health", self.health_check, methods=["GET"]))
387
return routes
388
389
async def health_check(self, request: Request):
390
return JSONResponse({"status": "healthy", "version": __version__})
391
```