The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.
npx @tessl/cli install tessl/pypi-dagster-graphql@1.11.00
# Dagster GraphQL
1
2
A comprehensive GraphQL API interface for the Dagster data orchestration platform. This package enables programmatic interaction with Dagster runs, repositories, and jobs through both a Python client and direct GraphQL operations, providing full access to Dagster's data orchestration capabilities including job execution, run management, repository operations, and real-time monitoring.
3
4
## Package Information
5
6
- **Package Name**: dagster-graphql
7
- **Language**: Python
8
- **Installation**: `pip install dagster-graphql`
9
- **License**: Apache-2.0
10
11
## Core Imports
12
13
```python
14
from dagster_graphql import (
15
DagsterGraphQLClient,
16
DagsterGraphQLClientError,
17
ReloadRepositoryLocationInfo,
18
ReloadRepositoryLocationStatus,
19
ShutdownRepositoryLocationInfo,
20
ShutdownRepositoryLocationStatus,
21
InvalidOutputErrorInfo
22
)
23
```
24
25
For GraphQL schema operations:
26
27
```python
28
from dagster_graphql.schema import create_schema
29
```
30
31
For test utilities:
32
33
```python
34
from dagster_graphql.test.utils import (
35
execute_dagster_graphql,
36
define_out_of_process_context
37
)
38
```
39
40
## Basic Usage
41
42
```python
43
from dagster_graphql import DagsterGraphQLClient, ReloadRepositoryLocationStatus
44
45
# Connect to Dagster GraphQL server
46
client = DagsterGraphQLClient("localhost", port_number=3000)
47
48
# Submit a job for execution
49
run_id = client.submit_job_execution(
50
job_name="my_job",
51
run_config={"ops": {"my_op": {"config": {"value": 42}}}},
52
tags={"env": "dev"}
53
)
54
55
# Check run status
56
status = client.get_run_status(run_id)
57
print(f"Run {run_id} status: {status}")
58
59
# Reload repository location
60
reload_info = client.reload_repository_location("my_location")
61
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
62
print("Repository location reloaded successfully")
63
```
64
65
## Architecture
66
67
The dagster-graphql package provides multiple interaction layers:
68
69
- **Python Client**: High-level Python client (DagsterGraphQLClient) for common operations
70
- **GraphQL Schema**: Complete GraphQL schema for direct GraphQL operations
71
- **CLI Interface**: Command-line tool for scripted GraphQL operations
72
- **Test Utilities**: Comprehensive testing framework for GraphQL-based tests
73
74
This layered architecture enables both simple programmatic access and full GraphQL flexibility, supporting everything from basic job execution to complex data pipeline orchestration workflows.
75
76
## Capabilities
77
78
### GraphQL Client Operations
79
80
Core Python client functionality for job execution, run management, and repository operations. The DagsterGraphQLClient provides high-level methods for the most common Dagster operations.
81
82
```python { .api }
83
class DagsterGraphQLClient:
84
def __init__(
85
self,
86
hostname: str,
87
port_number: Optional[int] = None,
88
transport: Optional[Transport] = None,
89
use_https: bool = False,
90
timeout: int = 300,
91
headers: Optional[dict[str, str]] = None,
92
auth: Optional[AuthBase] = None,
93
): ...
94
95
def submit_job_execution(
96
self,
97
job_name: str,
98
repository_location_name: Optional[str] = None,
99
repository_name: Optional[str] = None,
100
run_config: Optional[Union[RunConfig, Mapping[str, Any]]] = None,
101
tags: Optional[dict[str, Any]] = None,
102
op_selection: Optional[Sequence[str]] = None,
103
asset_selection: Optional[Sequence[CoercibleToAssetKey]] = None,
104
) -> str: ...
105
106
def get_run_status(self, run_id: str) -> DagsterRunStatus: ...
107
def reload_repository_location(self, repository_location_name: str) -> ReloadRepositoryLocationInfo: ...
108
def terminate_run(self, run_id: str): ...
109
def terminate_runs(self, run_ids: list[str]): ...
110
```
111
112
[GraphQL Client](./client.md)
113
114
### GraphQL Schema Operations
115
116
Direct GraphQL schema creation and execution for custom queries, mutations, and subscriptions. Enables full access to Dagster's GraphQL API surface.
117
118
```python { .api }
119
def create_schema() -> graphene.Schema: ...
120
121
def execute_query(
122
workspace_process_context: WorkspaceProcessContext,
123
query: str,
124
variables: Optional[Mapping[str, object]] = None,
125
): ...
126
```
127
128
[GraphQL Schema](./schema.md)
129
130
### Command Line Interface
131
132
Command-line interface for executing GraphQL operations from scripts and automation workflows.
133
134
```python { .api }
135
def main(): ...
136
137
def execute_query_from_cli(
138
workspace_process_context,
139
query,
140
variables=None,
141
output=None
142
): ...
143
144
def execute_query_against_remote(host, query, variables): ...
145
```
146
147
[Command Line Interface](./cli.md)
148
149
### Test Utilities
150
151
Comprehensive testing framework for GraphQL-based testing with workspace management, query execution, and result processing utilities.
152
153
```python { .api }
154
def execute_dagster_graphql(
155
context: WorkspaceRequestContext,
156
query: str,
157
variables: Optional[GqlVariables] = None,
158
schema: graphene.Schema = SCHEMA,
159
) -> GqlResult: ...
160
161
def define_out_of_process_context(
162
python_or_workspace_file: str,
163
fn_name: Optional[str],
164
instance: DagsterInstance,
165
read_only: bool = False,
166
read_only_locations: Optional[Mapping[str, bool]] = None,
167
) -> Iterator[WorkspaceRequestContext]: ...
168
```
169
170
[Test Utilities](./test-utilities.md)
171
172
## Exception Classes
173
174
```python { .api }
175
class DagsterGraphQLClientError(Exception):
176
def __init__(self, *args, body=None): ...
177
```
178
179
## Status and Information Types
180
181
```python { .api }
182
class ReloadRepositoryLocationStatus(Enum):
183
SUCCESS = "SUCCESS"
184
FAILURE = "FAILURE"
185
186
class ShutdownRepositoryLocationStatus(Enum):
187
SUCCESS = "SUCCESS"
188
FAILURE = "FAILURE"
189
190
class ReloadRepositoryLocationInfo(NamedTuple):
191
status: ReloadRepositoryLocationStatus
192
failure_type: Optional[str] = None
193
message: Optional[str] = None
194
195
class ShutdownRepositoryLocationInfo(NamedTuple):
196
status: ShutdownRepositoryLocationStatus
197
message: Optional[str] = None
198
199
class InvalidOutputErrorInfo(NamedTuple):
200
step_key: str
201
invalid_output_name: str
202
```