or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdclient.mdindex.mdschema.mdtest-utilities.md

test-utilities.mddocs/

0

# Test Utilities

1

2

Comprehensive testing framework for GraphQL-based testing with workspace management, query execution, and result processing utilities. These utilities enable robust testing of Dagster GraphQL operations in various contexts.

3

4

## Capabilities

5

6

### GraphQL Query Execution

7

8

Execute GraphQL queries in test contexts with comprehensive error handling and result processing.

9

10

```python { .api }

11

def execute_dagster_graphql(

12

context: WorkspaceRequestContext,

13

query: str,

14

variables: Optional[GqlVariables] = None,

15

schema: graphene.Schema = SCHEMA,

16

) -> GqlResult:

17

"""

18

Execute GraphQL query in test context with error handling.

19

20

Parameters:

21

- context (WorkspaceRequestContext): Workspace context for execution

22

- query (str): GraphQL query string to execute

23

- variables (Optional[GqlVariables]): Variables for GraphQL execution

24

- schema (graphene.Schema): GraphQL schema to use (defaults to SCHEMA)

25

26

Returns:

27

- GqlResult: Result object with data and errors properties

28

29

Note: Automatically clears loaders between requests and handles GraphQL errors

30

by raising the original exception for easier debugging.

31

"""

32

33

def async_execute_dagster_graphql(

34

context: WorkspaceRequestContext,

35

query: str,

36

variables: Optional[GqlVariables] = None,

37

schema: graphene.Schema = SCHEMA,

38

) -> GqlResult:

39

"""

40

Asynchronous version of GraphQL query execution for async tests.

41

42

Parameters: Same as execute_dagster_graphql

43

Returns: Same as execute_dagster_graphql

44

"""

45

```

46

47

Usage example:

48

49

```python

50

from dagster_graphql.test.utils import execute_dagster_graphql

51

52

def test_job_execution(graphql_context):

53

query = """

54

mutation LaunchJob($selector: PipelineSelector!) {

55

launchPipelineExecution(executionParams: {selector: $selector}) {

56

__typename

57

... on LaunchRunSuccess {

58

run {

59

runId

60

status

61

}

62

}

63

}

64

}

65

"""

66

67

variables = {

68

"selector": {

69

"repositoryLocationName": "test_location",

70

"repositoryName": "test_repo",

71

"pipelineName": "my_job"

72

}

73

}

74

75

result = execute_dagster_graphql(graphql_context, query, variables)

76

assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"

77

run_id = result.data["launchPipelineExecution"]["run"]["runId"]

78

assert run_id is not None

79

```

80

81

### Subscription Testing

82

83

Execute GraphQL subscriptions for testing real-time event processing and streaming operations.

84

85

```python { .api }

86

def execute_dagster_graphql_subscription(

87

context: WorkspaceRequestContext,

88

query: str,

89

variables: Optional[GqlVariables] = None,

90

schema: graphene.Schema = SCHEMA,

91

) -> Sequence[GqlResult]:

92

"""

93

Execute GraphQL subscription query and return results.

94

95

Parameters:

96

- context (WorkspaceRequestContext): Workspace context for execution

97

- query (str): GraphQL subscription query string

98

- variables (Optional[GqlVariables]): Variables for subscription

99

- schema (graphene.Schema): GraphQL schema to use

100

101

Returns:

102

- Sequence[GqlResult]: Sequence of subscription results

103

104

Note: Returns first payload from subscription for testing purposes.

105

"""

106

```

107

108

Usage example:

109

110

```python

111

def test_run_events_subscription(graphql_context):

112

subscription = """

113

subscription RunEvents($runId: ID!) {

114

pipelineRunLogs(runId: $runId) {

115

__typename

116

... on PipelineRunLogsSubscriptionSuccess {

117

messages {

118

__typename

119

... on RunStartEvent {

120

runId

121

}

122

}

123

}

124

}

125

}

126

"""

127

128

results = execute_dagster_graphql_subscription(

129

graphql_context,

130

subscription,

131

{"runId": "test-run-id"}

132

)

133

134

assert len(results) > 0

135

assert results[0].data is not None

136

```

137

138

### Test Context Management

139

140

Create and manage workspace contexts for testing with various configuration options and repository setups.

141

142

```python { .api }

143

def define_out_of_process_context(

144

python_or_workspace_file: str,

145

fn_name: Optional[str],

146

instance: DagsterInstance,

147

read_only: bool = False,

148

read_only_locations: Optional[Mapping[str, bool]] = None,

149

) -> Iterator[WorkspaceRequestContext]:

150

"""

151

Create out-of-process workspace context for testing.

152

153

Parameters:

154

- python_or_workspace_file (str): Python file or workspace YAML file path

155

- fn_name (Optional[str]): Function name for repository definition

156

- instance (DagsterInstance): Dagster instance for testing

157

- read_only (bool): Whether workspace should be read-only

158

- read_only_locations (Optional[Mapping[str, bool]]): Per-location read-only settings

159

160

Yields:

161

- WorkspaceRequestContext: Configured workspace context for testing

162

"""

163

164

def temp_workspace_file(python_fns: list[tuple[str, str, Optional[str]]]) -> Iterator[str]:

165

"""

166

Create temporary workspace configuration file for testing.

167

168

Parameters:

169

- python_fns (list[tuple[str, str, Optional[str]]]): List of tuples containing

170

(location_name, python_file_path, function_name)

171

172

Yields:

173

- str: Path to temporary workspace.yaml file

174

"""

175

```

176

177

Usage example:

178

179

```python

180

from dagster import DagsterInstance

181

from dagster_graphql.test.utils import define_out_of_process_context, temp_workspace_file

182

183

def test_multi_location_workspace():

184

with temp_workspace_file([

185

("location_a", "repos/repo_a.py", "get_repo_a"),

186

("location_b", "repos/repo_b.py", "get_repo_b")

187

]) as workspace_file:

188

with DagsterInstance.ephemeral() as instance:

189

with define_out_of_process_context(

190

workspace_file, None, instance

191

) as context:

192

# Test multi-location operations

193

locations = context.code_locations

194

assert len(locations) == 2

195

assert any(loc.name == "location_a" for loc in locations)

196

assert any(loc.name == "location_b" for loc in locations)

197

```

198

199

### Test Helper Functions

200

201

Utility functions for inferring repository information and creating selectors for GraphQL operations.

202

203

```python { .api }

204

def infer_repository(graphql_context: WorkspaceRequestContext) -> RemoteRepository:

205

"""

206

Automatically infer repository from GraphQL context.

207

208

Parameters:

209

- graphql_context (WorkspaceRequestContext): Workspace context

210

211

Returns:

212

- RemoteRepository: Inferred repository object

213

"""

214

215

def infer_repository_selector(

216

graphql_context: WorkspaceRequestContext,

217

location_name: Optional[str] = None

218

) -> Selector:

219

"""

220

Create repository selector dictionary from context.

221

222

Parameters:

223

- graphql_context (WorkspaceRequestContext): Workspace context

224

- location_name (Optional[str]): Specific location name to use

225

226

Returns:

227

- Selector: Dictionary with repositoryLocationName and repositoryName

228

"""

229

230

def infer_job_selector(

231

graphql_context: WorkspaceRequestContext,

232

job_name: str,

233

op_selection: Optional[Sequence[str]] = None,

234

asset_selection: Optional[Sequence[GqlAssetKey]] = None,

235

asset_check_selection: Optional[Sequence[GqlAssetCheckHandle]] = None,

236

location_name: Optional[str] = None,

237

) -> Selector:

238

"""

239

Create job selector dictionary with optional op/asset selection.

240

241

Parameters:

242

- graphql_context (WorkspaceRequestContext): Workspace context

243

- job_name (str): Name of the job

244

- op_selection (Optional[Sequence[str]]): Specific ops to select

245

- asset_selection (Optional[Sequence[GqlAssetKey]]): Specific assets to select

246

- asset_check_selection (Optional[Sequence[GqlAssetCheckHandle]]): Asset checks to select

247

- location_name (Optional[str]): Specific location name

248

249

Returns:

250

- Selector: Dictionary with job selection parameters

251

"""

252

```

253

254

Usage example:

255

256

```python

257

def test_job_with_op_selection(graphql_context):

258

# Automatically infer repository and create job selector

259

job_selector = infer_job_selector(

260

graphql_context,

261

"complex_pipeline",

262

op_selection=["extract_data", "transform_data"]

263

)

264

265

query = """

266

query JobDetails($selector: PipelineSelector!) {

267

pipelineOrError(params: $selector) {

268

... on Pipeline {

269

name

270

solids {

271

name

272

}

273

}

274

}

275

}

276

"""

277

278

result = execute_dagster_graphql(

279

graphql_context,

280

query,

281

{"selector": job_selector}

282

)

283

284

assert result.data["pipelineOrError"]["name"] == "complex_pipeline"

285

```

286

287

### Asset Testing Utilities

288

289

Specialized utilities for testing asset materialization and asset-based workflows.

290

291

```python { .api }

292

def materialize_assets(

293

context: WorkspaceRequestContext,

294

asset_selection: Optional[Sequence[AssetKey]] = None,

295

partition_keys: Optional[Sequence[str]] = None,

296

run_config_data: Optional[Mapping[str, Any]] = None,

297

location_name: Optional[str] = None,

298

) -> Union[GqlResult, Sequence[GqlResult]]:

299

"""

300

Execute asset materialization through GraphQL API.

301

302

Parameters:

303

- context (WorkspaceRequestContext): Workspace context

304

- asset_selection (Optional[Sequence[AssetKey]]): Specific assets to materialize

305

- partition_keys (Optional[Sequence[str]]): Partition keys for partitioned assets

306

- run_config_data (Optional[Mapping[str, Any]]): Run configuration

307

- location_name (Optional[str]): Specific location name

308

309

Returns:

310

- Union[GqlResult, Sequence[GqlResult]]: Single result or sequence for partitioned runs

311

"""

312

```

313

314

Usage example:

315

316

```python

317

from dagster import AssetKey

318

from dagster_graphql.test.utils import materialize_assets

319

320

def test_asset_materialization(graphql_context):

321

# Materialize specific assets

322

result = materialize_assets(

323

graphql_context,

324

asset_selection=[AssetKey("users"), AssetKey("orders")],

325

run_config_data={

326

"ops": {

327

"extract_users": {

328

"config": {"source": "production_db"}

329

}

330

}

331

}

332

)

333

334

assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"

335

336

# Test partitioned asset materialization

337

partitioned_results = materialize_assets(

338

graphql_context,

339

asset_selection=[AssetKey("daily_metrics")],

340

partition_keys=["2023-10-01", "2023-10-02", "2023-10-03"]

341

)

342

343

assert len(partitioned_results) == 3

344

for result in partitioned_results:

345

assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"

346

```

347

348

### Advanced Testing Patterns

349

350

Complete testing patterns for complex GraphQL operations:

351

352

```python

353

def test_complete_workflow(graphql_context):

354

"""Test complete workflow from job execution to completion."""

355

356

# 1. Execute job

357

job_result = execute_dagster_graphql(graphql_context, """

358

mutation {

359

launchPipelineExecution(executionParams: {

360

selector: {

361

repositoryLocationName: "test_location"

362

repositoryName: "test_repo"

363

pipelineName: "etl_pipeline"

364

}

365

}) {

366

... on LaunchRunSuccess {

367

run { runId }

368

}

369

}

370

}

371

""")

372

373

run_id = job_result.data["launchPipelineExecution"]["run"]["runId"]

374

375

# 2. Wait for completion with periodic status checks

376

execute_dagster_graphql_and_finish_runs(graphql_context, f"""

377

query {{

378

runOrError(runId: "{run_id}") {{

379

... on Run {{

380

status

381

stats {{

382

stepsSucceededCount

383

stepsFailed

384

}}

385

}}

386

}}

387

}}

388

""")

389

390

# 3. Verify final results

391

final_result = execute_dagster_graphql(graphql_context, f"""

392

query {{

393

runOrError(runId: "{run_id}") {{

394

... on Run {{

395

status

396

assets {{

397

key {{

398

path

399

}}

400

materialization {{

401

timestamp

402

}}

403

}}

404

}}

405

}}

406

}}

407

""")

408

409

run_data = final_result.data["runOrError"]

410

assert run_data["status"] == "SUCCESS"

411

assert len(run_data["assets"]) > 0

412

```

413

414

## Type Definitions for Testing

415

416

```python { .api }

417

class GqlResult(Protocol):

418

@property

419

def data(self) -> Mapping[str, Any]: ...

420

421

@property

422

def errors(self) -> Optional[Sequence[str]]: ...

423

424

class GqlTag(TypedDict):

425

key: str

426

value: str

427

428

class GqlAssetKey(TypedDict):

429

path: Sequence[str]

430

431

class GqlAssetCheckHandle(TypedDict):

432

assetKey: GqlAssetKey

433

name: str

434

435

Selector = dict[str, Any]

436

GqlVariables = Mapping[str, Any]

437

```