A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
Dagster provides a comprehensive, type-safe configuration system that enables parameterization of assets, operations, resources, and entire pipelines. The system supports schema validation, environment variable injection, and hierarchical configuration composition.
ConfigSchema { .api }Module: dagster._config.config_schema
Type: Class
Configuration schema definition with validation and type coercion.
from dagster import ConfigSchema, Field, op, job
from dagster import String, Int, Float, Bool, Array, Enum, EnumValue
# Simple schema using built-in types
schema = ConfigSchema({
"name": String,
"age": Int,
"salary": Float,
"active": Bool,
"tags": Array(String)
})
# Schema with Field configurations
complex_schema = ConfigSchema({
"database": Field(
String,
description="Database connection string",
default_value="sqlite:///default.db"
),
"batch_size": Field(
Int,
description="Processing batch size",
default_value=1000,
is_required=False
),
"mode": Field(
Enum("ProcessingMode", [
EnumValue("fast", description="Fast processing"),
EnumValue("accurate", description="Accurate processing")
]),
default_value="fast"
)
})
@op(config_schema=complex_schema)
def process_data(context):
"""Op with complex configuration."""
config = context.op_config
db_conn = config["database"]
batch_size = config["batch_size"]
mode = config["mode"]
context.log.info(f"Processing with batch_size={batch_size}, mode={mode}")
# Processing logicField { .api }Module: dagster._config.field
Type: Class
Configuration field with validation, defaults, and metadata.
from dagster import Field, String, Int, Shape, Array
# Basic field
name_field = Field(String, description="User name")
# Field with default value
port_field = Field(
Int,
default_value=5432,
description="Database port",
is_required=False
)
# Nested field with Shape
config_field = Field(
Shape({
"host": Field(String, description="Database host"),
"port": Field(Int, default_value=5432),
"credentials": Field(
Shape({
"username": String,
"password": String
}),
description="Database credentials"
)
}),
description="Database configuration"
)
# Array field
tags_field = Field(
Array(String),
default_value=[],
description="Processing tags"
)Parameters:
dagster_type: DagsterType - The type of the fielddefault_value: Any - Default value if not providedis_required: bool = True - Whether field is requireddescription: Optional[str] - Field descriptionmetadata: Optional[Dict[str, Any]] - Field metadataModule: dagster._builtins
from dagster import String, Int, Float, Bool, Any, Nothing
# String type with validation
@op(config_schema={"name": String})
def string_config_op(context):
name = context.op_config["name"] # Validated as string
# Integer type
@op(config_schema={"count": Int})
def int_config_op(context):
count = context.op_config["count"] # Validated as integer
# Float type
@op(config_schema={"threshold": Float})
def float_config_op(context):
threshold = context.op_config["threshold"] # Validated as float
# Boolean type
@op(config_schema={"enabled": Bool})
def bool_config_op(context):
enabled = context.op_config["enabled"] # Validated as boolean
# Any type (no validation)
@op(config_schema={"data": Any})
def any_config_op(context):
data = context.op_config["data"] # Any type allowed
# Nothing type (void/null)
@op(config_schema={"placeholder": Nothing})
def nothing_config_op(context):
# Field must be null/None
passArray { .api }Module: dagster._config.config_type
Type: Configuration type
Array/list configuration with element type validation.
from dagster import Array, String, Int, Shape
# Array of strings
@op(config_schema={"tags": Array(String)})
def string_array_op(context):
tags = context.op_config["tags"] # List[str]
# Array of integers
@op(config_schema={"numbers": Array(Int)})
def int_array_op(context):
numbers = context.op_config["numbers"] # List[int]
# Array of nested objects
@op(config_schema={
"users": Array(Shape({
"name": String,
"age": Int,
"active": Bool
}))
})
def nested_array_op(context):
users = context.op_config["users"] # List[Dict]
for user in users:
name = user["name"]
age = user["age"]
active = user["active"]Enum { .api }Module: dagster._config.config_type
Type: Configuration type
Enumeration configuration with predefined values.
from dagster import Enum, EnumValue, Field
# Simple enum
log_level_enum = Enum("LogLevel", [
EnumValue("DEBUG"),
EnumValue("INFO"),
EnumValue("WARNING"),
EnumValue("ERROR")
])
# Enum with descriptions
processing_mode = Enum("ProcessingMode", [
EnumValue("batch", description="Process in batches"),
EnumValue("streaming", description="Process as stream"),
EnumValue("realtime", description="Real-time processing")
])
@op(config_schema={
"log_level": log_level_enum,
"mode": processing_mode
})
def enum_config_op(context):
log_level = context.op_config["log_level"] # One of the enum values
mode = context.op_config["mode"]
context.log.info(f"Running in {mode} mode with {log_level} logging")Shape { .api }Module: dagster._config.field_utils
Type: Configuration utility
Object/dictionary configuration with typed fields.
from dagster import Shape, Field, String, Int, Bool
# Basic shape
user_shape = Shape({
"name": String,
"age": Int,
"email": String
})
# Shape with optional fields
config_shape = Shape({
"required_field": String,
"optional_field": Field(Int, is_required=False, default_value=42),
"nested": Shape({
"host": String,
"port": Field(Int, default_value=8080)
})
})
@op(config_schema={"user": user_shape, "config": config_shape})
def shape_config_op(context):
user = context.op_config["user"]
name = user["name"] # Guaranteed to be string
age = user["age"] # Guaranteed to be int
config = context.op_config["config"]
required = config["required_field"] # Required field
optional = config.get("optional_field", 42) # Optional with defaultSelector { .api }Module: dagster._config.field_utils
Type: Configuration utility
One-of configuration allowing selection between alternatives.
from dagster import Selector, Field, String, Int, Shape
# Database selector - choose between different database types
database_selector = Selector({
"sqlite": Shape({
"path": String
}),
"postgres": Shape({
"host": String,
"port": Field(Int, default_value=5432),
"database": String,
"username": String,
"password": String
}),
"mysql": Shape({
"host": String,
"port": Field(Int, default_value=3306),
"database": String,
"username": String,
"password": String
})
})
@op(config_schema={"database": database_selector})
def database_op(context):
db_config = context.op_config["database"]
# Only one key will be present
if "sqlite" in db_config:
sqlite_config = db_config["sqlite"]
path = sqlite_config["path"]
# Handle SQLite connection
elif "postgres" in db_config:
pg_config = db_config["postgres"]
host = pg_config["host"]
port = pg_config["port"]
# Handle PostgreSQL connection
elif "mysql" in db_config:
mysql_config = db_config["mysql"]
# Handle MySQL connectionPermissive { .api }Module: dagster._config.field_utils
Type: Configuration utility
Permissive configuration allowing additional unvalidated fields.
from dagster import Permissive, String, Int
# Permissive config allows extra fields
permissive_config = Permissive({
"name": String,
"age": Int
# Additional fields allowed but not validated
})
@op(config_schema={"data": permissive_config})
def permissive_op(context):
data = context.op_config["data"]
name = data["name"] # Validated
age = data["age"] # Validated
# Additional fields accessible but not validated
extra_field = data.get("some_extra_field")
custom_metadata = data.get("metadata", {})Noneable { .api }Module: dagster._config.config_type
Type: Configuration type
Optional configuration that can be null/None.
from dagster import Noneable, String, Int
@op(config_schema={
"required": String,
"optional": Noneable(String), # Can be string or None
"maybe_count": Noneable(Int) # Can be int or None
})
def nullable_config_op(context):
config = context.op_config
required = config["required"] # Always present
optional = config.get("optional") # May be None
maybe_count = config.get("maybe_count") # May be None
if optional is not None:
context.log.info(f"Optional value: {optional}")
if maybe_count is not None:
context.log.info(f"Count: {maybe_count}")EnvVar { .api }Module: dagster._config.field_utils
Type: Configuration utility
Environment variable configuration with type coercion and defaults.
from dagster import EnvVar, Field, String, Int, Bool
import os
@op(config_schema={
"database_url": Field(
String,
default_value=EnvVar("DATABASE_URL")
),
"api_key": Field(
String,
default_value=EnvVar("API_KEY")
),
"batch_size": Field(
Int,
default_value=EnvVar.int("BATCH_SIZE", default=1000)
),
"debug_mode": Field(
Bool,
default_value=EnvVar.bool("DEBUG", default=False)
)
})
def env_config_op(context):
"""Op using environment variables for configuration."""
config = context.op_config
# Values automatically loaded from environment
db_url = config["database_url"] # From DATABASE_URL env var
api_key = config["api_key"] # From API_KEY env var
batch_size = config["batch_size"] # From BATCH_SIZE env var (as int)
debug = config["debug_mode"] # From DEBUG env var (as bool)
# Environment variable types
env_string = EnvVar("STRING_VAR") # String
env_int = EnvVar.int("INT_VAR") # Integer
env_int_default = EnvVar.int("INT_VAR", default=42) # Integer with default
env_bool = EnvVar.bool("BOOL_VAR") # Boolean
env_bool_default = EnvVar.bool("BOOL_VAR", default=True) # Boolean with defaultStringSource { .api }Module: dagster._config.source
Type: Configuration source
String configuration from various sources including environment variables.
from dagster import StringSource, IntSource, BoolSource, Field
@op(config_schema={
"connection_string": Field(StringSource),
"retry_count": Field(IntSource),
"enable_ssl": Field(BoolSource)
})
def source_config_op(context):
"""Op using configuration sources."""
config = context.op_config
# Can come from environment variables or direct values
conn_str = config["connection_string"]
retries = config["retry_count"]
ssl_enabled = config["enable_ssl"]
# Usage in run config:
run_config = {
"ops": {
"source_config_op": {
"config": {
"connection_string": {"env": "DB_CONNECTION_STRING"},
"retry_count": {"value": 3}, # Direct value
"enable_ssl": {"env": "ENABLE_SSL"}
}
}
}
}Config { .api }Module: dagster._config.pythonic_config
Type: Base class
Pydantic-based configuration class for type-safe, IDE-friendly configuration.
from dagster import Config, op, asset, resource
from typing import List, Optional
from pydantic import Field
class DatabaseConfig(Config):
"""Database configuration using Pydantic."""
host: str = Field(description="Database host")
port: int = Field(default=5432, description="Database port")
database: str = Field(description="Database name")
username: str = Field(description="Database username")
password: str = Field(description="Database password")
pool_size: int = Field(default=10, description="Connection pool size")
ssl_enabled: bool = Field(default=True, description="Enable SSL")
# Environment variable integration
api_key: str = Field(default_factory=lambda: os.getenv("DB_API_KEY"))
class ProcessingConfig(Config):
"""Processing configuration."""
batch_size: int = Field(default=1000, gt=0, description="Batch size")
parallel_workers: int = Field(default=4, ge=1, le=16)
timeout_seconds: float = Field(default=30.0, gt=0)
tags: List[str] = Field(default_factory=list)
debug_mode: bool = Field(default=False)
# Computed field
@property
def total_capacity(self) -> int:
return self.batch_size * self.parallel_workers
@op
def process_data(config: ProcessingConfig) -> str:
"""Op with Pythonic configuration."""
# Full IDE support and type checking
batch_size = config.batch_size # IDE knows this is int
workers = config.parallel_workers
# Access computed properties
capacity = config.total_capacity
# Pydantic validation ensures all constraints met
return f"Processing with {workers} workers, batch_size {batch_size}"
@asset
def analytics_data(config: ProcessingConfig) -> dict:
"""Asset with typed configuration."""
if config.debug_mode:
print(f"Debug: Processing {config.batch_size} records")
# Configuration is fully typed and validated
return {"processed": True, "batch_size": config.batch_size}ConfigurableResource { .api }Module: dagster._config.pythonic_config
Type: Base class
Configurable resource using Pydantic configuration.
from dagster import ConfigurableResource, resource
from pydantic import Field
import requests
from typing import Optional
class DatabaseResource(ConfigurableResource):
"""Database resource with configuration."""
host: str = Field(description="Database host")
port: int = Field(default=5432)
database: str = Field(description="Database name")
username: str = Field(description="Username")
password: str = Field(description="Password")
pool_size: int = Field(default=10, ge=1, le=100)
def get_connection(self):
"""Get database connection."""
# Use configuration to create connection
return f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
def query(self, sql: str) -> list:
"""Execute query."""
conn = self.get_connection()
# Execute query logic
return []
class APIResource(ConfigurableResource):
"""API client resource."""
base_url: str = Field(description="API base URL")
api_key: str = Field(description="API key for authentication")
timeout: float = Field(default=30.0, gt=0)
retries: int = Field(default=3, ge=0)
def get_client(self) -> requests.Session:
"""Get configured HTTP client."""
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {self.api_key}"})
return session
def fetch_data(self, endpoint: str) -> dict:
"""Fetch data from API endpoint."""
client = self.get_client()
response = client.get(f"{self.base_url}/{endpoint}", timeout=self.timeout)
return response.json()
# Usage with assets
@asset
def user_data(database: DatabaseResource) -> list:
"""Fetch user data using database resource."""
return database.query("SELECT * FROM users")
@asset
def external_data(api_client: APIResource) -> dict:
"""Fetch external data using API resource."""
return api_client.fetch_data("users")
# Resource definitions in Definitions
from dagster import Definitions
defs = Definitions(
assets=[user_data, external_data],
resources={
"database": DatabaseResource(
host="localhost",
database="mydb",
username="user",
password="password"
),
"api_client": APIResource(
base_url="https://api.example.com",
api_key="secret-key"
)
}
)ResourceDependency { .api }Module: dagster._config.pythonic_config
Type: Configuration utility
Dependency injection for configurable resources.
from dagster import ConfigurableResource, ResourceDependency
class CacheResource(ConfigurableResource):
"""Cache resource."""
host: str
port: int = 6379
def get(self, key: str) -> Optional[str]:
return None # Redis logic
def set(self, key: str, value: str) -> None:
pass # Redis logic
class DatabaseService(ConfigurableResource):
"""Database service with cache dependency."""
connection_string: str
cache: ResourceDependency[CacheResource] # Resource dependency
def get_user(self, user_id: str) -> Optional[dict]:
# Try cache first
cached = self.cache.get(f"user:{user_id}")
if cached:
return json.loads(cached)
# Fetch from database
user = self.fetch_from_db(user_id)
# Cache result
if user:
self.cache.set(f"user:{user_id}", json.dumps(user))
return user
def fetch_from_db(self, user_id: str) -> Optional[dict]:
# Database fetch logic
return {"id": user_id, "name": "John"}
# Resource configuration with dependencies
defs = Definitions(
assets=[user_asset],
resources={
"cache": CacheResource(host="localhost"),
"database": DatabaseService(
connection_string="postgresql://localhost/db",
cache=ResourceDependency("cache") # Reference cache resource
)
}
)@config_mapping { .api }Module: dagster._core.definitions.decorators.config_mapping_decorator
Type: Function decorator
Define configuration transformations for composable definitions.
from dagster import config_mapping, op, job, Field, String, Int
@op(config_schema={"name": String, "count": Int})
def parametrized_op(context):
name = context.op_config["name"]
count = context.op_config["count"]
return f"Hello {name} x{count}"
@config_mapping(
config_schema={"user": String}, # Simplified input config
receive_processed_config_values=True
)
def simple_config_mapping(config):
"""Map simple config to complex op config."""
return {
"parametrized_op": {
"config": {
"name": config["user"],
"count": 3 # Default value
}
}
}
@job(config=simple_config_mapping)
def mapped_job():
parametrized_op()
# Usage with simplified config
from dagster import execute_job
result = execute_job(
mapped_job,
run_config={
"user": "Alice" # Simple config mapped to complex op config
}
)ConfigMapping { .api }Module: dagster._core.definitions.config
Type: Class
Configuration mapping for programmatic config transformation.
from dagster import ConfigMapping
# Programmatic config mapping
def transform_config(config_value):
"""Transform external config to internal format."""
return {
"ops": {
"my_op": {
"config": {
"processed_value": config_value["input"] * 2,
"metadata": {"source": "mapping"}
}
}
}
}
config_mapping = ConfigMapping(
config_fn=transform_config,
config_schema={"input": Int}
)
@job(config=config_mapping)
def configured_job():
my_op()config_from_files { .api }Module: dagster._core.definitions.utils
Type: Function
Load configuration from YAML or JSON files.
from dagster import config_from_files
# Load from single file
config = config_from_files(["config.yaml"])
# Load and merge multiple files
config = config_from_files([
"base_config.yaml",
"env_config.yaml",
"secrets.yaml"
])
# Use in job execution
from dagster import execute_job
result = execute_job(my_job, run_config=config)config_from_yaml_strings { .api }Module: dagster._core.definitions.utils
Type: Function
Load configuration from YAML strings.
from dagster import config_from_yaml_strings
yaml_config = """
ops:
my_op:
config:
batch_size: 1000
debug: true
resources:
database:
config:
host: localhost
port: 5432
"""
config = config_from_yaml_strings([yaml_config])RunConfig { .api }Module: dagster._core.definitions.run_config
Type: Class
Type-safe run configuration builder.
from dagster import RunConfig, job, op, Config
class MyOpConfig(Config):
value: int
name: str
@op
def my_op(config: MyOpConfig):
return config.value * 2
@job
def my_job():
my_op()
# Type-safe run config
run_config = RunConfig(
ops={
"my_op": MyOpConfig(value=42, name="test")
},
resources={
"database": {
"host": "localhost",
"port": 5432
}
}
)
# Execute with type-safe config
result = execute_job(my_job, run_config=run_config)from dagster import Config, Field, op, job
from typing import Union, Literal
import os
class DevelopmentConfig(Config):
mode: Literal["development"] = "development"
debug: bool = True
database_url: str = "sqlite:///dev.db"
class ProductionConfig(Config):
mode: Literal["production"] = "production"
debug: bool = False
database_url: str = Field(default_factory=lambda: os.getenv("DATABASE_URL"))
ssl_required: bool = True
AppConfig = Union[DevelopmentConfig, ProductionConfig]
@op
def app_op(config: AppConfig):
"""Op with conditional configuration."""
if config.mode == "development":
# Development-specific logic
if config.debug:
print("Debug mode enabled")
elif config.mode == "production":
# Production-specific logic
if config.ssl_required:
print("SSL validation enabled")
return f"Running in {config.mode} mode"class StorageConfig(Config):
type: Literal["s3", "gcs", "local"]
bucket: Optional[str] = None
path: str = "/tmp"
class DatabaseConfig(Config):
host: str
port: int = 5432
ssl: bool = True
class AppResourceConfig(Config):
storage: StorageConfig
database: DatabaseConfig
cache_ttl: int = 3600
class ApplicationResource(ConfigurableResource):
config: AppResourceConfig
def get_storage_client(self):
if self.config.storage.type == "s3":
return S3Client(bucket=self.config.storage.bucket)
elif self.config.storage.type == "local":
return LocalStorage(path=self.config.storage.path)
def get_database(self):
return Database(
host=self.config.database.host,
port=self.config.database.port,
ssl=self.config.database.ssl
)This comprehensive configuration system enables type-safe, validated, and environment-aware parameterization of all Dagster definitions. The Pythonic configuration approach with Pydantic provides excellent IDE support and runtime validation, while the traditional schema approach offers maximum flexibility for complex configuration scenarios.
For resource definitions and dependency injection, see Storage and I/O. For execution contexts that receive configuration, see Execution and Contexts.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster