A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
This document covers Dagster's storage and I/O management system, including I/O managers, input managers, file management, and built-in storage backends. The I/O system provides pluggable storage for asset and operation outputs with automatic serialization, deserialization, and metadata tracking.
I/O managers handle the storage and retrieval of asset and operation outputs, providing a clean abstraction over different storage backends.
IOManager { .api }Module: dagster._core.storage.io_manager
Type: Abstract base class
Base interface for I/O managers that handle asset and operation output storage.
from dagster import IOManager, InputContext, OutputContext
import pandas as pd
import pickle
import os
class CustomIOManager(IOManager):
"""Custom I/O manager implementation."""
def __init__(self, base_path: str = "/tmp/dagster"):
self.base_path = base_path
os.makedirs(base_path, exist_ok=True)
def handle_output(self, context: OutputContext, obj) -> None:
"""Store output object."""
# Generate file path from context
if context.asset_key:
# For assets, use asset key path
path_parts = context.asset_key.path
file_path = os.path.join(self.base_path, *path_parts)
else:
# For ops, use step key and output name
file_path = os.path.join(self.base_path, f"{context.step_key}_{context.name}")
# Create directory if needed
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Handle different object types
if isinstance(obj, pd.DataFrame):
file_path += ".parquet"
obj.to_parquet(file_path)
context.log.info(f"Stored DataFrame with {len(obj)} rows to {file_path}")
else:
file_path += ".pkl"
with open(file_path, "wb") as f:
pickle.dump(obj, f)
context.log.info(f"Stored object to {file_path}")
# Add metadata about storage
context.add_output_metadata({
"file_path": file_path,
"file_size_bytes": os.path.getsize(file_path),
"storage_type": "local_filesystem"
})
def load_input(self, context: InputContext):
"""Load input object."""
# Generate file path from context
if context.asset_key:
path_parts = context.asset_key.path
base_file_path = os.path.join(self.base_path, *path_parts)
else:
# For op outputs, need to determine path from upstream
upstream_context = context.upstream_output
if upstream_context.asset_key:
path_parts = upstream_context.asset_key.path
base_file_path = os.path.join(self.base_path, *path_parts)
else:
base_file_path = os.path.join(
self.base_path,
f"{upstream_context.step_key}_{upstream_context.name}"
)
# Try different file extensions
if os.path.exists(base_file_path + ".parquet"):
file_path = base_file_path + ".parquet"
obj = pd.read_parquet(file_path)
context.log.info(f"Loaded DataFrame with {len(obj)} rows from {file_path}")
return obj
elif os.path.exists(base_file_path + ".pkl"):
file_path = base_file_path + ".pkl"
with open(file_path, "rb") as f:
obj = pickle.load(f)
context.log.info(f"Loaded object from {file_path}")
return obj
else:
raise FileNotFoundError(f"No file found at {base_file_path}")
# Create I/O manager resource
@io_manager(config_schema={"base_path": str})
def custom_io_manager(context):
base_path = context.resource_config.get("base_path", "/tmp/dagster")
return CustomIOManager(base_path)@io_manager { .api }Module: dagster._core.storage.io_manager
Type: Function decorator
Create an I/O manager resource from a function.
from dagster import io_manager, IOManager, Field, String
import boto3
import pandas as pd
@io_manager(
config_schema={
"bucket_name": Field(String, description="S3 bucket name"),
"prefix": Field(String, default_value="dagster-storage", description="S3 key prefix")
},
required_resource_keys={"s3"}
)
def s3_io_manager(init_context) -> IOManager:
"""S3-based I/O manager."""
class S3IOManager(IOManager):
def __init__(self, bucket_name: str, prefix: str, s3_client):
self.bucket_name = bucket_name
self.prefix = prefix
self.s3_client = s3_client
def handle_output(self, context: OutputContext, obj) -> None:
# Generate S3 key
if context.asset_key:
key_parts = [self.prefix] + list(context.asset_key.path)
else:
key_parts = [self.prefix, f"{context.step_key}_{context.name}"]
s3_key = "/".join(key_parts)
# Store different types appropriately
if isinstance(obj, pd.DataFrame):
# Use parquet for DataFrames
s3_key += ".parquet"
buffer = BytesIO()
obj.to_parquet(buffer)
buffer.seek(0)
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=buffer.getvalue()
)
context.log.info(f"Stored DataFrame to s3://{self.bucket_name}/{s3_key}")
else:
# Use pickle for other objects
s3_key += ".pkl"
buffer = BytesIO()
pickle.dump(obj, buffer)
buffer.seek(0)
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=buffer.getvalue()
)
# Add metadata
context.add_output_metadata({
"s3_bucket": self.bucket_name,
"s3_key": s3_key,
"s3_uri": f"s3://{self.bucket_name}/{s3_key}"
})
def load_input(self, context: InputContext):
# Generate S3 key
if context.asset_key:
key_parts = [self.prefix] + list(context.asset_key.path)
else:
upstream_context = context.upstream_output
if upstream_context.asset_key:
key_parts = [self.prefix] + list(upstream_context.asset_key.path)
else:
key_parts = [self.prefix, f"{upstream_context.step_key}_{upstream_context.name}"]
base_s3_key = "/".join(key_parts)
# Try parquet first, then pickle
try:
s3_key = base_s3_key + ".parquet"
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
obj = pd.read_parquet(BytesIO(response['Body'].read()))
context.log.info(f"Loaded DataFrame from s3://{self.bucket_name}/{s3_key}")
return obj
except ClientError:
s3_key = base_s3_key + ".pkl"
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
obj = pickle.load(BytesIO(response['Body'].read()))
context.log.info(f"Loaded object from s3://{self.bucket_name}/{s3_key}")
return obj
# Get configuration and resources
bucket_name = init_context.resource_config["bucket_name"]
prefix = init_context.resource_config["prefix"]
s3_client = init_context.resources.s3
return S3IOManager(bucket_name, prefix, s3_client)fs_io_manager { .api }Module: dagster._core.storage.fs_io_manager
Type: ResourceDefinition
Built-in filesystem I/O manager for local storage.
from dagster import asset, fs_io_manager, Definitions
import pandas as pd
@asset
def sales_data() -> pd.DataFrame:
"""Generate sales data."""
return pd.DataFrame({
"date": pd.date_range("2023-01-01", periods=100),
"amount": np.random.randint(100, 1000, 100)
})
@asset
def monthly_sales(sales_data: pd.DataFrame) -> pd.DataFrame:
"""Aggregate sales by month."""
return sales_data.groupby(sales_data["date"].dt.to_period("M")).sum()
# Use filesystem I/O manager
defs = Definitions(
assets=[sales_data, monthly_sales],
resources={
"io_manager": fs_io_manager.configured({
"base_dir": "/tmp/dagster-storage" # Storage directory
})
}
)
# Advanced filesystem configuration
filesystem_io = fs_io_manager.configured({
"base_dir": "/data/warehouse",
"file_manager": local_file_manager # Optional file manager
})mem_io_manager { .api }Module: dagster._core.storage.mem_io_manager
Type: ResourceDefinition
Built-in in-memory I/O manager for testing and development.
from dagster import mem_io_manager, materialize_to_memory
@asset
def in_memory_data() -> dict:
return {"key": "value", "count": 42}
@asset
def processed_memory_data(in_memory_data: dict) -> dict:
return {**in_memory_data, "processed": True}
# Materialize with memory I/O manager
result = materialize_to_memory([in_memory_data, processed_memory_data])
# Access values directly from memory
data = result.output_for_node("in_memory_data")
processed = result.output_for_node("processed_memory_data")UPathIOManager { .api }Module: dagster._core.storage.upath_io_manager
Type: Class
Universal path I/O manager supporting multiple storage backends via UPath.
from dagster import UPathIOManager, ConfigurableResource
from pydantic import Field
class CloudStorageIOManager(ConfigurableResource):
"""Universal cloud storage I/O manager."""
base_path: str = Field(description="Base path (supports s3://, gs://, etc.)")
def create_io_manager(self, context) -> UPathIOManager:
return UPathIOManager(base_path=self.base_path)
# Usage with different storage backends
s3_io_manager = CloudStorageIOManager(base_path="s3://my-bucket/dagster-storage")
gcs_io_manager = CloudStorageIOManager(base_path="gs://my-bucket/dagster-storage")
azure_io_manager = CloudStorageIOManager(base_path="az://my-container/dagster-storage")
local_io_manager = CloudStorageIOManager(base_path="/tmp/dagster-storage")
# Assets automatically work with any backend
@asset(io_manager_key="cloud_storage")
def cloud_data() -> pd.DataFrame:
return pd.DataFrame({"value": [1, 2, 3]})
defs = Definitions(
assets=[cloud_data],
resources={
"cloud_storage": s3_io_manager # Switch between storage backends easily
}
)from dagster import IOManager, DagsterType
import pandas as pd
import numpy as np
import json
class TypedIOManager(IOManager):
"""I/O manager with type-specific storage strategies."""
def __init__(self, base_path: str):
self.base_path = base_path
self.type_handlers = {
pd.DataFrame: self._handle_dataframe,
np.ndarray: self._handle_numpy_array,
dict: self._handle_dict,
list: self._handle_list
}
def handle_output(self, context: OutputContext, obj) -> None:
obj_type = type(obj)
# Find appropriate handler
handler = None
for type_cls, type_handler in self.type_handlers.items():
if isinstance(obj, type_cls):
handler = type_handler
break
if handler:
handler(context, obj, mode="write")
else:
# Fallback to pickle
self._handle_pickle(context, obj, mode="write")
def load_input(self, context: InputContext):
# Determine expected type from context
if context.dagster_type:
expected_type = context.dagster_type.typing_type
else:
expected_type = None
# Try type-specific loading
for type_cls, type_handler in self.type_handlers.items():
if expected_type and issubclass(expected_type, type_cls):
return type_handler(context, None, mode="read")
# Fallback to pickle
return self._handle_pickle(context, None, mode="read")
def _handle_dataframe(self, context, obj, mode):
path = self._get_path(context) + ".parquet"
if mode == "write":
obj.to_parquet(path)
context.add_output_metadata({"format": "parquet", "rows": len(obj)})
else:
return pd.read_parquet(path)
def _handle_numpy_array(self, context, obj, mode):
path = self._get_path(context) + ".npy"
if mode == "write":
np.save(path, obj)
context.add_output_metadata({"format": "numpy", "shape": obj.shape})
else:
return np.load(path)
def _handle_dict(self, context, obj, mode):
path = self._get_path(context) + ".json"
if mode == "write":
with open(path, "w") as f:
json.dump(obj, f, indent=2)
context.add_output_metadata({"format": "json", "keys": list(obj.keys())})
else:
with open(path, "r") as f:
return json.load(f)
def _get_path(self, context):
if context.asset_key:
path_parts = context.asset_key.path
else:
if hasattr(context, 'step_key'):
path_parts = [f"{context.step_key}_{context.name}"]
else:
# Input context
upstream = context.upstream_output
path_parts = list(upstream.asset_key.path) if upstream.asset_key else [f"{upstream.step_key}_{upstream.name}"]
return os.path.join(self.base_path, *path_parts)
@io_manager(config_schema={"base_path": str})
def typed_io_manager(context):
return TypedIOManager(context.resource_config["base_path"])Input managers provide specialized loading logic for specific inputs, complementing I/O managers.
InputManager { .api }Module: dagster._core.storage.input_manager
Type: Abstract base class
Base interface for input managers that handle specialized input loading.
from dagster import InputManager, input_manager, InputContext
import pandas as pd
import requests
class APIInputManager(InputManager):
"""Input manager for loading data from APIs."""
def __init__(self, api_base_url: str, api_key: str):
self.api_base_url = api_base_url
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
def load_input(self, context: InputContext) -> pd.DataFrame:
"""Load data from API based on context."""
# Use asset key to determine endpoint
if context.asset_key:
endpoint = context.asset_key.path[-1] # Last part of asset key
else:
endpoint = context.name
# Build API URL
url = f"{self.api_base_url}/{endpoint}"
# Add partition filtering if needed
params = {}
if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:
# For partitioned inputs, filter by partition
partition_keys = list(context.asset_partition_keys)
params["partitions"] = ",".join(partition_keys)
context.log.info(f"Loading partitions: {partition_keys}")
# Fetch data
context.log.info(f"Fetching data from {url}")
response = self.session.get(url, params=params)
response.raise_for_status()
# Convert to DataFrame
data = response.json()
df = pd.DataFrame(data)
context.log.info(f"Loaded {len(df)} records from API")
return df
@input_manager(
config_schema={
"api_base_url": str,
"api_key": str
}
)
def api_input_manager(context):
"""Create API input manager from configuration."""
return APIInputManager(
api_base_url=context.resource_config["api_base_url"],
api_key=context.resource_config["api_key"]
)
# Usage in assets
@asset(input_manager_key="api_loader")
def external_users(context, users_api_data: pd.DataFrame) -> pd.DataFrame:
"""Asset loading from external API."""
# users_api_data is loaded via API input manager
return users_api_data.dropna()
defs = Definitions(
assets=[external_users],
resources={
"api_loader": api_input_manager.configured({
"api_base_url": "https://api.example.com/v1",
"api_key": "secret-api-key"
})
}
)class DatabaseInputManager(InputManager):
"""Input manager for loading data from databases."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
def load_input(self, context: InputContext) -> pd.DataFrame:
"""Load data from database table."""
# Derive table name from asset key or input metadata
if context.asset_key:
table_name = "_".join(context.asset_key.path)
elif context.metadata and "table_name" in context.metadata:
table_name = context.metadata["table_name"]
else:
table_name = context.name
# Build query
query = f"SELECT * FROM {table_name}"
# Add partition filtering for time-partitioned data
if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:
partition_keys = list(context.asset_partition_keys)
# Assuming date-based partitions
if len(partition_keys) == 1:
query += f" WHERE date = '{partition_keys[0]}'"
else:
date_list = "','".join(partition_keys)
query += f" WHERE date IN ('{date_list}')"
context.log.info(f"Executing query: {query}")
# Execute query
df = pd.read_sql(query, self.connection_string)
context.log.info(f"Loaded {len(df)} records from {table_name}")
return df
@input_manager(
config_schema={"connection_string": str},
required_resource_keys={"database"}
)
def database_input_manager(context):
connection_string = context.resource_config["connection_string"]
return DatabaseInputManager(connection_string)FileHandle { .api }Module: dagster._core.storage.file_manager
Type: Abstract base class
Abstract file handle for managing file references.
from dagster import FileHandle, LocalFileHandle, resource
class CustomFileHandle(FileHandle):
"""Custom file handle implementation."""
def __init__(self, file_path: str, file_manager):
self.file_path = file_path
self.file_manager = file_manager
@property
def path_desc(self) -> str:
"""Description of file path."""
return f"custom://{self.file_path}"
# Usage with file-based assets
@asset
def file_based_asset(context) -> FileHandle:
"""Asset that produces a file handle."""
# Generate file content
data = {"key": "value", "timestamp": pd.Timestamp.now().isoformat()}
# Get file manager from resources
file_manager = context.resources.file_manager
# Write file and get handle
with file_manager.write_data(data) as file_handle:
context.log.info(f"Created file: {file_handle.path_desc}")
return file_handle
@asset
def process_file(context, file_based_asset: FileHandle) -> dict:
"""Asset that processes a file handle."""
# Read file through handle
file_manager = context.resources.file_manager
with file_manager.read(file_based_asset) as file_obj:
data = json.load(file_obj)
context.log.info(f"Processed file: {file_based_asset.path_desc}")
return {"processed": True, "original_data": data}local_file_manager { .api }Module: dagster._core.storage.file_manager
Type: ResourceDefinition
Built-in local file manager for file-based operations.
from dagster import local_file_manager, Definitions
import tempfile
import json
@resource(config_schema={"base_dir": str})
def custom_file_manager(context):
"""Custom file manager with specific directory."""
base_dir = context.resource_config["base_dir"]
os.makedirs(base_dir, exist_ok=True)
return LocalFileManager(base_dir)
@asset
def config_file(context) -> FileHandle:
"""Asset that creates a configuration file."""
config_data = {
"database_url": "postgresql://localhost/mydb",
"api_endpoints": ["https://api1.com", "https://api2.com"],
"settings": {"timeout": 30, "retries": 3}
}
# Use file manager to create file
file_manager = context.resources.file_manager
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(config_data, f, indent=2)
temp_path = f.name
# Copy to managed location
file_handle = file_manager.copy_handle_to_local_temp(temp_path)
context.log.info(f"Created config file: {file_handle.path_desc}")
return file_handle
defs = Definitions(
assets=[config_file, process_file],
resources={
"file_manager": local_file_manager.configured({
"base_dir": "/tmp/dagster-files"
})
}
)AssetValueLoader { .api }Module: dagster._core.storage.asset_value_loader
Type: Class
Utility for loading materialized asset values outside of execution context.
from dagster import AssetValueLoader, DagsterInstance, materialize
@asset
def upstream_data() -> pd.DataFrame:
return pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
@asset
def downstream_result(upstream_data: pd.DataFrame) -> dict:
return {"total": upstream_data["value"].sum(), "count": len(upstream_data)}
# Materialize assets
instance = DagsterInstance.ephemeral()
result = materialize([upstream_data, downstream_result], instance=instance)
# Load asset values after materialization
loader = AssetValueLoader(instance)
# Load specific asset value
upstream_value = loader.load_asset_value(AssetKey("upstream_data"))
print(f"Upstream data: {upstream_value}")
# Load with partition key (for partitioned assets)
partitioned_value = loader.load_asset_value(
AssetKey("partitioned_asset"),
partition_key="2023-01-01"
)
# Load multiple asset values
asset_keys = [AssetKey("upstream_data"), AssetKey("downstream_result")]
values = loader.load_asset_values(asset_keys)
for asset_key, value in values.items():
print(f"Asset {asset_key}: {value}")from dagster import IOManager, ConfigurableResource, Field
from typing import Dict, Any, Literal
class MultiBackendIOManager(ConfigurableResource, IOManager):
"""I/O manager supporting multiple storage backends."""
default_backend: Literal["local", "s3", "gcs"] = "local"
backends: Dict[str, Any] = Field(
default_factory=dict,
description="Backend-specific configurations"
)
def setup_for_execution(self, context) -> "MultiBackendIOManager":
"""Initialize backend-specific managers."""
self._backend_managers = {}
if "local" in self.backends or self.default_backend == "local":
self._backend_managers["local"] = LocalIOManager(
self.backends.get("local", {}).get("base_dir", "/tmp/dagster")
)
if "s3" in self.backends or self.default_backend == "s3":
s3_config = self.backends.get("s3", {})
self._backend_managers["s3"] = S3IOManager(
bucket_name=s3_config.get("bucket_name"),
prefix=s3_config.get("prefix", "dagster")
)
return self
def handle_output(self, context: OutputContext, obj) -> None:
"""Route output to appropriate backend."""
# Determine backend from context metadata or default
backend = self._get_backend_for_context(context)
manager = self._backend_managers[backend]
context.log.info(f"Storing output using {backend} backend")
manager.handle_output(context, obj)
def load_input(self, context: InputContext):
"""Route input loading to appropriate backend."""
backend = self._get_backend_for_context(context)
manager = self._backend_managers[backend]
context.log.info(f"Loading input using {backend} backend")
return manager.load_input(context)
def _get_backend_for_context(self, context) -> str:
"""Determine appropriate backend for context."""
# Check context metadata for backend preference
if hasattr(context, 'metadata') and context.metadata:
if "storage_backend" in context.metadata:
return context.metadata["storage_backend"]
# Check asset group for backend routing
if hasattr(context, 'asset_key') and context.asset_key:
# Route based on asset key pattern
path_parts = context.asset_key.path
if "raw" in path_parts:
return "local" # Raw data stored locally
elif "processed" in path_parts:
return "s3" # Processed data in S3
elif "ml_models" in path_parts:
return "gcs" # ML models in GCS
return self.default_backend
# Usage with routing configuration
multi_backend_io = MultiBackendIOManager(
default_backend="s3",
backends={
"local": {"base_dir": "/tmp/dagster"},
"s3": {"bucket_name": "my-data-bucket", "prefix": "dagster-storage"},
"gcs": {"bucket_name": "my-ml-bucket", "prefix": "models"}
}
)
@asset(metadata={"storage_backend": "local"})
def raw_data() -> pd.DataFrame:
"""Asset stored locally."""
return pd.DataFrame({"raw": [1, 2, 3]})
@asset(metadata={"storage_backend": "s3"})
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Asset stored in S3."""
return raw_data * 2
@asset(group_name="ml_models") # Routed to GCS via group name
def trained_model(processed_data: pd.DataFrame) -> dict:
"""Model stored in GCS."""
return {"model_type": "linear", "trained": True}class VersionedIOManager(IOManager):
"""I/O manager with automatic versioning."""
def __init__(self, base_path: str, enable_versioning: bool = True):
self.base_path = base_path
self.enable_versioning = enable_versioning
def handle_output(self, context: OutputContext, obj) -> None:
"""Store output with versioning."""
base_file_path = self._get_base_path(context)
if self.enable_versioning:
# Create version based on run ID and timestamp
version = f"{context.run_id}_{int(pd.Timestamp.now().timestamp())}"
versioned_path = f"{base_file_path}/v_{version}"
# Store versioned copy
self._store_object(versioned_path, obj, context)
# Create/update "current" symlink or copy
current_path = f"{base_file_path}/current"
if os.path.exists(current_path):
if os.path.islink(current_path):
os.unlink(current_path)
else:
shutil.rmtree(current_path)
os.symlink(versioned_path, current_path)
context.add_output_metadata({
"version": version,
"versioned_path": versioned_path,
"current_path": current_path
})
else:
# Direct storage without versioning
self._store_object(base_file_path, obj, context)
def load_input(self, context: InputContext):
"""Load input, preferring current version."""
base_file_path = self._get_base_path(context)
# Try to load from "current" first
current_path = f"{base_file_path}/current"
if os.path.exists(current_path):
context.log.info(f"Loading current version from {current_path}")
return self._load_object(current_path, context)
# Fallback to direct path
if os.path.exists(base_file_path):
return self._load_object(base_file_path, context)
raise FileNotFoundError(f"No data found at {base_file_path}")
def _store_object(self, path: str, obj, context) -> None:
"""Store object to specific path."""
os.makedirs(os.path.dirname(path), exist_ok=True)
if isinstance(obj, pd.DataFrame):
obj.to_parquet(f"{path}.parquet")
else:
with open(f"{path}.pkl", "wb") as f:
pickle.dump(obj, f)
def _load_object(self, path: str, context):
"""Load object from specific path."""
if os.path.exists(f"{path}.parquet"):
return pd.read_parquet(f"{path}.parquet")
elif os.path.exists(f"{path}.pkl"):
with open(f"{path}.pkl", "rb") as f:
return pickle.load(f)
else:
raise FileNotFoundError(f"No data file found at {path}")This comprehensive storage and I/O system provides flexible, pluggable storage for all Dagster computations, with built-in support for multiple backends, type-aware serialization, versioning, and metadata tracking. The system scales from simple local development to complex multi-cloud production deployments.
For configuration of I/O managers and resources, see Configuration System. For contexts that use I/O managers, see Execution and Contexts.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster