or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdasset-reporting.mdcli.mddebug.mdgraphql.mdindex.md

graphql.mddocs/

0

# GraphQL Server

1

2

Complete GraphQL server implementation with HTTP and WebSocket support, providing the core API interface for the Dagster UI and external integrations. Built on Starlette/ASGI with support for queries, mutations, and real-time subscriptions.

3

4

## Capabilities

5

6

### DagsterWebserver Class

7

8

Main webserver implementation that extends the GraphQL server base class with Dagster-specific functionality.

9

10

```python { .api }

11

class DagsterWebserver(GraphQLServer[BaseWorkspaceRequestContext], Generic[T_IWorkspaceProcessContext]):

12

def __init__(

13

self,

14

process_context: T_IWorkspaceProcessContext,

15

app_path_prefix: str = "",

16

live_data_poll_rate: Optional[int] = None,

17

uses_app_path_prefix: bool = True

18

):

19

"""

20

Initialize the Dagster webserver.

21

22

Args:

23

process_context: Workspace process context with instance and code locations

24

app_path_prefix: URL path prefix for the application

25

live_data_poll_rate: Polling rate for live data updates in milliseconds

26

uses_app_path_prefix: Whether to use path prefix in routing

27

"""

28

29

def build_graphql_schema(self) -> Schema:

30

"""Build the GraphQL schema using dagster-graphql."""

31

32

def build_graphql_middleware(self) -> list:

33

"""Build GraphQL middleware stack."""

34

35

def build_middleware(self) -> list[Middleware]:

36

"""Build ASGI middleware stack with tracing support."""

37

38

def make_security_headers(self) -> dict:

39

"""Generate security headers for HTTP responses."""

40

41

def make_csp_header(self, nonce: str) -> str:

42

"""Create Content Security Policy header with nonce."""

43

44

def create_asgi_app(self, **kwargs) -> Starlette:

45

"""Create the ASGI application with all routes and middleware."""

46

```

47

48

**Usage Examples:**

49

50

```python

51

from dagster._core.workspace.context import WorkspaceProcessContext

52

from dagster_webserver.webserver import DagsterWebserver

53

import uvicorn

54

55

# Create webserver instance

56

with WorkspaceProcessContext(instance) as workspace_context:

57

webserver = DagsterWebserver(

58

workspace_context,

59

app_path_prefix="/dagster",

60

live_data_poll_rate=2000

61

)

62

63

# Get ASGI app

64

app = webserver.create_asgi_app()

65

66

# Run with uvicorn

67

uvicorn.run(app, host="127.0.0.1", port=3000)

68

69

# Custom middleware

70

webserver = DagsterWebserver(workspace_context)

71

app = webserver.create_asgi_app(

72

middleware=[custom_middleware],

73

debug=True

74

)

75

```

76

77

### GraphQL Server Base Class

78

79

Abstract base class providing GraphQL server functionality that can be extended for custom implementations.

80

81

```python { .api }

82

class GraphQLServer(ABC, Generic[TRequestContext]):

83

def __init__(self, app_path_prefix: str = ""):

84

"""

85

Initialize GraphQL server with optional path prefix.

86

87

Args:

88

app_path_prefix: URL path prefix for GraphQL endpoints

89

"""

90

91

@abstractmethod

92

def build_graphql_schema(self) -> Schema:

93

"""Build the GraphQL schema. Must be implemented by subclasses."""

94

95

@abstractmethod

96

def build_graphql_middleware(self) -> list:

97

"""Build GraphQL middleware. Must be implemented by subclasses."""

98

99

def build_graphql_validation_rules(self) -> Collection[type[ASTValidationRule]]:

100

"""Build GraphQL validation rules. Uses standard rules by default."""

101

102

@abstractmethod

103

def build_middleware(self) -> list[Middleware]:

104

"""Build ASGI middleware stack. Must be implemented by subclasses."""

105

106

@abstractmethod

107

def build_routes(self) -> list[BaseRoute]:

108

"""Build application routes. Must be implemented by subclasses."""

109

110

@abstractmethod

111

def _make_request_context(self, conn: HTTPConnection) -> TRequestContext:

112

"""Create request context from HTTP connection. Must be implemented by subclasses."""

113

114

def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:

115

"""Context manager for request context lifecycle."""

116

117

async def graphql_http_endpoint(self, request: Request):

118

"""HTTP endpoint handler for GraphQL queries and mutations."""

119

120

async def graphql_ws_endpoint(self, websocket: WebSocket):

121

"""WebSocket endpoint handler for GraphQL subscriptions."""

122

123

async def execute_graphql_request(

124

self,

125

request: Request,

126

query: str,

127

variables: Optional[dict[str, Any]],

128

operation_name: Optional[str]

129

) -> JSONResponse:

130

"""Execute a GraphQL request and return JSON response."""

131

132

def create_asgi_app(self, **kwargs) -> Starlette:

133

"""Create ASGI application with routes and middleware."""

134

```

135

136

### HTTP Endpoints

137

138

The webserver provides comprehensive HTTP endpoints beyond GraphQL:

139

140

```python { .api }

141

# GraphQL endpoints

142

async def graphql_http_endpoint(self, request: Request):

143

"""Handle GET/POST requests to /graphql with GraphiQL support."""

144

145

# Information endpoints

146

async def webserver_info_endpoint(self, request: Request):

147

"""Return webserver version and configuration info at /server_info."""

148

149

async def dagit_info_endpoint(self, request: Request):

150

"""Legacy info endpoint at /dagit_info (deprecated but maintained for compatibility)."""

151

152

# File download endpoints

153

async def download_debug_file_endpoint(self, request: Request):

154

"""Download debug export files at /download_debug/{run_id}."""

155

156

async def download_notebook(self, request: Request):

157

"""Download/view Jupyter notebooks at /notebook."""

158

159

async def download_captured_logs_endpoint(self, request: Request):

160

"""Download compute logs at /logs/{path}."""

161

162

# Asset reporting endpoints

163

async def report_asset_materialization_endpoint(self, request: Request):

164

"""Handle asset materialization reports at /report_asset_materialization/{asset_key}."""

165

166

async def report_asset_check_endpoint(self, request: Request):

167

"""Handle asset check reports at /report_asset_check/{asset_key}."""

168

169

async def report_asset_observation_endpoint(self, request: Request):

170

"""Handle asset observation reports at /report_asset_observation/{asset_key}."""

171

172

# UI endpoints

173

async def index_html_endpoint(self, request: Request):

174

"""Serve main UI HTML with CSP headers."""

175

```

176

177

### WebSocket Support

178

179

Full WebSocket support for GraphQL subscriptions using the `graphql-ws` protocol:

180

181

```python { .api }

182

class GraphQLWS(str, Enum):

183

"""GraphQL WebSocket protocol constants."""

184

PROTOCOL = "graphql-ws"

185

CONNECTION_INIT = "connection_init"

186

CONNECTION_ACK = "connection_ack"

187

CONNECTION_ERROR = "connection_error"

188

CONNECTION_TERMINATE = "connection_terminate"

189

CONNECTION_KEEP_ALIVE = "ka"

190

START = "start"

191

DATA = "data"

192

ERROR = "error"

193

COMPLETE = "complete"

194

STOP = "stop"

195

196

async def execute_graphql_subscription(

197

self,

198

websocket: WebSocket,

199

operation_id: str,

200

query: str,

201

variables: Optional[dict[str, Any]],

202

operation_name: Optional[str]

203

) -> tuple[Optional[Task], Optional[GraphQLFormattedError]]:

204

"""Execute GraphQL subscription and return async task."""

205

```

206

207

### Middleware Support

208

209

Built-in middleware for request tracing and custom middleware integration:

210

211

```python { .api }

212

class DagsterTracedCounterMiddleware:

213

"""ASGI middleware for counting traced Dagster operations."""

214

def __init__(self, app): ...

215

async def __call__(self, scope, receive, send): ...

216

217

class TracingMiddleware:

218

"""ASGI middleware for request tracing and observability."""

219

def __init__(self, app): ...

220

async def __call__(self, scope, receive, send): ...

221

```

222

223

**Custom middleware integration:**

224

225

```python

226

from starlette.middleware.base import BaseHTTPMiddleware

227

from starlette.middleware import Middleware

228

229

class CustomAuthMiddleware(BaseHTTPMiddleware):

230

async def dispatch(self, request, call_next):

231

# Add authentication logic

232

auth_header = request.headers.get("Authorization")

233

if not auth_header:

234

return JSONResponse({"error": "Unauthorized"}, status_code=401)

235

return await call_next(request)

236

237

# Usage with webserver

238

webserver = DagsterWebserver(workspace_context)

239

app = webserver.create_asgi_app(

240

middleware=[Middleware(CustomAuthMiddleware)]

241

)

242

```

243

244

### Security Features

245

246

Built-in security headers and Content Security Policy support:

247

248

```python { .api }

249

def make_security_headers(self) -> dict:

250

"""

251

Generate security headers for HTTP responses.

252

253

Returns:

254

dict: Security headers including Cache-Control, Feature-Policy, etc.

255

"""

256

257

def make_csp_header(self, nonce: str) -> str:

258

"""

259

Create Content Security Policy header with nonce.

260

261

Args:

262

nonce: Cryptographic nonce for inline scripts

263

264

Returns:

265

str: CSP header value

266

"""

267

```

268

269

**Security headers include:**

270

- `Cache-Control: no-store`

271

- `Feature-Policy: microphone 'none'; camera 'none'`

272

- `Referrer-Policy: strict-origin-when-cross-origin`

273

- `X-Content-Type-Options: nosniff`

274

- Content Security Policy with nonce support

275

276

### Error Handling

277

278

Comprehensive error handling for GraphQL operations:

279

280

```python { .api }

281

def handle_graphql_errors(self, errors: Sequence[GraphQLError]):

282

"""

283

Process GraphQL errors and add serializable error info.

284

285

Args:

286

errors: List of GraphQL errors

287

288

Returns:

289

list: Formatted error objects with debugging information

290

"""

291

292

def _determine_status_code(

293

self,

294

resolver_errors: Optional[list[GraphQLError]],

295

captured_errors: list[Exception]

296

) -> int:

297

"""

298

Determine appropriate HTTP status code based on error types.

299

300

Returns:

301

int: HTTP status code (200, 400, or 500)

302

"""

303

```

304

305

### Static File Serving

306

307

Built-in static file serving for the Dagster UI:

308

309

```python { .api }

310

def build_static_routes(self) -> list[Route]:

311

"""Build routes for static file serving including UI assets."""

312

313

def build_routes(self) -> list[BaseRoute]:

314

"""Build all application routes including GraphQL, API, and static files."""

315

```

316

317

### Request Context Management

318

319

Sophisticated request context management for workspace integration:

320

321

```python { .api }

322

def _make_request_context(self, conn: HTTPConnection) -> BaseWorkspaceRequestContext:

323

"""Create request context from HTTP connection with workspace access."""

324

325

@contextmanager

326

def request_context(self, conn: HTTPConnection) -> Iterator[TRequestContext]:

327

"""Context manager ensuring proper request context lifecycle."""

328

```

329

330

## GraphQL Schema Integration

331

332

The webserver integrates with `dagster-graphql` for schema generation:

333

334

```python

335

from dagster_graphql.schema import create_schema

336

337

# Schema is automatically created

338

schema = webserver.build_graphql_schema()

339

340

# Custom schema modifications (advanced usage)

341

class CustomDagsterWebserver(DagsterWebserver):

342

def build_graphql_schema(self) -> Schema:

343

schema = create_schema()

344

# Add custom types or resolvers

345

return schema

346

```

347

348

## Deployment Considerations

349

350

### Production Configuration

351

352

```python

353

# Production webserver setup

354

webserver = DagsterWebserver(

355

workspace_context,

356

app_path_prefix="/dagster",

357

live_data_poll_rate=5000, # Reduce polling frequency

358

uses_app_path_prefix=True

359

)

360

361

app = webserver.create_asgi_app()

362

363

# Deploy with production ASGI server

364

# gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app

365

```

366

367

### Development Configuration

368

369

```python

370

# Development webserver setup

371

webserver = DagsterWebserver(

372

workspace_context,

373

live_data_poll_rate=1000, # Fast polling for development

374

)

375

376

app = webserver.create_asgi_app(debug=True)

377

```

378

379

### Load Balancer Integration

380

381

```python

382

# Health check support

383

class HealthCheckWebserver(DagsterWebserver):

384

def build_routes(self) -> list[BaseRoute]:

385

routes = super().build_routes()

386

routes.append(Route("/health", self.health_check, methods=["GET"]))

387

return routes

388

389

async def health_check(self, request: Request):

390

return JSONResponse({"status": "healthy", "version": __version__})

391

```