0
# Storage and I/O Management
1
2
This document covers Dagster's storage and I/O management system, including I/O managers, input managers, file management, and built-in storage backends. The I/O system provides pluggable storage for asset and operation outputs with automatic serialization, deserialization, and metadata tracking.
3
4
## I/O Manager System
5
6
I/O managers handle the storage and retrieval of asset and operation outputs, providing a clean abstraction over different storage backends.
7
8
### IOManager Interface
9
10
#### `IOManager` { .api }
11
12
**Module:** `dagster._core.storage.io_manager`
13
**Type:** Abstract base class
14
15
Base interface for I/O managers that handle asset and operation output storage.
16
17
```python
18
from dagster import IOManager, InputContext, OutputContext
19
import pandas as pd
20
import pickle
21
import os
22
23
class CustomIOManager(IOManager):
24
"""Custom I/O manager implementation."""
25
26
def __init__(self, base_path: str = "/tmp/dagster"):
27
self.base_path = base_path
28
os.makedirs(base_path, exist_ok=True)
29
30
def handle_output(self, context: OutputContext, obj) -> None:
31
"""Store output object."""
32
# Generate file path from context
33
if context.asset_key:
34
# For assets, use asset key path
35
path_parts = context.asset_key.path
36
file_path = os.path.join(self.base_path, *path_parts)
37
else:
38
# For ops, use step key and output name
39
file_path = os.path.join(self.base_path, f"{context.step_key}_{context.name}")
40
41
# Create directory if needed
42
os.makedirs(os.path.dirname(file_path), exist_ok=True)
43
44
# Handle different object types
45
if isinstance(obj, pd.DataFrame):
46
file_path += ".parquet"
47
obj.to_parquet(file_path)
48
context.log.info(f"Stored DataFrame with {len(obj)} rows to {file_path}")
49
else:
50
file_path += ".pkl"
51
with open(file_path, "wb") as f:
52
pickle.dump(obj, f)
53
context.log.info(f"Stored object to {file_path}")
54
55
# Add metadata about storage
56
context.add_output_metadata({
57
"file_path": file_path,
58
"file_size_bytes": os.path.getsize(file_path),
59
"storage_type": "local_filesystem"
60
})
61
62
def load_input(self, context: InputContext):
63
"""Load input object."""
64
# Generate file path from context
65
if context.asset_key:
66
path_parts = context.asset_key.path
67
base_file_path = os.path.join(self.base_path, *path_parts)
68
else:
69
# For op outputs, need to determine path from upstream
70
upstream_context = context.upstream_output
71
if upstream_context.asset_key:
72
path_parts = upstream_context.asset_key.path
73
base_file_path = os.path.join(self.base_path, *path_parts)
74
else:
75
base_file_path = os.path.join(
76
self.base_path,
77
f"{upstream_context.step_key}_{upstream_context.name}"
78
)
79
80
# Try different file extensions
81
if os.path.exists(base_file_path + ".parquet"):
82
file_path = base_file_path + ".parquet"
83
obj = pd.read_parquet(file_path)
84
context.log.info(f"Loaded DataFrame with {len(obj)} rows from {file_path}")
85
return obj
86
elif os.path.exists(base_file_path + ".pkl"):
87
file_path = base_file_path + ".pkl"
88
with open(file_path, "rb") as f:
89
obj = pickle.load(f)
90
context.log.info(f"Loaded object from {file_path}")
91
return obj
92
else:
93
raise FileNotFoundError(f"No file found at {base_file_path}")
94
95
# Create I/O manager resource
96
@io_manager(config_schema={"base_path": str})
97
def custom_io_manager(context):
98
base_path = context.resource_config.get("base_path", "/tmp/dagster")
99
return CustomIOManager(base_path)
100
```
101
102
### I/O Manager Decorator
103
104
#### `@io_manager` { .api }
105
106
**Module:** `dagster._core.storage.io_manager`
107
**Type:** Function decorator
108
109
Create an I/O manager resource from a function.
110
111
```python
112
from dagster import io_manager, IOManager, Field, String
113
import boto3
114
import pandas as pd
115
116
@io_manager(
117
config_schema={
118
"bucket_name": Field(String, description="S3 bucket name"),
119
"prefix": Field(String, default_value="dagster-storage", description="S3 key prefix")
120
},
121
required_resource_keys={"s3"}
122
)
123
def s3_io_manager(init_context) -> IOManager:
124
"""S3-based I/O manager."""
125
126
class S3IOManager(IOManager):
127
def __init__(self, bucket_name: str, prefix: str, s3_client):
128
self.bucket_name = bucket_name
129
self.prefix = prefix
130
self.s3_client = s3_client
131
132
def handle_output(self, context: OutputContext, obj) -> None:
133
# Generate S3 key
134
if context.asset_key:
135
key_parts = [self.prefix] + list(context.asset_key.path)
136
else:
137
key_parts = [self.prefix, f"{context.step_key}_{context.name}"]
138
139
s3_key = "/".join(key_parts)
140
141
# Store different types appropriately
142
if isinstance(obj, pd.DataFrame):
143
# Use parquet for DataFrames
144
s3_key += ".parquet"
145
buffer = BytesIO()
146
obj.to_parquet(buffer)
147
buffer.seek(0)
148
149
self.s3_client.put_object(
150
Bucket=self.bucket_name,
151
Key=s3_key,
152
Body=buffer.getvalue()
153
)
154
155
context.log.info(f"Stored DataFrame to s3://{self.bucket_name}/{s3_key}")
156
else:
157
# Use pickle for other objects
158
s3_key += ".pkl"
159
buffer = BytesIO()
160
pickle.dump(obj, buffer)
161
buffer.seek(0)
162
163
self.s3_client.put_object(
164
Bucket=self.bucket_name,
165
Key=s3_key,
166
Body=buffer.getvalue()
167
)
168
169
# Add metadata
170
context.add_output_metadata({
171
"s3_bucket": self.bucket_name,
172
"s3_key": s3_key,
173
"s3_uri": f"s3://{self.bucket_name}/{s3_key}"
174
})
175
176
def load_input(self, context: InputContext):
177
# Generate S3 key
178
if context.asset_key:
179
key_parts = [self.prefix] + list(context.asset_key.path)
180
else:
181
upstream_context = context.upstream_output
182
if upstream_context.asset_key:
183
key_parts = [self.prefix] + list(upstream_context.asset_key.path)
184
else:
185
key_parts = [self.prefix, f"{upstream_context.step_key}_{upstream_context.name}"]
186
187
base_s3_key = "/".join(key_parts)
188
189
# Try parquet first, then pickle
190
try:
191
s3_key = base_s3_key + ".parquet"
192
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
193
obj = pd.read_parquet(BytesIO(response['Body'].read()))
194
context.log.info(f"Loaded DataFrame from s3://{self.bucket_name}/{s3_key}")
195
return obj
196
except ClientError:
197
s3_key = base_s3_key + ".pkl"
198
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
199
obj = pickle.load(BytesIO(response['Body'].read()))
200
context.log.info(f"Loaded object from s3://{self.bucket_name}/{s3_key}")
201
return obj
202
203
# Get configuration and resources
204
bucket_name = init_context.resource_config["bucket_name"]
205
prefix = init_context.resource_config["prefix"]
206
s3_client = init_context.resources.s3
207
208
return S3IOManager(bucket_name, prefix, s3_client)
209
```
210
211
### Built-in I/O Managers
212
213
#### `fs_io_manager` { .api }
214
215
**Module:** `dagster._core.storage.fs_io_manager`
216
**Type:** ResourceDefinition
217
218
Built-in filesystem I/O manager for local storage.
219
220
```python
221
from dagster import asset, fs_io_manager, Definitions
222
import pandas as pd
223
224
@asset
225
def sales_data() -> pd.DataFrame:
226
"""Generate sales data."""
227
return pd.DataFrame({
228
"date": pd.date_range("2023-01-01", periods=100),
229
"amount": np.random.randint(100, 1000, 100)
230
})
231
232
@asset
233
def monthly_sales(sales_data: pd.DataFrame) -> pd.DataFrame:
234
"""Aggregate sales by month."""
235
return sales_data.groupby(sales_data["date"].dt.to_period("M")).sum()
236
237
# Use filesystem I/O manager
238
defs = Definitions(
239
assets=[sales_data, monthly_sales],
240
resources={
241
"io_manager": fs_io_manager.configured({
242
"base_dir": "/tmp/dagster-storage" # Storage directory
243
})
244
}
245
)
246
247
# Advanced filesystem configuration
248
filesystem_io = fs_io_manager.configured({
249
"base_dir": "/data/warehouse",
250
"file_manager": local_file_manager # Optional file manager
251
})
252
```
253
254
#### `mem_io_manager` { .api }
255
256
**Module:** `dagster._core.storage.mem_io_manager`
257
**Type:** ResourceDefinition
258
259
Built-in in-memory I/O manager for testing and development.
260
261
```python
262
from dagster import mem_io_manager, materialize_to_memory
263
264
@asset
265
def in_memory_data() -> dict:
266
return {"key": "value", "count": 42}
267
268
@asset
269
def processed_memory_data(in_memory_data: dict) -> dict:
270
return {**in_memory_data, "processed": True}
271
272
# Materialize with memory I/O manager
273
result = materialize_to_memory([in_memory_data, processed_memory_data])
274
275
# Access values directly from memory
276
data = result.output_for_node("in_memory_data")
277
processed = result.output_for_node("processed_memory_data")
278
```
279
280
#### `UPathIOManager` { .api }
281
282
**Module:** `dagster._core.storage.upath_io_manager`
283
**Type:** Class
284
285
Universal path I/O manager supporting multiple storage backends via UPath.
286
287
```python
288
from dagster import UPathIOManager, ConfigurableResource
289
from pydantic import Field
290
291
class CloudStorageIOManager(ConfigurableResource):
292
"""Universal cloud storage I/O manager."""
293
294
base_path: str = Field(description="Base path (supports s3://, gs://, etc.)")
295
296
def create_io_manager(self, context) -> UPathIOManager:
297
return UPathIOManager(base_path=self.base_path)
298
299
# Usage with different storage backends
300
s3_io_manager = CloudStorageIOManager(base_path="s3://my-bucket/dagster-storage")
301
gcs_io_manager = CloudStorageIOManager(base_path="gs://my-bucket/dagster-storage")
302
azure_io_manager = CloudStorageIOManager(base_path="az://my-container/dagster-storage")
303
local_io_manager = CloudStorageIOManager(base_path="/tmp/dagster-storage")
304
305
# Assets automatically work with any backend
306
@asset(io_manager_key="cloud_storage")
307
def cloud_data() -> pd.DataFrame:
308
return pd.DataFrame({"value": [1, 2, 3]})
309
310
defs = Definitions(
311
assets=[cloud_data],
312
resources={
313
"cloud_storage": s3_io_manager # Switch between storage backends easily
314
}
315
)
316
```
317
318
### Custom I/O Manager Patterns
319
320
#### Type-Specific I/O Manager
321
322
```python
323
from dagster import IOManager, DagsterType
324
import pandas as pd
325
import numpy as np
326
import json
327
328
class TypedIOManager(IOManager):
329
"""I/O manager with type-specific storage strategies."""
330
331
def __init__(self, base_path: str):
332
self.base_path = base_path
333
self.type_handlers = {
334
pd.DataFrame: self._handle_dataframe,
335
np.ndarray: self._handle_numpy_array,
336
dict: self._handle_dict,
337
list: self._handle_list
338
}
339
340
def handle_output(self, context: OutputContext, obj) -> None:
341
obj_type = type(obj)
342
343
# Find appropriate handler
344
handler = None
345
for type_cls, type_handler in self.type_handlers.items():
346
if isinstance(obj, type_cls):
347
handler = type_handler
348
break
349
350
if handler:
351
handler(context, obj, mode="write")
352
else:
353
# Fallback to pickle
354
self._handle_pickle(context, obj, mode="write")
355
356
def load_input(self, context: InputContext):
357
# Determine expected type from context
358
if context.dagster_type:
359
expected_type = context.dagster_type.typing_type
360
else:
361
expected_type = None
362
363
# Try type-specific loading
364
for type_cls, type_handler in self.type_handlers.items():
365
if expected_type and issubclass(expected_type, type_cls):
366
return type_handler(context, None, mode="read")
367
368
# Fallback to pickle
369
return self._handle_pickle(context, None, mode="read")
370
371
def _handle_dataframe(self, context, obj, mode):
372
path = self._get_path(context) + ".parquet"
373
if mode == "write":
374
obj.to_parquet(path)
375
context.add_output_metadata({"format": "parquet", "rows": len(obj)})
376
else:
377
return pd.read_parquet(path)
378
379
def _handle_numpy_array(self, context, obj, mode):
380
path = self._get_path(context) + ".npy"
381
if mode == "write":
382
np.save(path, obj)
383
context.add_output_metadata({"format": "numpy", "shape": obj.shape})
384
else:
385
return np.load(path)
386
387
def _handle_dict(self, context, obj, mode):
388
path = self._get_path(context) + ".json"
389
if mode == "write":
390
with open(path, "w") as f:
391
json.dump(obj, f, indent=2)
392
context.add_output_metadata({"format": "json", "keys": list(obj.keys())})
393
else:
394
with open(path, "r") as f:
395
return json.load(f)
396
397
def _get_path(self, context):
398
if context.asset_key:
399
path_parts = context.asset_key.path
400
else:
401
if hasattr(context, 'step_key'):
402
path_parts = [f"{context.step_key}_{context.name}"]
403
else:
404
# Input context
405
upstream = context.upstream_output
406
path_parts = list(upstream.asset_key.path) if upstream.asset_key else [f"{upstream.step_key}_{upstream.name}"]
407
408
return os.path.join(self.base_path, *path_parts)
409
410
@io_manager(config_schema={"base_path": str})
411
def typed_io_manager(context):
412
return TypedIOManager(context.resource_config["base_path"])
413
```
414
415
## Input Manager System
416
417
Input managers provide specialized loading logic for specific inputs, complementing I/O managers.
418
419
### InputManager Interface
420
421
#### `InputManager` { .api }
422
423
**Module:** `dagster._core.storage.input_manager`
424
**Type:** Abstract base class
425
426
Base interface for input managers that handle specialized input loading.
427
428
```python
429
from dagster import InputManager, input_manager, InputContext
430
import pandas as pd
431
import requests
432
433
class APIInputManager(InputManager):
434
"""Input manager for loading data from APIs."""
435
436
def __init__(self, api_base_url: str, api_key: str):
437
self.api_base_url = api_base_url
438
self.api_key = api_key
439
self.session = requests.Session()
440
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
441
442
def load_input(self, context: InputContext) -> pd.DataFrame:
443
"""Load data from API based on context."""
444
# Use asset key to determine endpoint
445
if context.asset_key:
446
endpoint = context.asset_key.path[-1] # Last part of asset key
447
else:
448
endpoint = context.name
449
450
# Build API URL
451
url = f"{self.api_base_url}/{endpoint}"
452
453
# Add partition filtering if needed
454
params = {}
455
if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:
456
# For partitioned inputs, filter by partition
457
partition_keys = list(context.asset_partition_keys)
458
params["partitions"] = ",".join(partition_keys)
459
context.log.info(f"Loading partitions: {partition_keys}")
460
461
# Fetch data
462
context.log.info(f"Fetching data from {url}")
463
response = self.session.get(url, params=params)
464
response.raise_for_status()
465
466
# Convert to DataFrame
467
data = response.json()
468
df = pd.DataFrame(data)
469
470
context.log.info(f"Loaded {len(df)} records from API")
471
return df
472
473
@input_manager(
474
config_schema={
475
"api_base_url": str,
476
"api_key": str
477
}
478
)
479
def api_input_manager(context):
480
"""Create API input manager from configuration."""
481
return APIInputManager(
482
api_base_url=context.resource_config["api_base_url"],
483
api_key=context.resource_config["api_key"]
484
)
485
486
# Usage in assets
487
@asset(input_manager_key="api_loader")
488
def external_users(context, users_api_data: pd.DataFrame) -> pd.DataFrame:
489
"""Asset loading from external API."""
490
# users_api_data is loaded via API input manager
491
return users_api_data.dropna()
492
493
defs = Definitions(
494
assets=[external_users],
495
resources={
496
"api_loader": api_input_manager.configured({
497
"api_base_url": "https://api.example.com/v1",
498
"api_key": "secret-api-key"
499
})
500
}
501
)
502
```
503
504
### Specialized Input Managers
505
506
#### Database Input Manager
507
508
```python
509
class DatabaseInputManager(InputManager):
510
"""Input manager for loading data from databases."""
511
512
def __init__(self, connection_string: str):
513
self.connection_string = connection_string
514
515
def load_input(self, context: InputContext) -> pd.DataFrame:
516
"""Load data from database table."""
517
518
# Derive table name from asset key or input metadata
519
if context.asset_key:
520
table_name = "_".join(context.asset_key.path)
521
elif context.metadata and "table_name" in context.metadata:
522
table_name = context.metadata["table_name"]
523
else:
524
table_name = context.name
525
526
# Build query
527
query = f"SELECT * FROM {table_name}"
528
529
# Add partition filtering for time-partitioned data
530
if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:
531
partition_keys = list(context.asset_partition_keys)
532
# Assuming date-based partitions
533
if len(partition_keys) == 1:
534
query += f" WHERE date = '{partition_keys[0]}'"
535
else:
536
date_list = "','".join(partition_keys)
537
query += f" WHERE date IN ('{date_list}')"
538
539
context.log.info(f"Executing query: {query}")
540
541
# Execute query
542
df = pd.read_sql(query, self.connection_string)
543
544
context.log.info(f"Loaded {len(df)} records from {table_name}")
545
return df
546
547
@input_manager(
548
config_schema={"connection_string": str},
549
required_resource_keys={"database"}
550
)
551
def database_input_manager(context):
552
connection_string = context.resource_config["connection_string"]
553
return DatabaseInputManager(connection_string)
554
```
555
556
## File Management System
557
558
### FileHandle and LocalFileHandle
559
560
#### `FileHandle` { .api }
561
562
**Module:** `dagster._core.storage.file_manager`
563
**Type:** Abstract base class
564
565
Abstract file handle for managing file references.
566
567
```python
568
from dagster import FileHandle, LocalFileHandle, resource
569
570
class CustomFileHandle(FileHandle):
571
"""Custom file handle implementation."""
572
573
def __init__(self, file_path: str, file_manager):
574
self.file_path = file_path
575
self.file_manager = file_manager
576
577
@property
578
def path_desc(self) -> str:
579
"""Description of file path."""
580
return f"custom://{self.file_path}"
581
582
# Usage with file-based assets
583
@asset
584
def file_based_asset(context) -> FileHandle:
585
"""Asset that produces a file handle."""
586
587
# Generate file content
588
data = {"key": "value", "timestamp": pd.Timestamp.now().isoformat()}
589
590
# Get file manager from resources
591
file_manager = context.resources.file_manager
592
593
# Write file and get handle
594
with file_manager.write_data(data) as file_handle:
595
context.log.info(f"Created file: {file_handle.path_desc}")
596
return file_handle
597
598
@asset
599
def process_file(context, file_based_asset: FileHandle) -> dict:
600
"""Asset that processes a file handle."""
601
602
# Read file through handle
603
file_manager = context.resources.file_manager
604
605
with file_manager.read(file_based_asset) as file_obj:
606
data = json.load(file_obj)
607
608
context.log.info(f"Processed file: {file_based_asset.path_desc}")
609
610
return {"processed": True, "original_data": data}
611
```
612
613
#### `local_file_manager` { .api }
614
615
**Module:** `dagster._core.storage.file_manager`
616
**Type:** ResourceDefinition
617
618
Built-in local file manager for file-based operations.
619
620
```python
621
from dagster import local_file_manager, Definitions
622
import tempfile
623
import json
624
625
@resource(config_schema={"base_dir": str})
626
def custom_file_manager(context):
627
"""Custom file manager with specific directory."""
628
base_dir = context.resource_config["base_dir"]
629
os.makedirs(base_dir, exist_ok=True)
630
return LocalFileManager(base_dir)
631
632
@asset
633
def config_file(context) -> FileHandle:
634
"""Asset that creates a configuration file."""
635
636
config_data = {
637
"database_url": "postgresql://localhost/mydb",
638
"api_endpoints": ["https://api1.com", "https://api2.com"],
639
"settings": {"timeout": 30, "retries": 3}
640
}
641
642
# Use file manager to create file
643
file_manager = context.resources.file_manager
644
645
# Create temporary file
646
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
647
json.dump(config_data, f, indent=2)
648
temp_path = f.name
649
650
# Copy to managed location
651
file_handle = file_manager.copy_handle_to_local_temp(temp_path)
652
653
context.log.info(f"Created config file: {file_handle.path_desc}")
654
return file_handle
655
656
defs = Definitions(
657
assets=[config_file, process_file],
658
resources={
659
"file_manager": local_file_manager.configured({
660
"base_dir": "/tmp/dagster-files"
661
})
662
}
663
)
664
```
665
666
## Asset Value Loading
667
668
### AssetValueLoader
669
670
#### `AssetValueLoader` { .api }
671
672
**Module:** `dagster._core.storage.asset_value_loader`
673
**Type:** Class
674
675
Utility for loading materialized asset values outside of execution context.
676
677
```python
678
from dagster import AssetValueLoader, DagsterInstance, materialize
679
680
@asset
681
def upstream_data() -> pd.DataFrame:
682
return pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
683
684
@asset
685
def downstream_result(upstream_data: pd.DataFrame) -> dict:
686
return {"total": upstream_data["value"].sum(), "count": len(upstream_data)}
687
688
# Materialize assets
689
instance = DagsterInstance.ephemeral()
690
result = materialize([upstream_data, downstream_result], instance=instance)
691
692
# Load asset values after materialization
693
loader = AssetValueLoader(instance)
694
695
# Load specific asset value
696
upstream_value = loader.load_asset_value(AssetKey("upstream_data"))
697
print(f"Upstream data: {upstream_value}")
698
699
# Load with partition key (for partitioned assets)
700
partitioned_value = loader.load_asset_value(
701
AssetKey("partitioned_asset"),
702
partition_key="2023-01-01"
703
)
704
705
# Load multiple asset values
706
asset_keys = [AssetKey("upstream_data"), AssetKey("downstream_result")]
707
values = loader.load_asset_values(asset_keys)
708
709
for asset_key, value in values.items():
710
print(f"Asset {asset_key}: {value}")
711
```
712
713
## Advanced Storage Patterns
714
715
### Multi-Backend I/O Manager
716
717
```python
718
from dagster import IOManager, ConfigurableResource, Field
719
from typing import Dict, Any, Literal
720
721
class MultiBackendIOManager(ConfigurableResource, IOManager):
722
"""I/O manager supporting multiple storage backends."""
723
724
default_backend: Literal["local", "s3", "gcs"] = "local"
725
backends: Dict[str, Any] = Field(
726
default_factory=dict,
727
description="Backend-specific configurations"
728
)
729
730
def setup_for_execution(self, context) -> "MultiBackendIOManager":
731
"""Initialize backend-specific managers."""
732
self._backend_managers = {}
733
734
if "local" in self.backends or self.default_backend == "local":
735
self._backend_managers["local"] = LocalIOManager(
736
self.backends.get("local", {}).get("base_dir", "/tmp/dagster")
737
)
738
739
if "s3" in self.backends or self.default_backend == "s3":
740
s3_config = self.backends.get("s3", {})
741
self._backend_managers["s3"] = S3IOManager(
742
bucket_name=s3_config.get("bucket_name"),
743
prefix=s3_config.get("prefix", "dagster")
744
)
745
746
return self
747
748
def handle_output(self, context: OutputContext, obj) -> None:
749
"""Route output to appropriate backend."""
750
# Determine backend from context metadata or default
751
backend = self._get_backend_for_context(context)
752
manager = self._backend_managers[backend]
753
754
context.log.info(f"Storing output using {backend} backend")
755
manager.handle_output(context, obj)
756
757
def load_input(self, context: InputContext):
758
"""Route input loading to appropriate backend."""
759
backend = self._get_backend_for_context(context)
760
manager = self._backend_managers[backend]
761
762
context.log.info(f"Loading input using {backend} backend")
763
return manager.load_input(context)
764
765
def _get_backend_for_context(self, context) -> str:
766
"""Determine appropriate backend for context."""
767
# Check context metadata for backend preference
768
if hasattr(context, 'metadata') and context.metadata:
769
if "storage_backend" in context.metadata:
770
return context.metadata["storage_backend"]
771
772
# Check asset group for backend routing
773
if hasattr(context, 'asset_key') and context.asset_key:
774
# Route based on asset key pattern
775
path_parts = context.asset_key.path
776
if "raw" in path_parts:
777
return "local" # Raw data stored locally
778
elif "processed" in path_parts:
779
return "s3" # Processed data in S3
780
elif "ml_models" in path_parts:
781
return "gcs" # ML models in GCS
782
783
return self.default_backend
784
785
# Usage with routing configuration
786
multi_backend_io = MultiBackendIOManager(
787
default_backend="s3",
788
backends={
789
"local": {"base_dir": "/tmp/dagster"},
790
"s3": {"bucket_name": "my-data-bucket", "prefix": "dagster-storage"},
791
"gcs": {"bucket_name": "my-ml-bucket", "prefix": "models"}
792
}
793
)
794
795
@asset(metadata={"storage_backend": "local"})
796
def raw_data() -> pd.DataFrame:
797
"""Asset stored locally."""
798
return pd.DataFrame({"raw": [1, 2, 3]})
799
800
@asset(metadata={"storage_backend": "s3"})
801
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
802
"""Asset stored in S3."""
803
return raw_data * 2
804
805
@asset(group_name="ml_models") # Routed to GCS via group name
806
def trained_model(processed_data: pd.DataFrame) -> dict:
807
"""Model stored in GCS."""
808
return {"model_type": "linear", "trained": True}
809
```
810
811
### Versioned Storage I/O Manager
812
813
```python
814
class VersionedIOManager(IOManager):
815
"""I/O manager with automatic versioning."""
816
817
def __init__(self, base_path: str, enable_versioning: bool = True):
818
self.base_path = base_path
819
self.enable_versioning = enable_versioning
820
821
def handle_output(self, context: OutputContext, obj) -> None:
822
"""Store output with versioning."""
823
base_file_path = self._get_base_path(context)
824
825
if self.enable_versioning:
826
# Create version based on run ID and timestamp
827
version = f"{context.run_id}_{int(pd.Timestamp.now().timestamp())}"
828
versioned_path = f"{base_file_path}/v_{version}"
829
830
# Store versioned copy
831
self._store_object(versioned_path, obj, context)
832
833
# Create/update "current" symlink or copy
834
current_path = f"{base_file_path}/current"
835
if os.path.exists(current_path):
836
if os.path.islink(current_path):
837
os.unlink(current_path)
838
else:
839
shutil.rmtree(current_path)
840
841
os.symlink(versioned_path, current_path)
842
843
context.add_output_metadata({
844
"version": version,
845
"versioned_path": versioned_path,
846
"current_path": current_path
847
})
848
else:
849
# Direct storage without versioning
850
self._store_object(base_file_path, obj, context)
851
852
def load_input(self, context: InputContext):
853
"""Load input, preferring current version."""
854
base_file_path = self._get_base_path(context)
855
856
# Try to load from "current" first
857
current_path = f"{base_file_path}/current"
858
if os.path.exists(current_path):
859
context.log.info(f"Loading current version from {current_path}")
860
return self._load_object(current_path, context)
861
862
# Fallback to direct path
863
if os.path.exists(base_file_path):
864
return self._load_object(base_file_path, context)
865
866
raise FileNotFoundError(f"No data found at {base_file_path}")
867
868
def _store_object(self, path: str, obj, context) -> None:
869
"""Store object to specific path."""
870
os.makedirs(os.path.dirname(path), exist_ok=True)
871
872
if isinstance(obj, pd.DataFrame):
873
obj.to_parquet(f"{path}.parquet")
874
else:
875
with open(f"{path}.pkl", "wb") as f:
876
pickle.dump(obj, f)
877
878
def _load_object(self, path: str, context):
879
"""Load object from specific path."""
880
if os.path.exists(f"{path}.parquet"):
881
return pd.read_parquet(f"{path}.parquet")
882
elif os.path.exists(f"{path}.pkl"):
883
with open(f"{path}.pkl", "rb") as f:
884
return pickle.load(f)
885
else:
886
raise FileNotFoundError(f"No data file found at {path}")
887
```
888
889
This comprehensive storage and I/O system provides flexible, pluggable storage for all Dagster computations, with built-in support for multiple backends, type-aware serialization, versioning, and metadata tracking. The system scales from simple local development to complex multi-cloud production deployments.
890
891
For configuration of I/O managers and resources, see [Configuration System](./configuration.md). For contexts that use I/O managers, see [Execution and Contexts](./execution-contexts.md).