or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-definitions.mderror-handling.mdevents-metadata.mdexecution-contexts.mdindex.mdpartitions.mdsensors-schedules.mdstorage-io.md

execution-contexts.mddocs/

0

# Execution and Contexts

1

2

This document covers Dagster's execution system, including execution contexts, executors, and execution APIs. The execution system provides rich runtime environments with logging, configuration, resources, and metadata for all computational units.

3

4

## Execution Contexts

5

6

Execution contexts provide the runtime environment for operations, assets, and other computational units. They offer access to configuration, resources, logging, metadata, and execution state.

7

8

### Operation Execution Context

9

10

#### `OpExecutionContext` { .api }

11

12

**Module:** `dagster._core.execution.context.compute`

13

**Type:** Class

14

15

Execution context for operations with access to configuration, resources, and logging.

16

17

```python

18

from dagster import op, OpExecutionContext, Config, resource

19

import logging

20

21

class MyOpConfig(Config):

22

batch_size: int = 1000

23

debug_mode: bool = False

24

25

@resource

26

def database_resource():

27

return {"connection": "postgresql://localhost/db"}

28

29

@op(required_resource_keys={"database"})

30

def process_data(context: OpExecutionContext, data: list) -> list:

31

"""Op with full context access."""

32

33

# Access configuration

34

config = context.op_config

35

batch_size = config.get("batch_size", 100)

36

37

# Access resources

38

db = context.resources.database

39

connection = db["connection"]

40

41

# Logging with different levels

42

context.log.info(f"Processing {len(data)} records with batch_size {batch_size}")

43

context.log.debug(f"Using database: {connection}")

44

context.log.warning("This is a warning message")

45

46

# Access run information

47

run_id = context.run_id

48

op_name = context.op_def.name

49

50

# Access step information

51

step_key = context.step_context.step.key

52

53

# Metadata logging

54

context.add_output_metadata({

55

"records_processed": len(data),

56

"batch_size": batch_size,

57

"processing_time": "2.3s"

58

})

59

60

# Partition information (if partitioned)

61

if context.has_partition_key:

62

partition_key = context.partition_key

63

context.log.info(f"Processing partition: {partition_key}")

64

65

# Time window for time-partitioned assets

66

if hasattr(context, 'partition_time_window'):

67

time_window = context.partition_time_window

68

context.log.info(f"Time window: {time_window.start} to {time_window.end}")

69

70

# Process data

71

processed = data[:batch_size] # Simple processing

72

return processed

73

```

74

75

**Key Properties and Methods:**

76

- `op_config: Dict[str, Any]` - Operation configuration

77

- `resources: Resources` - Available resources

78

- `log: DagsterLogManager` - Logger for the operation

79

- `run_id: str` - Unique run identifier

80

- `op_def: OpDefinition` - Operation definition

81

- `step_context: StepExecutionContext` - Step execution context

82

- `has_partition_key: bool` - Whether operation is partitioned

83

- `partition_key: Optional[str]` - Partition key if partitioned

84

- `asset_partition_key_for_output: Optional[str]` - Asset partition key

85

- `add_output_metadata(metadata: Dict[str, Any])` - Add metadata to output

86

87

### Asset Execution Context

88

89

#### `AssetExecutionContext` { .api }

90

91

**Module:** `dagster._core.execution.context.compute`

92

**Type:** Class

93

94

Execution context for assets with asset-specific functionality and metadata.

95

96

```python

97

from dagster import asset, AssetExecutionContext, MaterializeResult

98

import pandas as pd

99

100

@asset(

101

group_name="analytics",

102

compute_kind="pandas"

103

)

104

def users_analysis(context: AssetExecutionContext, users_data: pd.DataFrame) -> MaterializeResult:

105

"""Asset with comprehensive context usage."""

106

107

# Asset-specific information

108

asset_key = context.asset_key

109

asset_name = asset_key.path[-1] # Last part of asset key

110

111

# Partition information for assets

112

if context.has_partition_key:

113

partition_key = context.partition_key

114

context.log.info(f"Materializing {asset_name} for partition {partition_key}")

115

116

# Time window for time-partitioned assets

117

if hasattr(context, 'partition_time_window'):

118

time_window = context.partition_time_window

119

start_date = time_window.start.strftime('%Y-%m-%d')

120

end_date = time_window.end.strftime('%Y-%m-%d')

121

122

# Filter data by time window

123

users_data = users_data[

124

(users_data['created_at'] >= start_date) &

125

(users_data['created_at'] < end_date)

126

]

127

128

# Access upstream asset values

129

upstream_assets = context.selected_asset_keys

130

context.log.info(f"Depends on assets: {[key.path[-1] for key in upstream_assets]}")

131

132

# Perform analysis

133

total_users = len(users_data)

134

active_users = len(users_data[users_data['active'] == True])

135

avg_age = users_data['age'].mean()

136

137

# Rich logging

138

context.log.info(f"Analyzed {total_users} users")

139

context.log.info(f"Active users: {active_users} ({active_users/total_users:.1%})")

140

141

# Create result DataFrame

142

analysis_result = pd.DataFrame({

143

'metric': ['total_users', 'active_users', 'average_age'],

144

'value': [total_users, active_users, avg_age],

145

'date': pd.Timestamp.now()

146

})

147

148

# Return MaterializeResult with metadata

149

return MaterializeResult(

150

value=analysis_result,

151

metadata={

152

"total_users": total_users,

153

"active_users": active_users,

154

"activity_rate": active_users / total_users,

155

"average_age": avg_age,

156

"data_quality_score": 0.95,

157

"last_updated": pd.Timestamp.now().isoformat()

158

}

159

)

160

161

@asset

162

def user_segments(context: AssetExecutionContext, users_analysis: pd.DataFrame) -> dict:

163

"""Asset consuming upstream asset with context."""

164

165

# Access asset lineage information

166

context.log.info(f"Consuming asset: {context.asset_key}")

167

168

# Get upstream asset data

169

total_users = users_analysis[users_analysis['metric'] == 'total_users']['value'].iloc[0]

170

active_users = users_analysis[users_analysis['metric'] == 'active_users']['value'].iloc[0]

171

172

# Generate segments

173

segments = {

174

'high_value': int(active_users * 0.2),

175

'medium_value': int(active_users * 0.5),

176

'low_value': int(active_users * 0.3)

177

}

178

179

# Log segment information

180

for segment, count in segments.items():

181

context.log.info(f"Segment {segment}: {count} users")

182

183

return segments

184

```

185

186

**Additional Asset Context Properties:**

187

- `asset_key: AssetKey` - Current asset key

188

- `selected_asset_keys: AbstractSet[AssetKey]` - Selected asset keys in materialization

189

- `asset_partition_key_for_input(input_name: str) -> Optional[str]` - Partition key for input

190

- `asset_partition_key_for_output(output_name: str) -> Optional[str]` - Partition key for output

191

- `asset_partition_keys_for_input(input_name: str) -> Optional[AbstractSet[str]]` - Multiple partition keys

192

- `partition_time_window: Optional[TimeWindow]` - Time window for time partitions

193

194

### Asset Check Execution Context

195

196

#### `AssetCheckExecutionContext` { .api }

197

198

**Module:** `dagster._core.execution.context.compute`

199

**Type:** Class

200

201

Execution context for asset checks with check-specific functionality.

202

203

```python

204

from dagster import asset_check, AssetCheckExecutionContext, AssetCheckResult

205

import pandas as pd

206

207

@asset_check(asset="users_data")

208

def users_data_quality_check(context: AssetCheckExecutionContext, users_data: pd.DataFrame) -> AssetCheckResult:

209

"""Asset check with context access."""

210

211

# Asset check specific information

212

asset_key = context.asset_key

213

check_name = context.op_def.name

214

215

context.log.info(f"Running check '{check_name}' for asset {asset_key}")

216

217

# Perform data quality checks

218

null_count = users_data.isnull().sum().sum()

219

duplicate_count = users_data.duplicated().sum()

220

total_records = len(users_data)

221

222

# Quality thresholds

223

null_threshold = 0.05 # 5% null values allowed

224

duplicate_threshold = 0.02 # 2% duplicates allowed

225

226

null_rate = null_count / (total_records * len(users_data.columns))

227

duplicate_rate = duplicate_count / total_records

228

229

# Log detailed results

230

context.log.info(f"Null rate: {null_rate:.2%} (threshold: {null_threshold:.2%})")

231

context.log.info(f"Duplicate rate: {duplicate_rate:.2%} (threshold: {duplicate_threshold:.2%})")

232

233

# Determine check result

234

passed = null_rate <= null_threshold and duplicate_rate <= duplicate_threshold

235

236

if not passed:

237

context.log.warning(f"Data quality check failed for {asset_key}")

238

239

return AssetCheckResult(

240

passed=passed,

241

metadata={

242

"null_count": null_count,

243

"null_rate": null_rate,

244

"duplicate_count": duplicate_count,

245

"duplicate_rate": duplicate_rate,

246

"total_records": total_records,

247

"check_timestamp": pd.Timestamp.now().isoformat()

248

}

249

)

250

```

251

252

### Context Builders

253

254

Context builders enable testing and local development by creating execution contexts outside of normal pipeline runs.

255

256

#### `build_op_context` { .api }

257

258

**Module:** `dagster._core.execution.context.invocation`

259

**Type:** Function

260

261

Build operation execution context for testing and development.

262

263

```python

264

from dagster import build_op_context, op, resource

265

266

@resource

267

def test_database():

268

return {"connection": "test://localhost/testdb"}

269

270

@op(

271

config_schema={"batch_size": int},

272

required_resource_keys={"database"}

273

)

274

def my_op(context):

275

batch_size = context.op_config["batch_size"]

276

db = context.resources.database

277

return f"Processing with batch_size {batch_size} using {db['connection']}"

278

279

# Build context for testing

280

context = build_op_context(

281

config={"batch_size": 100},

282

resources={"database": test_database},

283

partition_key="2023-01-01",

284

op_config={"batch_size": 500} # Alternative config specification

285

)

286

287

# Test op directly

288

result = my_op(context)

289

print(result) # "Processing with batch_size 500 using test://localhost/testdb"

290

291

# Advanced context building

292

from dagster import DagsterLogManager, build_init_logger_context

293

294

logger_def = colored_console_logger

295

logger_context = build_init_logger_context()

296

logger = logger_def.logger_fn(logger_context)

297

298

advanced_context = build_op_context(

299

config={"batch_size": 1000},

300

resources={"database": test_database},

301

partition_key="2023-01-02",

302

run_id="test-run-12345",

303

logger_defs={"custom": logger_def},

304

tags={"env": "test", "team": "data"}

305

)

306

```

307

308

**Parameters:**

309

- `config: Optional[Dict[str, Any]]` - Op configuration

310

- `resources: Optional[Dict[str, Any]]` - Resource instances

311

- `partition_key: Optional[str]` - Partition key

312

- `run_id: Optional[str]` - Run ID

313

- `logger_defs: Optional[Dict[str, LoggerDefinition]]` - Logger definitions

314

- `op_config: Optional[Dict[str, Any]]` - Alternative config specification

315

- `tags: Optional[Dict[str, str]]` - Execution tags

316

- `run_tags: Optional[Dict[str, str]]` - Run-level tags

317

318

#### `build_asset_context` { .api }

319

320

**Module:** `dagster._core.execution.context.invocation`

321

**Type:** Function

322

323

Build asset execution context for testing and development.

324

325

```python

326

from dagster import build_asset_context, asset, AssetKey

327

import pandas as pd

328

329

@asset

330

def my_asset(context):

331

asset_key = context.asset_key

332

partition_key = context.partition_key if context.has_partition_key else None

333

return f"Asset {asset_key} for partition {partition_key}"

334

335

# Build asset context

336

context = build_asset_context(

337

asset_key=AssetKey("my_asset"),

338

partition_key="2023-01-01",

339

resources={"database": test_database},

340

config={"processing_mode": "test"}

341

)

342

343

# Test asset directly

344

result = my_asset(context)

345

print(result) # "Asset ['my_asset'] for partition 2023-01-01"

346

347

# Context with upstream asset keys

348

upstream_context = build_asset_context(

349

asset_key=AssetKey(["analytics", "user_metrics"]),

350

partition_key="2023-01-01",

351

selected_asset_keys={AssetKey("users"), AssetKey("events")},

352

resources={"warehouse": warehouse_resource}

353

)

354

```

355

356

**Parameters:**

357

- `asset_key: Optional[AssetKey]` - Asset key

358

- `config: Optional[Dict[str, Any]]` - Asset configuration

359

- `resources: Optional[Dict[str, Any]]` - Resource instances

360

- `partition_key: Optional[str]` - Partition key

361

- `selected_asset_keys: Optional[AbstractSet[AssetKey]]` - Selected asset keys

362

- `run_id: Optional[str]` - Run ID

363

- `tags: Optional[Dict[str, str]]` - Execution tags

364

- `run_tags: Optional[Dict[str, str]]` - Run-level tags

365

366

### Input and Output Contexts

367

368

#### `InputContext` { .api }

369

370

**Module:** `dagster._core.execution.context.input`

371

**Type:** Class

372

373

Context for input loading with I/O managers and input managers.

374

375

```python

376

from dagster import InputContext, input_manager, IOManager

377

import pandas as pd

378

379

class DataWarehouseInputManager(IOManager):

380

def load_input(self, context: InputContext) -> pd.DataFrame:

381

"""Load input using context information."""

382

383

# Access input metadata

384

upstream_output = context.upstream_output

385

asset_key = upstream_output.asset_key if upstream_output else None

386

387

# Input configuration

388

metadata = context.metadata

389

dagster_type = context.dagster_type

390

391

# Partition information

392

if context.has_asset_partitions:

393

asset_partitions = context.asset_partition_keys

394

context.log.info(f"Loading partitions: {asset_partitions}")

395

396

# Resource access

397

if hasattr(context, 'resources'):

398

warehouse = context.resources.data_warehouse

399

400

context.log.info(f"Loading input for asset {asset_key}")

401

402

# Load data based on context

403

if asset_key:

404

return pd.read_parquet(f"/warehouse/{asset_key.path[-1]}.parquet")

405

else:

406

return pd.DataFrame()

407

408

@input_manager

409

def warehouse_input_manager() -> DataWarehouseInputManager:

410

return DataWarehouseInputManager()

411

412

# Usage in asset

413

@asset(input_manager_key="warehouse_loader")

414

def processed_data(context, raw_data: pd.DataFrame) -> pd.DataFrame:

415

"""Asset using custom input loading."""

416

return raw_data.dropna()

417

```

418

419

**Key Properties:**

420

- `upstream_output: Optional[OutputContext]` - Upstream output context

421

- `asset_key: Optional[AssetKey]` - Asset key being loaded

422

- `metadata: Optional[Dict[str, Any]]` - Input metadata

423

- `config: Optional[Dict[str, Any]]` - Input configuration

424

- `dagster_type: Optional[DagsterType]` - Expected input type

425

- `log: DagsterLogManager` - Logger

426

- `resources: Resources` - Available resources

427

- `has_asset_partitions: bool` - Whether input has asset partitions

428

- `asset_partition_keys: AbstractSet[str]` - Asset partition keys

429

430

#### `OutputContext` { .api }

431

432

**Module:** `dagster._core.execution.context.output`

433

**Type:** Class

434

435

Context for output handling with I/O managers.

436

437

```python

438

from dagster import OutputContext, IOManager, asset

439

import pandas as pd

440

import os

441

442

class S3IOManager(IOManager):

443

def handle_output(self, context: OutputContext, obj: pd.DataFrame) -> None:

444

"""Handle output using context information."""

445

446

# Access output metadata

447

asset_key = context.asset_key

448

step_key = context.step_key

449

name = context.name

450

451

# Partition information

452

if context.has_asset_partitions:

453

partition_keys = context.asset_partition_keys

454

context.log.info(f"Storing partitions: {partition_keys}")

455

456

# Metadata and configuration

457

metadata = context.metadata

458

config = context.config

459

460

# Resource access

461

s3_client = context.resources.s3

462

463

# Generate file path

464

if asset_key:

465

path_parts = asset_key.path

466

file_path = "/".join(path_parts) + ".parquet"

467

else:

468

file_path = f"{step_key}_{name}.parquet"

469

470

context.log.info(f"Storing output to S3: s3://my-bucket/{file_path}")

471

472

# Store data

473

obj.to_parquet(f"s3://my-bucket/{file_path}")

474

475

# Add output metadata

476

context.add_output_metadata({

477

"s3_path": f"s3://my-bucket/{file_path}",

478

"rows": len(obj),

479

"columns": len(obj.columns),

480

"size_mb": obj.memory_usage(deep=True).sum() / 1024 / 1024

481

})

482

483

def load_input(self, context: InputContext) -> pd.DataFrame:

484

"""Load input from S3."""

485

asset_key = context.asset_key

486

path_parts = asset_key.path

487

file_path = "/".join(path_parts) + ".parquet"

488

489

context.log.info(f"Loading from S3: s3://my-bucket/{file_path}")

490

return pd.read_parquet(f"s3://my-bucket/{file_path}")

491

492

@asset(io_manager_key="s3_manager")

493

def sales_data(context) -> pd.DataFrame:

494

"""Asset using S3 I/O manager."""

495

# Generate sales data

496

return pd.DataFrame({

497

"date": pd.date_range("2023-01-01", periods=100),

498

"sales": np.random.randint(1000, 5000, 100)

499

})

500

```

501

502

**Key Properties:**

503

- `asset_key: Optional[AssetKey]` - Asset key being output

504

- `step_key: str` - Step key

505

- `name: str` - Output name

506

- `metadata: Optional[Dict[str, Any]]` - Output metadata

507

- `config: Optional[Dict[str, Any]]` - Output configuration

508

- `log: DagsterLogManager` - Logger

509

- `resources: Resources` - Available resources

510

- `has_asset_partitions: bool` - Whether output has asset partitions

511

- `asset_partition_keys: AbstractSet[str]` - Asset partition keys

512

- `add_output_metadata(metadata: Dict[str, Any])` - Add metadata to output

513

514

## Execution System

515

516

### Executors

517

518

Executors control how operations are executed, providing different execution strategies for different environments and performance requirements.

519

520

#### `Executor` { .api }

521

522

**Module:** `dagster._core.executor.base`

523

**Type:** Base class

524

525

Base executor interface for custom execution strategies.

526

527

```python

528

from dagster import Executor, InitExecutorContext, StepExecutionContext

529

from dagster import executor, Field, Int

530

531

class CustomExecutor(Executor):

532

"""Custom executor implementation."""

533

534

def __init__(self, max_concurrent: int = 4):

535

self.max_concurrent = max_concurrent

536

537

def execute(self, plan_context, execution_plan):

538

"""Execute the execution plan."""

539

# Custom execution logic

540

steps = execution_plan.get_all_steps()

541

542

# Execute steps with concurrency control

543

for step in steps:

544

# Step execution logic

545

pass

546

547

return [] # Return step events

548

549

@executor(

550

name="custom_executor",

551

config_schema={

552

"max_concurrent": Field(Int, default_value=4, description="Max concurrent steps")

553

}

554

)

555

def custom_executor(init_context: InitExecutorContext) -> CustomExecutor:

556

"""Create custom executor from configuration."""

557

max_concurrent = init_context.executor_config["max_concurrent"]

558

return CustomExecutor(max_concurrent=max_concurrent)

559

```

560

561

#### Built-in Executors

562

563

##### `in_process_executor` { .api }

564

565

**Module:** `dagster._core.definitions.executor_definition`

566

**Type:** ExecutorDefinition

567

568

Single-process, single-threaded executor for development and testing.

569

570

```python

571

from dagster import job, in_process_executor

572

573

@job(executor_def=in_process_executor)

574

def single_process_job():

575

"""Job using in-process executor."""

576

op_a()

577

op_b()

578

op_c()

579

580

# Execute job

581

from dagster import execute_job

582

result = execute_job(single_process_job)

583

```

584

585

##### `multiprocess_executor` { .api }

586

587

**Module:** `dagster._core.definitions.executor_definition`

588

**Type:** ExecutorDefinition

589

590

Multi-process executor for parallel execution.

591

592

```python

593

from dagster import job, multiprocess_executor

594

595

@job(executor_def=multiprocess_executor)

596

def parallel_job():

597

"""Job using multiprocess executor."""

598

# These ops can run in parallel

599

result_a = op_a()

600

result_b = op_b()

601

602

# This op depends on both and runs after

603

op_c(result_a, result_b)

604

605

# Execute with multiprocess config

606

result = execute_job(

607

parallel_job,

608

run_config={

609

"execution": {

610

"config": {

611

"multiprocess": {

612

"max_concurrent": 8,

613

"retries": {"enabled": {}},

614

"start_method": "spawn"

615

}

616

}

617

}

618

}

619

)

620

```

621

622

**Configuration Parameters:**

623

- `max_concurrent: int = 4` - Maximum concurrent processes

624

- `retries: Optional[RetryMode]` - Retry configuration

625

- `start_method: Optional[str]` - Process start method ("spawn", "fork", "forkserver")

626

627

## Execution APIs

628

629

### Job Execution

630

631

#### `execute_job` { .api }

632

633

**Module:** `dagster._core.execution.api`

634

**Type:** Function

635

636

Execute a job with configuration and return execution results.

637

638

```python

639

from dagster import execute_job, job, op, Config, DagsterInstance

640

641

@op

642

def hello_op(name: str) -> str:

643

return f"Hello, {name}!"

644

645

@job

646

def greeting_job():

647

hello_op()

648

649

# Basic execution

650

result = execute_job(

651

greeting_job,

652

run_config={

653

"ops": {

654

"hello_op": {

655

"inputs": {"name": {"value": "World"}}

656

}

657

}

658

}

659

)

660

661

# Execution with instance and tags

662

from dagster import DagsterInstance

663

664

instance = DagsterInstance.ephemeral()

665

result = execute_job(

666

greeting_job,

667

run_config={"ops": {"hello_op": {"inputs": {"name": {"value": "Alice"}}}}},

668

instance=instance,

669

tags={"env": "test", "team": "data"},

670

run_id="custom-run-id-123"

671

)

672

673

# Check execution success

674

if result.success:

675

print("Job executed successfully")

676

print(f"Run ID: {result.run_id}")

677

678

# Access step results

679

for event in result.all_events:

680

if event.is_step_success:

681

print(f"Step {event.step_key} succeeded")

682

else:

683

print("Job execution failed")

684

print(f"Failure info: {result.failure_info}")

685

```

686

687

**Parameters:**

688

- `job_def: JobDefinition` - Job definition to execute

689

- `run_config: Optional[Dict[str, Any]]` - Run configuration

690

- `instance: Optional[DagsterInstance]` - Dagster instance

691

- `partition_key: Optional[str]` - Partition key for partitioned jobs

692

- `run_id: Optional[str]` - Custom run ID

693

- `tags: Optional[Dict[str, str]]` - Run tags

694

- `raise_on_error: bool = True` - Whether to raise on execution failure

695

696

#### `JobExecutionResult` { .api }

697

698

**Module:** `dagster._core.execution.job_execution_result`

699

**Type:** Class

700

701

Result of job execution with access to events, outputs, and metadata.

702

703

```python

704

# Access execution results

705

result = execute_job(my_job, run_config=config)

706

707

# Basic result information

708

success = result.success # bool: Whether execution succeeded

709

run_id = result.run_id # str: Unique run identifier

710

job_def = result.job_def # JobDefinition: Job that was executed

711

712

# Event access

713

all_events = result.all_events # List[DagsterEvent]: All execution events

714

step_events = result.step_event_list # List[DagsterEvent]: Step-specific events

715

716

# Asset materializations

717

materializations = result.asset_materializations

718

for materialization in materializations:

719

asset_key = materialization.asset_key

720

metadata = materialization.metadata_entries

721

print(f"Materialized {asset_key} with metadata: {metadata}")

722

723

# Step outputs

724

output = result.output_for_step("step_name", "output_name")

725

step_outputs = result.step_outputs_by_step_key # Dict of outputs by step

726

727

# Failure information

728

if not result.success:

729

failure_info = result.failure_info

730

print(f"Execution failed: {failure_info}")

731

```

732

733

### Asset Materialization

734

735

#### `materialize` { .api }

736

737

**Module:** `dagster._core.definitions.materialize`

738

**Type:** Function

739

740

Materialize assets with dependencies and return materialization results.

741

742

```python

743

from dagster import materialize, asset, Definitions

744

import pandas as pd

745

746

@asset

747

def users() -> pd.DataFrame:

748

return pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})

749

750

@asset

751

def user_stats(users: pd.DataFrame) -> dict:

752

return {"count": len(users), "avg_id": users["id"].mean()}

753

754

# Materialize specific assets

755

result = materialize([users, user_stats])

756

757

# Materialize with configuration

758

result = materialize(

759

[users, user_stats],

760

run_config={

761

"resources": {

762

"io_manager": {

763

"config": {"base_path": "/tmp/dagster"}

764

}

765

}

766

}

767

)

768

769

# Materialize with resources

770

from dagster import resource

771

772

@resource

773

def database():

774

return {"connection": "postgresql://localhost/db"}

775

776

defs = Definitions(

777

assets=[users, user_stats],

778

resources={"database": database}

779

)

780

781

result = materialize(

782

[users, user_stats],

783

resources={"database": database}

784

)

785

786

# Materialize with partition selection

787

from dagster import DailyPartitionsDefinition

788

789

daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")

790

791

@asset(partitions_def=daily_partitions)

792

def daily_data() -> pd.DataFrame:

793

return pd.DataFrame({"date": [pd.Timestamp.now()], "value": [42]})

794

795

# Materialize specific partition

796

result = materialize(

797

[daily_data],

798

partition_key="2023-01-15"

799

)

800

801

# Check materialization results

802

if result.success:

803

materializations = result.asset_materializations

804

for mat in materializations:

805

print(f"Materialized: {mat.asset_key}")

806

print(f"Metadata: {mat.metadata}")

807

```

808

809

**Parameters:**

810

- `assets: Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset]]` - Assets to materialize

811

- `run_config: Any = None` - Run configuration (accepts various formats)

812

- `instance: Optional[DagsterInstance] = None` - Dagster instance (uses default if None)

813

- `resources: Optional[Mapping[str, object]] = None` - Resource instances

814

- `partition_key: Optional[str] = None` - Partition key for partitioned assets

815

- `raise_on_error: bool = True` - Whether to raise exception on failure

816

- `tags: Optional[Mapping[str, str]] = None` - Run tags for metadata

817

- `selection: Optional[CoercibleToAssetSelection] = None` - Asset selection for partial materialization

818

819

**Returns:** `ExecuteInProcessResult` - Contains execution results, asset materializations, and run information

820

821

#### `materialize_to_memory` { .api }

822

823

**Module:** `dagster._core.definitions.materialize`

824

**Type:** Function

825

826

Materialize assets to memory for testing and development.

827

828

```python

829

from dagster import materialize_to_memory

830

831

# Materialize to memory (no I/O)

832

result = materialize_to_memory([users, user_stats])

833

834

# Access materialized values directly

835

if result.success:

836

# Get materialized asset values

837

users_data = result.output_for_node("users")

838

stats_data = result.output_for_node("user_stats")

839

840

print(f"Users: {users_data}")

841

print(f"Stats: {stats_data}")

842

843

# Memory materialization with partition

844

result = materialize_to_memory(

845

[daily_data],

846

partition_key="2023-01-15"

847

)

848

849

daily_value = result.output_for_node("daily_data")

850

print(f"Daily data: {daily_value}")

851

```

852

853

### Resource Building

854

855

#### `build_resources` { .api }

856

857

**Module:** `dagster._core.execution.build_resources`

858

**Type:** Function

859

860

Build and initialize resources outside of execution context.

861

862

```python

863

from dagster import build_resources, resource, Config

864

865

class DatabaseConfig(Config):

866

host: str = "localhost"

867

port: int = 5432

868

database: str = "mydb"

869

870

@resource(config_schema=DatabaseConfig)

871

def database_resource(config: DatabaseConfig):

872

connection = f"postgresql://{config.host}:{config.port}/{config.database}"

873

return {"connection": connection, "pool_size": 10}

874

875

@resource

876

def cache_resource():

877

return {"redis_url": "redis://localhost:6379"}

878

879

# Build resources with configuration

880

with build_resources({

881

"database": database_resource,

882

"cache": cache_resource

883

}, run_config={

884

"resources": {

885

"database": {

886

"config": {

887

"host": "prod-db",

888

"port": 5432,

889

"database": "production"

890

}

891

}

892

}

893

}) as resources:

894

# Use resources

895

db = resources.database

896

cache = resources.cache

897

898

print(f"Database: {db['connection']}")

899

print(f"Cache: {cache['redis_url']}")

900

901

# Resources are automatically cleaned up

902

```

903

904

### Configuration Validation

905

906

#### `validate_run_config` { .api }

907

908

**Module:** `dagster._core.execution.validate_run_config`

909

**Type:** Function

910

911

Validate run configuration against job schema.

912

913

```python

914

from dagster import validate_run_config, job, op, Field, String, Int

915

916

@op(config_schema={"name": String, "count": Int})

917

def configured_op(context):

918

return f"Hello {context.op_config['name']} x{context.op_config['count']}"

919

920

@job

921

def configured_job():

922

configured_op()

923

924

# Validate configuration

925

run_config = {

926

"ops": {

927

"configured_op": {

928

"config": {

929

"name": "Alice",

930

"count": 5

931

}

932

}

933

}

934

}

935

936

validation_result = validate_run_config(configured_job, run_config)

937

938

if validation_result.success:

939

print("Configuration is valid")

940

print(f"Validated config: {validation_result.run_config}")

941

else:

942

print("Configuration validation failed")

943

for error in validation_result.errors:

944

print(f"Error: {error.message}")

945

print(f"Path: {error.stack}")

946

```

947

948

This comprehensive execution system provides rich runtime environments, flexible execution strategies, and powerful APIs for materializing assets and executing jobs. The context system offers extensive access to configuration, resources, logging, and metadata, enabling sophisticated data pipeline implementations.

949

950

For storage and I/O management used in execution, see [Storage and I/O](./storage-io.md). For events generated during execution, see [Events and Metadata](./events-metadata.md).