A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
This document covers Dagster's execution system, including execution contexts, executors, and execution APIs. The execution system provides rich runtime environments with logging, configuration, resources, and metadata for all computational units.
Execution contexts provide the runtime environment for operations, assets, and other computational units. They offer access to configuration, resources, logging, metadata, and execution state.
OpExecutionContext { .api }Module: dagster._core.execution.context.compute
Type: Class
Execution context for operations with access to configuration, resources, and logging.
from dagster import op, OpExecutionContext, Config, resource
import logging
class MyOpConfig(Config):
batch_size: int = 1000
debug_mode: bool = False
@resource
def database_resource():
return {"connection": "postgresql://localhost/db"}
@op(required_resource_keys={"database"})
def process_data(context: OpExecutionContext, data: list) -> list:
"""Op with full context access."""
# Access configuration
config = context.op_config
batch_size = config.get("batch_size", 100)
# Access resources
db = context.resources.database
connection = db["connection"]
# Logging with different levels
context.log.info(f"Processing {len(data)} records with batch_size {batch_size}")
context.log.debug(f"Using database: {connection}")
context.log.warning("This is a warning message")
# Access run information
run_id = context.run_id
op_name = context.op_def.name
# Access step information
step_key = context.step_context.step.key
# Metadata logging
context.add_output_metadata({
"records_processed": len(data),
"batch_size": batch_size,
"processing_time": "2.3s"
})
# Partition information (if partitioned)
if context.has_partition_key:
partition_key = context.partition_key
context.log.info(f"Processing partition: {partition_key}")
# Time window for time-partitioned assets
if hasattr(context, 'partition_time_window'):
time_window = context.partition_time_window
context.log.info(f"Time window: {time_window.start} to {time_window.end}")
# Process data
processed = data[:batch_size] # Simple processing
return processedKey Properties and Methods:
op_config: Dict[str, Any] - Operation configurationresources: Resources - Available resourceslog: DagsterLogManager - Logger for the operationrun_id: str - Unique run identifierop_def: OpDefinition - Operation definitionstep_context: StepExecutionContext - Step execution contexthas_partition_key: bool - Whether operation is partitionedpartition_key: Optional[str] - Partition key if partitionedasset_partition_key_for_output: Optional[str] - Asset partition keyadd_output_metadata(metadata: Dict[str, Any]) - Add metadata to outputAssetExecutionContext { .api }Module: dagster._core.execution.context.compute
Type: Class
Execution context for assets with asset-specific functionality and metadata.
from dagster import asset, AssetExecutionContext, MaterializeResult
import pandas as pd
@asset(
group_name="analytics",
compute_kind="pandas"
)
def users_analysis(context: AssetExecutionContext, users_data: pd.DataFrame) -> MaterializeResult:
"""Asset with comprehensive context usage."""
# Asset-specific information
asset_key = context.asset_key
asset_name = asset_key.path[-1] # Last part of asset key
# Partition information for assets
if context.has_partition_key:
partition_key = context.partition_key
context.log.info(f"Materializing {asset_name} for partition {partition_key}")
# Time window for time-partitioned assets
if hasattr(context, 'partition_time_window'):
time_window = context.partition_time_window
start_date = time_window.start.strftime('%Y-%m-%d')
end_date = time_window.end.strftime('%Y-%m-%d')
# Filter data by time window
users_data = users_data[
(users_data['created_at'] >= start_date) &
(users_data['created_at'] < end_date)
]
# Access upstream asset values
upstream_assets = context.selected_asset_keys
context.log.info(f"Depends on assets: {[key.path[-1] for key in upstream_assets]}")
# Perform analysis
total_users = len(users_data)
active_users = len(users_data[users_data['active'] == True])
avg_age = users_data['age'].mean()
# Rich logging
context.log.info(f"Analyzed {total_users} users")
context.log.info(f"Active users: {active_users} ({active_users/total_users:.1%})")
# Create result DataFrame
analysis_result = pd.DataFrame({
'metric': ['total_users', 'active_users', 'average_age'],
'value': [total_users, active_users, avg_age],
'date': pd.Timestamp.now()
})
# Return MaterializeResult with metadata
return MaterializeResult(
value=analysis_result,
metadata={
"total_users": total_users,
"active_users": active_users,
"activity_rate": active_users / total_users,
"average_age": avg_age,
"data_quality_score": 0.95,
"last_updated": pd.Timestamp.now().isoformat()
}
)
@asset
def user_segments(context: AssetExecutionContext, users_analysis: pd.DataFrame) -> dict:
"""Asset consuming upstream asset with context."""
# Access asset lineage information
context.log.info(f"Consuming asset: {context.asset_key}")
# Get upstream asset data
total_users = users_analysis[users_analysis['metric'] == 'total_users']['value'].iloc[0]
active_users = users_analysis[users_analysis['metric'] == 'active_users']['value'].iloc[0]
# Generate segments
segments = {
'high_value': int(active_users * 0.2),
'medium_value': int(active_users * 0.5),
'low_value': int(active_users * 0.3)
}
# Log segment information
for segment, count in segments.items():
context.log.info(f"Segment {segment}: {count} users")
return segmentsAdditional Asset Context Properties:
asset_key: AssetKey - Current asset keyselected_asset_keys: AbstractSet[AssetKey] - Selected asset keys in materializationasset_partition_key_for_input(input_name: str) -> Optional[str] - Partition key for inputasset_partition_key_for_output(output_name: str) -> Optional[str] - Partition key for outputasset_partition_keys_for_input(input_name: str) -> Optional[AbstractSet[str]] - Multiple partition keyspartition_time_window: Optional[TimeWindow] - Time window for time partitionsAssetCheckExecutionContext { .api }Module: dagster._core.execution.context.compute
Type: Class
Execution context for asset checks with check-specific functionality.
from dagster import asset_check, AssetCheckExecutionContext, AssetCheckResult
import pandas as pd
@asset_check(asset="users_data")
def users_data_quality_check(context: AssetCheckExecutionContext, users_data: pd.DataFrame) -> AssetCheckResult:
"""Asset check with context access."""
# Asset check specific information
asset_key = context.asset_key
check_name = context.op_def.name
context.log.info(f"Running check '{check_name}' for asset {asset_key}")
# Perform data quality checks
null_count = users_data.isnull().sum().sum()
duplicate_count = users_data.duplicated().sum()
total_records = len(users_data)
# Quality thresholds
null_threshold = 0.05 # 5% null values allowed
duplicate_threshold = 0.02 # 2% duplicates allowed
null_rate = null_count / (total_records * len(users_data.columns))
duplicate_rate = duplicate_count / total_records
# Log detailed results
context.log.info(f"Null rate: {null_rate:.2%} (threshold: {null_threshold:.2%})")
context.log.info(f"Duplicate rate: {duplicate_rate:.2%} (threshold: {duplicate_threshold:.2%})")
# Determine check result
passed = null_rate <= null_threshold and duplicate_rate <= duplicate_threshold
if not passed:
context.log.warning(f"Data quality check failed for {asset_key}")
return AssetCheckResult(
passed=passed,
metadata={
"null_count": null_count,
"null_rate": null_rate,
"duplicate_count": duplicate_count,
"duplicate_rate": duplicate_rate,
"total_records": total_records,
"check_timestamp": pd.Timestamp.now().isoformat()
}
)Context builders enable testing and local development by creating execution contexts outside of normal pipeline runs.
build_op_context { .api }Module: dagster._core.execution.context.invocation
Type: Function
Build operation execution context for testing and development.
from dagster import build_op_context, op, resource
@resource
def test_database():
return {"connection": "test://localhost/testdb"}
@op(
config_schema={"batch_size": int},
required_resource_keys={"database"}
)
def my_op(context):
batch_size = context.op_config["batch_size"]
db = context.resources.database
return f"Processing with batch_size {batch_size} using {db['connection']}"
# Build context for testing
context = build_op_context(
config={"batch_size": 100},
resources={"database": test_database},
partition_key="2023-01-01",
op_config={"batch_size": 500} # Alternative config specification
)
# Test op directly
result = my_op(context)
print(result) # "Processing with batch_size 500 using test://localhost/testdb"
# Advanced context building
from dagster import DagsterLogManager, build_init_logger_context
logger_def = colored_console_logger
logger_context = build_init_logger_context()
logger = logger_def.logger_fn(logger_context)
advanced_context = build_op_context(
config={"batch_size": 1000},
resources={"database": test_database},
partition_key="2023-01-02",
run_id="test-run-12345",
logger_defs={"custom": logger_def},
tags={"env": "test", "team": "data"}
)Parameters:
config: Optional[Dict[str, Any]] - Op configurationresources: Optional[Dict[str, Any]] - Resource instancespartition_key: Optional[str] - Partition keyrun_id: Optional[str] - Run IDlogger_defs: Optional[Dict[str, LoggerDefinition]] - Logger definitionsop_config: Optional[Dict[str, Any]] - Alternative config specificationtags: Optional[Dict[str, str]] - Execution tagsrun_tags: Optional[Dict[str, str]] - Run-level tagsbuild_asset_context { .api }Module: dagster._core.execution.context.invocation
Type: Function
Build asset execution context for testing and development.
from dagster import build_asset_context, asset, AssetKey
import pandas as pd
@asset
def my_asset(context):
asset_key = context.asset_key
partition_key = context.partition_key if context.has_partition_key else None
return f"Asset {asset_key} for partition {partition_key}"
# Build asset context
context = build_asset_context(
asset_key=AssetKey("my_asset"),
partition_key="2023-01-01",
resources={"database": test_database},
config={"processing_mode": "test"}
)
# Test asset directly
result = my_asset(context)
print(result) # "Asset ['my_asset'] for partition 2023-01-01"
# Context with upstream asset keys
upstream_context = build_asset_context(
asset_key=AssetKey(["analytics", "user_metrics"]),
partition_key="2023-01-01",
selected_asset_keys={AssetKey("users"), AssetKey("events")},
resources={"warehouse": warehouse_resource}
)Parameters:
asset_key: Optional[AssetKey] - Asset keyconfig: Optional[Dict[str, Any]] - Asset configurationresources: Optional[Dict[str, Any]] - Resource instancespartition_key: Optional[str] - Partition keyselected_asset_keys: Optional[AbstractSet[AssetKey]] - Selected asset keysrun_id: Optional[str] - Run IDtags: Optional[Dict[str, str]] - Execution tagsrun_tags: Optional[Dict[str, str]] - Run-level tagsInputContext { .api }Module: dagster._core.execution.context.input
Type: Class
Context for input loading with I/O managers and input managers.
from dagster import InputContext, input_manager, IOManager
import pandas as pd
class DataWarehouseInputManager(IOManager):
def load_input(self, context: InputContext) -> pd.DataFrame:
"""Load input using context information."""
# Access input metadata
upstream_output = context.upstream_output
asset_key = upstream_output.asset_key if upstream_output else None
# Input configuration
metadata = context.metadata
dagster_type = context.dagster_type
# Partition information
if context.has_asset_partitions:
asset_partitions = context.asset_partition_keys
context.log.info(f"Loading partitions: {asset_partitions}")
# Resource access
if hasattr(context, 'resources'):
warehouse = context.resources.data_warehouse
context.log.info(f"Loading input for asset {asset_key}")
# Load data based on context
if asset_key:
return pd.read_parquet(f"/warehouse/{asset_key.path[-1]}.parquet")
else:
return pd.DataFrame()
@input_manager
def warehouse_input_manager() -> DataWarehouseInputManager:
return DataWarehouseInputManager()
# Usage in asset
@asset(input_manager_key="warehouse_loader")
def processed_data(context, raw_data: pd.DataFrame) -> pd.DataFrame:
"""Asset using custom input loading."""
return raw_data.dropna()Key Properties:
upstream_output: Optional[OutputContext] - Upstream output contextasset_key: Optional[AssetKey] - Asset key being loadedmetadata: Optional[Dict[str, Any]] - Input metadataconfig: Optional[Dict[str, Any]] - Input configurationdagster_type: Optional[DagsterType] - Expected input typelog: DagsterLogManager - Loggerresources: Resources - Available resourceshas_asset_partitions: bool - Whether input has asset partitionsasset_partition_keys: AbstractSet[str] - Asset partition keysOutputContext { .api }Module: dagster._core.execution.context.output
Type: Class
Context for output handling with I/O managers.
from dagster import OutputContext, IOManager, asset
import pandas as pd
import os
class S3IOManager(IOManager):
def handle_output(self, context: OutputContext, obj: pd.DataFrame) -> None:
"""Handle output using context information."""
# Access output metadata
asset_key = context.asset_key
step_key = context.step_key
name = context.name
# Partition information
if context.has_asset_partitions:
partition_keys = context.asset_partition_keys
context.log.info(f"Storing partitions: {partition_keys}")
# Metadata and configuration
metadata = context.metadata
config = context.config
# Resource access
s3_client = context.resources.s3
# Generate file path
if asset_key:
path_parts = asset_key.path
file_path = "/".join(path_parts) + ".parquet"
else:
file_path = f"{step_key}_{name}.parquet"
context.log.info(f"Storing output to S3: s3://my-bucket/{file_path}")
# Store data
obj.to_parquet(f"s3://my-bucket/{file_path}")
# Add output metadata
context.add_output_metadata({
"s3_path": f"s3://my-bucket/{file_path}",
"rows": len(obj),
"columns": len(obj.columns),
"size_mb": obj.memory_usage(deep=True).sum() / 1024 / 1024
})
def load_input(self, context: InputContext) -> pd.DataFrame:
"""Load input from S3."""
asset_key = context.asset_key
path_parts = asset_key.path
file_path = "/".join(path_parts) + ".parquet"
context.log.info(f"Loading from S3: s3://my-bucket/{file_path}")
return pd.read_parquet(f"s3://my-bucket/{file_path}")
@asset(io_manager_key="s3_manager")
def sales_data(context) -> pd.DataFrame:
"""Asset using S3 I/O manager."""
# Generate sales data
return pd.DataFrame({
"date": pd.date_range("2023-01-01", periods=100),
"sales": np.random.randint(1000, 5000, 100)
})Key Properties:
asset_key: Optional[AssetKey] - Asset key being outputstep_key: str - Step keyname: str - Output namemetadata: Optional[Dict[str, Any]] - Output metadataconfig: Optional[Dict[str, Any]] - Output configurationlog: DagsterLogManager - Loggerresources: Resources - Available resourceshas_asset_partitions: bool - Whether output has asset partitionsasset_partition_keys: AbstractSet[str] - Asset partition keysadd_output_metadata(metadata: Dict[str, Any]) - Add metadata to outputExecutors control how operations are executed, providing different execution strategies for different environments and performance requirements.
Executor { .api }Module: dagster._core.executor.base
Type: Base class
Base executor interface for custom execution strategies.
from dagster import Executor, InitExecutorContext, StepExecutionContext
from dagster import executor, Field, Int
class CustomExecutor(Executor):
"""Custom executor implementation."""
def __init__(self, max_concurrent: int = 4):
self.max_concurrent = max_concurrent
def execute(self, plan_context, execution_plan):
"""Execute the execution plan."""
# Custom execution logic
steps = execution_plan.get_all_steps()
# Execute steps with concurrency control
for step in steps:
# Step execution logic
pass
return [] # Return step events
@executor(
name="custom_executor",
config_schema={
"max_concurrent": Field(Int, default_value=4, description="Max concurrent steps")
}
)
def custom_executor(init_context: InitExecutorContext) -> CustomExecutor:
"""Create custom executor from configuration."""
max_concurrent = init_context.executor_config["max_concurrent"]
return CustomExecutor(max_concurrent=max_concurrent)in_process_executor { .api }Module: dagster._core.definitions.executor_definition
Type: ExecutorDefinition
Single-process, single-threaded executor for development and testing.
from dagster import job, in_process_executor
@job(executor_def=in_process_executor)
def single_process_job():
"""Job using in-process executor."""
op_a()
op_b()
op_c()
# Execute job
from dagster import execute_job
result = execute_job(single_process_job)multiprocess_executor { .api }Module: dagster._core.definitions.executor_definition
Type: ExecutorDefinition
Multi-process executor for parallel execution.
from dagster import job, multiprocess_executor
@job(executor_def=multiprocess_executor)
def parallel_job():
"""Job using multiprocess executor."""
# These ops can run in parallel
result_a = op_a()
result_b = op_b()
# This op depends on both and runs after
op_c(result_a, result_b)
# Execute with multiprocess config
result = execute_job(
parallel_job,
run_config={
"execution": {
"config": {
"multiprocess": {
"max_concurrent": 8,
"retries": {"enabled": {}},
"start_method": "spawn"
}
}
}
}
)Configuration Parameters:
max_concurrent: int = 4 - Maximum concurrent processesretries: Optional[RetryMode] - Retry configurationstart_method: Optional[str] - Process start method ("spawn", "fork", "forkserver")execute_job { .api }Module: dagster._core.execution.api
Type: Function
Execute a job with configuration and return execution results.
from dagster import execute_job, job, op, Config, DagsterInstance
@op
def hello_op(name: str) -> str:
return f"Hello, {name}!"
@job
def greeting_job():
hello_op()
# Basic execution
result = execute_job(
greeting_job,
run_config={
"ops": {
"hello_op": {
"inputs": {"name": {"value": "World"}}
}
}
}
)
# Execution with instance and tags
from dagster import DagsterInstance
instance = DagsterInstance.ephemeral()
result = execute_job(
greeting_job,
run_config={"ops": {"hello_op": {"inputs": {"name": {"value": "Alice"}}}}},
instance=instance,
tags={"env": "test", "team": "data"},
run_id="custom-run-id-123"
)
# Check execution success
if result.success:
print("Job executed successfully")
print(f"Run ID: {result.run_id}")
# Access step results
for event in result.all_events:
if event.is_step_success:
print(f"Step {event.step_key} succeeded")
else:
print("Job execution failed")
print(f"Failure info: {result.failure_info}")Parameters:
job_def: JobDefinition - Job definition to executerun_config: Optional[Dict[str, Any]] - Run configurationinstance: Optional[DagsterInstance] - Dagster instancepartition_key: Optional[str] - Partition key for partitioned jobsrun_id: Optional[str] - Custom run IDtags: Optional[Dict[str, str]] - Run tagsraise_on_error: bool = True - Whether to raise on execution failureJobExecutionResult { .api }Module: dagster._core.execution.job_execution_result
Type: Class
Result of job execution with access to events, outputs, and metadata.
# Access execution results
result = execute_job(my_job, run_config=config)
# Basic result information
success = result.success # bool: Whether execution succeeded
run_id = result.run_id # str: Unique run identifier
job_def = result.job_def # JobDefinition: Job that was executed
# Event access
all_events = result.all_events # List[DagsterEvent]: All execution events
step_events = result.step_event_list # List[DagsterEvent]: Step-specific events
# Asset materializations
materializations = result.asset_materializations
for materialization in materializations:
asset_key = materialization.asset_key
metadata = materialization.metadata_entries
print(f"Materialized {asset_key} with metadata: {metadata}")
# Step outputs
output = result.output_for_step("step_name", "output_name")
step_outputs = result.step_outputs_by_step_key # Dict of outputs by step
# Failure information
if not result.success:
failure_info = result.failure_info
print(f"Execution failed: {failure_info}")materialize { .api }Module: dagster._core.definitions.materialize
Type: Function
Materialize assets with dependencies and return materialization results.
from dagster import materialize, asset, Definitions
import pandas as pd
@asset
def users() -> pd.DataFrame:
return pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
@asset
def user_stats(users: pd.DataFrame) -> dict:
return {"count": len(users), "avg_id": users["id"].mean()}
# Materialize specific assets
result = materialize([users, user_stats])
# Materialize with configuration
result = materialize(
[users, user_stats],
run_config={
"resources": {
"io_manager": {
"config": {"base_path": "/tmp/dagster"}
}
}
}
)
# Materialize with resources
from dagster import resource
@resource
def database():
return {"connection": "postgresql://localhost/db"}
defs = Definitions(
assets=[users, user_stats],
resources={"database": database}
)
result = materialize(
[users, user_stats],
resources={"database": database}
)
# Materialize with partition selection
from dagster import DailyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")
@asset(partitions_def=daily_partitions)
def daily_data() -> pd.DataFrame:
return pd.DataFrame({"date": [pd.Timestamp.now()], "value": [42]})
# Materialize specific partition
result = materialize(
[daily_data],
partition_key="2023-01-15"
)
# Check materialization results
if result.success:
materializations = result.asset_materializations
for mat in materializations:
print(f"Materialized: {mat.asset_key}")
print(f"Metadata: {mat.metadata}")Parameters:
assets: Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset]] - Assets to materializerun_config: Any = None - Run configuration (accepts various formats)instance: Optional[DagsterInstance] = None - Dagster instance (uses default if None)resources: Optional[Mapping[str, object]] = None - Resource instancespartition_key: Optional[str] = None - Partition key for partitioned assetsraise_on_error: bool = True - Whether to raise exception on failuretags: Optional[Mapping[str, str]] = None - Run tags for metadataselection: Optional[CoercibleToAssetSelection] = None - Asset selection for partial materializationReturns: ExecuteInProcessResult - Contains execution results, asset materializations, and run information
materialize_to_memory { .api }Module: dagster._core.definitions.materialize
Type: Function
Materialize assets to memory for testing and development.
from dagster import materialize_to_memory
# Materialize to memory (no I/O)
result = materialize_to_memory([users, user_stats])
# Access materialized values directly
if result.success:
# Get materialized asset values
users_data = result.output_for_node("users")
stats_data = result.output_for_node("user_stats")
print(f"Users: {users_data}")
print(f"Stats: {stats_data}")
# Memory materialization with partition
result = materialize_to_memory(
[daily_data],
partition_key="2023-01-15"
)
daily_value = result.output_for_node("daily_data")
print(f"Daily data: {daily_value}")build_resources { .api }Module: dagster._core.execution.build_resources
Type: Function
Build and initialize resources outside of execution context.
from dagster import build_resources, resource, Config
class DatabaseConfig(Config):
host: str = "localhost"
port: int = 5432
database: str = "mydb"
@resource(config_schema=DatabaseConfig)
def database_resource(config: DatabaseConfig):
connection = f"postgresql://{config.host}:{config.port}/{config.database}"
return {"connection": connection, "pool_size": 10}
@resource
def cache_resource():
return {"redis_url": "redis://localhost:6379"}
# Build resources with configuration
with build_resources({
"database": database_resource,
"cache": cache_resource
}, run_config={
"resources": {
"database": {
"config": {
"host": "prod-db",
"port": 5432,
"database": "production"
}
}
}
}) as resources:
# Use resources
db = resources.database
cache = resources.cache
print(f"Database: {db['connection']}")
print(f"Cache: {cache['redis_url']}")
# Resources are automatically cleaned upvalidate_run_config { .api }Module: dagster._core.execution.validate_run_config
Type: Function
Validate run configuration against job schema.
from dagster import validate_run_config, job, op, Field, String, Int
@op(config_schema={"name": String, "count": Int})
def configured_op(context):
return f"Hello {context.op_config['name']} x{context.op_config['count']}"
@job
def configured_job():
configured_op()
# Validate configuration
run_config = {
"ops": {
"configured_op": {
"config": {
"name": "Alice",
"count": 5
}
}
}
}
validation_result = validate_run_config(configured_job, run_config)
if validation_result.success:
print("Configuration is valid")
print(f"Validated config: {validation_result.run_config}")
else:
print("Configuration validation failed")
for error in validation_result.errors:
print(f"Error: {error.message}")
print(f"Path: {error.stack}")This comprehensive execution system provides rich runtime environments, flexible execution strategies, and powerful APIs for materializing assets and executing jobs. The context system offers extensive access to configuration, resources, logging, and metadata, enabling sophisticated data pipeline implementations.
For storage and I/O management used in execution, see Storage and I/O. For events generated during execution, see Events and Metadata.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster