0
# Configuration System
1
2
Dagster provides a comprehensive, type-safe configuration system that enables parameterization of assets, operations, resources, and entire pipelines. The system supports schema validation, environment variable injection, and hierarchical configuration composition.
3
4
## Configuration Schema System
5
6
### `ConfigSchema` { .api }
7
8
**Module:** `dagster._config.config_schema`
9
**Type:** Class
10
11
Configuration schema definition with validation and type coercion.
12
13
```python
14
from dagster import ConfigSchema, Field, op, job
15
from dagster import String, Int, Float, Bool, Array, Enum, EnumValue
16
17
# Simple schema using built-in types
18
schema = ConfigSchema({
19
"name": String,
20
"age": Int,
21
"salary": Float,
22
"active": Bool,
23
"tags": Array(String)
24
})
25
26
# Schema with Field configurations
27
complex_schema = ConfigSchema({
28
"database": Field(
29
String,
30
description="Database connection string",
31
default_value="sqlite:///default.db"
32
),
33
"batch_size": Field(
34
Int,
35
description="Processing batch size",
36
default_value=1000,
37
is_required=False
38
),
39
"mode": Field(
40
Enum("ProcessingMode", [
41
EnumValue("fast", description="Fast processing"),
42
EnumValue("accurate", description="Accurate processing")
43
]),
44
default_value="fast"
45
)
46
})
47
48
@op(config_schema=complex_schema)
49
def process_data(context):
50
"""Op with complex configuration."""
51
config = context.op_config
52
db_conn = config["database"]
53
batch_size = config["batch_size"]
54
mode = config["mode"]
55
56
context.log.info(f"Processing with batch_size={batch_size}, mode={mode}")
57
# Processing logic
58
```
59
60
### `Field` { .api }
61
62
**Module:** `dagster._config.field`
63
**Type:** Class
64
65
Configuration field with validation, defaults, and metadata.
66
67
```python
68
from dagster import Field, String, Int, Shape, Array
69
70
# Basic field
71
name_field = Field(String, description="User name")
72
73
# Field with default value
74
port_field = Field(
75
Int,
76
default_value=5432,
77
description="Database port",
78
is_required=False
79
)
80
81
# Nested field with Shape
82
config_field = Field(
83
Shape({
84
"host": Field(String, description="Database host"),
85
"port": Field(Int, default_value=5432),
86
"credentials": Field(
87
Shape({
88
"username": String,
89
"password": String
90
}),
91
description="Database credentials"
92
)
93
}),
94
description="Database configuration"
95
)
96
97
# Array field
98
tags_field = Field(
99
Array(String),
100
default_value=[],
101
description="Processing tags"
102
)
103
```
104
105
**Parameters:**
106
- `dagster_type: DagsterType` - The type of the field
107
- `default_value: Any` - Default value if not provided
108
- `is_required: bool = True` - Whether field is required
109
- `description: Optional[str]` - Field description
110
- `metadata: Optional[Dict[str, Any]]` - Field metadata
111
112
## Configuration Types
113
114
### Scalar Types
115
116
#### Built-in Scalar Types { .api }
117
118
**Module:** `dagster._builtins`
119
120
```python
121
from dagster import String, Int, Float, Bool, Any, Nothing
122
123
# String type with validation
124
@op(config_schema={"name": String})
125
def string_config_op(context):
126
name = context.op_config["name"] # Validated as string
127
128
# Integer type
129
@op(config_schema={"count": Int})
130
def int_config_op(context):
131
count = context.op_config["count"] # Validated as integer
132
133
# Float type
134
@op(config_schema={"threshold": Float})
135
def float_config_op(context):
136
threshold = context.op_config["threshold"] # Validated as float
137
138
# Boolean type
139
@op(config_schema={"enabled": Bool})
140
def bool_config_op(context):
141
enabled = context.op_config["enabled"] # Validated as boolean
142
143
# Any type (no validation)
144
@op(config_schema={"data": Any})
145
def any_config_op(context):
146
data = context.op_config["data"] # Any type allowed
147
148
# Nothing type (void/null)
149
@op(config_schema={"placeholder": Nothing})
150
def nothing_config_op(context):
151
# Field must be null/None
152
pass
153
```
154
155
### Collection Types
156
157
#### `Array` { .api }
158
159
**Module:** `dagster._config.config_type`
160
**Type:** Configuration type
161
162
Array/list configuration with element type validation.
163
164
```python
165
from dagster import Array, String, Int, Shape
166
167
# Array of strings
168
@op(config_schema={"tags": Array(String)})
169
def string_array_op(context):
170
tags = context.op_config["tags"] # List[str]
171
172
# Array of integers
173
@op(config_schema={"numbers": Array(Int)})
174
def int_array_op(context):
175
numbers = context.op_config["numbers"] # List[int]
176
177
# Array of nested objects
178
@op(config_schema={
179
"users": Array(Shape({
180
"name": String,
181
"age": Int,
182
"active": Bool
183
}))
184
})
185
def nested_array_op(context):
186
users = context.op_config["users"] # List[Dict]
187
for user in users:
188
name = user["name"]
189
age = user["age"]
190
active = user["active"]
191
```
192
193
#### `Enum` { .api }
194
195
**Module:** `dagster._config.config_type`
196
**Type:** Configuration type
197
198
Enumeration configuration with predefined values.
199
200
```python
201
from dagster import Enum, EnumValue, Field
202
203
# Simple enum
204
log_level_enum = Enum("LogLevel", [
205
EnumValue("DEBUG"),
206
EnumValue("INFO"),
207
EnumValue("WARNING"),
208
EnumValue("ERROR")
209
])
210
211
# Enum with descriptions
212
processing_mode = Enum("ProcessingMode", [
213
EnumValue("batch", description="Process in batches"),
214
EnumValue("streaming", description="Process as stream"),
215
EnumValue("realtime", description="Real-time processing")
216
])
217
218
@op(config_schema={
219
"log_level": log_level_enum,
220
"mode": processing_mode
221
})
222
def enum_config_op(context):
223
log_level = context.op_config["log_level"] # One of the enum values
224
mode = context.op_config["mode"]
225
226
context.log.info(f"Running in {mode} mode with {log_level} logging")
227
```
228
229
### Complex Types
230
231
#### `Shape` { .api }
232
233
**Module:** `dagster._config.field_utils`
234
**Type:** Configuration utility
235
236
Object/dictionary configuration with typed fields.
237
238
```python
239
from dagster import Shape, Field, String, Int, Bool
240
241
# Basic shape
242
user_shape = Shape({
243
"name": String,
244
"age": Int,
245
"email": String
246
})
247
248
# Shape with optional fields
249
config_shape = Shape({
250
"required_field": String,
251
"optional_field": Field(Int, is_required=False, default_value=42),
252
"nested": Shape({
253
"host": String,
254
"port": Field(Int, default_value=8080)
255
})
256
})
257
258
@op(config_schema={"user": user_shape, "config": config_shape})
259
def shape_config_op(context):
260
user = context.op_config["user"]
261
name = user["name"] # Guaranteed to be string
262
age = user["age"] # Guaranteed to be int
263
264
config = context.op_config["config"]
265
required = config["required_field"] # Required field
266
optional = config.get("optional_field", 42) # Optional with default
267
```
268
269
#### `Selector` { .api }
270
271
**Module:** `dagster._config.field_utils`
272
**Type:** Configuration utility
273
274
One-of configuration allowing selection between alternatives.
275
276
```python
277
from dagster import Selector, Field, String, Int, Shape
278
279
# Database selector - choose between different database types
280
database_selector = Selector({
281
"sqlite": Shape({
282
"path": String
283
}),
284
"postgres": Shape({
285
"host": String,
286
"port": Field(Int, default_value=5432),
287
"database": String,
288
"username": String,
289
"password": String
290
}),
291
"mysql": Shape({
292
"host": String,
293
"port": Field(Int, default_value=3306),
294
"database": String,
295
"username": String,
296
"password": String
297
})
298
})
299
300
@op(config_schema={"database": database_selector})
301
def database_op(context):
302
db_config = context.op_config["database"]
303
304
# Only one key will be present
305
if "sqlite" in db_config:
306
sqlite_config = db_config["sqlite"]
307
path = sqlite_config["path"]
308
# Handle SQLite connection
309
elif "postgres" in db_config:
310
pg_config = db_config["postgres"]
311
host = pg_config["host"]
312
port = pg_config["port"]
313
# Handle PostgreSQL connection
314
elif "mysql" in db_config:
315
mysql_config = db_config["mysql"]
316
# Handle MySQL connection
317
```
318
319
#### `Permissive` { .api }
320
321
**Module:** `dagster._config.field_utils`
322
**Type:** Configuration utility
323
324
Permissive configuration allowing additional unvalidated fields.
325
326
```python
327
from dagster import Permissive, String, Int
328
329
# Permissive config allows extra fields
330
permissive_config = Permissive({
331
"name": String,
332
"age": Int
333
# Additional fields allowed but not validated
334
})
335
336
@op(config_schema={"data": permissive_config})
337
def permissive_op(context):
338
data = context.op_config["data"]
339
name = data["name"] # Validated
340
age = data["age"] # Validated
341
342
# Additional fields accessible but not validated
343
extra_field = data.get("some_extra_field")
344
custom_metadata = data.get("metadata", {})
345
```
346
347
### Optional Types
348
349
#### `Noneable` { .api }
350
351
**Module:** `dagster._config.config_type`
352
**Type:** Configuration type
353
354
Optional configuration that can be null/None.
355
356
```python
357
from dagster import Noneable, String, Int
358
359
@op(config_schema={
360
"required": String,
361
"optional": Noneable(String), # Can be string or None
362
"maybe_count": Noneable(Int) # Can be int or None
363
})
364
def nullable_config_op(context):
365
config = context.op_config
366
required = config["required"] # Always present
367
optional = config.get("optional") # May be None
368
maybe_count = config.get("maybe_count") # May be None
369
370
if optional is not None:
371
context.log.info(f"Optional value: {optional}")
372
373
if maybe_count is not None:
374
context.log.info(f"Count: {maybe_count}")
375
```
376
377
## Environment Variable Configuration
378
379
### `EnvVar` { .api }
380
381
**Module:** `dagster._config.field_utils`
382
**Type:** Configuration utility
383
384
Environment variable configuration with type coercion and defaults.
385
386
```python
387
from dagster import EnvVar, Field, String, Int, Bool
388
import os
389
390
@op(config_schema={
391
"database_url": Field(
392
String,
393
default_value=EnvVar("DATABASE_URL")
394
),
395
"api_key": Field(
396
String,
397
default_value=EnvVar("API_KEY")
398
),
399
"batch_size": Field(
400
Int,
401
default_value=EnvVar.int("BATCH_SIZE", default=1000)
402
),
403
"debug_mode": Field(
404
Bool,
405
default_value=EnvVar.bool("DEBUG", default=False)
406
)
407
})
408
def env_config_op(context):
409
"""Op using environment variables for configuration."""
410
config = context.op_config
411
412
# Values automatically loaded from environment
413
db_url = config["database_url"] # From DATABASE_URL env var
414
api_key = config["api_key"] # From API_KEY env var
415
batch_size = config["batch_size"] # From BATCH_SIZE env var (as int)
416
debug = config["debug_mode"] # From DEBUG env var (as bool)
417
418
# Environment variable types
419
env_string = EnvVar("STRING_VAR") # String
420
env_int = EnvVar.int("INT_VAR") # Integer
421
env_int_default = EnvVar.int("INT_VAR", default=42) # Integer with default
422
env_bool = EnvVar.bool("BOOL_VAR") # Boolean
423
env_bool_default = EnvVar.bool("BOOL_VAR", default=True) # Boolean with default
424
```
425
426
### Configuration Sources
427
428
#### `StringSource` { .api }
429
430
**Module:** `dagster._config.source`
431
**Type:** Configuration source
432
433
String configuration from various sources including environment variables.
434
435
```python
436
from dagster import StringSource, IntSource, BoolSource, Field
437
438
@op(config_schema={
439
"connection_string": Field(StringSource),
440
"retry_count": Field(IntSource),
441
"enable_ssl": Field(BoolSource)
442
})
443
def source_config_op(context):
444
"""Op using configuration sources."""
445
config = context.op_config
446
447
# Can come from environment variables or direct values
448
conn_str = config["connection_string"]
449
retries = config["retry_count"]
450
ssl_enabled = config["enable_ssl"]
451
452
# Usage in run config:
453
run_config = {
454
"ops": {
455
"source_config_op": {
456
"config": {
457
"connection_string": {"env": "DB_CONNECTION_STRING"},
458
"retry_count": {"value": 3}, # Direct value
459
"enable_ssl": {"env": "ENABLE_SSL"}
460
}
461
}
462
}
463
}
464
```
465
466
## Pythonic Configuration System
467
468
### `Config` { .api }
469
470
**Module:** `dagster._config.pythonic_config`
471
**Type:** Base class
472
473
Pydantic-based configuration class for type-safe, IDE-friendly configuration.
474
475
```python
476
from dagster import Config, op, asset, resource
477
from typing import List, Optional
478
from pydantic import Field
479
480
class DatabaseConfig(Config):
481
"""Database configuration using Pydantic."""
482
host: str = Field(description="Database host")
483
port: int = Field(default=5432, description="Database port")
484
database: str = Field(description="Database name")
485
username: str = Field(description="Database username")
486
password: str = Field(description="Database password")
487
pool_size: int = Field(default=10, description="Connection pool size")
488
ssl_enabled: bool = Field(default=True, description="Enable SSL")
489
490
# Environment variable integration
491
api_key: str = Field(default_factory=lambda: os.getenv("DB_API_KEY"))
492
493
class ProcessingConfig(Config):
494
"""Processing configuration."""
495
batch_size: int = Field(default=1000, gt=0, description="Batch size")
496
parallel_workers: int = Field(default=4, ge=1, le=16)
497
timeout_seconds: float = Field(default=30.0, gt=0)
498
tags: List[str] = Field(default_factory=list)
499
debug_mode: bool = Field(default=False)
500
501
# Computed field
502
@property
503
def total_capacity(self) -> int:
504
return self.batch_size * self.parallel_workers
505
506
@op
507
def process_data(config: ProcessingConfig) -> str:
508
"""Op with Pythonic configuration."""
509
# Full IDE support and type checking
510
batch_size = config.batch_size # IDE knows this is int
511
workers = config.parallel_workers
512
513
# Access computed properties
514
capacity = config.total_capacity
515
516
# Pydantic validation ensures all constraints met
517
return f"Processing with {workers} workers, batch_size {batch_size}"
518
519
@asset
520
def analytics_data(config: ProcessingConfig) -> dict:
521
"""Asset with typed configuration."""
522
if config.debug_mode:
523
print(f"Debug: Processing {config.batch_size} records")
524
525
# Configuration is fully typed and validated
526
return {"processed": True, "batch_size": config.batch_size}
527
```
528
529
### `ConfigurableResource` { .api }
530
531
**Module:** `dagster._config.pythonic_config`
532
**Type:** Base class
533
534
Configurable resource using Pydantic configuration.
535
536
```python
537
from dagster import ConfigurableResource, resource
538
from pydantic import Field
539
import requests
540
from typing import Optional
541
542
class DatabaseResource(ConfigurableResource):
543
"""Database resource with configuration."""
544
host: str = Field(description="Database host")
545
port: int = Field(default=5432)
546
database: str = Field(description="Database name")
547
username: str = Field(description="Username")
548
password: str = Field(description="Password")
549
pool_size: int = Field(default=10, ge=1, le=100)
550
551
def get_connection(self):
552
"""Get database connection."""
553
# Use configuration to create connection
554
return f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
555
556
def query(self, sql: str) -> list:
557
"""Execute query."""
558
conn = self.get_connection()
559
# Execute query logic
560
return []
561
562
class APIResource(ConfigurableResource):
563
"""API client resource."""
564
base_url: str = Field(description="API base URL")
565
api_key: str = Field(description="API key for authentication")
566
timeout: float = Field(default=30.0, gt=0)
567
retries: int = Field(default=3, ge=0)
568
569
def get_client(self) -> requests.Session:
570
"""Get configured HTTP client."""
571
session = requests.Session()
572
session.headers.update({"Authorization": f"Bearer {self.api_key}"})
573
return session
574
575
def fetch_data(self, endpoint: str) -> dict:
576
"""Fetch data from API endpoint."""
577
client = self.get_client()
578
response = client.get(f"{self.base_url}/{endpoint}", timeout=self.timeout)
579
return response.json()
580
581
# Usage with assets
582
@asset
583
def user_data(database: DatabaseResource) -> list:
584
"""Fetch user data using database resource."""
585
return database.query("SELECT * FROM users")
586
587
@asset
588
def external_data(api_client: APIResource) -> dict:
589
"""Fetch external data using API resource."""
590
return api_client.fetch_data("users")
591
592
# Resource definitions in Definitions
593
from dagster import Definitions
594
595
defs = Definitions(
596
assets=[user_data, external_data],
597
resources={
598
"database": DatabaseResource(
599
host="localhost",
600
database="mydb",
601
username="user",
602
password="password"
603
),
604
"api_client": APIResource(
605
base_url="https://api.example.com",
606
api_key="secret-key"
607
)
608
}
609
)
610
```
611
612
### `ResourceDependency` { .api }
613
614
**Module:** `dagster._config.pythonic_config`
615
**Type:** Configuration utility
616
617
Dependency injection for configurable resources.
618
619
```python
620
from dagster import ConfigurableResource, ResourceDependency
621
622
class CacheResource(ConfigurableResource):
623
"""Cache resource."""
624
host: str
625
port: int = 6379
626
627
def get(self, key: str) -> Optional[str]:
628
return None # Redis logic
629
630
def set(self, key: str, value: str) -> None:
631
pass # Redis logic
632
633
class DatabaseService(ConfigurableResource):
634
"""Database service with cache dependency."""
635
connection_string: str
636
cache: ResourceDependency[CacheResource] # Resource dependency
637
638
def get_user(self, user_id: str) -> Optional[dict]:
639
# Try cache first
640
cached = self.cache.get(f"user:{user_id}")
641
if cached:
642
return json.loads(cached)
643
644
# Fetch from database
645
user = self.fetch_from_db(user_id)
646
647
# Cache result
648
if user:
649
self.cache.set(f"user:{user_id}", json.dumps(user))
650
651
return user
652
653
def fetch_from_db(self, user_id: str) -> Optional[dict]:
654
# Database fetch logic
655
return {"id": user_id, "name": "John"}
656
657
# Resource configuration with dependencies
658
defs = Definitions(
659
assets=[user_asset],
660
resources={
661
"cache": CacheResource(host="localhost"),
662
"database": DatabaseService(
663
connection_string="postgresql://localhost/db",
664
cache=ResourceDependency("cache") # Reference cache resource
665
)
666
}
667
)
668
```
669
670
## Configuration Mapping
671
672
### `@config_mapping` { .api }
673
674
**Module:** `dagster._core.definitions.decorators.config_mapping_decorator`
675
**Type:** Function decorator
676
677
Define configuration transformations for composable definitions.
678
679
```python
680
from dagster import config_mapping, op, job, Field, String, Int
681
682
@op(config_schema={"name": String, "count": Int})
683
def parametrized_op(context):
684
name = context.op_config["name"]
685
count = context.op_config["count"]
686
return f"Hello {name} x{count}"
687
688
@config_mapping(
689
config_schema={"user": String}, # Simplified input config
690
receive_processed_config_values=True
691
)
692
def simple_config_mapping(config):
693
"""Map simple config to complex op config."""
694
return {
695
"parametrized_op": {
696
"config": {
697
"name": config["user"],
698
"count": 3 # Default value
699
}
700
}
701
}
702
703
@job(config=simple_config_mapping)
704
def mapped_job():
705
parametrized_op()
706
707
# Usage with simplified config
708
from dagster import execute_job
709
result = execute_job(
710
mapped_job,
711
run_config={
712
"user": "Alice" # Simple config mapped to complex op config
713
}
714
)
715
```
716
717
### `ConfigMapping` { .api }
718
719
**Module:** `dagster._core.definitions.config`
720
**Type:** Class
721
722
Configuration mapping for programmatic config transformation.
723
724
```python
725
from dagster import ConfigMapping
726
727
# Programmatic config mapping
728
def transform_config(config_value):
729
"""Transform external config to internal format."""
730
return {
731
"ops": {
732
"my_op": {
733
"config": {
734
"processed_value": config_value["input"] * 2,
735
"metadata": {"source": "mapping"}
736
}
737
}
738
}
739
}
740
741
config_mapping = ConfigMapping(
742
config_fn=transform_config,
743
config_schema={"input": Int}
744
)
745
746
@job(config=config_mapping)
747
def configured_job():
748
my_op()
749
```
750
751
## Configuration Utilities
752
753
### Configuration Loading
754
755
#### `config_from_files` { .api }
756
757
**Module:** `dagster._core.definitions.utils`
758
**Type:** Function
759
760
Load configuration from YAML or JSON files.
761
762
```python
763
from dagster import config_from_files
764
765
# Load from single file
766
config = config_from_files(["config.yaml"])
767
768
# Load and merge multiple files
769
config = config_from_files([
770
"base_config.yaml",
771
"env_config.yaml",
772
"secrets.yaml"
773
])
774
775
# Use in job execution
776
from dagster import execute_job
777
result = execute_job(my_job, run_config=config)
778
```
779
780
#### `config_from_yaml_strings` { .api }
781
782
**Module:** `dagster._core.definitions.utils`
783
**Type:** Function
784
785
Load configuration from YAML strings.
786
787
```python
788
from dagster import config_from_yaml_strings
789
790
yaml_config = """
791
ops:
792
my_op:
793
config:
794
batch_size: 1000
795
debug: true
796
resources:
797
database:
798
config:
799
host: localhost
800
port: 5432
801
"""
802
803
config = config_from_yaml_strings([yaml_config])
804
```
805
806
### `RunConfig` { .api }
807
808
**Module:** `dagster._core.definitions.run_config`
809
**Type:** Class
810
811
Type-safe run configuration builder.
812
813
```python
814
from dagster import RunConfig, job, op, Config
815
816
class MyOpConfig(Config):
817
value: int
818
name: str
819
820
@op
821
def my_op(config: MyOpConfig):
822
return config.value * 2
823
824
@job
825
def my_job():
826
my_op()
827
828
# Type-safe run config
829
run_config = RunConfig(
830
ops={
831
"my_op": MyOpConfig(value=42, name="test")
832
},
833
resources={
834
"database": {
835
"host": "localhost",
836
"port": 5432
837
}
838
}
839
)
840
841
# Execute with type-safe config
842
result = execute_job(my_job, run_config=run_config)
843
```
844
845
## Advanced Configuration Patterns
846
847
### Conditional Configuration
848
849
```python
850
from dagster import Config, Field, op, job
851
from typing import Union, Literal
852
import os
853
854
class DevelopmentConfig(Config):
855
mode: Literal["development"] = "development"
856
debug: bool = True
857
database_url: str = "sqlite:///dev.db"
858
859
class ProductionConfig(Config):
860
mode: Literal["production"] = "production"
861
debug: bool = False
862
database_url: str = Field(default_factory=lambda: os.getenv("DATABASE_URL"))
863
ssl_required: bool = True
864
865
AppConfig = Union[DevelopmentConfig, ProductionConfig]
866
867
@op
868
def app_op(config: AppConfig):
869
"""Op with conditional configuration."""
870
if config.mode == "development":
871
# Development-specific logic
872
if config.debug:
873
print("Debug mode enabled")
874
elif config.mode == "production":
875
# Production-specific logic
876
if config.ssl_required:
877
print("SSL validation enabled")
878
879
return f"Running in {config.mode} mode"
880
```
881
882
### Nested Resource Configuration
883
884
```python
885
class StorageConfig(Config):
886
type: Literal["s3", "gcs", "local"]
887
bucket: Optional[str] = None
888
path: str = "/tmp"
889
890
class DatabaseConfig(Config):
891
host: str
892
port: int = 5432
893
ssl: bool = True
894
895
class AppResourceConfig(Config):
896
storage: StorageConfig
897
database: DatabaseConfig
898
cache_ttl: int = 3600
899
900
class ApplicationResource(ConfigurableResource):
901
config: AppResourceConfig
902
903
def get_storage_client(self):
904
if self.config.storage.type == "s3":
905
return S3Client(bucket=self.config.storage.bucket)
906
elif self.config.storage.type == "local":
907
return LocalStorage(path=self.config.storage.path)
908
909
def get_database(self):
910
return Database(
911
host=self.config.database.host,
912
port=self.config.database.port,
913
ssl=self.config.database.ssl
914
)
915
```
916
917
This comprehensive configuration system enables type-safe, validated, and environment-aware parameterization of all Dagster definitions. The Pythonic configuration approach with Pydantic provides excellent IDE support and runtime validation, while the traditional schema approach offers maximum flexibility for complex configuration scenarios.
918
919
For resource definitions and dependency injection, see [Storage and I/O](./storage-io.md). For execution contexts that receive configuration, see [Execution and Contexts](./execution-contexts.md).