0
# GraphQL Client
1
2
The DagsterGraphQLClient provides a high-level Python interface for interacting with Dagster's GraphQL API. It handles connection management, authentication, error handling, and provides convenient methods for the most common Dagster operations.
3
4
## Capabilities
5
6
### Client Initialization
7
8
Create a connection to a Dagster GraphQL server with various configuration options including authentication, timeouts, and custom transport protocols.
9
10
```python { .api }
11
class DagsterGraphQLClient:
12
def __init__(
13
self,
14
hostname: str,
15
port_number: Optional[int] = None,
16
transport: Optional[Transport] = None,
17
use_https: bool = False,
18
timeout: int = 300,
19
headers: Optional[dict[str, str]] = None,
20
auth: Optional[AuthBase] = None,
21
):
22
"""
23
Initialize GraphQL client for Dagster API.
24
25
Parameters:
26
- hostname (str): Hostname for the Dagster GraphQL API
27
- port_number (Optional[int]): Port number to connect to on the host
28
- transport (Optional[Transport]): Custom transport for connection
29
- use_https (bool): Whether to use https in the URL connection string
30
- timeout (int): Number of seconds before requests should time out
31
- headers (Optional[dict[str, str]]): Additional headers to include in requests
32
- auth (Optional[AuthBase]): Authentication handler for requests
33
34
Raises:
35
- DagsterGraphQLClientError: If connection to the host fails
36
"""
37
```
38
39
Usage example:
40
41
```python
42
# Basic connection
43
client = DagsterGraphQLClient("localhost", port_number=3000)
44
45
# HTTPS connection with authentication
46
client = DagsterGraphQLClient(
47
"my-dagster.example.com",
48
use_https=True,
49
headers={"Dagster-Cloud-Api-Token": "your-token-here"},
50
timeout=60
51
)
52
```
53
54
### Job Execution
55
56
Submit jobs for execution with comprehensive configuration options including run config, tags, op selection, and asset selection.
57
58
```python { .api }
59
def submit_job_execution(
60
self,
61
job_name: str,
62
repository_location_name: Optional[str] = None,
63
repository_name: Optional[str] = None,
64
run_config: Optional[Union[RunConfig, Mapping[str, Any]]] = None,
65
tags: Optional[dict[str, Any]] = None,
66
op_selection: Optional[Sequence[str]] = None,
67
asset_selection: Optional[Sequence[CoercibleToAssetKey]] = None,
68
) -> str:
69
"""
70
Submit a job for execution with configuration.
71
72
Parameters:
73
- job_name (str): The job's name
74
- repository_location_name (Optional[str]): Repository location name
75
- repository_name (Optional[str]): Repository name
76
- run_config (Optional[Union[RunConfig, Mapping[str, Any]]]): Run configuration
77
- tags (Optional[dict[str, Any]]): Tags to add to the job execution
78
- op_selection (Optional[Sequence[str]]): List of ops to execute
79
- asset_selection (Optional[Sequence[CoercibleToAssetKey]]): List of asset keys to execute
80
81
Returns:
82
- str: Run ID of the submitted job
83
84
Raises:
85
- DagsterGraphQLClientError: For various execution errors including invalid steps,
86
configuration validation errors, job not found, or internal framework errors
87
"""
88
```
89
90
Usage example:
91
92
```python
93
# Basic job execution
94
run_id = client.submit_job_execution("my_job")
95
96
# Job execution with configuration and tags
97
run_id = client.submit_job_execution(
98
job_name="data_pipeline",
99
run_config={
100
"ops": {
101
"process_data": {
102
"config": {
103
"input_path": "/data/input.csv",
104
"output_path": "/data/output.parquet"
105
}
106
}
107
}
108
},
109
tags={
110
"environment": "production",
111
"team": "data-engineering"
112
}
113
)
114
115
# Execute specific ops within a job
116
run_id = client.submit_job_execution(
117
job_name="complex_pipeline",
118
op_selection=["extract_data", "transform_data"]
119
)
120
```
121
122
### Run Status Management
123
124
Monitor and retrieve the status of pipeline runs to track execution progress and outcomes.
125
126
```python { .api }
127
def get_run_status(self, run_id: str) -> DagsterRunStatus:
128
"""
129
Get the status of a given Pipeline Run.
130
131
Parameters:
132
- run_id (str): Run ID of the requested pipeline run
133
134
Returns:
135
- DagsterRunStatus: Status enum describing the state of the pipeline run
136
137
Raises:
138
- DagsterGraphQLClientError: If the run ID is not found or internal framework errors occur
139
"""
140
```
141
142
Usage example:
143
144
```python
145
from dagster import DagsterRunStatus
146
147
# Check run status
148
status = client.get_run_status(run_id)
149
150
if status == DagsterRunStatus.SUCCESS:
151
print("Job completed successfully")
152
elif status == DagsterRunStatus.FAILURE:
153
print("Job failed")
154
elif status == DagsterRunStatus.STARTED:
155
print("Job is currently running")
156
```
157
158
### Repository Location Management
159
160
Reload and manage repository locations to refresh metadata and apply configuration changes without restarting the Dagster server.
161
162
```python { .api }
163
def reload_repository_location(
164
self, repository_location_name: str
165
) -> ReloadRepositoryLocationInfo:
166
"""
167
Reload a Dagster Repository Location and all its repositories.
168
169
Parameters:
170
- repository_location_name (str): The name of the repository location
171
172
Returns:
173
- ReloadRepositoryLocationInfo: Object with information about the reload result
174
"""
175
176
def shutdown_repository_location(
177
self, repository_location_name: str
178
) -> ShutdownRepositoryLocationInfo:
179
"""
180
Shut down the server serving metadata for the repository location.
181
182
Note: This method is deprecated and will be removed in version 2.0.
183
184
Parameters:
185
- repository_location_name (str): The name of the repository location
186
187
Returns:
188
- ShutdownRepositoryLocationInfo: Object with information about the shutdown result
189
"""
190
```
191
192
Usage example:
193
194
```python
195
# Reload repository location
196
reload_info = client.reload_repository_location("my_code_location")
197
198
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
199
print("Repository location reloaded successfully")
200
else:
201
print(f"Reload failed: {reload_info.failure_type} - {reload_info.message}")
202
```
203
204
### Run Termination
205
206
Stop running pipeline executions programmatically for both individual runs and batches of runs.
207
208
```python { .api }
209
def terminate_run(self, run_id: str):
210
"""
211
Terminate a pipeline run.
212
213
Parameters:
214
- run_id (str): The run ID of the pipeline run to terminate
215
216
Raises:
217
- DagsterGraphQLClientError: If the run ID is not found or termination fails
218
"""
219
220
def terminate_runs(self, run_ids: list[str]):
221
"""
222
Terminate multiple pipeline runs.
223
224
Parameters:
225
- run_ids (list[str]): List of run IDs of the pipeline runs to terminate
226
227
Raises:
228
- DagsterGraphQLClientError: If some or all run terminations fail
229
"""
230
```
231
232
Usage example:
233
234
```python
235
# Terminate a single run
236
client.terminate_run(run_id)
237
238
# Terminate multiple runs
239
failed_run_ids = ["run_1", "run_2", "run_3"]
240
try:
241
client.terminate_runs(failed_run_ids)
242
print("All runs terminated successfully")
243
except DagsterGraphQLClientError as e:
244
print(f"Some terminations failed: {e}")
245
```
246
247
## Error Handling
248
249
The client raises `DagsterGraphQLClientError` for various error conditions:
250
251
- **Connection errors**: When unable to connect to the GraphQL server
252
- **Invalid step errors**: When a job has invalid steps
253
- **Configuration validation errors**: When run config doesn't match job requirements
254
- **Job not found errors**: When the specified job doesn't exist
255
- **Run conflicts**: When conflicting job runs exist in storage
256
- **Python errors**: For internal framework errors
257
258
Example error handling:
259
260
```python
261
from dagster_graphql import DagsterGraphQLClientError
262
263
try:
264
run_id = client.submit_job_execution(
265
job_name="nonexistent_job",
266
run_config={"invalid": "config"}
267
)
268
except DagsterGraphQLClientError as e:
269
if "JobNotFoundError" in str(e):
270
print("Job does not exist")
271
elif "RunConfigValidationInvalid" in str(e):
272
print("Invalid run configuration")
273
else:
274
print(f"Execution failed: {e}")
275
```