0
# Execution and Contexts
1
2
This document covers Dagster's execution system, including execution contexts, executors, and execution APIs. The execution system provides rich runtime environments with logging, configuration, resources, and metadata for all computational units.
3
4
## Execution Contexts
5
6
Execution contexts provide the runtime environment for operations, assets, and other computational units. They offer access to configuration, resources, logging, metadata, and execution state.
7
8
### Operation Execution Context
9
10
#### `OpExecutionContext` { .api }
11
12
**Module:** `dagster._core.execution.context.compute`
13
**Type:** Class
14
15
Execution context for operations with access to configuration, resources, and logging.
16
17
```python
18
from dagster import op, OpExecutionContext, Config, resource
19
import logging
20
21
class MyOpConfig(Config):
22
batch_size: int = 1000
23
debug_mode: bool = False
24
25
@resource
26
def database_resource():
27
return {"connection": "postgresql://localhost/db"}
28
29
@op(required_resource_keys={"database"})
30
def process_data(context: OpExecutionContext, data: list) -> list:
31
"""Op with full context access."""
32
33
# Access configuration
34
config = context.op_config
35
batch_size = config.get("batch_size", 100)
36
37
# Access resources
38
db = context.resources.database
39
connection = db["connection"]
40
41
# Logging with different levels
42
context.log.info(f"Processing {len(data)} records with batch_size {batch_size}")
43
context.log.debug(f"Using database: {connection}")
44
context.log.warning("This is a warning message")
45
46
# Access run information
47
run_id = context.run_id
48
op_name = context.op_def.name
49
50
# Access step information
51
step_key = context.step_context.step.key
52
53
# Metadata logging
54
context.add_output_metadata({
55
"records_processed": len(data),
56
"batch_size": batch_size,
57
"processing_time": "2.3s"
58
})
59
60
# Partition information (if partitioned)
61
if context.has_partition_key:
62
partition_key = context.partition_key
63
context.log.info(f"Processing partition: {partition_key}")
64
65
# Time window for time-partitioned assets
66
if hasattr(context, 'partition_time_window'):
67
time_window = context.partition_time_window
68
context.log.info(f"Time window: {time_window.start} to {time_window.end}")
69
70
# Process data
71
processed = data[:batch_size] # Simple processing
72
return processed
73
```
74
75
**Key Properties and Methods:**
76
- `op_config: Dict[str, Any]` - Operation configuration
77
- `resources: Resources` - Available resources
78
- `log: DagsterLogManager` - Logger for the operation
79
- `run_id: str` - Unique run identifier
80
- `op_def: OpDefinition` - Operation definition
81
- `step_context: StepExecutionContext` - Step execution context
82
- `has_partition_key: bool` - Whether operation is partitioned
83
- `partition_key: Optional[str]` - Partition key if partitioned
84
- `asset_partition_key_for_output: Optional[str]` - Asset partition key
85
- `add_output_metadata(metadata: Dict[str, Any])` - Add metadata to output
86
87
### Asset Execution Context
88
89
#### `AssetExecutionContext` { .api }
90
91
**Module:** `dagster._core.execution.context.compute`
92
**Type:** Class
93
94
Execution context for assets with asset-specific functionality and metadata.
95
96
```python
97
from dagster import asset, AssetExecutionContext, MaterializeResult
98
import pandas as pd
99
100
@asset(
101
group_name="analytics",
102
compute_kind="pandas"
103
)
104
def users_analysis(context: AssetExecutionContext, users_data: pd.DataFrame) -> MaterializeResult:
105
"""Asset with comprehensive context usage."""
106
107
# Asset-specific information
108
asset_key = context.asset_key
109
asset_name = asset_key.path[-1] # Last part of asset key
110
111
# Partition information for assets
112
if context.has_partition_key:
113
partition_key = context.partition_key
114
context.log.info(f"Materializing {asset_name} for partition {partition_key}")
115
116
# Time window for time-partitioned assets
117
if hasattr(context, 'partition_time_window'):
118
time_window = context.partition_time_window
119
start_date = time_window.start.strftime('%Y-%m-%d')
120
end_date = time_window.end.strftime('%Y-%m-%d')
121
122
# Filter data by time window
123
users_data = users_data[
124
(users_data['created_at'] >= start_date) &
125
(users_data['created_at'] < end_date)
126
]
127
128
# Access upstream asset values
129
upstream_assets = context.selected_asset_keys
130
context.log.info(f"Depends on assets: {[key.path[-1] for key in upstream_assets]}")
131
132
# Perform analysis
133
total_users = len(users_data)
134
active_users = len(users_data[users_data['active'] == True])
135
avg_age = users_data['age'].mean()
136
137
# Rich logging
138
context.log.info(f"Analyzed {total_users} users")
139
context.log.info(f"Active users: {active_users} ({active_users/total_users:.1%})")
140
141
# Create result DataFrame
142
analysis_result = pd.DataFrame({
143
'metric': ['total_users', 'active_users', 'average_age'],
144
'value': [total_users, active_users, avg_age],
145
'date': pd.Timestamp.now()
146
})
147
148
# Return MaterializeResult with metadata
149
return MaterializeResult(
150
value=analysis_result,
151
metadata={
152
"total_users": total_users,
153
"active_users": active_users,
154
"activity_rate": active_users / total_users,
155
"average_age": avg_age,
156
"data_quality_score": 0.95,
157
"last_updated": pd.Timestamp.now().isoformat()
158
}
159
)
160
161
@asset
162
def user_segments(context: AssetExecutionContext, users_analysis: pd.DataFrame) -> dict:
163
"""Asset consuming upstream asset with context."""
164
165
# Access asset lineage information
166
context.log.info(f"Consuming asset: {context.asset_key}")
167
168
# Get upstream asset data
169
total_users = users_analysis[users_analysis['metric'] == 'total_users']['value'].iloc[0]
170
active_users = users_analysis[users_analysis['metric'] == 'active_users']['value'].iloc[0]
171
172
# Generate segments
173
segments = {
174
'high_value': int(active_users * 0.2),
175
'medium_value': int(active_users * 0.5),
176
'low_value': int(active_users * 0.3)
177
}
178
179
# Log segment information
180
for segment, count in segments.items():
181
context.log.info(f"Segment {segment}: {count} users")
182
183
return segments
184
```
185
186
**Additional Asset Context Properties:**
187
- `asset_key: AssetKey` - Current asset key
188
- `selected_asset_keys: AbstractSet[AssetKey]` - Selected asset keys in materialization
189
- `asset_partition_key_for_input(input_name: str) -> Optional[str]` - Partition key for input
190
- `asset_partition_key_for_output(output_name: str) -> Optional[str]` - Partition key for output
191
- `asset_partition_keys_for_input(input_name: str) -> Optional[AbstractSet[str]]` - Multiple partition keys
192
- `partition_time_window: Optional[TimeWindow]` - Time window for time partitions
193
194
### Asset Check Execution Context
195
196
#### `AssetCheckExecutionContext` { .api }
197
198
**Module:** `dagster._core.execution.context.compute`
199
**Type:** Class
200
201
Execution context for asset checks with check-specific functionality.
202
203
```python
204
from dagster import asset_check, AssetCheckExecutionContext, AssetCheckResult
205
import pandas as pd
206
207
@asset_check(asset="users_data")
208
def users_data_quality_check(context: AssetCheckExecutionContext, users_data: pd.DataFrame) -> AssetCheckResult:
209
"""Asset check with context access."""
210
211
# Asset check specific information
212
asset_key = context.asset_key
213
check_name = context.op_def.name
214
215
context.log.info(f"Running check '{check_name}' for asset {asset_key}")
216
217
# Perform data quality checks
218
null_count = users_data.isnull().sum().sum()
219
duplicate_count = users_data.duplicated().sum()
220
total_records = len(users_data)
221
222
# Quality thresholds
223
null_threshold = 0.05 # 5% null values allowed
224
duplicate_threshold = 0.02 # 2% duplicates allowed
225
226
null_rate = null_count / (total_records * len(users_data.columns))
227
duplicate_rate = duplicate_count / total_records
228
229
# Log detailed results
230
context.log.info(f"Null rate: {null_rate:.2%} (threshold: {null_threshold:.2%})")
231
context.log.info(f"Duplicate rate: {duplicate_rate:.2%} (threshold: {duplicate_threshold:.2%})")
232
233
# Determine check result
234
passed = null_rate <= null_threshold and duplicate_rate <= duplicate_threshold
235
236
if not passed:
237
context.log.warning(f"Data quality check failed for {asset_key}")
238
239
return AssetCheckResult(
240
passed=passed,
241
metadata={
242
"null_count": null_count,
243
"null_rate": null_rate,
244
"duplicate_count": duplicate_count,
245
"duplicate_rate": duplicate_rate,
246
"total_records": total_records,
247
"check_timestamp": pd.Timestamp.now().isoformat()
248
}
249
)
250
```
251
252
### Context Builders
253
254
Context builders enable testing and local development by creating execution contexts outside of normal pipeline runs.
255
256
#### `build_op_context` { .api }
257
258
**Module:** `dagster._core.execution.context.invocation`
259
**Type:** Function
260
261
Build operation execution context for testing and development.
262
263
```python
264
from dagster import build_op_context, op, resource
265
266
@resource
267
def test_database():
268
return {"connection": "test://localhost/testdb"}
269
270
@op(
271
config_schema={"batch_size": int},
272
required_resource_keys={"database"}
273
)
274
def my_op(context):
275
batch_size = context.op_config["batch_size"]
276
db = context.resources.database
277
return f"Processing with batch_size {batch_size} using {db['connection']}"
278
279
# Build context for testing
280
context = build_op_context(
281
config={"batch_size": 100},
282
resources={"database": test_database},
283
partition_key="2023-01-01",
284
op_config={"batch_size": 500} # Alternative config specification
285
)
286
287
# Test op directly
288
result = my_op(context)
289
print(result) # "Processing with batch_size 500 using test://localhost/testdb"
290
291
# Advanced context building
292
from dagster import DagsterLogManager, build_init_logger_context
293
294
logger_def = colored_console_logger
295
logger_context = build_init_logger_context()
296
logger = logger_def.logger_fn(logger_context)
297
298
advanced_context = build_op_context(
299
config={"batch_size": 1000},
300
resources={"database": test_database},
301
partition_key="2023-01-02",
302
run_id="test-run-12345",
303
logger_defs={"custom": logger_def},
304
tags={"env": "test", "team": "data"}
305
)
306
```
307
308
**Parameters:**
309
- `config: Optional[Dict[str, Any]]` - Op configuration
310
- `resources: Optional[Dict[str, Any]]` - Resource instances
311
- `partition_key: Optional[str]` - Partition key
312
- `run_id: Optional[str]` - Run ID
313
- `logger_defs: Optional[Dict[str, LoggerDefinition]]` - Logger definitions
314
- `op_config: Optional[Dict[str, Any]]` - Alternative config specification
315
- `tags: Optional[Dict[str, str]]` - Execution tags
316
- `run_tags: Optional[Dict[str, str]]` - Run-level tags
317
318
#### `build_asset_context` { .api }
319
320
**Module:** `dagster._core.execution.context.invocation`
321
**Type:** Function
322
323
Build asset execution context for testing and development.
324
325
```python
326
from dagster import build_asset_context, asset, AssetKey
327
import pandas as pd
328
329
@asset
330
def my_asset(context):
331
asset_key = context.asset_key
332
partition_key = context.partition_key if context.has_partition_key else None
333
return f"Asset {asset_key} for partition {partition_key}"
334
335
# Build asset context
336
context = build_asset_context(
337
asset_key=AssetKey("my_asset"),
338
partition_key="2023-01-01",
339
resources={"database": test_database},
340
config={"processing_mode": "test"}
341
)
342
343
# Test asset directly
344
result = my_asset(context)
345
print(result) # "Asset ['my_asset'] for partition 2023-01-01"
346
347
# Context with upstream asset keys
348
upstream_context = build_asset_context(
349
asset_key=AssetKey(["analytics", "user_metrics"]),
350
partition_key="2023-01-01",
351
selected_asset_keys={AssetKey("users"), AssetKey("events")},
352
resources={"warehouse": warehouse_resource}
353
)
354
```
355
356
**Parameters:**
357
- `asset_key: Optional[AssetKey]` - Asset key
358
- `config: Optional[Dict[str, Any]]` - Asset configuration
359
- `resources: Optional[Dict[str, Any]]` - Resource instances
360
- `partition_key: Optional[str]` - Partition key
361
- `selected_asset_keys: Optional[AbstractSet[AssetKey]]` - Selected asset keys
362
- `run_id: Optional[str]` - Run ID
363
- `tags: Optional[Dict[str, str]]` - Execution tags
364
- `run_tags: Optional[Dict[str, str]]` - Run-level tags
365
366
### Input and Output Contexts
367
368
#### `InputContext` { .api }
369
370
**Module:** `dagster._core.execution.context.input`
371
**Type:** Class
372
373
Context for input loading with I/O managers and input managers.
374
375
```python
376
from dagster import InputContext, input_manager, IOManager
377
import pandas as pd
378
379
class DataWarehouseInputManager(IOManager):
380
def load_input(self, context: InputContext) -> pd.DataFrame:
381
"""Load input using context information."""
382
383
# Access input metadata
384
upstream_output = context.upstream_output
385
asset_key = upstream_output.asset_key if upstream_output else None
386
387
# Input configuration
388
metadata = context.metadata
389
dagster_type = context.dagster_type
390
391
# Partition information
392
if context.has_asset_partitions:
393
asset_partitions = context.asset_partition_keys
394
context.log.info(f"Loading partitions: {asset_partitions}")
395
396
# Resource access
397
if hasattr(context, 'resources'):
398
warehouse = context.resources.data_warehouse
399
400
context.log.info(f"Loading input for asset {asset_key}")
401
402
# Load data based on context
403
if asset_key:
404
return pd.read_parquet(f"/warehouse/{asset_key.path[-1]}.parquet")
405
else:
406
return pd.DataFrame()
407
408
@input_manager
409
def warehouse_input_manager() -> DataWarehouseInputManager:
410
return DataWarehouseInputManager()
411
412
# Usage in asset
413
@asset(input_manager_key="warehouse_loader")
414
def processed_data(context, raw_data: pd.DataFrame) -> pd.DataFrame:
415
"""Asset using custom input loading."""
416
return raw_data.dropna()
417
```
418
419
**Key Properties:**
420
- `upstream_output: Optional[OutputContext]` - Upstream output context
421
- `asset_key: Optional[AssetKey]` - Asset key being loaded
422
- `metadata: Optional[Dict[str, Any]]` - Input metadata
423
- `config: Optional[Dict[str, Any]]` - Input configuration
424
- `dagster_type: Optional[DagsterType]` - Expected input type
425
- `log: DagsterLogManager` - Logger
426
- `resources: Resources` - Available resources
427
- `has_asset_partitions: bool` - Whether input has asset partitions
428
- `asset_partition_keys: AbstractSet[str]` - Asset partition keys
429
430
#### `OutputContext` { .api }
431
432
**Module:** `dagster._core.execution.context.output`
433
**Type:** Class
434
435
Context for output handling with I/O managers.
436
437
```python
438
from dagster import OutputContext, IOManager, asset
439
import pandas as pd
440
import os
441
442
class S3IOManager(IOManager):
443
def handle_output(self, context: OutputContext, obj: pd.DataFrame) -> None:
444
"""Handle output using context information."""
445
446
# Access output metadata
447
asset_key = context.asset_key
448
step_key = context.step_key
449
name = context.name
450
451
# Partition information
452
if context.has_asset_partitions:
453
partition_keys = context.asset_partition_keys
454
context.log.info(f"Storing partitions: {partition_keys}")
455
456
# Metadata and configuration
457
metadata = context.metadata
458
config = context.config
459
460
# Resource access
461
s3_client = context.resources.s3
462
463
# Generate file path
464
if asset_key:
465
path_parts = asset_key.path
466
file_path = "/".join(path_parts) + ".parquet"
467
else:
468
file_path = f"{step_key}_{name}.parquet"
469
470
context.log.info(f"Storing output to S3: s3://my-bucket/{file_path}")
471
472
# Store data
473
obj.to_parquet(f"s3://my-bucket/{file_path}")
474
475
# Add output metadata
476
context.add_output_metadata({
477
"s3_path": f"s3://my-bucket/{file_path}",
478
"rows": len(obj),
479
"columns": len(obj.columns),
480
"size_mb": obj.memory_usage(deep=True).sum() / 1024 / 1024
481
})
482
483
def load_input(self, context: InputContext) -> pd.DataFrame:
484
"""Load input from S3."""
485
asset_key = context.asset_key
486
path_parts = asset_key.path
487
file_path = "/".join(path_parts) + ".parquet"
488
489
context.log.info(f"Loading from S3: s3://my-bucket/{file_path}")
490
return pd.read_parquet(f"s3://my-bucket/{file_path}")
491
492
@asset(io_manager_key="s3_manager")
493
def sales_data(context) -> pd.DataFrame:
494
"""Asset using S3 I/O manager."""
495
# Generate sales data
496
return pd.DataFrame({
497
"date": pd.date_range("2023-01-01", periods=100),
498
"sales": np.random.randint(1000, 5000, 100)
499
})
500
```
501
502
**Key Properties:**
503
- `asset_key: Optional[AssetKey]` - Asset key being output
504
- `step_key: str` - Step key
505
- `name: str` - Output name
506
- `metadata: Optional[Dict[str, Any]]` - Output metadata
507
- `config: Optional[Dict[str, Any]]` - Output configuration
508
- `log: DagsterLogManager` - Logger
509
- `resources: Resources` - Available resources
510
- `has_asset_partitions: bool` - Whether output has asset partitions
511
- `asset_partition_keys: AbstractSet[str]` - Asset partition keys
512
- `add_output_metadata(metadata: Dict[str, Any])` - Add metadata to output
513
514
## Execution System
515
516
### Executors
517
518
Executors control how operations are executed, providing different execution strategies for different environments and performance requirements.
519
520
#### `Executor` { .api }
521
522
**Module:** `dagster._core.executor.base`
523
**Type:** Base class
524
525
Base executor interface for custom execution strategies.
526
527
```python
528
from dagster import Executor, InitExecutorContext, StepExecutionContext
529
from dagster import executor, Field, Int
530
531
class CustomExecutor(Executor):
532
"""Custom executor implementation."""
533
534
def __init__(self, max_concurrent: int = 4):
535
self.max_concurrent = max_concurrent
536
537
def execute(self, plan_context, execution_plan):
538
"""Execute the execution plan."""
539
# Custom execution logic
540
steps = execution_plan.get_all_steps()
541
542
# Execute steps with concurrency control
543
for step in steps:
544
# Step execution logic
545
pass
546
547
return [] # Return step events
548
549
@executor(
550
name="custom_executor",
551
config_schema={
552
"max_concurrent": Field(Int, default_value=4, description="Max concurrent steps")
553
}
554
)
555
def custom_executor(init_context: InitExecutorContext) -> CustomExecutor:
556
"""Create custom executor from configuration."""
557
max_concurrent = init_context.executor_config["max_concurrent"]
558
return CustomExecutor(max_concurrent=max_concurrent)
559
```
560
561
#### Built-in Executors
562
563
##### `in_process_executor` { .api }
564
565
**Module:** `dagster._core.definitions.executor_definition`
566
**Type:** ExecutorDefinition
567
568
Single-process, single-threaded executor for development and testing.
569
570
```python
571
from dagster import job, in_process_executor
572
573
@job(executor_def=in_process_executor)
574
def single_process_job():
575
"""Job using in-process executor."""
576
op_a()
577
op_b()
578
op_c()
579
580
# Execute job
581
from dagster import execute_job
582
result = execute_job(single_process_job)
583
```
584
585
##### `multiprocess_executor` { .api }
586
587
**Module:** `dagster._core.definitions.executor_definition`
588
**Type:** ExecutorDefinition
589
590
Multi-process executor for parallel execution.
591
592
```python
593
from dagster import job, multiprocess_executor
594
595
@job(executor_def=multiprocess_executor)
596
def parallel_job():
597
"""Job using multiprocess executor."""
598
# These ops can run in parallel
599
result_a = op_a()
600
result_b = op_b()
601
602
# This op depends on both and runs after
603
op_c(result_a, result_b)
604
605
# Execute with multiprocess config
606
result = execute_job(
607
parallel_job,
608
run_config={
609
"execution": {
610
"config": {
611
"multiprocess": {
612
"max_concurrent": 8,
613
"retries": {"enabled": {}},
614
"start_method": "spawn"
615
}
616
}
617
}
618
}
619
)
620
```
621
622
**Configuration Parameters:**
623
- `max_concurrent: int = 4` - Maximum concurrent processes
624
- `retries: Optional[RetryMode]` - Retry configuration
625
- `start_method: Optional[str]` - Process start method ("spawn", "fork", "forkserver")
626
627
## Execution APIs
628
629
### Job Execution
630
631
#### `execute_job` { .api }
632
633
**Module:** `dagster._core.execution.api`
634
**Type:** Function
635
636
Execute a job with configuration and return execution results.
637
638
```python
639
from dagster import execute_job, job, op, Config, DagsterInstance
640
641
@op
642
def hello_op(name: str) -> str:
643
return f"Hello, {name}!"
644
645
@job
646
def greeting_job():
647
hello_op()
648
649
# Basic execution
650
result = execute_job(
651
greeting_job,
652
run_config={
653
"ops": {
654
"hello_op": {
655
"inputs": {"name": {"value": "World"}}
656
}
657
}
658
}
659
)
660
661
# Execution with instance and tags
662
from dagster import DagsterInstance
663
664
instance = DagsterInstance.ephemeral()
665
result = execute_job(
666
greeting_job,
667
run_config={"ops": {"hello_op": {"inputs": {"name": {"value": "Alice"}}}}},
668
instance=instance,
669
tags={"env": "test", "team": "data"},
670
run_id="custom-run-id-123"
671
)
672
673
# Check execution success
674
if result.success:
675
print("Job executed successfully")
676
print(f"Run ID: {result.run_id}")
677
678
# Access step results
679
for event in result.all_events:
680
if event.is_step_success:
681
print(f"Step {event.step_key} succeeded")
682
else:
683
print("Job execution failed")
684
print(f"Failure info: {result.failure_info}")
685
```
686
687
**Parameters:**
688
- `job_def: JobDefinition` - Job definition to execute
689
- `run_config: Optional[Dict[str, Any]]` - Run configuration
690
- `instance: Optional[DagsterInstance]` - Dagster instance
691
- `partition_key: Optional[str]` - Partition key for partitioned jobs
692
- `run_id: Optional[str]` - Custom run ID
693
- `tags: Optional[Dict[str, str]]` - Run tags
694
- `raise_on_error: bool = True` - Whether to raise on execution failure
695
696
#### `JobExecutionResult` { .api }
697
698
**Module:** `dagster._core.execution.job_execution_result`
699
**Type:** Class
700
701
Result of job execution with access to events, outputs, and metadata.
702
703
```python
704
# Access execution results
705
result = execute_job(my_job, run_config=config)
706
707
# Basic result information
708
success = result.success # bool: Whether execution succeeded
709
run_id = result.run_id # str: Unique run identifier
710
job_def = result.job_def # JobDefinition: Job that was executed
711
712
# Event access
713
all_events = result.all_events # List[DagsterEvent]: All execution events
714
step_events = result.step_event_list # List[DagsterEvent]: Step-specific events
715
716
# Asset materializations
717
materializations = result.asset_materializations
718
for materialization in materializations:
719
asset_key = materialization.asset_key
720
metadata = materialization.metadata_entries
721
print(f"Materialized {asset_key} with metadata: {metadata}")
722
723
# Step outputs
724
output = result.output_for_step("step_name", "output_name")
725
step_outputs = result.step_outputs_by_step_key # Dict of outputs by step
726
727
# Failure information
728
if not result.success:
729
failure_info = result.failure_info
730
print(f"Execution failed: {failure_info}")
731
```
732
733
### Asset Materialization
734
735
#### `materialize` { .api }
736
737
**Module:** `dagster._core.definitions.materialize`
738
**Type:** Function
739
740
Materialize assets with dependencies and return materialization results.
741
742
```python
743
from dagster import materialize, asset, Definitions
744
import pandas as pd
745
746
@asset
747
def users() -> pd.DataFrame:
748
return pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
749
750
@asset
751
def user_stats(users: pd.DataFrame) -> dict:
752
return {"count": len(users), "avg_id": users["id"].mean()}
753
754
# Materialize specific assets
755
result = materialize([users, user_stats])
756
757
# Materialize with configuration
758
result = materialize(
759
[users, user_stats],
760
run_config={
761
"resources": {
762
"io_manager": {
763
"config": {"base_path": "/tmp/dagster"}
764
}
765
}
766
}
767
)
768
769
# Materialize with resources
770
from dagster import resource
771
772
@resource
773
def database():
774
return {"connection": "postgresql://localhost/db"}
775
776
defs = Definitions(
777
assets=[users, user_stats],
778
resources={"database": database}
779
)
780
781
result = materialize(
782
[users, user_stats],
783
resources={"database": database}
784
)
785
786
# Materialize with partition selection
787
from dagster import DailyPartitionsDefinition
788
789
daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")
790
791
@asset(partitions_def=daily_partitions)
792
def daily_data() -> pd.DataFrame:
793
return pd.DataFrame({"date": [pd.Timestamp.now()], "value": [42]})
794
795
# Materialize specific partition
796
result = materialize(
797
[daily_data],
798
partition_key="2023-01-15"
799
)
800
801
# Check materialization results
802
if result.success:
803
materializations = result.asset_materializations
804
for mat in materializations:
805
print(f"Materialized: {mat.asset_key}")
806
print(f"Metadata: {mat.metadata}")
807
```
808
809
**Parameters:**
810
- `assets: Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset]]` - Assets to materialize
811
- `run_config: Any = None` - Run configuration (accepts various formats)
812
- `instance: Optional[DagsterInstance] = None` - Dagster instance (uses default if None)
813
- `resources: Optional[Mapping[str, object]] = None` - Resource instances
814
- `partition_key: Optional[str] = None` - Partition key for partitioned assets
815
- `raise_on_error: bool = True` - Whether to raise exception on failure
816
- `tags: Optional[Mapping[str, str]] = None` - Run tags for metadata
817
- `selection: Optional[CoercibleToAssetSelection] = None` - Asset selection for partial materialization
818
819
**Returns:** `ExecuteInProcessResult` - Contains execution results, asset materializations, and run information
820
821
#### `materialize_to_memory` { .api }
822
823
**Module:** `dagster._core.definitions.materialize`
824
**Type:** Function
825
826
Materialize assets to memory for testing and development.
827
828
```python
829
from dagster import materialize_to_memory
830
831
# Materialize to memory (no I/O)
832
result = materialize_to_memory([users, user_stats])
833
834
# Access materialized values directly
835
if result.success:
836
# Get materialized asset values
837
users_data = result.output_for_node("users")
838
stats_data = result.output_for_node("user_stats")
839
840
print(f"Users: {users_data}")
841
print(f"Stats: {stats_data}")
842
843
# Memory materialization with partition
844
result = materialize_to_memory(
845
[daily_data],
846
partition_key="2023-01-15"
847
)
848
849
daily_value = result.output_for_node("daily_data")
850
print(f"Daily data: {daily_value}")
851
```
852
853
### Resource Building
854
855
#### `build_resources` { .api }
856
857
**Module:** `dagster._core.execution.build_resources`
858
**Type:** Function
859
860
Build and initialize resources outside of execution context.
861
862
```python
863
from dagster import build_resources, resource, Config
864
865
class DatabaseConfig(Config):
866
host: str = "localhost"
867
port: int = 5432
868
database: str = "mydb"
869
870
@resource(config_schema=DatabaseConfig)
871
def database_resource(config: DatabaseConfig):
872
connection = f"postgresql://{config.host}:{config.port}/{config.database}"
873
return {"connection": connection, "pool_size": 10}
874
875
@resource
876
def cache_resource():
877
return {"redis_url": "redis://localhost:6379"}
878
879
# Build resources with configuration
880
with build_resources({
881
"database": database_resource,
882
"cache": cache_resource
883
}, run_config={
884
"resources": {
885
"database": {
886
"config": {
887
"host": "prod-db",
888
"port": 5432,
889
"database": "production"
890
}
891
}
892
}
893
}) as resources:
894
# Use resources
895
db = resources.database
896
cache = resources.cache
897
898
print(f"Database: {db['connection']}")
899
print(f"Cache: {cache['redis_url']}")
900
901
# Resources are automatically cleaned up
902
```
903
904
### Configuration Validation
905
906
#### `validate_run_config` { .api }
907
908
**Module:** `dagster._core.execution.validate_run_config`
909
**Type:** Function
910
911
Validate run configuration against job schema.
912
913
```python
914
from dagster import validate_run_config, job, op, Field, String, Int
915
916
@op(config_schema={"name": String, "count": Int})
917
def configured_op(context):
918
return f"Hello {context.op_config['name']} x{context.op_config['count']}"
919
920
@job
921
def configured_job():
922
configured_op()
923
924
# Validate configuration
925
run_config = {
926
"ops": {
927
"configured_op": {
928
"config": {
929
"name": "Alice",
930
"count": 5
931
}
932
}
933
}
934
}
935
936
validation_result = validate_run_config(configured_job, run_config)
937
938
if validation_result.success:
939
print("Configuration is valid")
940
print(f"Validated config: {validation_result.run_config}")
941
else:
942
print("Configuration validation failed")
943
for error in validation_result.errors:
944
print(f"Error: {error.message}")
945
print(f"Path: {error.stack}")
946
```
947
948
This comprehensive execution system provides rich runtime environments, flexible execution strategies, and powerful APIs for materializing assets and executing jobs. The context system offers extensive access to configuration, resources, logging, and metadata, enabling sophisticated data pipeline implementations.
949
950
For storage and I/O management used in execution, see [Storage and I/O](./storage-io.md). For events generated during execution, see [Events and Metadata](./events-metadata.md).