0
# Test Utilities
1
2
Comprehensive testing framework for GraphQL-based testing with workspace management, query execution, and result processing utilities. These utilities enable robust testing of Dagster GraphQL operations in various contexts.
3
4
## Capabilities
5
6
### GraphQL Query Execution
7
8
Execute GraphQL queries in test contexts with comprehensive error handling and result processing.
9
10
```python { .api }
11
def execute_dagster_graphql(
12
context: WorkspaceRequestContext,
13
query: str,
14
variables: Optional[GqlVariables] = None,
15
schema: graphene.Schema = SCHEMA,
16
) -> GqlResult:
17
"""
18
Execute GraphQL query in test context with error handling.
19
20
Parameters:
21
- context (WorkspaceRequestContext): Workspace context for execution
22
- query (str): GraphQL query string to execute
23
- variables (Optional[GqlVariables]): Variables for GraphQL execution
24
- schema (graphene.Schema): GraphQL schema to use (defaults to SCHEMA)
25
26
Returns:
27
- GqlResult: Result object with data and errors properties
28
29
Note: Automatically clears loaders between requests and handles GraphQL errors
30
by raising the original exception for easier debugging.
31
"""
32
33
def async_execute_dagster_graphql(
34
context: WorkspaceRequestContext,
35
query: str,
36
variables: Optional[GqlVariables] = None,
37
schema: graphene.Schema = SCHEMA,
38
) -> GqlResult:
39
"""
40
Asynchronous version of GraphQL query execution for async tests.
41
42
Parameters: Same as execute_dagster_graphql
43
Returns: Same as execute_dagster_graphql
44
"""
45
```
46
47
Usage example:
48
49
```python
50
from dagster_graphql.test.utils import execute_dagster_graphql
51
52
def test_job_execution(graphql_context):
53
query = """
54
mutation LaunchJob($selector: PipelineSelector!) {
55
launchPipelineExecution(executionParams: {selector: $selector}) {
56
__typename
57
... on LaunchRunSuccess {
58
run {
59
runId
60
status
61
}
62
}
63
}
64
}
65
"""
66
67
variables = {
68
"selector": {
69
"repositoryLocationName": "test_location",
70
"repositoryName": "test_repo",
71
"pipelineName": "my_job"
72
}
73
}
74
75
result = execute_dagster_graphql(graphql_context, query, variables)
76
assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"
77
run_id = result.data["launchPipelineExecution"]["run"]["runId"]
78
assert run_id is not None
79
```
80
81
### Subscription Testing
82
83
Execute GraphQL subscriptions for testing real-time event processing and streaming operations.
84
85
```python { .api }
86
def execute_dagster_graphql_subscription(
87
context: WorkspaceRequestContext,
88
query: str,
89
variables: Optional[GqlVariables] = None,
90
schema: graphene.Schema = SCHEMA,
91
) -> Sequence[GqlResult]:
92
"""
93
Execute GraphQL subscription query and return results.
94
95
Parameters:
96
- context (WorkspaceRequestContext): Workspace context for execution
97
- query (str): GraphQL subscription query string
98
- variables (Optional[GqlVariables]): Variables for subscription
99
- schema (graphene.Schema): GraphQL schema to use
100
101
Returns:
102
- Sequence[GqlResult]: Sequence of subscription results
103
104
Note: Returns first payload from subscription for testing purposes.
105
"""
106
```
107
108
Usage example:
109
110
```python
111
def test_run_events_subscription(graphql_context):
112
subscription = """
113
subscription RunEvents($runId: ID!) {
114
pipelineRunLogs(runId: $runId) {
115
__typename
116
... on PipelineRunLogsSubscriptionSuccess {
117
messages {
118
__typename
119
... on RunStartEvent {
120
runId
121
}
122
}
123
}
124
}
125
}
126
"""
127
128
results = execute_dagster_graphql_subscription(
129
graphql_context,
130
subscription,
131
{"runId": "test-run-id"}
132
)
133
134
assert len(results) > 0
135
assert results[0].data is not None
136
```
137
138
### Test Context Management
139
140
Create and manage workspace contexts for testing with various configuration options and repository setups.
141
142
```python { .api }
143
def define_out_of_process_context(
144
python_or_workspace_file: str,
145
fn_name: Optional[str],
146
instance: DagsterInstance,
147
read_only: bool = False,
148
read_only_locations: Optional[Mapping[str, bool]] = None,
149
) -> Iterator[WorkspaceRequestContext]:
150
"""
151
Create out-of-process workspace context for testing.
152
153
Parameters:
154
- python_or_workspace_file (str): Python file or workspace YAML file path
155
- fn_name (Optional[str]): Function name for repository definition
156
- instance (DagsterInstance): Dagster instance for testing
157
- read_only (bool): Whether workspace should be read-only
158
- read_only_locations (Optional[Mapping[str, bool]]): Per-location read-only settings
159
160
Yields:
161
- WorkspaceRequestContext: Configured workspace context for testing
162
"""
163
164
def temp_workspace_file(python_fns: list[tuple[str, str, Optional[str]]]) -> Iterator[str]:
165
"""
166
Create temporary workspace configuration file for testing.
167
168
Parameters:
169
- python_fns (list[tuple[str, str, Optional[str]]]): List of tuples containing
170
(location_name, python_file_path, function_name)
171
172
Yields:
173
- str: Path to temporary workspace.yaml file
174
"""
175
```
176
177
Usage example:
178
179
```python
180
from dagster import DagsterInstance
181
from dagster_graphql.test.utils import define_out_of_process_context, temp_workspace_file
182
183
def test_multi_location_workspace():
184
with temp_workspace_file([
185
("location_a", "repos/repo_a.py", "get_repo_a"),
186
("location_b", "repos/repo_b.py", "get_repo_b")
187
]) as workspace_file:
188
with DagsterInstance.ephemeral() as instance:
189
with define_out_of_process_context(
190
workspace_file, None, instance
191
) as context:
192
# Test multi-location operations
193
locations = context.code_locations
194
assert len(locations) == 2
195
assert any(loc.name == "location_a" for loc in locations)
196
assert any(loc.name == "location_b" for loc in locations)
197
```
198
199
### Test Helper Functions
200
201
Utility functions for inferring repository information and creating selectors for GraphQL operations.
202
203
```python { .api }
204
def infer_repository(graphql_context: WorkspaceRequestContext) -> RemoteRepository:
205
"""
206
Automatically infer repository from GraphQL context.
207
208
Parameters:
209
- graphql_context (WorkspaceRequestContext): Workspace context
210
211
Returns:
212
- RemoteRepository: Inferred repository object
213
"""
214
215
def infer_repository_selector(
216
graphql_context: WorkspaceRequestContext,
217
location_name: Optional[str] = None
218
) -> Selector:
219
"""
220
Create repository selector dictionary from context.
221
222
Parameters:
223
- graphql_context (WorkspaceRequestContext): Workspace context
224
- location_name (Optional[str]): Specific location name to use
225
226
Returns:
227
- Selector: Dictionary with repositoryLocationName and repositoryName
228
"""
229
230
def infer_job_selector(
231
graphql_context: WorkspaceRequestContext,
232
job_name: str,
233
op_selection: Optional[Sequence[str]] = None,
234
asset_selection: Optional[Sequence[GqlAssetKey]] = None,
235
asset_check_selection: Optional[Sequence[GqlAssetCheckHandle]] = None,
236
location_name: Optional[str] = None,
237
) -> Selector:
238
"""
239
Create job selector dictionary with optional op/asset selection.
240
241
Parameters:
242
- graphql_context (WorkspaceRequestContext): Workspace context
243
- job_name (str): Name of the job
244
- op_selection (Optional[Sequence[str]]): Specific ops to select
245
- asset_selection (Optional[Sequence[GqlAssetKey]]): Specific assets to select
246
- asset_check_selection (Optional[Sequence[GqlAssetCheckHandle]]): Asset checks to select
247
- location_name (Optional[str]): Specific location name
248
249
Returns:
250
- Selector: Dictionary with job selection parameters
251
"""
252
```
253
254
Usage example:
255
256
```python
257
def test_job_with_op_selection(graphql_context):
258
# Automatically infer repository and create job selector
259
job_selector = infer_job_selector(
260
graphql_context,
261
"complex_pipeline",
262
op_selection=["extract_data", "transform_data"]
263
)
264
265
query = """
266
query JobDetails($selector: PipelineSelector!) {
267
pipelineOrError(params: $selector) {
268
... on Pipeline {
269
name
270
solids {
271
name
272
}
273
}
274
}
275
}
276
"""
277
278
result = execute_dagster_graphql(
279
graphql_context,
280
query,
281
{"selector": job_selector}
282
)
283
284
assert result.data["pipelineOrError"]["name"] == "complex_pipeline"
285
```
286
287
### Asset Testing Utilities
288
289
Specialized utilities for testing asset materialization and asset-based workflows.
290
291
```python { .api }
292
def materialize_assets(
293
context: WorkspaceRequestContext,
294
asset_selection: Optional[Sequence[AssetKey]] = None,
295
partition_keys: Optional[Sequence[str]] = None,
296
run_config_data: Optional[Mapping[str, Any]] = None,
297
location_name: Optional[str] = None,
298
) -> Union[GqlResult, Sequence[GqlResult]]:
299
"""
300
Execute asset materialization through GraphQL API.
301
302
Parameters:
303
- context (WorkspaceRequestContext): Workspace context
304
- asset_selection (Optional[Sequence[AssetKey]]): Specific assets to materialize
305
- partition_keys (Optional[Sequence[str]]): Partition keys for partitioned assets
306
- run_config_data (Optional[Mapping[str, Any]]): Run configuration
307
- location_name (Optional[str]): Specific location name
308
309
Returns:
310
- Union[GqlResult, Sequence[GqlResult]]: Single result or sequence for partitioned runs
311
"""
312
```
313
314
Usage example:
315
316
```python
317
from dagster import AssetKey
318
from dagster_graphql.test.utils import materialize_assets
319
320
def test_asset_materialization(graphql_context):
321
# Materialize specific assets
322
result = materialize_assets(
323
graphql_context,
324
asset_selection=[AssetKey("users"), AssetKey("orders")],
325
run_config_data={
326
"ops": {
327
"extract_users": {
328
"config": {"source": "production_db"}
329
}
330
}
331
}
332
)
333
334
assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"
335
336
# Test partitioned asset materialization
337
partitioned_results = materialize_assets(
338
graphql_context,
339
asset_selection=[AssetKey("daily_metrics")],
340
partition_keys=["2023-10-01", "2023-10-02", "2023-10-03"]
341
)
342
343
assert len(partitioned_results) == 3
344
for result in partitioned_results:
345
assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"
346
```
347
348
### Advanced Testing Patterns
349
350
Complete testing patterns for complex GraphQL operations:
351
352
```python
353
def test_complete_workflow(graphql_context):
354
"""Test complete workflow from job execution to completion."""
355
356
# 1. Execute job
357
job_result = execute_dagster_graphql(graphql_context, """
358
mutation {
359
launchPipelineExecution(executionParams: {
360
selector: {
361
repositoryLocationName: "test_location"
362
repositoryName: "test_repo"
363
pipelineName: "etl_pipeline"
364
}
365
}) {
366
... on LaunchRunSuccess {
367
run { runId }
368
}
369
}
370
}
371
""")
372
373
run_id = job_result.data["launchPipelineExecution"]["run"]["runId"]
374
375
# 2. Wait for completion with periodic status checks
376
execute_dagster_graphql_and_finish_runs(graphql_context, f"""
377
query {{
378
runOrError(runId: "{run_id}") {{
379
... on Run {{
380
status
381
stats {{
382
stepsSucceededCount
383
stepsFailed
384
}}
385
}}
386
}}
387
}}
388
""")
389
390
# 3. Verify final results
391
final_result = execute_dagster_graphql(graphql_context, f"""
392
query {{
393
runOrError(runId: "{run_id}") {{
394
... on Run {{
395
status
396
assets {{
397
key {{
398
path
399
}}
400
materialization {{
401
timestamp
402
}}
403
}}
404
}}
405
}}
406
}}
407
""")
408
409
run_data = final_result.data["runOrError"]
410
assert run_data["status"] == "SUCCESS"
411
assert len(run_data["assets"]) > 0
412
```
413
414
## Type Definitions for Testing
415
416
```python { .api }
417
class GqlResult(Protocol):
418
@property
419
def data(self) -> Mapping[str, Any]: ...
420
421
@property
422
def errors(self) -> Optional[Sequence[str]]: ...
423
424
class GqlTag(TypedDict):
425
key: str
426
value: str
427
428
class GqlAssetKey(TypedDict):
429
path: Sequence[str]
430
431
class GqlAssetCheckHandle(TypedDict):
432
assetKey: GqlAssetKey
433
name: str
434
435
Selector = dict[str, Any]
436
GqlVariables = Mapping[str, Any]
437
```