CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration System

Dagster provides a comprehensive, type-safe configuration system that enables parameterization of assets, operations, resources, and entire pipelines. The system supports schema validation, environment variable injection, and hierarchical configuration composition.

Configuration Schema System

ConfigSchema { .api }

Module: dagster._config.config_schema
Type: Class

Configuration schema definition with validation and type coercion.

from dagster import ConfigSchema, Field, op, job
from dagster import String, Int, Float, Bool, Array, Enum, EnumValue

# Simple schema using built-in types
schema = ConfigSchema({
    "name": String,
    "age": Int, 
    "salary": Float,
    "active": Bool,
    "tags": Array(String)
})

# Schema with Field configurations
complex_schema = ConfigSchema({
    "database": Field(
        String,
        description="Database connection string",
        default_value="sqlite:///default.db"
    ),
    "batch_size": Field(
        Int, 
        description="Processing batch size",
        default_value=1000,
        is_required=False
    ),
    "mode": Field(
        Enum("ProcessingMode", [
            EnumValue("fast", description="Fast processing"),
            EnumValue("accurate", description="Accurate processing")
        ]),
        default_value="fast"
    )
})

@op(config_schema=complex_schema)
def process_data(context):
    """Op with complex configuration."""
    config = context.op_config
    db_conn = config["database"]
    batch_size = config["batch_size"] 
    mode = config["mode"]
    
    context.log.info(f"Processing with batch_size={batch_size}, mode={mode}")
    # Processing logic

Field { .api }

Module: dagster._config.field
Type: Class

Configuration field with validation, defaults, and metadata.

from dagster import Field, String, Int, Shape, Array

# Basic field
name_field = Field(String, description="User name")

# Field with default value
port_field = Field(
    Int,
    default_value=5432,
    description="Database port",
    is_required=False
)

# Nested field with Shape
config_field = Field(
    Shape({
        "host": Field(String, description="Database host"),
        "port": Field(Int, default_value=5432),
        "credentials": Field(
            Shape({
                "username": String,
                "password": String
            }),
            description="Database credentials"
        )
    }),
    description="Database configuration"
)

# Array field
tags_field = Field(
    Array(String),
    default_value=[],
    description="Processing tags"
)

Parameters:

  • dagster_type: DagsterType - The type of the field
  • default_value: Any - Default value if not provided
  • is_required: bool = True - Whether field is required
  • description: Optional[str] - Field description
  • metadata: Optional[Dict[str, Any]] - Field metadata

Configuration Types

Scalar Types

Built-in Scalar Types { .api }

Module: dagster._builtins

from dagster import String, Int, Float, Bool, Any, Nothing

# String type with validation
@op(config_schema={"name": String})
def string_config_op(context):
    name = context.op_config["name"]  # Validated as string

# Integer type
@op(config_schema={"count": Int}) 
def int_config_op(context):
    count = context.op_config["count"]  # Validated as integer

# Float type
@op(config_schema={"threshold": Float})
def float_config_op(context):
    threshold = context.op_config["threshold"]  # Validated as float

# Boolean type
@op(config_schema={"enabled": Bool})
def bool_config_op(context):
    enabled = context.op_config["enabled"]  # Validated as boolean

# Any type (no validation)
@op(config_schema={"data": Any})
def any_config_op(context):
    data = context.op_config["data"]  # Any type allowed

# Nothing type (void/null)
@op(config_schema={"placeholder": Nothing})
def nothing_config_op(context):
    # Field must be null/None
    pass

Collection Types

Array { .api }

Module: dagster._config.config_type
Type: Configuration type

Array/list configuration with element type validation.

from dagster import Array, String, Int, Shape

# Array of strings
@op(config_schema={"tags": Array(String)})
def string_array_op(context):
    tags = context.op_config["tags"]  # List[str]

# Array of integers  
@op(config_schema={"numbers": Array(Int)})
def int_array_op(context):
    numbers = context.op_config["numbers"]  # List[int]

# Array of nested objects
@op(config_schema={
    "users": Array(Shape({
        "name": String,
        "age": Int,
        "active": Bool
    }))
})
def nested_array_op(context):
    users = context.op_config["users"]  # List[Dict]
    for user in users:
        name = user["name"]
        age = user["age"] 
        active = user["active"]

Enum { .api }

Module: dagster._config.config_type
Type: Configuration type

Enumeration configuration with predefined values.

from dagster import Enum, EnumValue, Field

# Simple enum
log_level_enum = Enum("LogLevel", [
    EnumValue("DEBUG"),
    EnumValue("INFO"), 
    EnumValue("WARNING"),
    EnumValue("ERROR")
])

# Enum with descriptions
processing_mode = Enum("ProcessingMode", [
    EnumValue("batch", description="Process in batches"),
    EnumValue("streaming", description="Process as stream"),
    EnumValue("realtime", description="Real-time processing")
])

@op(config_schema={
    "log_level": log_level_enum,
    "mode": processing_mode
})
def enum_config_op(context):
    log_level = context.op_config["log_level"]  # One of the enum values
    mode = context.op_config["mode"]
    
    context.log.info(f"Running in {mode} mode with {log_level} logging")

Complex Types

Shape { .api }

Module: dagster._config.field_utils
Type: Configuration utility

Object/dictionary configuration with typed fields.

from dagster import Shape, Field, String, Int, Bool

# Basic shape
user_shape = Shape({
    "name": String,
    "age": Int,
    "email": String
})

# Shape with optional fields
config_shape = Shape({
    "required_field": String,
    "optional_field": Field(Int, is_required=False, default_value=42),
    "nested": Shape({
        "host": String,
        "port": Field(Int, default_value=8080)
    })
})

@op(config_schema={"user": user_shape, "config": config_shape})
def shape_config_op(context):
    user = context.op_config["user"]
    name = user["name"]  # Guaranteed to be string
    age = user["age"]    # Guaranteed to be int
    
    config = context.op_config["config"]
    required = config["required_field"]  # Required field
    optional = config.get("optional_field", 42)  # Optional with default

Selector { .api }

Module: dagster._config.field_utils
Type: Configuration utility

One-of configuration allowing selection between alternatives.

from dagster import Selector, Field, String, Int, Shape

# Database selector - choose between different database types
database_selector = Selector({
    "sqlite": Shape({
        "path": String
    }),
    "postgres": Shape({
        "host": String,
        "port": Field(Int, default_value=5432),
        "database": String,
        "username": String,
        "password": String
    }),
    "mysql": Shape({
        "host": String,
        "port": Field(Int, default_value=3306),
        "database": String,
        "username": String,
        "password": String
    })
})

@op(config_schema={"database": database_selector})
def database_op(context):
    db_config = context.op_config["database"]
    
    # Only one key will be present
    if "sqlite" in db_config:
        sqlite_config = db_config["sqlite"]
        path = sqlite_config["path"]
        # Handle SQLite connection
    elif "postgres" in db_config:
        pg_config = db_config["postgres"]
        host = pg_config["host"]
        port = pg_config["port"]
        # Handle PostgreSQL connection
    elif "mysql" in db_config:
        mysql_config = db_config["mysql"] 
        # Handle MySQL connection

Permissive { .api }

Module: dagster._config.field_utils
Type: Configuration utility

Permissive configuration allowing additional unvalidated fields.

from dagster import Permissive, String, Int

# Permissive config allows extra fields
permissive_config = Permissive({
    "name": String,
    "age": Int
    # Additional fields allowed but not validated
})

@op(config_schema={"data": permissive_config})
def permissive_op(context):
    data = context.op_config["data"]
    name = data["name"]  # Validated
    age = data["age"]    # Validated
    
    # Additional fields accessible but not validated
    extra_field = data.get("some_extra_field")
    custom_metadata = data.get("metadata", {})

Optional Types

Noneable { .api }

Module: dagster._config.config_type
Type: Configuration type

Optional configuration that can be null/None.

from dagster import Noneable, String, Int

@op(config_schema={
    "required": String,
    "optional": Noneable(String),  # Can be string or None
    "maybe_count": Noneable(Int)   # Can be int or None
})
def nullable_config_op(context):
    config = context.op_config
    required = config["required"]  # Always present
    optional = config.get("optional")  # May be None
    maybe_count = config.get("maybe_count")  # May be None
    
    if optional is not None:
        context.log.info(f"Optional value: {optional}")
    
    if maybe_count is not None:
        context.log.info(f"Count: {maybe_count}")

Environment Variable Configuration

EnvVar { .api }

Module: dagster._config.field_utils
Type: Configuration utility

Environment variable configuration with type coercion and defaults.

from dagster import EnvVar, Field, String, Int, Bool
import os

@op(config_schema={
    "database_url": Field(
        String,
        default_value=EnvVar("DATABASE_URL")
    ),
    "api_key": Field(
        String, 
        default_value=EnvVar("API_KEY")
    ),
    "batch_size": Field(
        Int,
        default_value=EnvVar.int("BATCH_SIZE", default=1000)
    ),
    "debug_mode": Field(
        Bool,
        default_value=EnvVar.bool("DEBUG", default=False)
    )
})
def env_config_op(context):
    """Op using environment variables for configuration."""
    config = context.op_config
    
    # Values automatically loaded from environment
    db_url = config["database_url"]    # From DATABASE_URL env var
    api_key = config["api_key"]        # From API_KEY env var  
    batch_size = config["batch_size"]  # From BATCH_SIZE env var (as int)
    debug = config["debug_mode"]       # From DEBUG env var (as bool)

# Environment variable types
env_string = EnvVar("STRING_VAR")                    # String
env_int = EnvVar.int("INT_VAR")                     # Integer
env_int_default = EnvVar.int("INT_VAR", default=42) # Integer with default
env_bool = EnvVar.bool("BOOL_VAR")                  # Boolean
env_bool_default = EnvVar.bool("BOOL_VAR", default=True)  # Boolean with default

Configuration Sources

StringSource { .api }

Module: dagster._config.source
Type: Configuration source

String configuration from various sources including environment variables.

from dagster import StringSource, IntSource, BoolSource, Field

@op(config_schema={
    "connection_string": Field(StringSource),
    "retry_count": Field(IntSource), 
    "enable_ssl": Field(BoolSource)
})
def source_config_op(context):
    """Op using configuration sources."""
    config = context.op_config
    
    # Can come from environment variables or direct values
    conn_str = config["connection_string"]
    retries = config["retry_count"]
    ssl_enabled = config["enable_ssl"]

# Usage in run config:
run_config = {
    "ops": {
        "source_config_op": {
            "config": {
                "connection_string": {"env": "DB_CONNECTION_STRING"},
                "retry_count": {"value": 3},  # Direct value
                "enable_ssl": {"env": "ENABLE_SSL"}
            }
        }
    }
}

Pythonic Configuration System

Config { .api }

Module: dagster._config.pythonic_config
Type: Base class

Pydantic-based configuration class for type-safe, IDE-friendly configuration.

from dagster import Config, op, asset, resource
from typing import List, Optional
from pydantic import Field

class DatabaseConfig(Config):
    """Database configuration using Pydantic."""
    host: str = Field(description="Database host")
    port: int = Field(default=5432, description="Database port")
    database: str = Field(description="Database name")
    username: str = Field(description="Database username") 
    password: str = Field(description="Database password")
    pool_size: int = Field(default=10, description="Connection pool size")
    ssl_enabled: bool = Field(default=True, description="Enable SSL")
    
    # Environment variable integration
    api_key: str = Field(default_factory=lambda: os.getenv("DB_API_KEY"))
    
class ProcessingConfig(Config):
    """Processing configuration."""
    batch_size: int = Field(default=1000, gt=0, description="Batch size")
    parallel_workers: int = Field(default=4, ge=1, le=16)
    timeout_seconds: float = Field(default=30.0, gt=0)
    tags: List[str] = Field(default_factory=list)
    debug_mode: bool = Field(default=False)
    
    # Computed field
    @property
    def total_capacity(self) -> int:
        return self.batch_size * self.parallel_workers

@op
def process_data(config: ProcessingConfig) -> str:
    """Op with Pythonic configuration."""
    # Full IDE support and type checking
    batch_size = config.batch_size  # IDE knows this is int
    workers = config.parallel_workers
    
    # Access computed properties
    capacity = config.total_capacity
    
    # Pydantic validation ensures all constraints met
    return f"Processing with {workers} workers, batch_size {batch_size}"

@asset  
def analytics_data(config: ProcessingConfig) -> dict:
    """Asset with typed configuration."""
    if config.debug_mode:
        print(f"Debug: Processing {config.batch_size} records")
    
    # Configuration is fully typed and validated
    return {"processed": True, "batch_size": config.batch_size}

ConfigurableResource { .api }

Module: dagster._config.pythonic_config
Type: Base class

Configurable resource using Pydantic configuration.

from dagster import ConfigurableResource, resource
from pydantic import Field
import requests
from typing import Optional

class DatabaseResource(ConfigurableResource):
    """Database resource with configuration."""
    host: str = Field(description="Database host")
    port: int = Field(default=5432)
    database: str = Field(description="Database name")
    username: str = Field(description="Username")
    password: str = Field(description="Password")
    pool_size: int = Field(default=10, ge=1, le=100)
    
    def get_connection(self):
        """Get database connection."""
        # Use configuration to create connection
        return f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
    
    def query(self, sql: str) -> list:
        """Execute query."""
        conn = self.get_connection()
        # Execute query logic
        return []

class APIResource(ConfigurableResource):
    """API client resource."""
    base_url: str = Field(description="API base URL")
    api_key: str = Field(description="API key for authentication")
    timeout: float = Field(default=30.0, gt=0)
    retries: int = Field(default=3, ge=0)
    
    def get_client(self) -> requests.Session:
        """Get configured HTTP client."""
        session = requests.Session()
        session.headers.update({"Authorization": f"Bearer {self.api_key}"})
        return session
    
    def fetch_data(self, endpoint: str) -> dict:
        """Fetch data from API endpoint."""
        client = self.get_client()
        response = client.get(f"{self.base_url}/{endpoint}", timeout=self.timeout)
        return response.json()

# Usage with assets
@asset
def user_data(database: DatabaseResource) -> list:
    """Fetch user data using database resource."""
    return database.query("SELECT * FROM users")

@asset
def external_data(api_client: APIResource) -> dict:
    """Fetch external data using API resource."""
    return api_client.fetch_data("users")

# Resource definitions in Definitions
from dagster import Definitions

defs = Definitions(
    assets=[user_data, external_data],
    resources={
        "database": DatabaseResource(
            host="localhost",
            database="mydb", 
            username="user",
            password="password"
        ),
        "api_client": APIResource(
            base_url="https://api.example.com",
            api_key="secret-key"
        )
    }
)

ResourceDependency { .api }

Module: dagster._config.pythonic_config
Type: Configuration utility

Dependency injection for configurable resources.

from dagster import ConfigurableResource, ResourceDependency

class CacheResource(ConfigurableResource):
    """Cache resource."""
    host: str
    port: int = 6379
    
    def get(self, key: str) -> Optional[str]:
        return None  # Redis logic
    
    def set(self, key: str, value: str) -> None:
        pass  # Redis logic

class DatabaseService(ConfigurableResource):
    """Database service with cache dependency."""
    connection_string: str
    cache: ResourceDependency[CacheResource]  # Resource dependency
    
    def get_user(self, user_id: str) -> Optional[dict]:
        # Try cache first
        cached = self.cache.get(f"user:{user_id}")
        if cached:
            return json.loads(cached)
        
        # Fetch from database
        user = self.fetch_from_db(user_id)
        
        # Cache result
        if user:
            self.cache.set(f"user:{user_id}", json.dumps(user))
        
        return user
    
    def fetch_from_db(self, user_id: str) -> Optional[dict]:
        # Database fetch logic
        return {"id": user_id, "name": "John"}

# Resource configuration with dependencies
defs = Definitions(
    assets=[user_asset],
    resources={
        "cache": CacheResource(host="localhost"),
        "database": DatabaseService(
            connection_string="postgresql://localhost/db",
            cache=ResourceDependency("cache")  # Reference cache resource
        )
    }
)

Configuration Mapping

@config_mapping { .api }

Module: dagster._core.definitions.decorators.config_mapping_decorator
Type: Function decorator

Define configuration transformations for composable definitions.

from dagster import config_mapping, op, job, Field, String, Int

@op(config_schema={"name": String, "count": Int})
def parametrized_op(context):
    name = context.op_config["name"]
    count = context.op_config["count"]
    return f"Hello {name} x{count}"

@config_mapping(
    config_schema={"user": String},  # Simplified input config
    receive_processed_config_values=True
)
def simple_config_mapping(config):
    """Map simple config to complex op config."""
    return {
        "parametrized_op": {
            "config": {
                "name": config["user"],
                "count": 3  # Default value
            }
        }
    }

@job(config=simple_config_mapping)
def mapped_job():
    parametrized_op()

# Usage with simplified config
from dagster import execute_job
result = execute_job(
    mapped_job,
    run_config={
        "user": "Alice"  # Simple config mapped to complex op config
    }
)

ConfigMapping { .api }

Module: dagster._core.definitions.config
Type: Class

Configuration mapping for programmatic config transformation.

from dagster import ConfigMapping

# Programmatic config mapping
def transform_config(config_value):
    """Transform external config to internal format."""
    return {
        "ops": {
            "my_op": {
                "config": {
                    "processed_value": config_value["input"] * 2,
                    "metadata": {"source": "mapping"}
                }
            }
        }
    }

config_mapping = ConfigMapping(
    config_fn=transform_config,
    config_schema={"input": Int}
)

@job(config=config_mapping)
def configured_job():
    my_op()

Configuration Utilities

Configuration Loading

config_from_files { .api }

Module: dagster._core.definitions.utils
Type: Function

Load configuration from YAML or JSON files.

from dagster import config_from_files

# Load from single file
config = config_from_files(["config.yaml"])

# Load and merge multiple files
config = config_from_files([
    "base_config.yaml",
    "env_config.yaml",
    "secrets.yaml"
])

# Use in job execution
from dagster import execute_job
result = execute_job(my_job, run_config=config)

config_from_yaml_strings { .api }

Module: dagster._core.definitions.utils
Type: Function

Load configuration from YAML strings.

from dagster import config_from_yaml_strings

yaml_config = """
ops:
  my_op:
    config:
      batch_size: 1000
      debug: true
resources:
  database:
    config:
      host: localhost
      port: 5432
"""

config = config_from_yaml_strings([yaml_config])

RunConfig { .api }

Module: dagster._core.definitions.run_config
Type: Class

Type-safe run configuration builder.

from dagster import RunConfig, job, op, Config

class MyOpConfig(Config):
    value: int
    name: str

@op
def my_op(config: MyOpConfig):
    return config.value * 2

@job
def my_job():
    my_op()

# Type-safe run config
run_config = RunConfig(
    ops={
        "my_op": MyOpConfig(value=42, name="test")
    },
    resources={
        "database": {
            "host": "localhost",
            "port": 5432
        }
    }
)

# Execute with type-safe config
result = execute_job(my_job, run_config=run_config)

Advanced Configuration Patterns

Conditional Configuration

from dagster import Config, Field, op, job
from typing import Union, Literal
import os

class DevelopmentConfig(Config):
    mode: Literal["development"] = "development"
    debug: bool = True
    database_url: str = "sqlite:///dev.db"

class ProductionConfig(Config):
    mode: Literal["production"] = "production"  
    debug: bool = False
    database_url: str = Field(default_factory=lambda: os.getenv("DATABASE_URL"))
    ssl_required: bool = True

AppConfig = Union[DevelopmentConfig, ProductionConfig]

@op
def app_op(config: AppConfig):
    """Op with conditional configuration."""
    if config.mode == "development":
        # Development-specific logic
        if config.debug:
            print("Debug mode enabled")
    elif config.mode == "production":
        # Production-specific logic
        if config.ssl_required:
            print("SSL validation enabled")
    
    return f"Running in {config.mode} mode"

Nested Resource Configuration

class StorageConfig(Config):
    type: Literal["s3", "gcs", "local"]
    bucket: Optional[str] = None
    path: str = "/tmp"

class DatabaseConfig(Config):
    host: str
    port: int = 5432
    ssl: bool = True

class AppResourceConfig(Config):
    storage: StorageConfig
    database: DatabaseConfig
    cache_ttl: int = 3600

class ApplicationResource(ConfigurableResource):
    config: AppResourceConfig
    
    def get_storage_client(self):
        if self.config.storage.type == "s3":
            return S3Client(bucket=self.config.storage.bucket)
        elif self.config.storage.type == "local":
            return LocalStorage(path=self.config.storage.path)
    
    def get_database(self):
        return Database(
            host=self.config.database.host,
            port=self.config.database.port,
            ssl=self.config.database.ssl
        )

This comprehensive configuration system enables type-safe, validated, and environment-aware parameterization of all Dagster definitions. The Pythonic configuration approach with Pydantic provides excellent IDE support and runtime validation, while the traditional schema approach offers maximum flexibility for complex configuration scenarios.

For resource definitions and dependency injection, see Storage and I/O. For execution contexts that receive configuration, see Execution and Contexts.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json