0
# GraphQL Schema
1
2
Direct GraphQL schema operations provide full access to Dagster's GraphQL API surface through schema creation, query execution, and workspace management. This enables custom GraphQL operations beyond what the high-level client provides.
3
4
## Capabilities
5
6
### Schema Creation
7
8
Create and configure the complete Dagster GraphQL schema with queries, mutations, and subscriptions for direct GraphQL operations.
9
10
```python { .api }
11
def create_schema() -> graphene.Schema:
12
"""
13
Create complete GraphQL schema with queries, mutations, and subscriptions.
14
15
Returns:
16
- graphene.Schema: Configured GraphQL schema for Dagster operations
17
"""
18
```
19
20
Usage example:
21
22
```python
23
from dagster_graphql.schema import create_schema
24
import asyncio
25
26
schema = create_schema()
27
28
# Execute a GraphQL query directly
29
query = """
30
query GetRuns($limit: Int) {
31
runsOrError(limit: $limit) {
32
... on PipelineRuns {
33
results {
34
runId
35
status
36
pipelineName
37
creationTime
38
}
39
}
40
}
41
}
42
"""
43
44
result = asyncio.run(
45
schema.execute_async(query, variable_values={"limit": 10})
46
)
47
print(result.data)
48
```
49
50
### Query Execution
51
52
Execute GraphQL queries and mutations within Dagster workspace contexts with comprehensive error handling and result processing.
53
54
```python { .api }
55
def execute_query(
56
workspace_process_context: WorkspaceProcessContext,
57
query: str,
58
variables: Optional[Mapping[str, object]] = None,
59
):
60
"""
61
Execute GraphQL query against workspace context.
62
63
Parameters:
64
- workspace_process_context (WorkspaceProcessContext): Workspace context for execution
65
- query (str): GraphQL query string to execute
66
- variables (Optional[Mapping[str, object]]): Variables for GraphQL execution
67
68
Returns:
69
- dict: GraphQL execution result with data and potential errors
70
71
Note: Includes automatic error handling with stack traces for debugging
72
"""
73
```
74
75
Usage example:
76
77
```python
78
from dagster_graphql.cli import execute_query
79
from dagster._core.workspace.context import WorkspaceProcessContext
80
81
# Execute custom GraphQL query
82
query = """
83
mutation LaunchRun($executionParams: ExecutionParams!) {
84
launchPipelineExecution(executionParams: $executionParams) {
85
__typename
86
... on LaunchRunSuccess {
87
run {
88
runId
89
status
90
}
91
}
92
... on PipelineConfigValidationInvalid {
93
errors {
94
message
95
path
96
}
97
}
98
}
99
}
100
"""
101
102
variables = {
103
"executionParams": {
104
"selector": {
105
"repositoryLocationName": "my_location",
106
"repositoryName": "my_repo",
107
"pipelineName": "my_job"
108
},
109
"runConfigData": {},
110
"mode": "default"
111
}
112
}
113
114
result = execute_query(workspace_context, query, variables)
115
```
116
117
### CLI Query Execution
118
119
Execute GraphQL operations from command-line interfaces and automation scripts with file output support and remote server connectivity.
120
121
```python { .api }
122
def execute_query_from_cli(
123
workspace_process_context,
124
query,
125
variables=None,
126
output=None
127
):
128
"""
129
Execute GraphQL query from CLI with optional file output.
130
131
Parameters:
132
- workspace_process_context: Workspace context for execution
133
- query (str): GraphQL query string to execute
134
- variables (Optional[str]): JSON-encoded variables for execution
135
- output (Optional[str]): File path to store GraphQL response
136
137
Returns:
138
- str: JSON-encoded result string
139
"""
140
141
def execute_query_against_remote(host, query, variables):
142
"""
143
Execute GraphQL query against remote Dagster server.
144
145
Parameters:
146
- host (str): Host URL for remote Dagster server
147
- query (str): GraphQL query string to execute
148
- variables: Variables for GraphQL execution
149
150
Returns:
151
- dict: GraphQL execution result
152
153
Raises:
154
- click.UsageError: If host URL is invalid or server fails sanity check
155
"""
156
```
157
158
Usage example:
159
160
```python
161
# Execute query with file output
162
result = execute_query_from_cli(
163
workspace_context,
164
query,
165
variables='{"limit": 5}',
166
output="/tmp/query_result.json"
167
)
168
169
# Execute against remote server
170
remote_result = execute_query_against_remote(
171
"https://my-dagster-server.com",
172
query,
173
{"repositoryName": "analytics"}
174
)
175
```
176
177
### Predefined Query Operations
178
179
Access predefined GraphQL operations for common Dagster tasks including pipeline execution and run management.
180
181
```python { .api }
182
PREDEFINED_QUERIES = {
183
"launchPipelineExecution": LAUNCH_PIPELINE_EXECUTION_MUTATION,
184
}
185
```
186
187
Available predefined queries include:
188
189
- **LAUNCH_PIPELINE_EXECUTION_MUTATION**: Complete mutation for launching pipeline executions
190
- **SUBSCRIPTION_QUERY**: Real-time subscription for run events
191
- **RUN_EVENTS_QUERY**: Query for retrieving run events and logs
192
- **LAUNCH_MULTIPLE_RUNS_MUTATION**: Batch execution mutation
193
- **LAUNCH_PIPELINE_REEXECUTION_MUTATION**: Pipeline re-execution mutation
194
- **LAUNCH_PARTITION_BACKFILL_MUTATION**: Partition backfill operations
195
196
Usage example:
197
198
```python
199
from dagster_graphql.client.query import LAUNCH_PIPELINE_EXECUTION_MUTATION
200
from dagster_graphql.cli import execute_query
201
202
# Use predefined mutation
203
mutation = LAUNCH_PIPELINE_EXECUTION_MUTATION
204
variables = {
205
"executionParams": {
206
"selector": {
207
"repositoryLocationName": "my_location",
208
"repositoryName": "my_repo",
209
"pipelineName": "my_pipeline"
210
}
211
}
212
}
213
214
result = execute_query(workspace_context, mutation, variables)
215
```
216
217
## GraphQL Schema Components
218
219
The schema includes comprehensive type definitions for:
220
221
### Query Types
222
- Repository and job metadata queries
223
- Run status and history queries
224
- Asset and partition queries
225
- Schedule and sensor queries
226
- Configuration schema queries
227
228
### Mutation Types
229
- Job and pipeline execution mutations
230
- Run termination mutations
231
- Repository location management mutations
232
- Backfill and re-execution mutations
233
234
### Subscription Types
235
- Real-time run event subscriptions
236
- Log streaming subscriptions
237
- Asset materialization subscriptions
238
239
### Error Handling Types
240
- Comprehensive error type definitions
241
- Validation error structures
242
- Python error serialization with stack traces
243
244
Example comprehensive GraphQL operation:
245
246
```python
247
complex_query = """
248
query ComplexDagsterQuery($repoSelector: RepositorySelector!) {
249
repositoryOrError(repositorySelector: $repoSelector) {
250
... on Repository {
251
name
252
location {
253
name
254
}
255
pipelines {
256
name
257
modes {
258
name
259
resources {
260
name
261
configField {
262
configType {
263
key
264
}
265
}
266
}
267
}
268
solids {
269
name
270
inputs {
271
definition {
272
name
273
type {
274
displayName
275
}
276
}
277
}
278
outputs {
279
definition {
280
name
281
type {
282
displayName
283
}
284
}
285
}
286
}
287
}
288
}
289
... on RepositoryNotFoundError {
290
message
291
}
292
}
293
}
294
"""
295
296
variables = {
297
"repoSelector": {
298
"repositoryLocationName": "my_location",
299
"repositoryName": "my_repo"
300
}
301
}
302
303
result = execute_query(workspace_context, complex_query, variables)
304
```