0
# Object Store
1
2
NATS Object Store provides file-like storage for larger payloads with metadata, chunking, compression, and linking capabilities built on JetStream streams.
3
4
## Capabilities
5
6
### Object Store Access
7
8
Create and access Object Stores through JetStream views.
9
10
```typescript { .api }
11
/**
12
* Get or create an Object Store
13
* @param name - Object store bucket name
14
* @param opts - Object store configuration options
15
* @returns Promise resolving to ObjectStore instance
16
*/
17
os(name: string, opts?: Partial<ObjectStoreOptions>): Promise<ObjectStore>;
18
19
interface Views {
20
kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;
21
os: (name: string, opts?: Partial<ObjectStoreOptions>) => Promise<ObjectStore>;
22
}
23
24
interface ObjectStoreOptions {
25
/** Bucket description */
26
description?: string;
27
/** Maximum object size in bytes */
28
max_object_size?: number;
29
/** Storage type (file or memory) */
30
storage?: StorageType;
31
/** Number of replicas for HA */
32
replicas?: number;
33
/** Object TTL in nanoseconds */
34
ttl?: number;
35
/** Enable compression */
36
compression?: StoreCompression;
37
/** Bucket placement constraints */
38
placement?: Placement;
39
/** Custom republish configuration */
40
republish?: Republish;
41
/** Mirror another object store */
42
mirror?: StreamSource;
43
/** Source other object stores */
44
sources?: StreamSource[];
45
}
46
```
47
48
**Usage Examples:**
49
50
```typescript
51
import { connect, StorageType, StoreCompression } from "nats";
52
53
const nc = await connect();
54
const js = nc.jetstream();
55
56
// Create or get Object Store
57
const os = await js.views.os("file-storage");
58
59
// Create Object Store with options
60
const os = await js.views.os("media-store", {
61
description: "Media file storage",
62
max_object_size: 10 * 1024 * 1024, // 10MB max per object
63
storage: StorageType.File,
64
compression: StoreCompression.S2,
65
replicas: 3,
66
ttl: 30 * 24 * 60 * 60 * 1000 * 1000 * 1000 // 30 days in nanoseconds
67
});
68
```
69
70
### Basic Object Operations
71
72
Store, retrieve, and manage objects with metadata.
73
74
```typescript { .api }
75
interface ObjectStore {
76
/**
77
* Get information about an object
78
* @param name - Object name
79
* @returns Promise resolving to object info or null if not found
80
*/
81
info(name: string): Promise<ObjectInfo | null>;
82
83
/**
84
* Store an object with metadata
85
* @param meta - Object metadata including name and optional properties
86
* @param payload - Object data as async iterable of byte chunks
87
* @returns Promise resolving to object information
88
*/
89
put(meta: ObjectStoreMeta, payload: AsyncIterable<Uint8Array>): Promise<ObjectInfo>;
90
91
/**
92
* Retrieve an object
93
* @param name - Object name
94
* @returns Promise resolving to object result or null if not found
95
*/
96
get(name: string): Promise<ObjectResult | null>;
97
98
/**
99
* Delete an object
100
* @param name - Object name
101
* @returns Promise resolving to true if deleted
102
*/
103
delete(name: string): Promise<boolean>;
104
105
/**
106
* List all objects in the store
107
* @returns Promise resolving to async iterator of object info
108
*/
109
list(): Promise<QueuedIterator<ObjectInfo>>;
110
}
111
112
interface ObjectStoreMeta {
113
/** Object name (required) */
114
name: string;
115
/** Object description */
116
description?: string;
117
/** Object MIME type */
118
mime?: string;
119
/** Maximum chunk size for large objects */
120
max_chunk_size?: number;
121
/** Custom metadata headers */
122
headers?: MsgHdrs;
123
/** Object options */
124
options?: ObjectStoreMetaOptions;
125
}
126
127
interface ObjectStoreMetaOptions {
128
/** Link to another object instead of storing data */
129
link?: ObjectStoreLink;
130
/** Chunk size for splitting large objects */
131
chunk_size?: number;
132
}
133
134
interface ObjectInfo {
135
/** Object store bucket name */
136
bucket: string;
137
/** Object name */
138
name: string;
139
/** Object description */
140
description?: string;
141
/** Object MIME type */
142
mime?: string;
143
/** Object size in bytes */
144
size: number;
145
/** Number of chunks */
146
chunks: number;
147
/** Object digest/checksum */
148
digest: string;
149
/** Object creation time */
150
mtime: Date;
151
/** Custom headers */
152
headers?: MsgHdrs;
153
/** True if object is deleted */
154
deleted: boolean;
155
/** Link information if this is a link */
156
link?: ObjectStoreLink;
157
}
158
159
interface ObjectResult {
160
/** Object information */
161
info: ObjectInfo;
162
/** Object data as async iterator */
163
data: AsyncIterable<Uint8Array>;
164
/** Close/cleanup function */
165
close?: () => void;
166
}
167
168
interface ObjectStoreLink {
169
/** Linked object store bucket */
170
bucket?: string;
171
/** Linked object name */
172
name: string;
173
}
174
```
175
176
**Usage Examples:**
177
178
```typescript
179
import { connect, headers } from "nats";
180
import { readFileSync, createReadStream } from "fs";
181
182
const nc = await connect();
183
const js = nc.jetstream();
184
const os = await js.views.os("documents");
185
186
// Store a file from buffer
187
const fileData = readFileSync("document.pdf");
188
async function* bufferToChunks(buffer: Uint8Array, chunkSize = 64 * 1024) {
189
for (let i = 0; i < buffer.length; i += chunkSize) {
190
yield buffer.slice(i, i + chunkSize);
191
}
192
}
193
194
const meta = {
195
name: "document.pdf",
196
description: "Important document",
197
mime: "application/pdf"
198
};
199
200
const info = await os.put(meta, bufferToChunks(fileData));
201
console.log(`Stored object: ${info.name}, size: ${info.size} bytes, chunks: ${info.chunks}`);
202
203
// Store with custom headers
204
const hdrs = headers();
205
hdrs.set("author", "John Doe");
206
hdrs.set("version", "1.0");
207
208
const metaWithHeaders = {
209
name: "report.json",
210
mime: "application/json",
211
headers: hdrs
212
};
213
214
const jsonData = JSON.stringify({ report: "data" });
215
const info2 = await os.put(metaWithHeaders, bufferToChunks(new TextEncoder().encode(jsonData)));
216
217
// Retrieve object
218
const result = await os.get("document.pdf");
219
if (result) {
220
console.log(`Object: ${result.info.name}, MIME: ${result.info.mime}`);
221
222
// Read all chunks
223
const chunks: Uint8Array[] = [];
224
for await (const chunk of result.data) {
225
chunks.push(chunk);
226
}
227
228
// Reconstruct complete file
229
const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
230
const completeFile = new Uint8Array(totalLength);
231
let offset = 0;
232
for (const chunk of chunks) {
233
completeFile.set(chunk, offset);
234
offset += chunk.length;
235
}
236
237
console.log(`Reconstructed file size: ${completeFile.length} bytes`);
238
}
239
240
// Get object information only
241
const objInfo = await os.info("document.pdf");
242
if (objInfo) {
243
console.log(`Object info: ${objInfo.name}, size: ${objInfo.size}, modified: ${objInfo.mtime}`);
244
}
245
```
246
247
### Object Listing and Watching
248
249
List objects and monitor changes in the object store.
250
251
```typescript { .api }
252
interface ObjectStore {
253
/**
254
* List all objects in the store
255
* @returns Promise resolving to async iterator of object info
256
*/
257
list(): Promise<QueuedIterator<ObjectInfo>>;
258
259
/**
260
* Watch for object changes
261
* @param opts - Watch options
262
* @returns Promise resolving to async iterator of object info
263
*/
264
watch(opts?: Partial<ObjectStoreMetaOptions>): Promise<QueuedIterator<ObjectInfo>>;
265
266
/**
267
* Get object store status and statistics
268
* @returns Promise resolving to object store status
269
*/
270
status(): Promise<ObjectStoreStatus>;
271
272
/**
273
* Destroy object store (delete underlying stream)
274
* @returns Promise resolving to true if destroyed
275
*/
276
destroy(): Promise<boolean>;
277
}
278
279
interface ObjectStoreStatus {
280
/** Store bucket name */
281
bucket: string;
282
/** Number of objects */
283
size: number;
284
/** Total bytes stored */
285
bytes: number;
286
/** Store description */
287
description?: string;
288
/** TTL setting */
289
ttl: number;
290
/** Storage type */
291
storage: StorageType;
292
/** Number of replicas */
293
replicas: number;
294
/** Compression setting */
295
compression: StoreCompression;
296
/** Underlying stream info */
297
streamInfo: StreamInfo;
298
/** Backing stream name */
299
backingStore: string;
300
}
301
302
interface QueuedIterator<T> {
303
next(): Promise<IteratorResult<T>>;
304
stop(): void;
305
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
306
}
307
```
308
309
**Usage Examples:**
310
311
```typescript
312
import { connect } from "nats";
313
314
const nc = await connect();
315
const js = nc.jetstream();
316
const os = await js.views.os("media-files");
317
318
// List all objects
319
console.log("Objects in store:");
320
const objectList = await os.list();
321
for await (const obj of objectList) {
322
console.log(`- ${obj.name} (${obj.size} bytes, ${obj.mime || 'unknown type'})`);
323
if (obj.deleted) {
324
console.log(` [DELETED]`);
325
}
326
if (obj.link) {
327
console.log(` [LINK to ${obj.link.bucket}/${obj.link.name}]`);
328
}
329
}
330
331
// Watch for object changes
332
const watcher = await os.watch();
333
(async () => {
334
for await (const obj of watcher) {
335
if (obj.deleted) {
336
console.log(`Object deleted: ${obj.name}`);
337
} else {
338
console.log(`Object updated: ${obj.name} (${obj.size} bytes)`);
339
}
340
}
341
})();
342
343
// Get store statistics
344
const status = await os.status();
345
console.log(`Store: ${status.bucket}`);
346
console.log(`Objects: ${status.size}`);
347
console.log(`Total bytes: ${status.bytes}`);
348
console.log(`Storage: ${status.storage}`);
349
console.log(`Compression: ${status.compression}`);
350
```
351
352
### Large File Handling
353
354
Handle large files with streaming and chunking.
355
356
```typescript
357
// Store large file with streaming
358
async function storeFile(os: ObjectStore, filePath: string, objectName: string) {
359
const fs = require('fs');
360
const path = require('path');
361
362
const stats = fs.statSync(filePath);
363
const stream = fs.createReadStream(filePath, { highWaterMark: 64 * 1024 }); // 64KB chunks
364
365
// Convert Node.js stream to async iterable
366
async function* streamToAsyncIterable(stream: any) {
367
for await (const chunk of stream) {
368
yield chunk;
369
}
370
}
371
372
const meta = {
373
name: objectName,
374
description: `File: ${path.basename(filePath)}`,
375
mime: getMimeType(filePath), // Your MIME type detection
376
max_chunk_size: 64 * 1024
377
};
378
379
const info = await os.put(meta, streamToAsyncIterable(stream));
380
console.log(`Stored ${filePath} as ${objectName}: ${info.size} bytes in ${info.chunks} chunks`);
381
return info;
382
}
383
384
// Retrieve and save large file
385
async function retrieveFile(os: ObjectStore, objectName: string, outputPath: string) {
386
const fs = require('fs');
387
388
const result = await os.get(objectName);
389
if (!result) {
390
throw new Error(`Object ${objectName} not found`);
391
}
392
393
const writeStream = fs.createWriteStream(outputPath);
394
395
for await (const chunk of result.data) {
396
writeStream.write(chunk);
397
}
398
399
writeStream.end();
400
console.log(`Retrieved ${objectName} to ${outputPath}: ${result.info.size} bytes`);
401
}
402
403
// Usage
404
const os = await js.views.os("large-files");
405
await storeFile(os, "/path/to/large-video.mp4", "video-2023-01.mp4");
406
await retrieveFile(os, "video-2023-01.mp4", "/path/to/downloaded-video.mp4");
407
```
408
409
### Object Linking
410
411
Create links between objects and stores.
412
413
```typescript
414
// Create object link
415
async function createObjectLink(os: ObjectStore, linkName: string, targetBucket: string, targetObject: string) {
416
const meta: ObjectStoreMeta = {
417
name: linkName,
418
description: `Link to ${targetBucket}/${targetObject}`,
419
options: {
420
link: {
421
bucket: targetBucket,
422
name: targetObject
423
}
424
}
425
};
426
427
// Empty async iterable for link (no actual data)
428
const emptyData = async function* (): AsyncIterable<Uint8Array> {}();
429
430
const info = await os.put(meta, emptyData);
431
console.log(`Created link: ${linkName} -> ${targetBucket}/${targetObject}`);
432
return info;
433
}
434
435
// Follow object links
436
async function resolveObjectLink(os: ObjectStore, objectName: string): Promise<ObjectResult | null> {
437
const info = await os.info(objectName);
438
if (!info) return null;
439
440
if (info.link) {
441
// This is a link, resolve it
442
if (info.link.bucket && info.link.bucket !== info.bucket) {
443
// Link to different bucket - would need to access that bucket
444
console.log(`Link points to different bucket: ${info.link.bucket}/${info.link.name}`);
445
return null;
446
} else {
447
// Link within same bucket
448
return await os.get(info.link.name);
449
}
450
} else {
451
// Regular object
452
return await os.get(objectName);
453
}
454
}
455
456
// Usage
457
const os = await js.views.os("documents");
458
await createObjectLink(os, "latest-report", "documents", "report-2023-12.pdf");
459
460
const result = await resolveObjectLink(os, "latest-report");
461
if (result) {
462
console.log(`Resolved link to actual object: ${result.info.name}`);
463
}
464
```
465
466
### Object Store Management
467
468
Administrative operations for object store lifecycle.
469
470
```typescript
471
// Object store utilities
472
class ObjectStoreManager {
473
constructor(private js: JetStreamClient) {}
474
475
async createStore(name: string, opts?: Partial<ObjectStoreOptions>): Promise<ObjectStore> {
476
return await this.js.views.os(name, opts);
477
}
478
479
async listStores(): Promise<ObjectStoreStatus[]> {
480
const jsm = await this.js.getConnection().jetstreamManager();
481
const stores: ObjectStoreStatus[] = [];
482
483
const storeList = jsm.streams.listObjectStores();
484
for await (const store of storeList) {
485
stores.push(store);
486
}
487
488
return stores;
489
}
490
491
async cloneStore(sourceName: string, targetName: string): Promise<ObjectStore> {
492
const source = await this.js.views.os(sourceName);
493
const sourceStatus = await source.status();
494
495
// Create target store with same configuration
496
const target = await this.js.views.os(targetName, {
497
description: `Clone of ${sourceName}`,
498
storage: sourceStatus.storage,
499
compression: sourceStatus.compression,
500
replicas: sourceStatus.replicas
501
});
502
503
// Copy all objects
504
const objectList = await source.list();
505
for await (const objInfo of objectList) {
506
if (!objInfo.deleted) {
507
const result = await source.get(objInfo.name);
508
if (result) {
509
const meta: ObjectStoreMeta = {
510
name: objInfo.name,
511
description: objInfo.description,
512
mime: objInfo.mime,
513
headers: objInfo.headers
514
};
515
516
await target.put(meta, result.data);
517
console.log(`Copied: ${objInfo.name}`);
518
}
519
}
520
}
521
522
return target;
523
}
524
525
async getStoreMetrics(storeName: string): Promise<{
526
objects: number;
527
totalSize: number;
528
avgSize: number;
529
mimeTypes: Record<string, number>;
530
}> {
531
const os = await this.js.views.os(storeName);
532
const objectList = await os.list();
533
534
let objects = 0;
535
let totalSize = 0;
536
const mimeTypeCounts: Record<string, number> = {};
537
538
for await (const obj of objectList) {
539
if (!obj.deleted) {
540
objects++;
541
totalSize += obj.size;
542
543
const mime = obj.mime || 'unknown';
544
mimeTypeCounts[mime] = (mimeTypeCounts[mime] || 0) + 1;
545
}
546
}
547
548
return {
549
objects,
550
totalSize,
551
avgSize: objects > 0 ? totalSize / objects : 0,
552
mimeTypes: mimeTypeCounts
553
};
554
}
555
}
556
557
// Usage
558
const nc = await connect();
559
const js = nc.jetstream();
560
const manager = new ObjectStoreManager(js);
561
562
// List all object stores
563
const stores = await manager.listStores();
564
for (const store of stores) {
565
console.log(`Store: ${store.bucket}, Objects: ${store.size}, Size: ${store.bytes} bytes`);
566
}
567
568
// Get detailed metrics
569
const metrics = await manager.getStoreMetrics("media-files");
570
console.log(`Objects: ${metrics.objects}`);
571
console.log(`Total size: ${metrics.totalSize} bytes`);
572
console.log(`Average size: ${metrics.avgSize} bytes`);
573
console.log(`MIME types:`, metrics.mimeTypes);
574
575
// Clone a store
576
const clonedStore = await manager.cloneStore("production-files", "backup-files");
577
```
578
579
## Types
580
581
```typescript { .api }
582
enum StorageType {
583
File = "file",
584
Memory = "memory"
585
}
586
587
enum StoreCompression {
588
None = "none",
589
S2 = "s2"
590
}
591
592
interface StreamSource {
593
name: string;
594
opt_start_seq?: number;
595
opt_start_time?: string;
596
filter_subject?: string;
597
external?: ExternalStream;
598
}
599
600
interface ExternalStream {
601
api: string;
602
deliver?: string;
603
}
604
605
interface Placement {
606
cluster?: string;
607
tags?: string[];
608
}
609
610
interface Republish {
611
src?: string;
612
dest: string;
613
headers_only?: boolean;
614
}
615
616
interface StreamInfo {
617
config: StreamConfig;
618
state: StreamState;
619
created: Date;
620
ts: Date;
621
alternates?: StreamAlternate[];
622
}
623
624
interface StreamState {
625
messages: number;
626
bytes: number;
627
first_seq: number;
628
first_ts: Date;
629
last_seq: number;
630
last_ts: Date;
631
num_subjects?: number;
632
subjects?: Record<string, number>;
633
num_deleted?: number;
634
deleted?: number[];
635
lost?: LostStreamData;
636
consumer_count: number;
637
}
638
639
interface LostStreamData {
640
msgs?: number[];
641
bytes?: number;
642
}
643
```