CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

storage-io.mddocs/

Storage and I/O Management

This document covers Dagster's storage and I/O management system, including I/O managers, input managers, file management, and built-in storage backends. The I/O system provides pluggable storage for asset and operation outputs with automatic serialization, deserialization, and metadata tracking.

I/O Manager System

I/O managers handle the storage and retrieval of asset and operation outputs, providing a clean abstraction over different storage backends.

IOManager Interface

IOManager { .api }

Module: dagster._core.storage.io_manager
Type: Abstract base class

Base interface for I/O managers that handle asset and operation output storage.

from dagster import IOManager, InputContext, OutputContext
import pandas as pd
import pickle
import os

class CustomIOManager(IOManager):
    """Custom I/O manager implementation."""
    
    def __init__(self, base_path: str = "/tmp/dagster"):
        self.base_path = base_path
        os.makedirs(base_path, exist_ok=True)
    
    def handle_output(self, context: OutputContext, obj) -> None:
        """Store output object."""
        # Generate file path from context
        if context.asset_key:
            # For assets, use asset key path
            path_parts = context.asset_key.path
            file_path = os.path.join(self.base_path, *path_parts)
        else:
            # For ops, use step key and output name
            file_path = os.path.join(self.base_path, f"{context.step_key}_{context.name}")
        
        # Create directory if needed
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        
        # Handle different object types
        if isinstance(obj, pd.DataFrame):
            file_path += ".parquet"
            obj.to_parquet(file_path)
            context.log.info(f"Stored DataFrame with {len(obj)} rows to {file_path}")
        else:
            file_path += ".pkl"
            with open(file_path, "wb") as f:
                pickle.dump(obj, f)
            context.log.info(f"Stored object to {file_path}")
        
        # Add metadata about storage
        context.add_output_metadata({
            "file_path": file_path,
            "file_size_bytes": os.path.getsize(file_path),
            "storage_type": "local_filesystem"
        })
    
    def load_input(self, context: InputContext):
        """Load input object."""
        # Generate file path from context
        if context.asset_key:
            path_parts = context.asset_key.path
            base_file_path = os.path.join(self.base_path, *path_parts)
        else:
            # For op outputs, need to determine path from upstream
            upstream_context = context.upstream_output
            if upstream_context.asset_key:
                path_parts = upstream_context.asset_key.path
                base_file_path = os.path.join(self.base_path, *path_parts)
            else:
                base_file_path = os.path.join(
                    self.base_path, 
                    f"{upstream_context.step_key}_{upstream_context.name}"
                )
        
        # Try different file extensions
        if os.path.exists(base_file_path + ".parquet"):
            file_path = base_file_path + ".parquet"
            obj = pd.read_parquet(file_path)
            context.log.info(f"Loaded DataFrame with {len(obj)} rows from {file_path}")
            return obj
        elif os.path.exists(base_file_path + ".pkl"):
            file_path = base_file_path + ".pkl"
            with open(file_path, "rb") as f:
                obj = pickle.load(f)
            context.log.info(f"Loaded object from {file_path}")
            return obj
        else:
            raise FileNotFoundError(f"No file found at {base_file_path}")

# Create I/O manager resource
@io_manager(config_schema={"base_path": str})
def custom_io_manager(context):
    base_path = context.resource_config.get("base_path", "/tmp/dagster")
    return CustomIOManager(base_path)

I/O Manager Decorator

@io_manager { .api }

Module: dagster._core.storage.io_manager
Type: Function decorator

Create an I/O manager resource from a function.

from dagster import io_manager, IOManager, Field, String
import boto3
import pandas as pd

@io_manager(
    config_schema={
        "bucket_name": Field(String, description="S3 bucket name"),
        "prefix": Field(String, default_value="dagster-storage", description="S3 key prefix")
    },
    required_resource_keys={"s3"}
)
def s3_io_manager(init_context) -> IOManager:
    """S3-based I/O manager."""
    
    class S3IOManager(IOManager):
        def __init__(self, bucket_name: str, prefix: str, s3_client):
            self.bucket_name = bucket_name
            self.prefix = prefix
            self.s3_client = s3_client
        
        def handle_output(self, context: OutputContext, obj) -> None:
            # Generate S3 key
            if context.asset_key:
                key_parts = [self.prefix] + list(context.asset_key.path)
            else:
                key_parts = [self.prefix, f"{context.step_key}_{context.name}"]
            
            s3_key = "/".join(key_parts)
            
            # Store different types appropriately
            if isinstance(obj, pd.DataFrame):
                # Use parquet for DataFrames
                s3_key += ".parquet"
                buffer = BytesIO()
                obj.to_parquet(buffer)
                buffer.seek(0)
                
                self.s3_client.put_object(
                    Bucket=self.bucket_name,
                    Key=s3_key,
                    Body=buffer.getvalue()
                )
                
                context.log.info(f"Stored DataFrame to s3://{self.bucket_name}/{s3_key}")
            else:
                # Use pickle for other objects
                s3_key += ".pkl"
                buffer = BytesIO()
                pickle.dump(obj, buffer)
                buffer.seek(0)
                
                self.s3_client.put_object(
                    Bucket=self.bucket_name,
                    Key=s3_key,
                    Body=buffer.getvalue()
                )
            
            # Add metadata
            context.add_output_metadata({
                "s3_bucket": self.bucket_name,
                "s3_key": s3_key,
                "s3_uri": f"s3://{self.bucket_name}/{s3_key}"
            })
        
        def load_input(self, context: InputContext):
            # Generate S3 key
            if context.asset_key:
                key_parts = [self.prefix] + list(context.asset_key.path)
            else:
                upstream_context = context.upstream_output
                if upstream_context.asset_key:
                    key_parts = [self.prefix] + list(upstream_context.asset_key.path)
                else:
                    key_parts = [self.prefix, f"{upstream_context.step_key}_{upstream_context.name}"]
            
            base_s3_key = "/".join(key_parts)
            
            # Try parquet first, then pickle
            try:
                s3_key = base_s3_key + ".parquet"
                response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
                obj = pd.read_parquet(BytesIO(response['Body'].read()))
                context.log.info(f"Loaded DataFrame from s3://{self.bucket_name}/{s3_key}")
                return obj
            except ClientError:
                s3_key = base_s3_key + ".pkl"
                response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
                obj = pickle.load(BytesIO(response['Body'].read()))
                context.log.info(f"Loaded object from s3://{self.bucket_name}/{s3_key}")
                return obj
    
    # Get configuration and resources
    bucket_name = init_context.resource_config["bucket_name"]
    prefix = init_context.resource_config["prefix"]
    s3_client = init_context.resources.s3
    
    return S3IOManager(bucket_name, prefix, s3_client)

Built-in I/O Managers

fs_io_manager { .api }

Module: dagster._core.storage.fs_io_manager
Type: ResourceDefinition

Built-in filesystem I/O manager for local storage.

from dagster import asset, fs_io_manager, Definitions
import pandas as pd

@asset
def sales_data() -> pd.DataFrame:
    """Generate sales data."""
    return pd.DataFrame({
        "date": pd.date_range("2023-01-01", periods=100),
        "amount": np.random.randint(100, 1000, 100)
    })

@asset
def monthly_sales(sales_data: pd.DataFrame) -> pd.DataFrame:
    """Aggregate sales by month."""
    return sales_data.groupby(sales_data["date"].dt.to_period("M")).sum()

# Use filesystem I/O manager
defs = Definitions(
    assets=[sales_data, monthly_sales],
    resources={
        "io_manager": fs_io_manager.configured({
            "base_dir": "/tmp/dagster-storage"  # Storage directory
        })
    }
)

# Advanced filesystem configuration
filesystem_io = fs_io_manager.configured({
    "base_dir": "/data/warehouse",
    "file_manager": local_file_manager  # Optional file manager
})

mem_io_manager { .api }

Module: dagster._core.storage.mem_io_manager
Type: ResourceDefinition

Built-in in-memory I/O manager for testing and development.

from dagster import mem_io_manager, materialize_to_memory

@asset
def in_memory_data() -> dict:
    return {"key": "value", "count": 42}

@asset
def processed_memory_data(in_memory_data: dict) -> dict:
    return {**in_memory_data, "processed": True}

# Materialize with memory I/O manager
result = materialize_to_memory([in_memory_data, processed_memory_data])

# Access values directly from memory
data = result.output_for_node("in_memory_data")
processed = result.output_for_node("processed_memory_data")

UPathIOManager { .api }

Module: dagster._core.storage.upath_io_manager
Type: Class

Universal path I/O manager supporting multiple storage backends via UPath.

from dagster import UPathIOManager, ConfigurableResource
from pydantic import Field

class CloudStorageIOManager(ConfigurableResource):
    """Universal cloud storage I/O manager."""
    
    base_path: str = Field(description="Base path (supports s3://, gs://, etc.)")
    
    def create_io_manager(self, context) -> UPathIOManager:
        return UPathIOManager(base_path=self.base_path)

# Usage with different storage backends
s3_io_manager = CloudStorageIOManager(base_path="s3://my-bucket/dagster-storage")
gcs_io_manager = CloudStorageIOManager(base_path="gs://my-bucket/dagster-storage")
azure_io_manager = CloudStorageIOManager(base_path="az://my-container/dagster-storage")
local_io_manager = CloudStorageIOManager(base_path="/tmp/dagster-storage")

# Assets automatically work with any backend
@asset(io_manager_key="cloud_storage")
def cloud_data() -> pd.DataFrame:
    return pd.DataFrame({"value": [1, 2, 3]})

defs = Definitions(
    assets=[cloud_data],
    resources={
        "cloud_storage": s3_io_manager  # Switch between storage backends easily
    }
)

Custom I/O Manager Patterns

Type-Specific I/O Manager

from dagster import IOManager, DagsterType
import pandas as pd
import numpy as np
import json

class TypedIOManager(IOManager):
    """I/O manager with type-specific storage strategies."""
    
    def __init__(self, base_path: str):
        self.base_path = base_path
        self.type_handlers = {
            pd.DataFrame: self._handle_dataframe,
            np.ndarray: self._handle_numpy_array,
            dict: self._handle_dict,
            list: self._handle_list
        }
    
    def handle_output(self, context: OutputContext, obj) -> None:
        obj_type = type(obj)
        
        # Find appropriate handler
        handler = None
        for type_cls, type_handler in self.type_handlers.items():
            if isinstance(obj, type_cls):
                handler = type_handler
                break
        
        if handler:
            handler(context, obj, mode="write")
        else:
            # Fallback to pickle
            self._handle_pickle(context, obj, mode="write")
    
    def load_input(self, context: InputContext):
        # Determine expected type from context
        if context.dagster_type:
            expected_type = context.dagster_type.typing_type
        else:
            expected_type = None
        
        # Try type-specific loading
        for type_cls, type_handler in self.type_handlers.items():
            if expected_type and issubclass(expected_type, type_cls):
                return type_handler(context, None, mode="read")
        
        # Fallback to pickle
        return self._handle_pickle(context, None, mode="read")
    
    def _handle_dataframe(self, context, obj, mode):
        path = self._get_path(context) + ".parquet"
        if mode == "write":
            obj.to_parquet(path)
            context.add_output_metadata({"format": "parquet", "rows": len(obj)})
        else:
            return pd.read_parquet(path)
    
    def _handle_numpy_array(self, context, obj, mode):
        path = self._get_path(context) + ".npy"
        if mode == "write":
            np.save(path, obj)
            context.add_output_metadata({"format": "numpy", "shape": obj.shape})
        else:
            return np.load(path)
    
    def _handle_dict(self, context, obj, mode):
        path = self._get_path(context) + ".json"
        if mode == "write":
            with open(path, "w") as f:
                json.dump(obj, f, indent=2)
            context.add_output_metadata({"format": "json", "keys": list(obj.keys())})
        else:
            with open(path, "r") as f:
                return json.load(f)
    
    def _get_path(self, context):
        if context.asset_key:
            path_parts = context.asset_key.path
        else:
            if hasattr(context, 'step_key'):
                path_parts = [f"{context.step_key}_{context.name}"]
            else:
                # Input context
                upstream = context.upstream_output
                path_parts = list(upstream.asset_key.path) if upstream.asset_key else [f"{upstream.step_key}_{upstream.name}"]
        
        return os.path.join(self.base_path, *path_parts)

@io_manager(config_schema={"base_path": str})
def typed_io_manager(context):
    return TypedIOManager(context.resource_config["base_path"])

Input Manager System

Input managers provide specialized loading logic for specific inputs, complementing I/O managers.

InputManager Interface

InputManager { .api }

Module: dagster._core.storage.input_manager
Type: Abstract base class

Base interface for input managers that handle specialized input loading.

from dagster import InputManager, input_manager, InputContext
import pandas as pd
import requests

class APIInputManager(InputManager):
    """Input manager for loading data from APIs."""
    
    def __init__(self, api_base_url: str, api_key: str):
        self.api_base_url = api_base_url
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({"Authorization": f"Bearer {api_key}"})
    
    def load_input(self, context: InputContext) -> pd.DataFrame:
        """Load data from API based on context."""
        # Use asset key to determine endpoint
        if context.asset_key:
            endpoint = context.asset_key.path[-1]  # Last part of asset key
        else:
            endpoint = context.name
        
        # Build API URL
        url = f"{self.api_base_url}/{endpoint}"
        
        # Add partition filtering if needed
        params = {}
        if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:
            # For partitioned inputs, filter by partition
            partition_keys = list(context.asset_partition_keys)
            params["partitions"] = ",".join(partition_keys)
            context.log.info(f"Loading partitions: {partition_keys}")
        
        # Fetch data
        context.log.info(f"Fetching data from {url}")
        response = self.session.get(url, params=params)
        response.raise_for_status()
        
        # Convert to DataFrame
        data = response.json()
        df = pd.DataFrame(data)
        
        context.log.info(f"Loaded {len(df)} records from API")
        return df

@input_manager(
    config_schema={
        "api_base_url": str,
        "api_key": str
    }
)
def api_input_manager(context):
    """Create API input manager from configuration."""
    return APIInputManager(
        api_base_url=context.resource_config["api_base_url"],
        api_key=context.resource_config["api_key"]
    )

# Usage in assets
@asset(input_manager_key="api_loader")
def external_users(context, users_api_data: pd.DataFrame) -> pd.DataFrame:
    """Asset loading from external API."""
    # users_api_data is loaded via API input manager
    return users_api_data.dropna()

defs = Definitions(
    assets=[external_users],
    resources={
        "api_loader": api_input_manager.configured({
            "api_base_url": "https://api.example.com/v1",
            "api_key": "secret-api-key"
        })
    }
)

Specialized Input Managers

Database Input Manager

class DatabaseInputManager(InputManager):
    """Input manager for loading data from databases."""
    
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
    
    def load_input(self, context: InputContext) -> pd.DataFrame:
        """Load data from database table."""
        
        # Derive table name from asset key or input metadata
        if context.asset_key:
            table_name = "_".join(context.asset_key.path)
        elif context.metadata and "table_name" in context.metadata:
            table_name = context.metadata["table_name"]
        else:
            table_name = context.name
        
        # Build query
        query = f"SELECT * FROM {table_name}"
        
        # Add partition filtering for time-partitioned data
        if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:
            partition_keys = list(context.asset_partition_keys)
            # Assuming date-based partitions
            if len(partition_keys) == 1:
                query += f" WHERE date = '{partition_keys[0]}'"
            else:
                date_list = "','".join(partition_keys)
                query += f" WHERE date IN ('{date_list}')"
        
        context.log.info(f"Executing query: {query}")
        
        # Execute query
        df = pd.read_sql(query, self.connection_string)
        
        context.log.info(f"Loaded {len(df)} records from {table_name}")
        return df

@input_manager(
    config_schema={"connection_string": str},
    required_resource_keys={"database"}
)
def database_input_manager(context):
    connection_string = context.resource_config["connection_string"]
    return DatabaseInputManager(connection_string)

File Management System

FileHandle and LocalFileHandle

FileHandle { .api }

Module: dagster._core.storage.file_manager
Type: Abstract base class

Abstract file handle for managing file references.

from dagster import FileHandle, LocalFileHandle, resource

class CustomFileHandle(FileHandle):
    """Custom file handle implementation."""
    
    def __init__(self, file_path: str, file_manager):
        self.file_path = file_path
        self.file_manager = file_manager
    
    @property
    def path_desc(self) -> str:
        """Description of file path."""
        return f"custom://{self.file_path}"

# Usage with file-based assets
@asset
def file_based_asset(context) -> FileHandle:
    """Asset that produces a file handle."""
    
    # Generate file content
    data = {"key": "value", "timestamp": pd.Timestamp.now().isoformat()}
    
    # Get file manager from resources
    file_manager = context.resources.file_manager
    
    # Write file and get handle
    with file_manager.write_data(data) as file_handle:
        context.log.info(f"Created file: {file_handle.path_desc}")
        return file_handle

@asset
def process_file(context, file_based_asset: FileHandle) -> dict:
    """Asset that processes a file handle."""
    
    # Read file through handle
    file_manager = context.resources.file_manager
    
    with file_manager.read(file_based_asset) as file_obj:
        data = json.load(file_obj)
    
    context.log.info(f"Processed file: {file_based_asset.path_desc}")
    
    return {"processed": True, "original_data": data}

local_file_manager { .api }

Module: dagster._core.storage.file_manager
Type: ResourceDefinition

Built-in local file manager for file-based operations.

from dagster import local_file_manager, Definitions
import tempfile
import json

@resource(config_schema={"base_dir": str})
def custom_file_manager(context):
    """Custom file manager with specific directory."""
    base_dir = context.resource_config["base_dir"]
    os.makedirs(base_dir, exist_ok=True)
    return LocalFileManager(base_dir)

@asset
def config_file(context) -> FileHandle:
    """Asset that creates a configuration file."""
    
    config_data = {
        "database_url": "postgresql://localhost/mydb",
        "api_endpoints": ["https://api1.com", "https://api2.com"],
        "settings": {"timeout": 30, "retries": 3}
    }
    
    # Use file manager to create file
    file_manager = context.resources.file_manager
    
    # Create temporary file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
        json.dump(config_data, f, indent=2)
        temp_path = f.name
    
    # Copy to managed location
    file_handle = file_manager.copy_handle_to_local_temp(temp_path)
    
    context.log.info(f"Created config file: {file_handle.path_desc}")
    return file_handle

defs = Definitions(
    assets=[config_file, process_file],
    resources={
        "file_manager": local_file_manager.configured({
            "base_dir": "/tmp/dagster-files"
        })
    }
)

Asset Value Loading

AssetValueLoader

AssetValueLoader { .api }

Module: dagster._core.storage.asset_value_loader
Type: Class

Utility for loading materialized asset values outside of execution context.

from dagster import AssetValueLoader, DagsterInstance, materialize

@asset
def upstream_data() -> pd.DataFrame:
    return pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})

@asset
def downstream_result(upstream_data: pd.DataFrame) -> dict:
    return {"total": upstream_data["value"].sum(), "count": len(upstream_data)}

# Materialize assets
instance = DagsterInstance.ephemeral()
result = materialize([upstream_data, downstream_result], instance=instance)

# Load asset values after materialization
loader = AssetValueLoader(instance)

# Load specific asset value
upstream_value = loader.load_asset_value(AssetKey("upstream_data"))
print(f"Upstream data: {upstream_value}")

# Load with partition key (for partitioned assets)
partitioned_value = loader.load_asset_value(
    AssetKey("partitioned_asset"), 
    partition_key="2023-01-01"
)

# Load multiple asset values
asset_keys = [AssetKey("upstream_data"), AssetKey("downstream_result")]
values = loader.load_asset_values(asset_keys)

for asset_key, value in values.items():
    print(f"Asset {asset_key}: {value}")

Advanced Storage Patterns

Multi-Backend I/O Manager

from dagster import IOManager, ConfigurableResource, Field
from typing import Dict, Any, Literal

class MultiBackendIOManager(ConfigurableResource, IOManager):
    """I/O manager supporting multiple storage backends."""
    
    default_backend: Literal["local", "s3", "gcs"] = "local"
    backends: Dict[str, Any] = Field(
        default_factory=dict,
        description="Backend-specific configurations"
    )
    
    def setup_for_execution(self, context) -> "MultiBackendIOManager":
        """Initialize backend-specific managers."""
        self._backend_managers = {}
        
        if "local" in self.backends or self.default_backend == "local":
            self._backend_managers["local"] = LocalIOManager(
                self.backends.get("local", {}).get("base_dir", "/tmp/dagster")
            )
        
        if "s3" in self.backends or self.default_backend == "s3":
            s3_config = self.backends.get("s3", {})
            self._backend_managers["s3"] = S3IOManager(
                bucket_name=s3_config.get("bucket_name"),
                prefix=s3_config.get("prefix", "dagster")
            )
        
        return self
    
    def handle_output(self, context: OutputContext, obj) -> None:
        """Route output to appropriate backend."""
        # Determine backend from context metadata or default
        backend = self._get_backend_for_context(context)
        manager = self._backend_managers[backend]
        
        context.log.info(f"Storing output using {backend} backend")
        manager.handle_output(context, obj)
    
    def load_input(self, context: InputContext):
        """Route input loading to appropriate backend."""
        backend = self._get_backend_for_context(context)
        manager = self._backend_managers[backend]
        
        context.log.info(f"Loading input using {backend} backend")
        return manager.load_input(context)
    
    def _get_backend_for_context(self, context) -> str:
        """Determine appropriate backend for context."""
        # Check context metadata for backend preference
        if hasattr(context, 'metadata') and context.metadata:
            if "storage_backend" in context.metadata:
                return context.metadata["storage_backend"]
        
        # Check asset group for backend routing
        if hasattr(context, 'asset_key') and context.asset_key:
            # Route based on asset key pattern
            path_parts = context.asset_key.path
            if "raw" in path_parts:
                return "local"  # Raw data stored locally
            elif "processed" in path_parts:
                return "s3"     # Processed data in S3
            elif "ml_models" in path_parts:
                return "gcs"    # ML models in GCS
        
        return self.default_backend

# Usage with routing configuration
multi_backend_io = MultiBackendIOManager(
    default_backend="s3",
    backends={
        "local": {"base_dir": "/tmp/dagster"},
        "s3": {"bucket_name": "my-data-bucket", "prefix": "dagster-storage"},
        "gcs": {"bucket_name": "my-ml-bucket", "prefix": "models"}
    }
)

@asset(metadata={"storage_backend": "local"})
def raw_data() -> pd.DataFrame:
    """Asset stored locally."""
    return pd.DataFrame({"raw": [1, 2, 3]})

@asset(metadata={"storage_backend": "s3"})  
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
    """Asset stored in S3."""
    return raw_data * 2

@asset(group_name="ml_models")  # Routed to GCS via group name
def trained_model(processed_data: pd.DataFrame) -> dict:
    """Model stored in GCS."""
    return {"model_type": "linear", "trained": True}

Versioned Storage I/O Manager

class VersionedIOManager(IOManager):
    """I/O manager with automatic versioning."""
    
    def __init__(self, base_path: str, enable_versioning: bool = True):
        self.base_path = base_path
        self.enable_versioning = enable_versioning
    
    def handle_output(self, context: OutputContext, obj) -> None:
        """Store output with versioning."""
        base_file_path = self._get_base_path(context)
        
        if self.enable_versioning:
            # Create version based on run ID and timestamp
            version = f"{context.run_id}_{int(pd.Timestamp.now().timestamp())}"
            versioned_path = f"{base_file_path}/v_{version}"
            
            # Store versioned copy
            self._store_object(versioned_path, obj, context)
            
            # Create/update "current" symlink or copy
            current_path = f"{base_file_path}/current"
            if os.path.exists(current_path):
                if os.path.islink(current_path):
                    os.unlink(current_path)
                else:
                    shutil.rmtree(current_path)
            
            os.symlink(versioned_path, current_path)
            
            context.add_output_metadata({
                "version": version,
                "versioned_path": versioned_path,
                "current_path": current_path
            })
        else:
            # Direct storage without versioning
            self._store_object(base_file_path, obj, context)
    
    def load_input(self, context: InputContext):
        """Load input, preferring current version."""
        base_file_path = self._get_base_path(context)
        
        # Try to load from "current" first
        current_path = f"{base_file_path}/current"
        if os.path.exists(current_path):
            context.log.info(f"Loading current version from {current_path}")
            return self._load_object(current_path, context)
        
        # Fallback to direct path
        if os.path.exists(base_file_path):
            return self._load_object(base_file_path, context)
        
        raise FileNotFoundError(f"No data found at {base_file_path}")
    
    def _store_object(self, path: str, obj, context) -> None:
        """Store object to specific path."""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        
        if isinstance(obj, pd.DataFrame):
            obj.to_parquet(f"{path}.parquet")
        else:
            with open(f"{path}.pkl", "wb") as f:
                pickle.dump(obj, f)
    
    def _load_object(self, path: str, context):
        """Load object from specific path."""
        if os.path.exists(f"{path}.parquet"):
            return pd.read_parquet(f"{path}.parquet")
        elif os.path.exists(f"{path}.pkl"):
            with open(f"{path}.pkl", "rb") as f:
                return pickle.load(f)
        else:
            raise FileNotFoundError(f"No data file found at {path}")

This comprehensive storage and I/O system provides flexible, pluggable storage for all Dagster computations, with built-in support for multiple backends, type-aware serialization, versioning, and metadata tracking. The system scales from simple local development to complex multi-cloud production deployments.

For configuration of I/O managers and resources, see Configuration System. For contexts that use I/O managers, see Execution and Contexts.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json