0
# Object Store
1
2
Scalable object storage built on JetStream with metadata support, automatic chunking for large objects, content addressing, and efficient streaming capabilities for binary data and files.
3
4
## Capabilities
5
6
### Object Storage Operations
7
8
Core operations for storing and retrieving objects with metadata and content integrity.
9
10
```python { .api }
11
class ObjectStore:
12
async def put(
13
self,
14
name: str,
15
data: bytes,
16
description: str = None,
17
filename: str = None,
18
headers: Dict[str, str] = None,
19
**kwargs
20
) -> ObjectInfo:
21
"""
22
Store object with optional metadata.
23
24
Parameters:
25
- name: Object name/key
26
- data: Object data as bytes
27
- description: Object description
28
- filename: Original filename
29
- headers: Custom headers/metadata
30
31
Returns:
32
Object information with metadata
33
34
Raises:
35
- ObjectAlreadyExists: Object already exists for create operations
36
- BadObjectMetaError: Invalid metadata
37
"""
38
39
async def get(self, name: str) -> bytes:
40
"""
41
Retrieve complete object data.
42
43
Parameters:
44
- name: Object name to retrieve
45
46
Returns:
47
Object data as bytes
48
49
Raises:
50
- ObjectNotFoundError: Object does not exist
51
- ObjectDeletedError: Object was deleted
52
"""
53
54
async def get_info(
55
self,
56
name: str,
57
show_deleted: bool = False
58
) -> ObjectInfo:
59
"""
60
Get object metadata and information.
61
62
Parameters:
63
- name: Object name
64
- show_deleted: Include deleted objects
65
66
Returns:
67
Object information with metadata
68
69
Raises:
70
- ObjectNotFoundError: Object does not exist
71
- ObjectDeletedError: Object was deleted and show_deleted=False
72
"""
73
74
async def delete(self, name: str) -> bool:
75
"""
76
Delete object from store.
77
78
Parameters:
79
- name: Object name to delete
80
81
Returns:
82
True if object was deleted
83
84
Raises:
85
- ObjectNotFoundError: Object does not exist
86
"""
87
```
88
89
#### Usage Examples
90
91
```python
92
import asyncio
93
import nats
94
95
async def main():
96
nc = await nats.connect()
97
js = nc.jetstream()
98
99
# Get or create object store
100
os = await js.object_store("file-storage")
101
102
# Store a file
103
with open("document.pdf", "rb") as f:
104
file_data = f.read()
105
106
obj_info = await os.put(
107
name="documents/report-2024.pdf",
108
data=file_data,
109
description="Quarterly report 2024",
110
filename="report-2024.pdf",
111
headers={
112
"Content-Type": "application/pdf",
113
"Author": "Alice Johnson",
114
"Department": "Sales"
115
}
116
)
117
118
print(f"Stored object: {obj_info.name}")
119
print(f"Size: {obj_info.size} bytes")
120
print(f"Chunks: {obj_info.chunks}")
121
122
# Retrieve object info
123
info = await os.get_info("documents/report-2024.pdf")
124
print(f"Created: {info.mtime}")
125
print(f"Digest: {info.digest}")
126
127
# Retrieve object data
128
retrieved_data = await os.get("documents/report-2024.pdf")
129
print(f"Retrieved {len(retrieved_data)} bytes")
130
131
# Verify integrity
132
assert retrieved_data == file_data
133
134
# Delete object
135
await os.delete("documents/report-2024.pdf")
136
```
137
138
### Object Listing and Management
139
140
List and manage objects in the store.
141
142
```python { .api }
143
class ObjectStore:
144
async def list(self, **kwargs) -> List[ObjectInfo]:
145
"""
146
List all objects in store.
147
148
Returns:
149
List of object information entries
150
"""
151
152
async def status(self) -> ObjectStoreStatus:
153
"""
154
Get object store status and statistics.
155
156
Returns:
157
Store status with usage information
158
"""
159
160
async def seal(self) -> bool:
161
"""
162
Seal object store (make read-only).
163
164
Returns:
165
True if store was sealed
166
"""
167
```
168
169
#### Usage Examples
170
171
```python
172
# List all objects
173
objects = await os.list()
174
for obj in objects:
175
print(f"Object: {obj.name}")
176
print(f" Size: {obj.size} bytes")
177
print(f" Modified: {obj.mtime}")
178
print(f" Description: {obj.description}")
179
180
# Get store statistics
181
status = await os.status()
182
print(f"Store: {status.bucket}")
183
print(f"Objects: {status.count}")
184
print(f"Total size: {status.size} bytes")
185
186
# Seal store to prevent modifications
187
await os.seal()
188
print("Store is now read-only")
189
```
190
191
### Object Metadata Management
192
193
Update and manage object metadata without changing content.
194
195
```python { .api }
196
class ObjectStore:
197
async def update_meta(self, name: str, meta: ObjectMeta) -> ObjectInfo:
198
"""
199
Update object metadata without changing content.
200
201
Parameters:
202
- name: Object name
203
- meta: Updated metadata
204
205
Returns:
206
Updated object information
207
208
Raises:
209
- ObjectNotFoundError: Object does not exist
210
- BadObjectMetaError: Invalid metadata
211
"""
212
```
213
214
#### Usage Examples
215
216
```python
217
from nats.js.api import ObjectMeta
218
219
# Update object metadata
220
new_meta = ObjectMeta(
221
name="documents/report-2024.pdf",
222
description="Updated quarterly report 2024",
223
headers={
224
"Content-Type": "application/pdf",
225
"Author": "Alice Johnson",
226
"Department": "Sales",
227
"Status": "Final",
228
"Version": "2.0"
229
}
230
)
231
232
updated_info = await os.update_meta("documents/report-2024.pdf", new_meta)
233
print(f"Updated metadata for {updated_info.name}")
234
```
235
236
### Object Watching
237
238
Monitor object store for changes in real-time.
239
240
```python { .api }
241
class ObjectStore:
242
async def watch(self, **kwargs) -> AsyncIterator[ObjectInfo]:
243
"""
244
Watch object store for changes.
245
246
Returns:
247
Async iterator yielding object info for changes
248
"""
249
```
250
251
#### Usage Examples
252
253
```python
254
# Watch for object store changes
255
async def watch_objects():
256
async for obj_info in os.watch():
257
if obj_info.deleted:
258
print(f"Object deleted: {obj_info.name}")
259
else:
260
print(f"Object changed: {obj_info.name}")
261
print(f" Size: {obj_info.size} bytes")
262
print(f" Modified: {obj_info.mtime}")
263
264
# Run watcher
265
await watch_objects()
266
```
267
268
### JetStream Integration
269
270
Create and manage object stores through JetStream context.
271
272
```python { .api }
273
class JetStreamContext:
274
async def object_store(self, bucket: str) -> ObjectStore:
275
"""
276
Get existing object store.
277
278
Parameters:
279
- bucket: Bucket name
280
281
Returns:
282
ObjectStore instance
283
284
Raises:
285
- BucketNotFoundError: Bucket does not exist
286
"""
287
288
async def create_object_store(
289
self,
290
config: ObjectStoreConfig = None,
291
**params
292
) -> ObjectStore:
293
"""
294
Create new object store.
295
296
Parameters:
297
- config: Complete bucket configuration
298
- **params: Individual configuration parameters
299
300
Returns:
301
ObjectStore instance
302
303
Raises:
304
- BadBucketError: Invalid configuration
305
"""
306
307
async def delete_object_store(self, bucket: str) -> bool:
308
"""
309
Delete object store and all objects.
310
311
Parameters:
312
- bucket: Bucket name to delete
313
314
Returns:
315
True if bucket was deleted
316
"""
317
```
318
319
#### Usage Examples
320
321
```python
322
from nats.js.api import ObjectStoreConfig
323
from datetime import timedelta
324
325
# Create object store with configuration
326
os_config = ObjectStoreConfig(
327
bucket="media-files",
328
description="Media file storage",
329
max_bytes=10*1024*1024*1024, # 10GB
330
storage="file",
331
replicas=3,
332
ttl=timedelta(days=365) # 1 year retention
333
)
334
335
os = await js.create_object_store(config=os_config)
336
337
# Create simple store with parameters
338
os = await js.create_object_store(
339
bucket="temp-files",
340
max_bytes=1024*1024*1024, # 1GB
341
ttl=timedelta(hours=24)
342
)
343
344
# Get existing store
345
os = await js.object_store("media-files")
346
347
# Delete store
348
await js.delete_object_store("old-files")
349
```
350
351
## Data Types
352
353
```python { .api }
354
from dataclasses import dataclass
355
from typing import Optional, Dict, List
356
from datetime import datetime, timedelta
357
358
@dataclass
359
class ObjectInfo:
360
"""Object information and metadata."""
361
name: str
362
description: Optional[str]
363
headers: Optional[Dict[str, str]]
364
size: int
365
chunks: int
366
digest: str
367
deleted: bool
368
mtime: datetime
369
nuid: str
370
bucket: str
371
links: Optional[List[str]] = None
372
373
@dataclass
374
class ObjectMeta:
375
"""Object metadata for updates."""
376
name: str
377
description: Optional[str] = None
378
headers: Optional[Dict[str, str]] = None
379
options: Optional[ObjectMetaOptions] = None
380
381
@dataclass
382
class ObjectMetaOptions:
383
"""Object metadata options."""
384
link: Optional[ObjectLink] = None
385
chunk_size: Optional[int] = None
386
max_chunk_size: int = 128 * 1024 # 128KB
387
388
@dataclass
389
class ObjectLink:
390
"""Object link configuration."""
391
bucket: str
392
name: str
393
394
@dataclass
395
class ObjectStoreStatus:
396
"""Object store status and statistics."""
397
bucket: str
398
description: Optional[str]
399
ttl: Optional[timedelta]
400
storage: str
401
replicas: int
402
sealed: bool
403
size: int
404
count: int
405
backing_store: str # "JetStream"
406
407
@dataclass
408
class ObjectStoreConfig:
409
"""Object store configuration."""
410
bucket: str
411
description: Optional[str] = None
412
ttl: Optional[timedelta] = None
413
max_bytes: int = -1
414
storage: str = "file" # "file", "memory"
415
replicas: int = 1
416
placement: Optional[Placement] = None
417
metadata: Optional[Dict[str, str]] = None
418
```
419
420
## Constants
421
422
```python { .api }
423
# Object store templates
424
OBJ_ALL_CHUNKS_PRE_TEMPLATE = "OBJ_ALL_CHUNKS"
425
OBJ_ALL_META_PRE_TEMPLATE = "OBJ_ALL_META"
426
OBJ_STREAM_TEMPLATE = "OBJ_STREAM"
427
428
# Chunk size limits
429
DEFAULT_CHUNK_SIZE = 128 * 1024 # 128KB
430
MAX_CHUNK_SIZE = 1024 * 1024 # 1MB
431
432
# Validation patterns
433
VALID_BUCKET_RE = r"^[a-zA-Z0-9_-]+$"
434
```
435
436
## Advanced Usage
437
438
### Large File Handling
439
440
Object store automatically chunks large files for efficient storage and transfer.
441
442
```python
443
# Store large file (automatically chunked)
444
large_file_data = b"x" * (10 * 1024 * 1024) # 10MB
445
obj_info = await os.put("large-file.bin", large_file_data)
446
print(f"File stored in {obj_info.chunks} chunks")
447
448
# Retrieve works transparently
449
retrieved = await os.get("large-file.bin")
450
assert len(retrieved) == len(large_file_data)
451
```
452
453
### Content Integrity
454
455
All objects are stored with SHA-256 digests for content verification.
456
457
```python
458
# Object info includes content digest
459
info = await os.get_info("document.pdf")
460
print(f"Content digest: {info.digest}")
461
462
# Verify content manually if needed
463
import hashlib
464
data = await os.get("document.pdf")
465
computed_digest = hashlib.sha256(data).hexdigest()
466
assert computed_digest == info.digest.replace("SHA-256=", "")
467
```
468
469
### Object Links
470
471
Create references between objects without duplicating data.
472
473
```python
474
from nats.js.api import ObjectMeta, ObjectLink
475
476
# Create link to existing object
477
link_meta = ObjectMeta(
478
name="documents/current-report.pdf",
479
description="Link to current quarterly report",
480
options=ObjectMetaOptions(
481
link=ObjectLink(
482
bucket="file-storage",
483
name="documents/report-2024.pdf"
484
)
485
)
486
)
487
488
await os.update_meta("documents/current-report.pdf", link_meta)
489
```