or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compute-resources.mdcontainer-images.mdcore-application-client.mdfunction-decorators-helpers.mdindex.mdinfrastructure-services.mdruntime-utilities.mdscheduling-reliability.mdstorage-data.mdutility-classes.mdweb-api-integration.md

storage-data.mddocs/

0

# Storage & Data

1

2

Modal provides comprehensive storage solutions for persisting data across function calls, including volumes, network file systems, key-value stores, queues, and cloud bucket mounts. These storage primitives enable building stateful applications while maintaining the benefits of serverless architecture.

3

4

## Capabilities

5

6

### Volume - Persistent Networked File System

7

8

Persistent networked file system storage that provides POSIX-like file operations and can be mounted to multiple functions simultaneously.

9

10

```python { .api }

11

class Volume:

12

@classmethod

13

def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "Volume":

14

"""Load a Volume by its unique name"""

15

16

@classmethod

17

def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "Volume":

18

"""Create a persistent Volume with given name"""

19

20

@classmethod

21

def ephemeral(cls, **kwargs) -> "Volume":

22

"""Create an ephemeral Volume that's deleted when not in use"""

23

24

def listdir(self, path: str) -> list[FileEntry]:

25

"""List files and directories at path"""

26

27

def iterdir(self, path: str) -> AsyncIterator[FileEntry]:

28

"""Async iterator over files and directories at path"""

29

30

def put_file(

31

self,

32

local_file: Union[str, Path, BinaryIO],

33

remote_path: str,

34

*,

35

progress: Optional[bool] = None

36

) -> None:

37

"""Upload a local file to the volume"""

38

39

def put_directory(

40

self,

41

local_path: Union[str, Path],

42

remote_path: str,

43

*,

44

pattern: Optional[str] = None,

45

progress: Optional[bool] = None

46

) -> None:

47

"""Upload a local directory to the volume"""

48

49

def get_file(self, remote_path: str, local_file: Union[str, Path, BinaryIO]) -> None:

50

"""Download a file from the volume to local storage"""

51

52

def remove_file(self, path: str, *, recursive: bool = False) -> None:

53

"""Remove a file or directory from the volume"""

54

55

def exists(self, path: str) -> bool:

56

"""Check if a path exists in the volume"""

57

58

def reload(self) -> None:

59

"""Reload the volume to get latest state"""

60

61

class FileEntry:

62

"""Represents a file or directory entry in a volume"""

63

path: str

64

type: FileEntryType # FILE, DIRECTORY, SYMLINK, etc.

65

mtime: int # Modified time as Unix timestamp

66

size: int # Size in bytes

67

68

class FileEntryType:

69

"""Type of file entry"""

70

FILE: int

71

DIRECTORY: int

72

SYMLINK: int

73

FIFO: int

74

SOCKET: int

75

```

76

77

#### Usage Examples

78

79

```python

80

import modal

81

82

app = modal.App()

83

84

# Create a persistent volume

85

volume = modal.Volume.persist("my-data-volume")

86

87

@app.function(volumes={"/data": volume})

88

def process_data():

89

# Files are accessible at /data mount point

90

with open("/data/input.txt", "r") as f:

91

content = f.read()

92

93

# Process data and save results

94

with open("/data/output.txt", "w") as f:

95

f.write(f"Processed: {content}")

96

97

# Upload files to volume from local machine

98

@app.local_entrypoint()

99

def upload_data():

100

volume.put_file("local_data.txt", "/input.txt")

101

volume.put_directory("./datasets", "/datasets")

102

103

# List volume contents

104

for entry in volume.listdir("/"):

105

print(f"{entry.path}: {entry.type.name}, {entry.size} bytes")

106

```

107

108

### NetworkFileSystem - Shared Networked Storage

109

110

Shared networked file system that allows multiple functions to read and write files concurrently with better performance for frequent access patterns.

111

112

```python { .api }

113

class NetworkFileSystem:

114

@classmethod

115

def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "NetworkFileSystem":

116

"""Load a NetworkFileSystem by its unique name"""

117

118

@classmethod

119

def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "NetworkFileSystem":

120

"""Create a persistent NetworkFileSystem with given name"""

121

122

@classmethod

123

def ephemeral(cls, **kwargs) -> "NetworkFileSystem":

124

"""Create an ephemeral NetworkFileSystem"""

125

```

126

127

#### Usage Examples

128

129

```python

130

import modal

131

132

app = modal.App()

133

nfs = modal.NetworkFileSystem.persist("shared-cache")

134

135

@app.function(network_file_systems={"/cache": nfs})

136

def worker_function(task_id: str):

137

cache_file = f"/cache/task_{task_id}.json"

138

139

# Check if cached result exists

140

if os.path.exists(cache_file):

141

with open(cache_file, "r") as f:

142

return json.load(f)

143

144

# Process and cache result

145

result = expensive_computation(task_id)

146

with open(cache_file, "w") as f:

147

json.dump(result, f)

148

149

return result

150

```

151

152

### Dict - Persistent Key-Value Store

153

154

Distributed key-value store for sharing data between functions with automatic serialization and deserialization.

155

156

```python { .api }

157

class Dict:

158

@classmethod

159

def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "Dict":

160

"""Load a Dict by its unique name"""

161

162

@classmethod

163

def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "Dict":

164

"""Create a persistent Dict with given name"""

165

166

@classmethod

167

def ephemeral(cls, **kwargs) -> "Dict":

168

"""Create an ephemeral Dict"""

169

170

def get(self, key: str, default: Any = None) -> Any:

171

"""Get value by key, returns default if key doesn't exist"""

172

173

def __getitem__(self, key: str) -> Any:

174

"""Get value by key using dict[key] syntax"""

175

176

def put(self, key: str, value: Any) -> None:

177

"""Put key-value pair"""

178

179

def __setitem__(self, key: str, value: Any) -> None:

180

"""Set value using dict[key] = value syntax"""

181

182

def pop(self, key: str, default: Any = None) -> Any:

183

"""Remove and return value for key"""

184

185

def __delitem__(self, key: str) -> None:

186

"""Delete key using del dict[key] syntax"""

187

188

def update(self, mapping: Mapping[str, Any]) -> None:

189

"""Update multiple key-value pairs"""

190

191

def clear(self) -> None:

192

"""Remove all items from the dict"""

193

194

def len(self) -> int:

195

"""Get number of items in the dict"""

196

197

def __len__(self) -> int:

198

"""Get number of items using len(dict) syntax"""

199

200

def contains(self, key: str) -> bool:

201

"""Check if key exists in dict"""

202

203

def __contains__(self, key: str) -> bool:

204

"""Check if key exists using 'key in dict' syntax"""

205

206

def keys(self) -> list[str]:

207

"""Get all keys as a list"""

208

209

def values(self) -> list[Any]:

210

"""Get all values as a list"""

211

212

def items(self) -> list[tuple[str, Any]]:

213

"""Get all key-value pairs as list of tuples"""

214

215

def iterate_keys(self) -> AsyncIterator[str]:

216

"""Async iterator over all keys"""

217

218

def iterate_values(self) -> AsyncIterator[Any]:

219

"""Async iterator over all values"""

220

221

def iterate_items(self) -> AsyncIterator[tuple[str, Any]]:

222

"""Async iterator over all key-value pairs"""

223

224

class DictInfo:

225

"""Information about a Dict object"""

226

name: Optional[str]

227

created_at: datetime

228

created_by: Optional[str]

229

```

230

231

#### Usage Examples

232

233

```python

234

import modal

235

236

app = modal.App()

237

shared_dict = modal.Dict.persist("config-store")

238

239

@app.function()

240

def setup_config():

241

# Store configuration data

242

shared_dict["database_url"] = "postgresql://..."

243

shared_dict["api_keys"] = {"service_a": "key1", "service_b": "key2"}

244

shared_dict["feature_flags"] = {"new_feature": True, "beta_mode": False}

245

246

@app.function()

247

def worker_task():

248

# Access shared configuration

249

db_url = shared_dict["database_url"]

250

api_keys = shared_dict.get("api_keys", {})

251

252

# Check feature flag

253

if shared_dict.get("feature_flags", {}).get("new_feature", False):

254

return use_new_algorithm()

255

else:

256

return use_legacy_algorithm()

257

258

@app.function()

259

def analytics_function():

260

# Update metrics

261

current_count = shared_dict.get("request_count", 0)

262

shared_dict["request_count"] = current_count + 1

263

264

# Store processing results

265

results = shared_dict.get("daily_results", [])

266

results.append({"timestamp": time.time(), "processed": 100})

267

shared_dict["daily_results"] = results

268

```

269

270

### Queue - Distributed Task Queue

271

272

Distributed queue for asynchronous task processing with automatic serialization and FIFO ordering.

273

274

```python { .api }

275

class Queue:

276

@classmethod

277

def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "Queue":

278

"""Load a Queue by its unique name"""

279

280

@classmethod

281

def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "Queue":

282

"""Create a persistent Queue with given name"""

283

284

@classmethod

285

def ephemeral(cls, **kwargs) -> "Queue":

286

"""Create an ephemeral Queue"""

287

288

def put(self, item: Any, *, block: bool = True) -> None:

289

"""Put an item into the queue"""

290

291

def put_many(self, items: list[Any], *, block: bool = True) -> None:

292

"""Put multiple items into the queue"""

293

294

def get(self, *, block: bool = True, timeout: Optional[float] = None) -> Any:

295

"""Get an item from the queue"""

296

297

def get_many(self, n: int, *, block: bool = True, timeout: Optional[float] = None) -> list[Any]:

298

"""Get multiple items from the queue"""

299

300

def iterate(self, *, timeout: Optional[float] = None) -> AsyncIterator[Any]:

301

"""Async iterator over queue items"""

302

303

def len(self) -> int:

304

"""Get approximate number of items in queue"""

305

306

def __len__(self) -> int:

307

"""Get approximate number of items using len(queue) syntax"""

308

309

class QueueInfo:

310

"""Information about a Queue object"""

311

name: Optional[str]

312

created_at: datetime

313

created_by: Optional[str]

314

```

315

316

#### Usage Examples

317

318

```python

319

import modal

320

321

app = modal.App()

322

task_queue = modal.Queue.persist("work-queue")

323

324

# Producer function

325

@app.function()

326

def generate_tasks():

327

tasks = [{"id": i, "data": f"task_{i}"} for i in range(100)]

328

task_queue.put_many(tasks)

329

print(f"Added {len(tasks)} tasks to queue")

330

331

# Consumer function

332

@app.function()

333

def process_tasks():

334

while True:

335

try:

336

# Get task with timeout

337

task = task_queue.get(timeout=30)

338

339

# Process the task

340

result = expensive_operation(task["data"])

341

print(f"Processed task {task['id']}: {result}")

342

343

except queue.Empty:

344

print("No more tasks, worker stopping")

345

break

346

347

# Batch consumer

348

@app.function()

349

def batch_processor():

350

# Process tasks in batches for efficiency

351

for batch in task_queue.iterate():

352

tasks = task_queue.get_many(10) # Get up to 10 tasks

353

results = [process_single_task(task) for task in tasks]

354

print(f"Processed batch of {len(results)} tasks")

355

```

356

357

### CloudBucketMount - Cloud Storage Integration

358

359

Mount cloud storage buckets (S3, GCS, Azure) as file systems within Modal functions.

360

361

```python { .api }

362

class CloudBucketMount:

363

@classmethod

364

def from_s3_bucket(

365

cls,

366

bucket_name: str,

367

*,

368

key_prefix: str = "",

369

secret: Optional["Secret"] = None,

370

read_only: bool = True

371

) -> "CloudBucketMount":

372

"""Mount an S3 bucket"""

373

374

@classmethod

375

def from_gcs_bucket(

376

cls,

377

bucket_name: str,

378

*,

379

key_prefix: str = "",

380

secret: Optional["Secret"] = None,

381

read_only: bool = True

382

) -> "CloudBucketMount":

383

"""Mount a Google Cloud Storage bucket"""

384

385

@classmethod

386

def from_azure_blob_storage(

387

cls,

388

account_name: str,

389

container_name: str,

390

*,

391

key_prefix: str = "",

392

secret: Optional["Secret"] = None,

393

read_only: bool = True

394

) -> "CloudBucketMount":

395

"""Mount an Azure Blob Storage container"""

396

```

397

398

#### Usage Examples

399

400

```python

401

import modal

402

403

app = modal.App()

404

405

# Mount S3 bucket

406

s3_mount = modal.CloudBucketMount.from_s3_bucket(

407

"my-data-bucket",

408

secret=modal.Secret.from_name("aws-credentials"),

409

read_only=False

410

)

411

412

@app.function(cloud_bucket_mounts={"/s3-data": s3_mount})

413

def process_s3_data():

414

# Access S3 files as local files

415

with open("/s3-data/input/data.csv", "r") as f:

416

data = f.read()

417

418

# Process data

419

result = analyze_data(data)

420

421

# Write results back to S3

422

with open("/s3-data/output/results.json", "w") as f:

423

json.dump(result, f)

424

425

# Mount GCS bucket

426

gcs_mount = modal.CloudBucketMount.from_gcs_bucket(

427

"my-gcs-bucket",

428

secret=modal.Secret.from_name("gcp-credentials")

429

)

430

431

@app.function(cloud_bucket_mounts={"/gcs-data": gcs_mount})

432

def backup_to_gcs():

433

# Read local data

434

local_files = os.listdir("/tmp/data")

435

436

# Copy to GCS through mount

437

for filename in local_files:

438

shutil.copy(f"/tmp/data/{filename}", f"/gcs-data/backup/{filename}")

439

```

440

441

## Storage Patterns

442

443

### Data Pipeline with Multiple Storage Types

444

445

```python

446

import modal

447

448

app = modal.App()

449

450

# Different storage for different use cases

451

raw_data_volume = modal.Volume.persist("raw-data") # Large file storage

452

processed_cache = modal.NetworkFileSystem.persist("cache") # Fast shared access

453

config_dict = modal.Dict.persist("pipeline-config") # Configuration

454

task_queue = modal.Queue.persist("processing-queue") # Task coordination

455

456

@app.function(

457

volumes={"/raw": raw_data_volume},

458

network_file_systems={"/cache": processed_cache}

459

)

460

def data_pipeline():

461

# Get configuration

462

batch_size = config_dict.get("batch_size", 100)

463

464

# Process raw data files

465

for filename in os.listdir("/raw/input"):

466

# Check cache first

467

cache_key = f"/cache/processed_{filename}"

468

if os.path.exists(cache_key):

469

continue # Already processed

470

471

# Process file

472

with open(f"/raw/input/{filename}", "r") as f:

473

data = f.read()

474

475

result = process_data_file(data)

476

477

# Cache result

478

with open(cache_key, "w") as f:

479

json.dump(result, f)

480

481

# Queue downstream tasks

482

task_queue.put({"type": "notify", "file": filename, "result": result})

483

```

484

485

### Shared State Between Functions

486

487

```python

488

import modal

489

490

app = modal.App()

491

shared_state = modal.Dict.persist("worker-state")

492

493

@app.function()

494

def coordinator():

495

# Initialize shared state

496

shared_state["active_workers"] = 0

497

shared_state["total_processed"] = 0

498

shared_state["status"] = "starting"

499

500

# Start workers

501

for i in range(5):

502

worker.spawn(f"worker-{i}")

503

504

@app.function()

505

def worker(worker_id: str):

506

# Register worker

507

current_workers = shared_state.get("active_workers", 0)

508

shared_state["active_workers"] = current_workers + 1

509

510

try:

511

# Do work

512

for task in get_tasks():

513

result = process_task(task)

514

515

# Update shared counters

516

total = shared_state.get("total_processed", 0)

517

shared_state["total_processed"] = total + 1

518

519

finally:

520

# Unregister worker

521

current_workers = shared_state.get("active_workers", 0)

522

shared_state["active_workers"] = max(0, current_workers - 1)

523

524

# Check if all workers done

525

if shared_state["active_workers"] == 0:

526

shared_state["status"] = "completed"

527

```