or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-definitions.mderror-handling.mdevents-metadata.mdexecution-contexts.mdindex.mdpartitions.mdsensors-schedules.mdstorage-io.md

storage-io.mddocs/

0

# Storage and I/O Management

1

2

This document covers Dagster's storage and I/O management system, including I/O managers, input managers, file management, and built-in storage backends. The I/O system provides pluggable storage for asset and operation outputs with automatic serialization, deserialization, and metadata tracking.

3

4

## I/O Manager System

5

6

I/O managers handle the storage and retrieval of asset and operation outputs, providing a clean abstraction over different storage backends.

7

8

### IOManager Interface

9

10

#### `IOManager` { .api }

11

12

**Module:** `dagster._core.storage.io_manager`

13

**Type:** Abstract base class

14

15

Base interface for I/O managers that handle asset and operation output storage.

16

17

```python

18

from dagster import IOManager, InputContext, OutputContext

19

import pandas as pd

20

import pickle

21

import os

22

23

class CustomIOManager(IOManager):

24

"""Custom I/O manager implementation."""

25

26

def __init__(self, base_path: str = "/tmp/dagster"):

27

self.base_path = base_path

28

os.makedirs(base_path, exist_ok=True)

29

30

def handle_output(self, context: OutputContext, obj) -> None:

31

"""Store output object."""

32

# Generate file path from context

33

if context.asset_key:

34

# For assets, use asset key path

35

path_parts = context.asset_key.path

36

file_path = os.path.join(self.base_path, *path_parts)

37

else:

38

# For ops, use step key and output name

39

file_path = os.path.join(self.base_path, f"{context.step_key}_{context.name}")

40

41

# Create directory if needed

42

os.makedirs(os.path.dirname(file_path), exist_ok=True)

43

44

# Handle different object types

45

if isinstance(obj, pd.DataFrame):

46

file_path += ".parquet"

47

obj.to_parquet(file_path)

48

context.log.info(f"Stored DataFrame with {len(obj)} rows to {file_path}")

49

else:

50

file_path += ".pkl"

51

with open(file_path, "wb") as f:

52

pickle.dump(obj, f)

53

context.log.info(f"Stored object to {file_path}")

54

55

# Add metadata about storage

56

context.add_output_metadata({

57

"file_path": file_path,

58

"file_size_bytes": os.path.getsize(file_path),

59

"storage_type": "local_filesystem"

60

})

61

62

def load_input(self, context: InputContext):

63

"""Load input object."""

64

# Generate file path from context

65

if context.asset_key:

66

path_parts = context.asset_key.path

67

base_file_path = os.path.join(self.base_path, *path_parts)

68

else:

69

# For op outputs, need to determine path from upstream

70

upstream_context = context.upstream_output

71

if upstream_context.asset_key:

72

path_parts = upstream_context.asset_key.path

73

base_file_path = os.path.join(self.base_path, *path_parts)

74

else:

75

base_file_path = os.path.join(

76

self.base_path,

77

f"{upstream_context.step_key}_{upstream_context.name}"

78

)

79

80

# Try different file extensions

81

if os.path.exists(base_file_path + ".parquet"):

82

file_path = base_file_path + ".parquet"

83

obj = pd.read_parquet(file_path)

84

context.log.info(f"Loaded DataFrame with {len(obj)} rows from {file_path}")

85

return obj

86

elif os.path.exists(base_file_path + ".pkl"):

87

file_path = base_file_path + ".pkl"

88

with open(file_path, "rb") as f:

89

obj = pickle.load(f)

90

context.log.info(f"Loaded object from {file_path}")

91

return obj

92

else:

93

raise FileNotFoundError(f"No file found at {base_file_path}")

94

95

# Create I/O manager resource

96

@io_manager(config_schema={"base_path": str})

97

def custom_io_manager(context):

98

base_path = context.resource_config.get("base_path", "/tmp/dagster")

99

return CustomIOManager(base_path)

100

```

101

102

### I/O Manager Decorator

103

104

#### `@io_manager` { .api }

105

106

**Module:** `dagster._core.storage.io_manager`

107

**Type:** Function decorator

108

109

Create an I/O manager resource from a function.

110

111

```python

112

from dagster import io_manager, IOManager, Field, String

113

import boto3

114

import pandas as pd

115

116

@io_manager(

117

config_schema={

118

"bucket_name": Field(String, description="S3 bucket name"),

119

"prefix": Field(String, default_value="dagster-storage", description="S3 key prefix")

120

},

121

required_resource_keys={"s3"}

122

)

123

def s3_io_manager(init_context) -> IOManager:

124

"""S3-based I/O manager."""

125

126

class S3IOManager(IOManager):

127

def __init__(self, bucket_name: str, prefix: str, s3_client):

128

self.bucket_name = bucket_name

129

self.prefix = prefix

130

self.s3_client = s3_client

131

132

def handle_output(self, context: OutputContext, obj) -> None:

133

# Generate S3 key

134

if context.asset_key:

135

key_parts = [self.prefix] + list(context.asset_key.path)

136

else:

137

key_parts = [self.prefix, f"{context.step_key}_{context.name}"]

138

139

s3_key = "/".join(key_parts)

140

141

# Store different types appropriately

142

if isinstance(obj, pd.DataFrame):

143

# Use parquet for DataFrames

144

s3_key += ".parquet"

145

buffer = BytesIO()

146

obj.to_parquet(buffer)

147

buffer.seek(0)

148

149

self.s3_client.put_object(

150

Bucket=self.bucket_name,

151

Key=s3_key,

152

Body=buffer.getvalue()

153

)

154

155

context.log.info(f"Stored DataFrame to s3://{self.bucket_name}/{s3_key}")

156

else:

157

# Use pickle for other objects

158

s3_key += ".pkl"

159

buffer = BytesIO()

160

pickle.dump(obj, buffer)

161

buffer.seek(0)

162

163

self.s3_client.put_object(

164

Bucket=self.bucket_name,

165

Key=s3_key,

166

Body=buffer.getvalue()

167

)

168

169

# Add metadata

170

context.add_output_metadata({

171

"s3_bucket": self.bucket_name,

172

"s3_key": s3_key,

173

"s3_uri": f"s3://{self.bucket_name}/{s3_key}"

174

})

175

176

def load_input(self, context: InputContext):

177

# Generate S3 key

178

if context.asset_key:

179

key_parts = [self.prefix] + list(context.asset_key.path)

180

else:

181

upstream_context = context.upstream_output

182

if upstream_context.asset_key:

183

key_parts = [self.prefix] + list(upstream_context.asset_key.path)

184

else:

185

key_parts = [self.prefix, f"{upstream_context.step_key}_{upstream_context.name}"]

186

187

base_s3_key = "/".join(key_parts)

188

189

# Try parquet first, then pickle

190

try:

191

s3_key = base_s3_key + ".parquet"

192

response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)

193

obj = pd.read_parquet(BytesIO(response['Body'].read()))

194

context.log.info(f"Loaded DataFrame from s3://{self.bucket_name}/{s3_key}")

195

return obj

196

except ClientError:

197

s3_key = base_s3_key + ".pkl"

198

response = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)

199

obj = pickle.load(BytesIO(response['Body'].read()))

200

context.log.info(f"Loaded object from s3://{self.bucket_name}/{s3_key}")

201

return obj

202

203

# Get configuration and resources

204

bucket_name = init_context.resource_config["bucket_name"]

205

prefix = init_context.resource_config["prefix"]

206

s3_client = init_context.resources.s3

207

208

return S3IOManager(bucket_name, prefix, s3_client)

209

```

210

211

### Built-in I/O Managers

212

213

#### `fs_io_manager` { .api }

214

215

**Module:** `dagster._core.storage.fs_io_manager`

216

**Type:** ResourceDefinition

217

218

Built-in filesystem I/O manager for local storage.

219

220

```python

221

from dagster import asset, fs_io_manager, Definitions

222

import pandas as pd

223

224

@asset

225

def sales_data() -> pd.DataFrame:

226

"""Generate sales data."""

227

return pd.DataFrame({

228

"date": pd.date_range("2023-01-01", periods=100),

229

"amount": np.random.randint(100, 1000, 100)

230

})

231

232

@asset

233

def monthly_sales(sales_data: pd.DataFrame) -> pd.DataFrame:

234

"""Aggregate sales by month."""

235

return sales_data.groupby(sales_data["date"].dt.to_period("M")).sum()

236

237

# Use filesystem I/O manager

238

defs = Definitions(

239

assets=[sales_data, monthly_sales],

240

resources={

241

"io_manager": fs_io_manager.configured({

242

"base_dir": "/tmp/dagster-storage" # Storage directory

243

})

244

}

245

)

246

247

# Advanced filesystem configuration

248

filesystem_io = fs_io_manager.configured({

249

"base_dir": "/data/warehouse",

250

"file_manager": local_file_manager # Optional file manager

251

})

252

```

253

254

#### `mem_io_manager` { .api }

255

256

**Module:** `dagster._core.storage.mem_io_manager`

257

**Type:** ResourceDefinition

258

259

Built-in in-memory I/O manager for testing and development.

260

261

```python

262

from dagster import mem_io_manager, materialize_to_memory

263

264

@asset

265

def in_memory_data() -> dict:

266

return {"key": "value", "count": 42}

267

268

@asset

269

def processed_memory_data(in_memory_data: dict) -> dict:

270

return {**in_memory_data, "processed": True}

271

272

# Materialize with memory I/O manager

273

result = materialize_to_memory([in_memory_data, processed_memory_data])

274

275

# Access values directly from memory

276

data = result.output_for_node("in_memory_data")

277

processed = result.output_for_node("processed_memory_data")

278

```

279

280

#### `UPathIOManager` { .api }

281

282

**Module:** `dagster._core.storage.upath_io_manager`

283

**Type:** Class

284

285

Universal path I/O manager supporting multiple storage backends via UPath.

286

287

```python

288

from dagster import UPathIOManager, ConfigurableResource

289

from pydantic import Field

290

291

class CloudStorageIOManager(ConfigurableResource):

292

"""Universal cloud storage I/O manager."""

293

294

base_path: str = Field(description="Base path (supports s3://, gs://, etc.)")

295

296

def create_io_manager(self, context) -> UPathIOManager:

297

return UPathIOManager(base_path=self.base_path)

298

299

# Usage with different storage backends

300

s3_io_manager = CloudStorageIOManager(base_path="s3://my-bucket/dagster-storage")

301

gcs_io_manager = CloudStorageIOManager(base_path="gs://my-bucket/dagster-storage")

302

azure_io_manager = CloudStorageIOManager(base_path="az://my-container/dagster-storage")

303

local_io_manager = CloudStorageIOManager(base_path="/tmp/dagster-storage")

304

305

# Assets automatically work with any backend

306

@asset(io_manager_key="cloud_storage")

307

def cloud_data() -> pd.DataFrame:

308

return pd.DataFrame({"value": [1, 2, 3]})

309

310

defs = Definitions(

311

assets=[cloud_data],

312

resources={

313

"cloud_storage": s3_io_manager # Switch between storage backends easily

314

}

315

)

316

```

317

318

### Custom I/O Manager Patterns

319

320

#### Type-Specific I/O Manager

321

322

```python

323

from dagster import IOManager, DagsterType

324

import pandas as pd

325

import numpy as np

326

import json

327

328

class TypedIOManager(IOManager):

329

"""I/O manager with type-specific storage strategies."""

330

331

def __init__(self, base_path: str):

332

self.base_path = base_path

333

self.type_handlers = {

334

pd.DataFrame: self._handle_dataframe,

335

np.ndarray: self._handle_numpy_array,

336

dict: self._handle_dict,

337

list: self._handle_list

338

}

339

340

def handle_output(self, context: OutputContext, obj) -> None:

341

obj_type = type(obj)

342

343

# Find appropriate handler

344

handler = None

345

for type_cls, type_handler in self.type_handlers.items():

346

if isinstance(obj, type_cls):

347

handler = type_handler

348

break

349

350

if handler:

351

handler(context, obj, mode="write")

352

else:

353

# Fallback to pickle

354

self._handle_pickle(context, obj, mode="write")

355

356

def load_input(self, context: InputContext):

357

# Determine expected type from context

358

if context.dagster_type:

359

expected_type = context.dagster_type.typing_type

360

else:

361

expected_type = None

362

363

# Try type-specific loading

364

for type_cls, type_handler in self.type_handlers.items():

365

if expected_type and issubclass(expected_type, type_cls):

366

return type_handler(context, None, mode="read")

367

368

# Fallback to pickle

369

return self._handle_pickle(context, None, mode="read")

370

371

def _handle_dataframe(self, context, obj, mode):

372

path = self._get_path(context) + ".parquet"

373

if mode == "write":

374

obj.to_parquet(path)

375

context.add_output_metadata({"format": "parquet", "rows": len(obj)})

376

else:

377

return pd.read_parquet(path)

378

379

def _handle_numpy_array(self, context, obj, mode):

380

path = self._get_path(context) + ".npy"

381

if mode == "write":

382

np.save(path, obj)

383

context.add_output_metadata({"format": "numpy", "shape": obj.shape})

384

else:

385

return np.load(path)

386

387

def _handle_dict(self, context, obj, mode):

388

path = self._get_path(context) + ".json"

389

if mode == "write":

390

with open(path, "w") as f:

391

json.dump(obj, f, indent=2)

392

context.add_output_metadata({"format": "json", "keys": list(obj.keys())})

393

else:

394

with open(path, "r") as f:

395

return json.load(f)

396

397

def _get_path(self, context):

398

if context.asset_key:

399

path_parts = context.asset_key.path

400

else:

401

if hasattr(context, 'step_key'):

402

path_parts = [f"{context.step_key}_{context.name}"]

403

else:

404

# Input context

405

upstream = context.upstream_output

406

path_parts = list(upstream.asset_key.path) if upstream.asset_key else [f"{upstream.step_key}_{upstream.name}"]

407

408

return os.path.join(self.base_path, *path_parts)

409

410

@io_manager(config_schema={"base_path": str})

411

def typed_io_manager(context):

412

return TypedIOManager(context.resource_config["base_path"])

413

```

414

415

## Input Manager System

416

417

Input managers provide specialized loading logic for specific inputs, complementing I/O managers.

418

419

### InputManager Interface

420

421

#### `InputManager` { .api }

422

423

**Module:** `dagster._core.storage.input_manager`

424

**Type:** Abstract base class

425

426

Base interface for input managers that handle specialized input loading.

427

428

```python

429

from dagster import InputManager, input_manager, InputContext

430

import pandas as pd

431

import requests

432

433

class APIInputManager(InputManager):

434

"""Input manager for loading data from APIs."""

435

436

def __init__(self, api_base_url: str, api_key: str):

437

self.api_base_url = api_base_url

438

self.api_key = api_key

439

self.session = requests.Session()

440

self.session.headers.update({"Authorization": f"Bearer {api_key}"})

441

442

def load_input(self, context: InputContext) -> pd.DataFrame:

443

"""Load data from API based on context."""

444

# Use asset key to determine endpoint

445

if context.asset_key:

446

endpoint = context.asset_key.path[-1] # Last part of asset key

447

else:

448

endpoint = context.name

449

450

# Build API URL

451

url = f"{self.api_base_url}/{endpoint}"

452

453

# Add partition filtering if needed

454

params = {}

455

if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:

456

# For partitioned inputs, filter by partition

457

partition_keys = list(context.asset_partition_keys)

458

params["partitions"] = ",".join(partition_keys)

459

context.log.info(f"Loading partitions: {partition_keys}")

460

461

# Fetch data

462

context.log.info(f"Fetching data from {url}")

463

response = self.session.get(url, params=params)

464

response.raise_for_status()

465

466

# Convert to DataFrame

467

data = response.json()

468

df = pd.DataFrame(data)

469

470

context.log.info(f"Loaded {len(df)} records from API")

471

return df

472

473

@input_manager(

474

config_schema={

475

"api_base_url": str,

476

"api_key": str

477

}

478

)

479

def api_input_manager(context):

480

"""Create API input manager from configuration."""

481

return APIInputManager(

482

api_base_url=context.resource_config["api_base_url"],

483

api_key=context.resource_config["api_key"]

484

)

485

486

# Usage in assets

487

@asset(input_manager_key="api_loader")

488

def external_users(context, users_api_data: pd.DataFrame) -> pd.DataFrame:

489

"""Asset loading from external API."""

490

# users_api_data is loaded via API input manager

491

return users_api_data.dropna()

492

493

defs = Definitions(

494

assets=[external_users],

495

resources={

496

"api_loader": api_input_manager.configured({

497

"api_base_url": "https://api.example.com/v1",

498

"api_key": "secret-api-key"

499

})

500

}

501

)

502

```

503

504

### Specialized Input Managers

505

506

#### Database Input Manager

507

508

```python

509

class DatabaseInputManager(InputManager):

510

"""Input manager for loading data from databases."""

511

512

def __init__(self, connection_string: str):

513

self.connection_string = connection_string

514

515

def load_input(self, context: InputContext) -> pd.DataFrame:

516

"""Load data from database table."""

517

518

# Derive table name from asset key or input metadata

519

if context.asset_key:

520

table_name = "_".join(context.asset_key.path)

521

elif context.metadata and "table_name" in context.metadata:

522

table_name = context.metadata["table_name"]

523

else:

524

table_name = context.name

525

526

# Build query

527

query = f"SELECT * FROM {table_name}"

528

529

# Add partition filtering for time-partitioned data

530

if hasattr(context, 'asset_partition_keys') and context.asset_partition_keys:

531

partition_keys = list(context.asset_partition_keys)

532

# Assuming date-based partitions

533

if len(partition_keys) == 1:

534

query += f" WHERE date = '{partition_keys[0]}'"

535

else:

536

date_list = "','".join(partition_keys)

537

query += f" WHERE date IN ('{date_list}')"

538

539

context.log.info(f"Executing query: {query}")

540

541

# Execute query

542

df = pd.read_sql(query, self.connection_string)

543

544

context.log.info(f"Loaded {len(df)} records from {table_name}")

545

return df

546

547

@input_manager(

548

config_schema={"connection_string": str},

549

required_resource_keys={"database"}

550

)

551

def database_input_manager(context):

552

connection_string = context.resource_config["connection_string"]

553

return DatabaseInputManager(connection_string)

554

```

555

556

## File Management System

557

558

### FileHandle and LocalFileHandle

559

560

#### `FileHandle` { .api }

561

562

**Module:** `dagster._core.storage.file_manager`

563

**Type:** Abstract base class

564

565

Abstract file handle for managing file references.

566

567

```python

568

from dagster import FileHandle, LocalFileHandle, resource

569

570

class CustomFileHandle(FileHandle):

571

"""Custom file handle implementation."""

572

573

def __init__(self, file_path: str, file_manager):

574

self.file_path = file_path

575

self.file_manager = file_manager

576

577

@property

578

def path_desc(self) -> str:

579

"""Description of file path."""

580

return f"custom://{self.file_path}"

581

582

# Usage with file-based assets

583

@asset

584

def file_based_asset(context) -> FileHandle:

585

"""Asset that produces a file handle."""

586

587

# Generate file content

588

data = {"key": "value", "timestamp": pd.Timestamp.now().isoformat()}

589

590

# Get file manager from resources

591

file_manager = context.resources.file_manager

592

593

# Write file and get handle

594

with file_manager.write_data(data) as file_handle:

595

context.log.info(f"Created file: {file_handle.path_desc}")

596

return file_handle

597

598

@asset

599

def process_file(context, file_based_asset: FileHandle) -> dict:

600

"""Asset that processes a file handle."""

601

602

# Read file through handle

603

file_manager = context.resources.file_manager

604

605

with file_manager.read(file_based_asset) as file_obj:

606

data = json.load(file_obj)

607

608

context.log.info(f"Processed file: {file_based_asset.path_desc}")

609

610

return {"processed": True, "original_data": data}

611

```

612

613

#### `local_file_manager` { .api }

614

615

**Module:** `dagster._core.storage.file_manager`

616

**Type:** ResourceDefinition

617

618

Built-in local file manager for file-based operations.

619

620

```python

621

from dagster import local_file_manager, Definitions

622

import tempfile

623

import json

624

625

@resource(config_schema={"base_dir": str})

626

def custom_file_manager(context):

627

"""Custom file manager with specific directory."""

628

base_dir = context.resource_config["base_dir"]

629

os.makedirs(base_dir, exist_ok=True)

630

return LocalFileManager(base_dir)

631

632

@asset

633

def config_file(context) -> FileHandle:

634

"""Asset that creates a configuration file."""

635

636

config_data = {

637

"database_url": "postgresql://localhost/mydb",

638

"api_endpoints": ["https://api1.com", "https://api2.com"],

639

"settings": {"timeout": 30, "retries": 3}

640

}

641

642

# Use file manager to create file

643

file_manager = context.resources.file_manager

644

645

# Create temporary file

646

with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:

647

json.dump(config_data, f, indent=2)

648

temp_path = f.name

649

650

# Copy to managed location

651

file_handle = file_manager.copy_handle_to_local_temp(temp_path)

652

653

context.log.info(f"Created config file: {file_handle.path_desc}")

654

return file_handle

655

656

defs = Definitions(

657

assets=[config_file, process_file],

658

resources={

659

"file_manager": local_file_manager.configured({

660

"base_dir": "/tmp/dagster-files"

661

})

662

}

663

)

664

```

665

666

## Asset Value Loading

667

668

### AssetValueLoader

669

670

#### `AssetValueLoader` { .api }

671

672

**Module:** `dagster._core.storage.asset_value_loader`

673

**Type:** Class

674

675

Utility for loading materialized asset values outside of execution context.

676

677

```python

678

from dagster import AssetValueLoader, DagsterInstance, materialize

679

680

@asset

681

def upstream_data() -> pd.DataFrame:

682

return pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})

683

684

@asset

685

def downstream_result(upstream_data: pd.DataFrame) -> dict:

686

return {"total": upstream_data["value"].sum(), "count": len(upstream_data)}

687

688

# Materialize assets

689

instance = DagsterInstance.ephemeral()

690

result = materialize([upstream_data, downstream_result], instance=instance)

691

692

# Load asset values after materialization

693

loader = AssetValueLoader(instance)

694

695

# Load specific asset value

696

upstream_value = loader.load_asset_value(AssetKey("upstream_data"))

697

print(f"Upstream data: {upstream_value}")

698

699

# Load with partition key (for partitioned assets)

700

partitioned_value = loader.load_asset_value(

701

AssetKey("partitioned_asset"),

702

partition_key="2023-01-01"

703

)

704

705

# Load multiple asset values

706

asset_keys = [AssetKey("upstream_data"), AssetKey("downstream_result")]

707

values = loader.load_asset_values(asset_keys)

708

709

for asset_key, value in values.items():

710

print(f"Asset {asset_key}: {value}")

711

```

712

713

## Advanced Storage Patterns

714

715

### Multi-Backend I/O Manager

716

717

```python

718

from dagster import IOManager, ConfigurableResource, Field

719

from typing import Dict, Any, Literal

720

721

class MultiBackendIOManager(ConfigurableResource, IOManager):

722

"""I/O manager supporting multiple storage backends."""

723

724

default_backend: Literal["local", "s3", "gcs"] = "local"

725

backends: Dict[str, Any] = Field(

726

default_factory=dict,

727

description="Backend-specific configurations"

728

)

729

730

def setup_for_execution(self, context) -> "MultiBackendIOManager":

731

"""Initialize backend-specific managers."""

732

self._backend_managers = {}

733

734

if "local" in self.backends or self.default_backend == "local":

735

self._backend_managers["local"] = LocalIOManager(

736

self.backends.get("local", {}).get("base_dir", "/tmp/dagster")

737

)

738

739

if "s3" in self.backends or self.default_backend == "s3":

740

s3_config = self.backends.get("s3", {})

741

self._backend_managers["s3"] = S3IOManager(

742

bucket_name=s3_config.get("bucket_name"),

743

prefix=s3_config.get("prefix", "dagster")

744

)

745

746

return self

747

748

def handle_output(self, context: OutputContext, obj) -> None:

749

"""Route output to appropriate backend."""

750

# Determine backend from context metadata or default

751

backend = self._get_backend_for_context(context)

752

manager = self._backend_managers[backend]

753

754

context.log.info(f"Storing output using {backend} backend")

755

manager.handle_output(context, obj)

756

757

def load_input(self, context: InputContext):

758

"""Route input loading to appropriate backend."""

759

backend = self._get_backend_for_context(context)

760

manager = self._backend_managers[backend]

761

762

context.log.info(f"Loading input using {backend} backend")

763

return manager.load_input(context)

764

765

def _get_backend_for_context(self, context) -> str:

766

"""Determine appropriate backend for context."""

767

# Check context metadata for backend preference

768

if hasattr(context, 'metadata') and context.metadata:

769

if "storage_backend" in context.metadata:

770

return context.metadata["storage_backend"]

771

772

# Check asset group for backend routing

773

if hasattr(context, 'asset_key') and context.asset_key:

774

# Route based on asset key pattern

775

path_parts = context.asset_key.path

776

if "raw" in path_parts:

777

return "local" # Raw data stored locally

778

elif "processed" in path_parts:

779

return "s3" # Processed data in S3

780

elif "ml_models" in path_parts:

781

return "gcs" # ML models in GCS

782

783

return self.default_backend

784

785

# Usage with routing configuration

786

multi_backend_io = MultiBackendIOManager(

787

default_backend="s3",

788

backends={

789

"local": {"base_dir": "/tmp/dagster"},

790

"s3": {"bucket_name": "my-data-bucket", "prefix": "dagster-storage"},

791

"gcs": {"bucket_name": "my-ml-bucket", "prefix": "models"}

792

}

793

)

794

795

@asset(metadata={"storage_backend": "local"})

796

def raw_data() -> pd.DataFrame:

797

"""Asset stored locally."""

798

return pd.DataFrame({"raw": [1, 2, 3]})

799

800

@asset(metadata={"storage_backend": "s3"})

801

def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:

802

"""Asset stored in S3."""

803

return raw_data * 2

804

805

@asset(group_name="ml_models") # Routed to GCS via group name

806

def trained_model(processed_data: pd.DataFrame) -> dict:

807

"""Model stored in GCS."""

808

return {"model_type": "linear", "trained": True}

809

```

810

811

### Versioned Storage I/O Manager

812

813

```python

814

class VersionedIOManager(IOManager):

815

"""I/O manager with automatic versioning."""

816

817

def __init__(self, base_path: str, enable_versioning: bool = True):

818

self.base_path = base_path

819

self.enable_versioning = enable_versioning

820

821

def handle_output(self, context: OutputContext, obj) -> None:

822

"""Store output with versioning."""

823

base_file_path = self._get_base_path(context)

824

825

if self.enable_versioning:

826

# Create version based on run ID and timestamp

827

version = f"{context.run_id}_{int(pd.Timestamp.now().timestamp())}"

828

versioned_path = f"{base_file_path}/v_{version}"

829

830

# Store versioned copy

831

self._store_object(versioned_path, obj, context)

832

833

# Create/update "current" symlink or copy

834

current_path = f"{base_file_path}/current"

835

if os.path.exists(current_path):

836

if os.path.islink(current_path):

837

os.unlink(current_path)

838

else:

839

shutil.rmtree(current_path)

840

841

os.symlink(versioned_path, current_path)

842

843

context.add_output_metadata({

844

"version": version,

845

"versioned_path": versioned_path,

846

"current_path": current_path

847

})

848

else:

849

# Direct storage without versioning

850

self._store_object(base_file_path, obj, context)

851

852

def load_input(self, context: InputContext):

853

"""Load input, preferring current version."""

854

base_file_path = self._get_base_path(context)

855

856

# Try to load from "current" first

857

current_path = f"{base_file_path}/current"

858

if os.path.exists(current_path):

859

context.log.info(f"Loading current version from {current_path}")

860

return self._load_object(current_path, context)

861

862

# Fallback to direct path

863

if os.path.exists(base_file_path):

864

return self._load_object(base_file_path, context)

865

866

raise FileNotFoundError(f"No data found at {base_file_path}")

867

868

def _store_object(self, path: str, obj, context) -> None:

869

"""Store object to specific path."""

870

os.makedirs(os.path.dirname(path), exist_ok=True)

871

872

if isinstance(obj, pd.DataFrame):

873

obj.to_parquet(f"{path}.parquet")

874

else:

875

with open(f"{path}.pkl", "wb") as f:

876

pickle.dump(obj, f)

877

878

def _load_object(self, path: str, context):

879

"""Load object from specific path."""

880

if os.path.exists(f"{path}.parquet"):

881

return pd.read_parquet(f"{path}.parquet")

882

elif os.path.exists(f"{path}.pkl"):

883

with open(f"{path}.pkl", "rb") as f:

884

return pickle.load(f)

885

else:

886

raise FileNotFoundError(f"No data file found at {path}")

887

```

888

889

This comprehensive storage and I/O system provides flexible, pluggable storage for all Dagster computations, with built-in support for multiple backends, type-aware serialization, versioning, and metadata tracking. The system scales from simple local development to complex multi-cloud production deployments.

890

891

For configuration of I/O managers and resources, see [Configuration System](./configuration.md). For contexts that use I/O managers, see [Execution and Contexts](./execution-contexts.md).