CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

execution-contexts.mddocs/

Execution and Contexts

This document covers Dagster's execution system, including execution contexts, executors, and execution APIs. The execution system provides rich runtime environments with logging, configuration, resources, and metadata for all computational units.

Execution Contexts

Execution contexts provide the runtime environment for operations, assets, and other computational units. They offer access to configuration, resources, logging, metadata, and execution state.

Operation Execution Context

OpExecutionContext { .api }

Module: dagster._core.execution.context.compute
Type: Class

Execution context for operations with access to configuration, resources, and logging.

from dagster import op, OpExecutionContext, Config, resource
import logging

class MyOpConfig(Config):
    batch_size: int = 1000
    debug_mode: bool = False

@resource
def database_resource():
    return {"connection": "postgresql://localhost/db"}

@op(required_resource_keys={"database"})
def process_data(context: OpExecutionContext, data: list) -> list:
    """Op with full context access."""
    
    # Access configuration
    config = context.op_config
    batch_size = config.get("batch_size", 100)
    
    # Access resources
    db = context.resources.database
    connection = db["connection"]
    
    # Logging with different levels
    context.log.info(f"Processing {len(data)} records with batch_size {batch_size}")
    context.log.debug(f"Using database: {connection}")
    context.log.warning("This is a warning message")
    
    # Access run information
    run_id = context.run_id
    op_name = context.op_def.name
    
    # Access step information
    step_key = context.step_context.step.key
    
    # Metadata logging
    context.add_output_metadata({
        "records_processed": len(data),
        "batch_size": batch_size,
        "processing_time": "2.3s"
    })
    
    # Partition information (if partitioned)
    if context.has_partition_key:
        partition_key = context.partition_key
        context.log.info(f"Processing partition: {partition_key}")
    
    # Time window for time-partitioned assets
    if hasattr(context, 'partition_time_window'):
        time_window = context.partition_time_window
        context.log.info(f"Time window: {time_window.start} to {time_window.end}")
    
    # Process data
    processed = data[:batch_size]  # Simple processing
    return processed

Key Properties and Methods:

  • op_config: Dict[str, Any] - Operation configuration
  • resources: Resources - Available resources
  • log: DagsterLogManager - Logger for the operation
  • run_id: str - Unique run identifier
  • op_def: OpDefinition - Operation definition
  • step_context: StepExecutionContext - Step execution context
  • has_partition_key: bool - Whether operation is partitioned
  • partition_key: Optional[str] - Partition key if partitioned
  • asset_partition_key_for_output: Optional[str] - Asset partition key
  • add_output_metadata(metadata: Dict[str, Any]) - Add metadata to output

Asset Execution Context

AssetExecutionContext { .api }

Module: dagster._core.execution.context.compute
Type: Class

Execution context for assets with asset-specific functionality and metadata.

from dagster import asset, AssetExecutionContext, MaterializeResult
import pandas as pd

@asset(
    group_name="analytics",
    compute_kind="pandas"
)
def users_analysis(context: AssetExecutionContext, users_data: pd.DataFrame) -> MaterializeResult:
    """Asset with comprehensive context usage."""
    
    # Asset-specific information
    asset_key = context.asset_key
    asset_name = asset_key.path[-1]  # Last part of asset key
    
    # Partition information for assets
    if context.has_partition_key:
        partition_key = context.partition_key
        context.log.info(f"Materializing {asset_name} for partition {partition_key}")
        
        # Time window for time-partitioned assets
        if hasattr(context, 'partition_time_window'):
            time_window = context.partition_time_window
            start_date = time_window.start.strftime('%Y-%m-%d')
            end_date = time_window.end.strftime('%Y-%m-%d')
            
            # Filter data by time window
            users_data = users_data[
                (users_data['created_at'] >= start_date) & 
                (users_data['created_at'] < end_date)
            ]
    
    # Access upstream asset values
    upstream_assets = context.selected_asset_keys
    context.log.info(f"Depends on assets: {[key.path[-1] for key in upstream_assets]}")
    
    # Perform analysis
    total_users = len(users_data)
    active_users = len(users_data[users_data['active'] == True])
    avg_age = users_data['age'].mean()
    
    # Rich logging
    context.log.info(f"Analyzed {total_users} users")
    context.log.info(f"Active users: {active_users} ({active_users/total_users:.1%})")
    
    # Create result DataFrame
    analysis_result = pd.DataFrame({
        'metric': ['total_users', 'active_users', 'average_age'],
        'value': [total_users, active_users, avg_age],
        'date': pd.Timestamp.now()
    })
    
    # Return MaterializeResult with metadata
    return MaterializeResult(
        value=analysis_result,
        metadata={
            "total_users": total_users,
            "active_users": active_users,
            "activity_rate": active_users / total_users,
            "average_age": avg_age,
            "data_quality_score": 0.95,
            "last_updated": pd.Timestamp.now().isoformat()
        }
    )

@asset
def user_segments(context: AssetExecutionContext, users_analysis: pd.DataFrame) -> dict:
    """Asset consuming upstream asset with context."""
    
    # Access asset lineage information
    context.log.info(f"Consuming asset: {context.asset_key}")
    
    # Get upstream asset data
    total_users = users_analysis[users_analysis['metric'] == 'total_users']['value'].iloc[0]
    active_users = users_analysis[users_analysis['metric'] == 'active_users']['value'].iloc[0]
    
    # Generate segments
    segments = {
        'high_value': int(active_users * 0.2),
        'medium_value': int(active_users * 0.5), 
        'low_value': int(active_users * 0.3)
    }
    
    # Log segment information
    for segment, count in segments.items():
        context.log.info(f"Segment {segment}: {count} users")
    
    return segments

Additional Asset Context Properties:

  • asset_key: AssetKey - Current asset key
  • selected_asset_keys: AbstractSet[AssetKey] - Selected asset keys in materialization
  • asset_partition_key_for_input(input_name: str) -> Optional[str] - Partition key for input
  • asset_partition_key_for_output(output_name: str) -> Optional[str] - Partition key for output
  • asset_partition_keys_for_input(input_name: str) -> Optional[AbstractSet[str]] - Multiple partition keys
  • partition_time_window: Optional[TimeWindow] - Time window for time partitions

Asset Check Execution Context

AssetCheckExecutionContext { .api }

Module: dagster._core.execution.context.compute
Type: Class

Execution context for asset checks with check-specific functionality.

from dagster import asset_check, AssetCheckExecutionContext, AssetCheckResult
import pandas as pd

@asset_check(asset="users_data")
def users_data_quality_check(context: AssetCheckExecutionContext, users_data: pd.DataFrame) -> AssetCheckResult:
    """Asset check with context access."""
    
    # Asset check specific information
    asset_key = context.asset_key
    check_name = context.op_def.name
    
    context.log.info(f"Running check '{check_name}' for asset {asset_key}")
    
    # Perform data quality checks
    null_count = users_data.isnull().sum().sum()
    duplicate_count = users_data.duplicated().sum()
    total_records = len(users_data)
    
    # Quality thresholds
    null_threshold = 0.05  # 5% null values allowed
    duplicate_threshold = 0.02  # 2% duplicates allowed
    
    null_rate = null_count / (total_records * len(users_data.columns))
    duplicate_rate = duplicate_count / total_records
    
    # Log detailed results
    context.log.info(f"Null rate: {null_rate:.2%} (threshold: {null_threshold:.2%})")
    context.log.info(f"Duplicate rate: {duplicate_rate:.2%} (threshold: {duplicate_threshold:.2%})")
    
    # Determine check result
    passed = null_rate <= null_threshold and duplicate_rate <= duplicate_threshold
    
    if not passed:
        context.log.warning(f"Data quality check failed for {asset_key}")
    
    return AssetCheckResult(
        passed=passed,
        metadata={
            "null_count": null_count,
            "null_rate": null_rate,
            "duplicate_count": duplicate_count, 
            "duplicate_rate": duplicate_rate,
            "total_records": total_records,
            "check_timestamp": pd.Timestamp.now().isoformat()
        }
    )

Context Builders

Context builders enable testing and local development by creating execution contexts outside of normal pipeline runs.

build_op_context { .api }

Module: dagster._core.execution.context.invocation
Type: Function

Build operation execution context for testing and development.

from dagster import build_op_context, op, resource

@resource
def test_database():
    return {"connection": "test://localhost/testdb"}

@op(
    config_schema={"batch_size": int},
    required_resource_keys={"database"}
)
def my_op(context):
    batch_size = context.op_config["batch_size"]
    db = context.resources.database
    return f"Processing with batch_size {batch_size} using {db['connection']}"

# Build context for testing
context = build_op_context(
    config={"batch_size": 100},
    resources={"database": test_database},
    partition_key="2023-01-01",
    op_config={"batch_size": 500}  # Alternative config specification
)

# Test op directly
result = my_op(context)
print(result)  # "Processing with batch_size 500 using test://localhost/testdb"

# Advanced context building
from dagster import DagsterLogManager, build_init_logger_context

logger_def = colored_console_logger
logger_context = build_init_logger_context()
logger = logger_def.logger_fn(logger_context)

advanced_context = build_op_context(
    config={"batch_size": 1000},
    resources={"database": test_database},
    partition_key="2023-01-02", 
    run_id="test-run-12345",
    logger_defs={"custom": logger_def},
    tags={"env": "test", "team": "data"}
)

Parameters:

  • config: Optional[Dict[str, Any]] - Op configuration
  • resources: Optional[Dict[str, Any]] - Resource instances
  • partition_key: Optional[str] - Partition key
  • run_id: Optional[str] - Run ID
  • logger_defs: Optional[Dict[str, LoggerDefinition]] - Logger definitions
  • op_config: Optional[Dict[str, Any]] - Alternative config specification
  • tags: Optional[Dict[str, str]] - Execution tags
  • run_tags: Optional[Dict[str, str]] - Run-level tags

build_asset_context { .api }

Module: dagster._core.execution.context.invocation
Type: Function

Build asset execution context for testing and development.

from dagster import build_asset_context, asset, AssetKey
import pandas as pd

@asset
def my_asset(context):
    asset_key = context.asset_key
    partition_key = context.partition_key if context.has_partition_key else None
    return f"Asset {asset_key} for partition {partition_key}"

# Build asset context
context = build_asset_context(
    asset_key=AssetKey("my_asset"),
    partition_key="2023-01-01",
    resources={"database": test_database},
    config={"processing_mode": "test"}
)

# Test asset directly
result = my_asset(context)
print(result)  # "Asset ['my_asset'] for partition 2023-01-01"

# Context with upstream asset keys
upstream_context = build_asset_context(
    asset_key=AssetKey(["analytics", "user_metrics"]),
    partition_key="2023-01-01",
    selected_asset_keys={AssetKey("users"), AssetKey("events")},
    resources={"warehouse": warehouse_resource}
)

Parameters:

  • asset_key: Optional[AssetKey] - Asset key
  • config: Optional[Dict[str, Any]] - Asset configuration
  • resources: Optional[Dict[str, Any]] - Resource instances
  • partition_key: Optional[str] - Partition key
  • selected_asset_keys: Optional[AbstractSet[AssetKey]] - Selected asset keys
  • run_id: Optional[str] - Run ID
  • tags: Optional[Dict[str, str]] - Execution tags
  • run_tags: Optional[Dict[str, str]] - Run-level tags

Input and Output Contexts

InputContext { .api }

Module: dagster._core.execution.context.input
Type: Class

Context for input loading with I/O managers and input managers.

from dagster import InputContext, input_manager, IOManager
import pandas as pd

class DataWarehouseInputManager(IOManager):
    def load_input(self, context: InputContext) -> pd.DataFrame:
        """Load input using context information."""
        
        # Access input metadata
        upstream_output = context.upstream_output
        asset_key = upstream_output.asset_key if upstream_output else None
        
        # Input configuration
        metadata = context.metadata
        dagster_type = context.dagster_type
        
        # Partition information  
        if context.has_asset_partitions:
            asset_partitions = context.asset_partition_keys
            context.log.info(f"Loading partitions: {asset_partitions}")
        
        # Resource access
        if hasattr(context, 'resources'):
            warehouse = context.resources.data_warehouse
        
        context.log.info(f"Loading input for asset {asset_key}")
        
        # Load data based on context
        if asset_key:
            return pd.read_parquet(f"/warehouse/{asset_key.path[-1]}.parquet")
        else:
            return pd.DataFrame()

@input_manager
def warehouse_input_manager() -> DataWarehouseInputManager:
    return DataWarehouseInputManager()

# Usage in asset
@asset(input_manager_key="warehouse_loader")
def processed_data(context, raw_data: pd.DataFrame) -> pd.DataFrame:
    """Asset using custom input loading."""
    return raw_data.dropna()

Key Properties:

  • upstream_output: Optional[OutputContext] - Upstream output context
  • asset_key: Optional[AssetKey] - Asset key being loaded
  • metadata: Optional[Dict[str, Any]] - Input metadata
  • config: Optional[Dict[str, Any]] - Input configuration
  • dagster_type: Optional[DagsterType] - Expected input type
  • log: DagsterLogManager - Logger
  • resources: Resources - Available resources
  • has_asset_partitions: bool - Whether input has asset partitions
  • asset_partition_keys: AbstractSet[str] - Asset partition keys

OutputContext { .api }

Module: dagster._core.execution.context.output
Type: Class

Context for output handling with I/O managers.

from dagster import OutputContext, IOManager, asset
import pandas as pd
import os

class S3IOManager(IOManager):
    def handle_output(self, context: OutputContext, obj: pd.DataFrame) -> None:
        """Handle output using context information."""
        
        # Access output metadata
        asset_key = context.asset_key
        step_key = context.step_key
        name = context.name
        
        # Partition information
        if context.has_asset_partitions:
            partition_keys = context.asset_partition_keys
            context.log.info(f"Storing partitions: {partition_keys}")
            
        # Metadata and configuration  
        metadata = context.metadata
        config = context.config
        
        # Resource access
        s3_client = context.resources.s3
        
        # Generate file path
        if asset_key:
            path_parts = asset_key.path
            file_path = "/".join(path_parts) + ".parquet"
        else:
            file_path = f"{step_key}_{name}.parquet"
        
        context.log.info(f"Storing output to S3: s3://my-bucket/{file_path}")
        
        # Store data
        obj.to_parquet(f"s3://my-bucket/{file_path}")
        
        # Add output metadata
        context.add_output_metadata({
            "s3_path": f"s3://my-bucket/{file_path}",
            "rows": len(obj),
            "columns": len(obj.columns),
            "size_mb": obj.memory_usage(deep=True).sum() / 1024 / 1024
        })

    def load_input(self, context: InputContext) -> pd.DataFrame:
        """Load input from S3."""
        asset_key = context.asset_key
        path_parts = asset_key.path  
        file_path = "/".join(path_parts) + ".parquet"
        
        context.log.info(f"Loading from S3: s3://my-bucket/{file_path}")
        return pd.read_parquet(f"s3://my-bucket/{file_path}")

@asset(io_manager_key="s3_manager")  
def sales_data(context) -> pd.DataFrame:
    """Asset using S3 I/O manager."""
    # Generate sales data
    return pd.DataFrame({
        "date": pd.date_range("2023-01-01", periods=100),
        "sales": np.random.randint(1000, 5000, 100)
    })

Key Properties:

  • asset_key: Optional[AssetKey] - Asset key being output
  • step_key: str - Step key
  • name: str - Output name
  • metadata: Optional[Dict[str, Any]] - Output metadata
  • config: Optional[Dict[str, Any]] - Output configuration
  • log: DagsterLogManager - Logger
  • resources: Resources - Available resources
  • has_asset_partitions: bool - Whether output has asset partitions
  • asset_partition_keys: AbstractSet[str] - Asset partition keys
  • add_output_metadata(metadata: Dict[str, Any]) - Add metadata to output

Execution System

Executors

Executors control how operations are executed, providing different execution strategies for different environments and performance requirements.

Executor { .api }

Module: dagster._core.executor.base
Type: Base class

Base executor interface for custom execution strategies.

from dagster import Executor, InitExecutorContext, StepExecutionContext
from dagster import executor, Field, Int

class CustomExecutor(Executor):
    """Custom executor implementation."""
    
    def __init__(self, max_concurrent: int = 4):
        self.max_concurrent = max_concurrent
    
    def execute(self, plan_context, execution_plan):
        """Execute the execution plan."""
        # Custom execution logic
        steps = execution_plan.get_all_steps()
        
        # Execute steps with concurrency control
        for step in steps:
            # Step execution logic
            pass
        
        return []  # Return step events

@executor(
    name="custom_executor",
    config_schema={
        "max_concurrent": Field(Int, default_value=4, description="Max concurrent steps")
    }
)
def custom_executor(init_context: InitExecutorContext) -> CustomExecutor:
    """Create custom executor from configuration."""
    max_concurrent = init_context.executor_config["max_concurrent"]
    return CustomExecutor(max_concurrent=max_concurrent)

Built-in Executors

in_process_executor { .api }

Module: dagster._core.definitions.executor_definition
Type: ExecutorDefinition

Single-process, single-threaded executor for development and testing.

from dagster import job, in_process_executor

@job(executor_def=in_process_executor)
def single_process_job():
    """Job using in-process executor."""
    op_a()
    op_b()
    op_c()

# Execute job
from dagster import execute_job
result = execute_job(single_process_job)
multiprocess_executor { .api }

Module: dagster._core.definitions.executor_definition
Type: ExecutorDefinition

Multi-process executor for parallel execution.

from dagster import job, multiprocess_executor

@job(executor_def=multiprocess_executor)
def parallel_job():
    """Job using multiprocess executor."""
    # These ops can run in parallel
    result_a = op_a()
    result_b = op_b()
    
    # This op depends on both and runs after
    op_c(result_a, result_b)

# Execute with multiprocess config
result = execute_job(
    parallel_job,
    run_config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 8,
                    "retries": {"enabled": {}},
                    "start_method": "spawn"
                }
            }
        }
    }
)

Configuration Parameters:

  • max_concurrent: int = 4 - Maximum concurrent processes
  • retries: Optional[RetryMode] - Retry configuration
  • start_method: Optional[str] - Process start method ("spawn", "fork", "forkserver")

Execution APIs

Job Execution

execute_job { .api }

Module: dagster._core.execution.api
Type: Function

Execute a job with configuration and return execution results.

from dagster import execute_job, job, op, Config, DagsterInstance

@op
def hello_op(name: str) -> str:
    return f"Hello, {name}!"

@job
def greeting_job():
    hello_op()

# Basic execution
result = execute_job(
    greeting_job,
    run_config={
        "ops": {
            "hello_op": {
                "inputs": {"name": {"value": "World"}}
            }
        }
    }
)

# Execution with instance and tags
from dagster import DagsterInstance

instance = DagsterInstance.ephemeral()
result = execute_job(
    greeting_job,
    run_config={"ops": {"hello_op": {"inputs": {"name": {"value": "Alice"}}}}},
    instance=instance,
    tags={"env": "test", "team": "data"},
    run_id="custom-run-id-123"
)

# Check execution success
if result.success:
    print("Job executed successfully")
    print(f"Run ID: {result.run_id}")
    
    # Access step results
    for event in result.all_events:
        if event.is_step_success:
            print(f"Step {event.step_key} succeeded")
else:
    print("Job execution failed")
    print(f"Failure info: {result.failure_info}")

Parameters:

  • job_def: JobDefinition - Job definition to execute
  • run_config: Optional[Dict[str, Any]] - Run configuration
  • instance: Optional[DagsterInstance] - Dagster instance
  • partition_key: Optional[str] - Partition key for partitioned jobs
  • run_id: Optional[str] - Custom run ID
  • tags: Optional[Dict[str, str]] - Run tags
  • raise_on_error: bool = True - Whether to raise on execution failure

JobExecutionResult { .api }

Module: dagster._core.execution.job_execution_result
Type: Class

Result of job execution with access to events, outputs, and metadata.

# Access execution results
result = execute_job(my_job, run_config=config)

# Basic result information
success = result.success  # bool: Whether execution succeeded
run_id = result.run_id   # str: Unique run identifier
job_def = result.job_def # JobDefinition: Job that was executed

# Event access
all_events = result.all_events  # List[DagsterEvent]: All execution events
step_events = result.step_event_list  # List[DagsterEvent]: Step-specific events

# Asset materializations
materializations = result.asset_materializations
for materialization in materializations:
    asset_key = materialization.asset_key
    metadata = materialization.metadata_entries
    print(f"Materialized {asset_key} with metadata: {metadata}")

# Step outputs
output = result.output_for_step("step_name", "output_name")  
step_outputs = result.step_outputs_by_step_key  # Dict of outputs by step

# Failure information
if not result.success:
    failure_info = result.failure_info
    print(f"Execution failed: {failure_info}")

Asset Materialization

materialize { .api }

Module: dagster._core.definitions.materialize
Type: Function

Materialize assets with dependencies and return materialization results.

from dagster import materialize, asset, Definitions
import pandas as pd

@asset
def users() -> pd.DataFrame:
    return pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})

@asset  
def user_stats(users: pd.DataFrame) -> dict:
    return {"count": len(users), "avg_id": users["id"].mean()}

# Materialize specific assets
result = materialize([users, user_stats])

# Materialize with configuration
result = materialize(
    [users, user_stats],
    run_config={
        "resources": {
            "io_manager": {
                "config": {"base_path": "/tmp/dagster"}
            }
        }
    }
)

# Materialize with resources
from dagster import resource

@resource
def database():
    return {"connection": "postgresql://localhost/db"}

defs = Definitions(
    assets=[users, user_stats],
    resources={"database": database}
)

result = materialize(
    [users, user_stats],
    resources={"database": database}
)

# Materialize with partition selection
from dagster import DailyPartitionsDefinition

daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")

@asset(partitions_def=daily_partitions)
def daily_data() -> pd.DataFrame:
    return pd.DataFrame({"date": [pd.Timestamp.now()], "value": [42]})

# Materialize specific partition  
result = materialize(
    [daily_data],
    partition_key="2023-01-15"
)

# Check materialization results
if result.success:
    materializations = result.asset_materializations
    for mat in materializations:
        print(f"Materialized: {mat.asset_key}")
        print(f"Metadata: {mat.metadata}")

Parameters:

  • assets: Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset]] - Assets to materialize
  • run_config: Any = None - Run configuration (accepts various formats)
  • instance: Optional[DagsterInstance] = None - Dagster instance (uses default if None)
  • resources: Optional[Mapping[str, object]] = None - Resource instances
  • partition_key: Optional[str] = None - Partition key for partitioned assets
  • raise_on_error: bool = True - Whether to raise exception on failure
  • tags: Optional[Mapping[str, str]] = None - Run tags for metadata
  • selection: Optional[CoercibleToAssetSelection] = None - Asset selection for partial materialization

Returns: ExecuteInProcessResult - Contains execution results, asset materializations, and run information

materialize_to_memory { .api }

Module: dagster._core.definitions.materialize
Type: Function

Materialize assets to memory for testing and development.

from dagster import materialize_to_memory

# Materialize to memory (no I/O)
result = materialize_to_memory([users, user_stats])

# Access materialized values directly
if result.success:
    # Get materialized asset values
    users_data = result.output_for_node("users")
    stats_data = result.output_for_node("user_stats")
    
    print(f"Users: {users_data}")
    print(f"Stats: {stats_data}")

# Memory materialization with partition
result = materialize_to_memory(
    [daily_data],
    partition_key="2023-01-15"
)

daily_value = result.output_for_node("daily_data") 
print(f"Daily data: {daily_value}")

Resource Building

build_resources { .api }

Module: dagster._core.execution.build_resources
Type: Function

Build and initialize resources outside of execution context.

from dagster import build_resources, resource, Config

class DatabaseConfig(Config):
    host: str = "localhost"
    port: int = 5432
    database: str = "mydb"

@resource(config_schema=DatabaseConfig)
def database_resource(config: DatabaseConfig):
    connection = f"postgresql://{config.host}:{config.port}/{config.database}"
    return {"connection": connection, "pool_size": 10}

@resource
def cache_resource():
    return {"redis_url": "redis://localhost:6379"}

# Build resources with configuration
with build_resources({
    "database": database_resource,
    "cache": cache_resource
}, run_config={
    "resources": {
        "database": {
            "config": {
                "host": "prod-db",
                "port": 5432,
                "database": "production"
            }
        }
    }
}) as resources:
    # Use resources
    db = resources.database
    cache = resources.cache
    
    print(f"Database: {db['connection']}")
    print(f"Cache: {cache['redis_url']}")
    
    # Resources are automatically cleaned up

Configuration Validation

validate_run_config { .api }

Module: dagster._core.execution.validate_run_config
Type: Function

Validate run configuration against job schema.

from dagster import validate_run_config, job, op, Field, String, Int

@op(config_schema={"name": String, "count": Int})
def configured_op(context):
    return f"Hello {context.op_config['name']} x{context.op_config['count']}"

@job
def configured_job():
    configured_op()

# Validate configuration
run_config = {
    "ops": {
        "configured_op": {
            "config": {
                "name": "Alice",
                "count": 5
            }
        }
    }
}

validation_result = validate_run_config(configured_job, run_config)

if validation_result.success:
    print("Configuration is valid")
    print(f"Validated config: {validation_result.run_config}")
else:
    print("Configuration validation failed")
    for error in validation_result.errors:
        print(f"Error: {error.message}")
        print(f"Path: {error.stack}")

This comprehensive execution system provides rich runtime environments, flexible execution strategies, and powerful APIs for materializing assets and executing jobs. The context system offers extensive access to configuration, resources, logging, and metadata, enabling sophisticated data pipeline implementations.

For storage and I/O management used in execution, see Storage and I/O. For events generated during execution, see Events and Metadata.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json