0
# Storage & Data
1
2
Modal provides comprehensive storage solutions for persisting data across function calls, including volumes, network file systems, key-value stores, queues, and cloud bucket mounts. These storage primitives enable building stateful applications while maintaining the benefits of serverless architecture.
3
4
## Capabilities
5
6
### Volume - Persistent Networked File System
7
8
Persistent networked file system storage that provides POSIX-like file operations and can be mounted to multiple functions simultaneously.
9
10
```python { .api }
11
class Volume:
12
@classmethod
13
def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "Volume":
14
"""Load a Volume by its unique name"""
15
16
@classmethod
17
def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "Volume":
18
"""Create a persistent Volume with given name"""
19
20
@classmethod
21
def ephemeral(cls, **kwargs) -> "Volume":
22
"""Create an ephemeral Volume that's deleted when not in use"""
23
24
def listdir(self, path: str) -> list[FileEntry]:
25
"""List files and directories at path"""
26
27
def iterdir(self, path: str) -> AsyncIterator[FileEntry]:
28
"""Async iterator over files and directories at path"""
29
30
def put_file(
31
self,
32
local_file: Union[str, Path, BinaryIO],
33
remote_path: str,
34
*,
35
progress: Optional[bool] = None
36
) -> None:
37
"""Upload a local file to the volume"""
38
39
def put_directory(
40
self,
41
local_path: Union[str, Path],
42
remote_path: str,
43
*,
44
pattern: Optional[str] = None,
45
progress: Optional[bool] = None
46
) -> None:
47
"""Upload a local directory to the volume"""
48
49
def get_file(self, remote_path: str, local_file: Union[str, Path, BinaryIO]) -> None:
50
"""Download a file from the volume to local storage"""
51
52
def remove_file(self, path: str, *, recursive: bool = False) -> None:
53
"""Remove a file or directory from the volume"""
54
55
def exists(self, path: str) -> bool:
56
"""Check if a path exists in the volume"""
57
58
def reload(self) -> None:
59
"""Reload the volume to get latest state"""
60
61
class FileEntry:
62
"""Represents a file or directory entry in a volume"""
63
path: str
64
type: FileEntryType # FILE, DIRECTORY, SYMLINK, etc.
65
mtime: int # Modified time as Unix timestamp
66
size: int # Size in bytes
67
68
class FileEntryType:
69
"""Type of file entry"""
70
FILE: int
71
DIRECTORY: int
72
SYMLINK: int
73
FIFO: int
74
SOCKET: int
75
```
76
77
#### Usage Examples
78
79
```python
80
import modal
81
82
app = modal.App()
83
84
# Create a persistent volume
85
volume = modal.Volume.persist("my-data-volume")
86
87
@app.function(volumes={"/data": volume})
88
def process_data():
89
# Files are accessible at /data mount point
90
with open("/data/input.txt", "r") as f:
91
content = f.read()
92
93
# Process data and save results
94
with open("/data/output.txt", "w") as f:
95
f.write(f"Processed: {content}")
96
97
# Upload files to volume from local machine
98
@app.local_entrypoint()
99
def upload_data():
100
volume.put_file("local_data.txt", "/input.txt")
101
volume.put_directory("./datasets", "/datasets")
102
103
# List volume contents
104
for entry in volume.listdir("/"):
105
print(f"{entry.path}: {entry.type.name}, {entry.size} bytes")
106
```
107
108
### NetworkFileSystem - Shared Networked Storage
109
110
Shared networked file system that allows multiple functions to read and write files concurrently with better performance for frequent access patterns.
111
112
```python { .api }
113
class NetworkFileSystem:
114
@classmethod
115
def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "NetworkFileSystem":
116
"""Load a NetworkFileSystem by its unique name"""
117
118
@classmethod
119
def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "NetworkFileSystem":
120
"""Create a persistent NetworkFileSystem with given name"""
121
122
@classmethod
123
def ephemeral(cls, **kwargs) -> "NetworkFileSystem":
124
"""Create an ephemeral NetworkFileSystem"""
125
```
126
127
#### Usage Examples
128
129
```python
130
import modal
131
132
app = modal.App()
133
nfs = modal.NetworkFileSystem.persist("shared-cache")
134
135
@app.function(network_file_systems={"/cache": nfs})
136
def worker_function(task_id: str):
137
cache_file = f"/cache/task_{task_id}.json"
138
139
# Check if cached result exists
140
if os.path.exists(cache_file):
141
with open(cache_file, "r") as f:
142
return json.load(f)
143
144
# Process and cache result
145
result = expensive_computation(task_id)
146
with open(cache_file, "w") as f:
147
json.dump(result, f)
148
149
return result
150
```
151
152
### Dict - Persistent Key-Value Store
153
154
Distributed key-value store for sharing data between functions with automatic serialization and deserialization.
155
156
```python { .api }
157
class Dict:
158
@classmethod
159
def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "Dict":
160
"""Load a Dict by its unique name"""
161
162
@classmethod
163
def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "Dict":
164
"""Create a persistent Dict with given name"""
165
166
@classmethod
167
def ephemeral(cls, **kwargs) -> "Dict":
168
"""Create an ephemeral Dict"""
169
170
def get(self, key: str, default: Any = None) -> Any:
171
"""Get value by key, returns default if key doesn't exist"""
172
173
def __getitem__(self, key: str) -> Any:
174
"""Get value by key using dict[key] syntax"""
175
176
def put(self, key: str, value: Any) -> None:
177
"""Put key-value pair"""
178
179
def __setitem__(self, key: str, value: Any) -> None:
180
"""Set value using dict[key] = value syntax"""
181
182
def pop(self, key: str, default: Any = None) -> Any:
183
"""Remove and return value for key"""
184
185
def __delitem__(self, key: str) -> None:
186
"""Delete key using del dict[key] syntax"""
187
188
def update(self, mapping: Mapping[str, Any]) -> None:
189
"""Update multiple key-value pairs"""
190
191
def clear(self) -> None:
192
"""Remove all items from the dict"""
193
194
def len(self) -> int:
195
"""Get number of items in the dict"""
196
197
def __len__(self) -> int:
198
"""Get number of items using len(dict) syntax"""
199
200
def contains(self, key: str) -> bool:
201
"""Check if key exists in dict"""
202
203
def __contains__(self, key: str) -> bool:
204
"""Check if key exists using 'key in dict' syntax"""
205
206
def keys(self) -> list[str]:
207
"""Get all keys as a list"""
208
209
def values(self) -> list[Any]:
210
"""Get all values as a list"""
211
212
def items(self) -> list[tuple[str, Any]]:
213
"""Get all key-value pairs as list of tuples"""
214
215
def iterate_keys(self) -> AsyncIterator[str]:
216
"""Async iterator over all keys"""
217
218
def iterate_values(self) -> AsyncIterator[Any]:
219
"""Async iterator over all values"""
220
221
def iterate_items(self) -> AsyncIterator[tuple[str, Any]]:
222
"""Async iterator over all key-value pairs"""
223
224
class DictInfo:
225
"""Information about a Dict object"""
226
name: Optional[str]
227
created_at: datetime
228
created_by: Optional[str]
229
```
230
231
#### Usage Examples
232
233
```python
234
import modal
235
236
app = modal.App()
237
shared_dict = modal.Dict.persist("config-store")
238
239
@app.function()
240
def setup_config():
241
# Store configuration data
242
shared_dict["database_url"] = "postgresql://..."
243
shared_dict["api_keys"] = {"service_a": "key1", "service_b": "key2"}
244
shared_dict["feature_flags"] = {"new_feature": True, "beta_mode": False}
245
246
@app.function()
247
def worker_task():
248
# Access shared configuration
249
db_url = shared_dict["database_url"]
250
api_keys = shared_dict.get("api_keys", {})
251
252
# Check feature flag
253
if shared_dict.get("feature_flags", {}).get("new_feature", False):
254
return use_new_algorithm()
255
else:
256
return use_legacy_algorithm()
257
258
@app.function()
259
def analytics_function():
260
# Update metrics
261
current_count = shared_dict.get("request_count", 0)
262
shared_dict["request_count"] = current_count + 1
263
264
# Store processing results
265
results = shared_dict.get("daily_results", [])
266
results.append({"timestamp": time.time(), "processed": 100})
267
shared_dict["daily_results"] = results
268
```
269
270
### Queue - Distributed Task Queue
271
272
Distributed queue for asynchronous task processing with automatic serialization and FIFO ordering.
273
274
```python { .api }
275
class Queue:
276
@classmethod
277
def from_name(cls, label: str, *, environment_name: Optional[str] = None) -> "Queue":
278
"""Load a Queue by its unique name"""
279
280
@classmethod
281
def persist(cls, label: str, *, environment_name: Optional[str] = None) -> "Queue":
282
"""Create a persistent Queue with given name"""
283
284
@classmethod
285
def ephemeral(cls, **kwargs) -> "Queue":
286
"""Create an ephemeral Queue"""
287
288
def put(self, item: Any, *, block: bool = True) -> None:
289
"""Put an item into the queue"""
290
291
def put_many(self, items: list[Any], *, block: bool = True) -> None:
292
"""Put multiple items into the queue"""
293
294
def get(self, *, block: bool = True, timeout: Optional[float] = None) -> Any:
295
"""Get an item from the queue"""
296
297
def get_many(self, n: int, *, block: bool = True, timeout: Optional[float] = None) -> list[Any]:
298
"""Get multiple items from the queue"""
299
300
def iterate(self, *, timeout: Optional[float] = None) -> AsyncIterator[Any]:
301
"""Async iterator over queue items"""
302
303
def len(self) -> int:
304
"""Get approximate number of items in queue"""
305
306
def __len__(self) -> int:
307
"""Get approximate number of items using len(queue) syntax"""
308
309
class QueueInfo:
310
"""Information about a Queue object"""
311
name: Optional[str]
312
created_at: datetime
313
created_by: Optional[str]
314
```
315
316
#### Usage Examples
317
318
```python
319
import modal
320
321
app = modal.App()
322
task_queue = modal.Queue.persist("work-queue")
323
324
# Producer function
325
@app.function()
326
def generate_tasks():
327
tasks = [{"id": i, "data": f"task_{i}"} for i in range(100)]
328
task_queue.put_many(tasks)
329
print(f"Added {len(tasks)} tasks to queue")
330
331
# Consumer function
332
@app.function()
333
def process_tasks():
334
while True:
335
try:
336
# Get task with timeout
337
task = task_queue.get(timeout=30)
338
339
# Process the task
340
result = expensive_operation(task["data"])
341
print(f"Processed task {task['id']}: {result}")
342
343
except queue.Empty:
344
print("No more tasks, worker stopping")
345
break
346
347
# Batch consumer
348
@app.function()
349
def batch_processor():
350
# Process tasks in batches for efficiency
351
for batch in task_queue.iterate():
352
tasks = task_queue.get_many(10) # Get up to 10 tasks
353
results = [process_single_task(task) for task in tasks]
354
print(f"Processed batch of {len(results)} tasks")
355
```
356
357
### CloudBucketMount - Cloud Storage Integration
358
359
Mount cloud storage buckets (S3, GCS, Azure) as file systems within Modal functions.
360
361
```python { .api }
362
class CloudBucketMount:
363
@classmethod
364
def from_s3_bucket(
365
cls,
366
bucket_name: str,
367
*,
368
key_prefix: str = "",
369
secret: Optional["Secret"] = None,
370
read_only: bool = True
371
) -> "CloudBucketMount":
372
"""Mount an S3 bucket"""
373
374
@classmethod
375
def from_gcs_bucket(
376
cls,
377
bucket_name: str,
378
*,
379
key_prefix: str = "",
380
secret: Optional["Secret"] = None,
381
read_only: bool = True
382
) -> "CloudBucketMount":
383
"""Mount a Google Cloud Storage bucket"""
384
385
@classmethod
386
def from_azure_blob_storage(
387
cls,
388
account_name: str,
389
container_name: str,
390
*,
391
key_prefix: str = "",
392
secret: Optional["Secret"] = None,
393
read_only: bool = True
394
) -> "CloudBucketMount":
395
"""Mount an Azure Blob Storage container"""
396
```
397
398
#### Usage Examples
399
400
```python
401
import modal
402
403
app = modal.App()
404
405
# Mount S3 bucket
406
s3_mount = modal.CloudBucketMount.from_s3_bucket(
407
"my-data-bucket",
408
secret=modal.Secret.from_name("aws-credentials"),
409
read_only=False
410
)
411
412
@app.function(cloud_bucket_mounts={"/s3-data": s3_mount})
413
def process_s3_data():
414
# Access S3 files as local files
415
with open("/s3-data/input/data.csv", "r") as f:
416
data = f.read()
417
418
# Process data
419
result = analyze_data(data)
420
421
# Write results back to S3
422
with open("/s3-data/output/results.json", "w") as f:
423
json.dump(result, f)
424
425
# Mount GCS bucket
426
gcs_mount = modal.CloudBucketMount.from_gcs_bucket(
427
"my-gcs-bucket",
428
secret=modal.Secret.from_name("gcp-credentials")
429
)
430
431
@app.function(cloud_bucket_mounts={"/gcs-data": gcs_mount})
432
def backup_to_gcs():
433
# Read local data
434
local_files = os.listdir("/tmp/data")
435
436
# Copy to GCS through mount
437
for filename in local_files:
438
shutil.copy(f"/tmp/data/{filename}", f"/gcs-data/backup/{filename}")
439
```
440
441
## Storage Patterns
442
443
### Data Pipeline with Multiple Storage Types
444
445
```python
446
import modal
447
448
app = modal.App()
449
450
# Different storage for different use cases
451
raw_data_volume = modal.Volume.persist("raw-data") # Large file storage
452
processed_cache = modal.NetworkFileSystem.persist("cache") # Fast shared access
453
config_dict = modal.Dict.persist("pipeline-config") # Configuration
454
task_queue = modal.Queue.persist("processing-queue") # Task coordination
455
456
@app.function(
457
volumes={"/raw": raw_data_volume},
458
network_file_systems={"/cache": processed_cache}
459
)
460
def data_pipeline():
461
# Get configuration
462
batch_size = config_dict.get("batch_size", 100)
463
464
# Process raw data files
465
for filename in os.listdir("/raw/input"):
466
# Check cache first
467
cache_key = f"/cache/processed_{filename}"
468
if os.path.exists(cache_key):
469
continue # Already processed
470
471
# Process file
472
with open(f"/raw/input/{filename}", "r") as f:
473
data = f.read()
474
475
result = process_data_file(data)
476
477
# Cache result
478
with open(cache_key, "w") as f:
479
json.dump(result, f)
480
481
# Queue downstream tasks
482
task_queue.put({"type": "notify", "file": filename, "result": result})
483
```
484
485
### Shared State Between Functions
486
487
```python
488
import modal
489
490
app = modal.App()
491
shared_state = modal.Dict.persist("worker-state")
492
493
@app.function()
494
def coordinator():
495
# Initialize shared state
496
shared_state["active_workers"] = 0
497
shared_state["total_processed"] = 0
498
shared_state["status"] = "starting"
499
500
# Start workers
501
for i in range(5):
502
worker.spawn(f"worker-{i}")
503
504
@app.function()
505
def worker(worker_id: str):
506
# Register worker
507
current_workers = shared_state.get("active_workers", 0)
508
shared_state["active_workers"] = current_workers + 1
509
510
try:
511
# Do work
512
for task in get_tasks():
513
result = process_task(task)
514
515
# Update shared counters
516
total = shared_state.get("total_processed", 0)
517
shared_state["total_processed"] = total + 1
518
519
finally:
520
# Unregister worker
521
current_workers = shared_state.get("active_workers", 0)
522
shared_state["active_workers"] = max(0, current_workers - 1)
523
524
# Check if all workers done
525
if shared_state["active_workers"] == 0:
526
shared_state["status"] = "completed"
527
```