or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdclient.mdindex.mdschema.mdtest-utilities.md

client.mddocs/

0

# GraphQL Client

1

2

The DagsterGraphQLClient provides a high-level Python interface for interacting with Dagster's GraphQL API. It handles connection management, authentication, error handling, and provides convenient methods for the most common Dagster operations.

3

4

## Capabilities

5

6

### Client Initialization

7

8

Create a connection to a Dagster GraphQL server with various configuration options including authentication, timeouts, and custom transport protocols.

9

10

```python { .api }

11

class DagsterGraphQLClient:

12

def __init__(

13

self,

14

hostname: str,

15

port_number: Optional[int] = None,

16

transport: Optional[Transport] = None,

17

use_https: bool = False,

18

timeout: int = 300,

19

headers: Optional[dict[str, str]] = None,

20

auth: Optional[AuthBase] = None,

21

):

22

"""

23

Initialize GraphQL client for Dagster API.

24

25

Parameters:

26

- hostname (str): Hostname for the Dagster GraphQL API

27

- port_number (Optional[int]): Port number to connect to on the host

28

- transport (Optional[Transport]): Custom transport for connection

29

- use_https (bool): Whether to use https in the URL connection string

30

- timeout (int): Number of seconds before requests should time out

31

- headers (Optional[dict[str, str]]): Additional headers to include in requests

32

- auth (Optional[AuthBase]): Authentication handler for requests

33

34

Raises:

35

- DagsterGraphQLClientError: If connection to the host fails

36

"""

37

```

38

39

Usage example:

40

41

```python

42

# Basic connection

43

client = DagsterGraphQLClient("localhost", port_number=3000)

44

45

# HTTPS connection with authentication

46

client = DagsterGraphQLClient(

47

"my-dagster.example.com",

48

use_https=True,

49

headers={"Dagster-Cloud-Api-Token": "your-token-here"},

50

timeout=60

51

)

52

```

53

54

### Job Execution

55

56

Submit jobs for execution with comprehensive configuration options including run config, tags, op selection, and asset selection.

57

58

```python { .api }

59

def submit_job_execution(

60

self,

61

job_name: str,

62

repository_location_name: Optional[str] = None,

63

repository_name: Optional[str] = None,

64

run_config: Optional[Union[RunConfig, Mapping[str, Any]]] = None,

65

tags: Optional[dict[str, Any]] = None,

66

op_selection: Optional[Sequence[str]] = None,

67

asset_selection: Optional[Sequence[CoercibleToAssetKey]] = None,

68

) -> str:

69

"""

70

Submit a job for execution with configuration.

71

72

Parameters:

73

- job_name (str): The job's name

74

- repository_location_name (Optional[str]): Repository location name

75

- repository_name (Optional[str]): Repository name

76

- run_config (Optional[Union[RunConfig, Mapping[str, Any]]]): Run configuration

77

- tags (Optional[dict[str, Any]]): Tags to add to the job execution

78

- op_selection (Optional[Sequence[str]]): List of ops to execute

79

- asset_selection (Optional[Sequence[CoercibleToAssetKey]]): List of asset keys to execute

80

81

Returns:

82

- str: Run ID of the submitted job

83

84

Raises:

85

- DagsterGraphQLClientError: For various execution errors including invalid steps,

86

configuration validation errors, job not found, or internal framework errors

87

"""

88

```

89

90

Usage example:

91

92

```python

93

# Basic job execution

94

run_id = client.submit_job_execution("my_job")

95

96

# Job execution with configuration and tags

97

run_id = client.submit_job_execution(

98

job_name="data_pipeline",

99

run_config={

100

"ops": {

101

"process_data": {

102

"config": {

103

"input_path": "/data/input.csv",

104

"output_path": "/data/output.parquet"

105

}

106

}

107

}

108

},

109

tags={

110

"environment": "production",

111

"team": "data-engineering"

112

}

113

)

114

115

# Execute specific ops within a job

116

run_id = client.submit_job_execution(

117

job_name="complex_pipeline",

118

op_selection=["extract_data", "transform_data"]

119

)

120

```

121

122

### Run Status Management

123

124

Monitor and retrieve the status of pipeline runs to track execution progress and outcomes.

125

126

```python { .api }

127

def get_run_status(self, run_id: str) -> DagsterRunStatus:

128

"""

129

Get the status of a given Pipeline Run.

130

131

Parameters:

132

- run_id (str): Run ID of the requested pipeline run

133

134

Returns:

135

- DagsterRunStatus: Status enum describing the state of the pipeline run

136

137

Raises:

138

- DagsterGraphQLClientError: If the run ID is not found or internal framework errors occur

139

"""

140

```

141

142

Usage example:

143

144

```python

145

from dagster import DagsterRunStatus

146

147

# Check run status

148

status = client.get_run_status(run_id)

149

150

if status == DagsterRunStatus.SUCCESS:

151

print("Job completed successfully")

152

elif status == DagsterRunStatus.FAILURE:

153

print("Job failed")

154

elif status == DagsterRunStatus.STARTED:

155

print("Job is currently running")

156

```

157

158

### Repository Location Management

159

160

Reload and manage repository locations to refresh metadata and apply configuration changes without restarting the Dagster server.

161

162

```python { .api }

163

def reload_repository_location(

164

self, repository_location_name: str

165

) -> ReloadRepositoryLocationInfo:

166

"""

167

Reload a Dagster Repository Location and all its repositories.

168

169

Parameters:

170

- repository_location_name (str): The name of the repository location

171

172

Returns:

173

- ReloadRepositoryLocationInfo: Object with information about the reload result

174

"""

175

176

def shutdown_repository_location(

177

self, repository_location_name: str

178

) -> ShutdownRepositoryLocationInfo:

179

"""

180

Shut down the server serving metadata for the repository location.

181

182

Note: This method is deprecated and will be removed in version 2.0.

183

184

Parameters:

185

- repository_location_name (str): The name of the repository location

186

187

Returns:

188

- ShutdownRepositoryLocationInfo: Object with information about the shutdown result

189

"""

190

```

191

192

Usage example:

193

194

```python

195

# Reload repository location

196

reload_info = client.reload_repository_location("my_code_location")

197

198

if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:

199

print("Repository location reloaded successfully")

200

else:

201

print(f"Reload failed: {reload_info.failure_type} - {reload_info.message}")

202

```

203

204

### Run Termination

205

206

Stop running pipeline executions programmatically for both individual runs and batches of runs.

207

208

```python { .api }

209

def terminate_run(self, run_id: str):

210

"""

211

Terminate a pipeline run.

212

213

Parameters:

214

- run_id (str): The run ID of the pipeline run to terminate

215

216

Raises:

217

- DagsterGraphQLClientError: If the run ID is not found or termination fails

218

"""

219

220

def terminate_runs(self, run_ids: list[str]):

221

"""

222

Terminate multiple pipeline runs.

223

224

Parameters:

225

- run_ids (list[str]): List of run IDs of the pipeline runs to terminate

226

227

Raises:

228

- DagsterGraphQLClientError: If some or all run terminations fail

229

"""

230

```

231

232

Usage example:

233

234

```python

235

# Terminate a single run

236

client.terminate_run(run_id)

237

238

# Terminate multiple runs

239

failed_run_ids = ["run_1", "run_2", "run_3"]

240

try:

241

client.terminate_runs(failed_run_ids)

242

print("All runs terminated successfully")

243

except DagsterGraphQLClientError as e:

244

print(f"Some terminations failed: {e}")

245

```

246

247

## Error Handling

248

249

The client raises `DagsterGraphQLClientError` for various error conditions:

250

251

- **Connection errors**: When unable to connect to the GraphQL server

252

- **Invalid step errors**: When a job has invalid steps

253

- **Configuration validation errors**: When run config doesn't match job requirements

254

- **Job not found errors**: When the specified job doesn't exist

255

- **Run conflicts**: When conflicting job runs exist in storage

256

- **Python errors**: For internal framework errors

257

258

Example error handling:

259

260

```python

261

from dagster_graphql import DagsterGraphQLClientError

262

263

try:

264

run_id = client.submit_job_execution(

265

job_name="nonexistent_job",

266

run_config={"invalid": "config"}

267

)

268

except DagsterGraphQLClientError as e:

269

if "JobNotFoundError" in str(e):

270

print("Job does not exist")

271

elif "RunConfigValidationInvalid" in str(e):

272

print("Invalid run configuration")

273

else:

274

print(f"Execution failed: {e}")

275

```