Node.js client for NATS, a lightweight, high-performance cloud native messaging system
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
NATS Object Store provides file-like storage for larger payloads with metadata, chunking, compression, and linking capabilities built on JetStream streams.
Create and access Object Stores through JetStream views.
/**
* Get or create an Object Store
* @param name - Object store bucket name
* @param opts - Object store configuration options
* @returns Promise resolving to ObjectStore instance
*/
os(name: string, opts?: Partial<ObjectStoreOptions>): Promise<ObjectStore>;
interface Views {
kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;
os: (name: string, opts?: Partial<ObjectStoreOptions>) => Promise<ObjectStore>;
}
interface ObjectStoreOptions {
/** Bucket description */
description?: string;
/** Maximum object size in bytes */
max_object_size?: number;
/** Storage type (file or memory) */
storage?: StorageType;
/** Number of replicas for HA */
replicas?: number;
/** Object TTL in nanoseconds */
ttl?: number;
/** Enable compression */
compression?: StoreCompression;
/** Bucket placement constraints */
placement?: Placement;
/** Custom republish configuration */
republish?: Republish;
/** Mirror another object store */
mirror?: StreamSource;
/** Source other object stores */
sources?: StreamSource[];
}Usage Examples:
import { connect, StorageType, StoreCompression } from "nats";
const nc = await connect();
const js = nc.jetstream();
// Create or get Object Store
const os = await js.views.os("file-storage");
// Create Object Store with options
const os = await js.views.os("media-store", {
description: "Media file storage",
max_object_size: 10 * 1024 * 1024, // 10MB max per object
storage: StorageType.File,
compression: StoreCompression.S2,
replicas: 3,
ttl: 30 * 24 * 60 * 60 * 1000 * 1000 * 1000 // 30 days in nanoseconds
});Store, retrieve, and manage objects with metadata.
interface ObjectStore {
/**
* Get information about an object
* @param name - Object name
* @returns Promise resolving to object info or null if not found
*/
info(name: string): Promise<ObjectInfo | null>;
/**
* Store an object with metadata
* @param meta - Object metadata including name and optional properties
* @param payload - Object data as async iterable of byte chunks
* @returns Promise resolving to object information
*/
put(meta: ObjectStoreMeta, payload: AsyncIterable<Uint8Array>): Promise<ObjectInfo>;
/**
* Retrieve an object
* @param name - Object name
* @returns Promise resolving to object result or null if not found
*/
get(name: string): Promise<ObjectResult | null>;
/**
* Delete an object
* @param name - Object name
* @returns Promise resolving to true if deleted
*/
delete(name: string): Promise<boolean>;
/**
* List all objects in the store
* @returns Promise resolving to async iterator of object info
*/
list(): Promise<QueuedIterator<ObjectInfo>>;
}
interface ObjectStoreMeta {
/** Object name (required) */
name: string;
/** Object description */
description?: string;
/** Object MIME type */
mime?: string;
/** Maximum chunk size for large objects */
max_chunk_size?: number;
/** Custom metadata headers */
headers?: MsgHdrs;
/** Object options */
options?: ObjectStoreMetaOptions;
}
interface ObjectStoreMetaOptions {
/** Link to another object instead of storing data */
link?: ObjectStoreLink;
/** Chunk size for splitting large objects */
chunk_size?: number;
}
interface ObjectInfo {
/** Object store bucket name */
bucket: string;
/** Object name */
name: string;
/** Object description */
description?: string;
/** Object MIME type */
mime?: string;
/** Object size in bytes */
size: number;
/** Number of chunks */
chunks: number;
/** Object digest/checksum */
digest: string;
/** Object creation time */
mtime: Date;
/** Custom headers */
headers?: MsgHdrs;
/** True if object is deleted */
deleted: boolean;
/** Link information if this is a link */
link?: ObjectStoreLink;
}
interface ObjectResult {
/** Object information */
info: ObjectInfo;
/** Object data as async iterator */
data: AsyncIterable<Uint8Array>;
/** Close/cleanup function */
close?: () => void;
}
interface ObjectStoreLink {
/** Linked object store bucket */
bucket?: string;
/** Linked object name */
name: string;
}Usage Examples:
import { connect, headers } from "nats";
import { readFileSync, createReadStream } from "fs";
const nc = await connect();
const js = nc.jetstream();
const os = await js.views.os("documents");
// Store a file from buffer
const fileData = readFileSync("document.pdf");
async function* bufferToChunks(buffer: Uint8Array, chunkSize = 64 * 1024) {
for (let i = 0; i < buffer.length; i += chunkSize) {
yield buffer.slice(i, i + chunkSize);
}
}
const meta = {
name: "document.pdf",
description: "Important document",
mime: "application/pdf"
};
const info = await os.put(meta, bufferToChunks(fileData));
console.log(`Stored object: ${info.name}, size: ${info.size} bytes, chunks: ${info.chunks}`);
// Store with custom headers
const hdrs = headers();
hdrs.set("author", "John Doe");
hdrs.set("version", "1.0");
const metaWithHeaders = {
name: "report.json",
mime: "application/json",
headers: hdrs
};
const jsonData = JSON.stringify({ report: "data" });
const info2 = await os.put(metaWithHeaders, bufferToChunks(new TextEncoder().encode(jsonData)));
// Retrieve object
const result = await os.get("document.pdf");
if (result) {
console.log(`Object: ${result.info.name}, MIME: ${result.info.mime}`);
// Read all chunks
const chunks: Uint8Array[] = [];
for await (const chunk of result.data) {
chunks.push(chunk);
}
// Reconstruct complete file
const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
const completeFile = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
completeFile.set(chunk, offset);
offset += chunk.length;
}
console.log(`Reconstructed file size: ${completeFile.length} bytes`);
}
// Get object information only
const objInfo = await os.info("document.pdf");
if (objInfo) {
console.log(`Object info: ${objInfo.name}, size: ${objInfo.size}, modified: ${objInfo.mtime}`);
}List objects and monitor changes in the object store.
interface ObjectStore {
/**
* List all objects in the store
* @returns Promise resolving to async iterator of object info
*/
list(): Promise<QueuedIterator<ObjectInfo>>;
/**
* Watch for object changes
* @param opts - Watch options
* @returns Promise resolving to async iterator of object info
*/
watch(opts?: Partial<ObjectStoreMetaOptions>): Promise<QueuedIterator<ObjectInfo>>;
/**
* Get object store status and statistics
* @returns Promise resolving to object store status
*/
status(): Promise<ObjectStoreStatus>;
/**
* Destroy object store (delete underlying stream)
* @returns Promise resolving to true if destroyed
*/
destroy(): Promise<boolean>;
}
interface ObjectStoreStatus {
/** Store bucket name */
bucket: string;
/** Number of objects */
size: number;
/** Total bytes stored */
bytes: number;
/** Store description */
description?: string;
/** TTL setting */
ttl: number;
/** Storage type */
storage: StorageType;
/** Number of replicas */
replicas: number;
/** Compression setting */
compression: StoreCompression;
/** Underlying stream info */
streamInfo: StreamInfo;
/** Backing stream name */
backingStore: string;
}
interface QueuedIterator<T> {
next(): Promise<IteratorResult<T>>;
stop(): void;
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}Usage Examples:
import { connect } from "nats";
const nc = await connect();
const js = nc.jetstream();
const os = await js.views.os("media-files");
// List all objects
console.log("Objects in store:");
const objectList = await os.list();
for await (const obj of objectList) {
console.log(`- ${obj.name} (${obj.size} bytes, ${obj.mime || 'unknown type'})`);
if (obj.deleted) {
console.log(` [DELETED]`);
}
if (obj.link) {
console.log(` [LINK to ${obj.link.bucket}/${obj.link.name}]`);
}
}
// Watch for object changes
const watcher = await os.watch();
(async () => {
for await (const obj of watcher) {
if (obj.deleted) {
console.log(`Object deleted: ${obj.name}`);
} else {
console.log(`Object updated: ${obj.name} (${obj.size} bytes)`);
}
}
})();
// Get store statistics
const status = await os.status();
console.log(`Store: ${status.bucket}`);
console.log(`Objects: ${status.size}`);
console.log(`Total bytes: ${status.bytes}`);
console.log(`Storage: ${status.storage}`);
console.log(`Compression: ${status.compression}`);Handle large files with streaming and chunking.
// Store large file with streaming
async function storeFile(os: ObjectStore, filePath: string, objectName: string) {
const fs = require('fs');
const path = require('path');
const stats = fs.statSync(filePath);
const stream = fs.createReadStream(filePath, { highWaterMark: 64 * 1024 }); // 64KB chunks
// Convert Node.js stream to async iterable
async function* streamToAsyncIterable(stream: any) {
for await (const chunk of stream) {
yield chunk;
}
}
const meta = {
name: objectName,
description: `File: ${path.basename(filePath)}`,
mime: getMimeType(filePath), // Your MIME type detection
max_chunk_size: 64 * 1024
};
const info = await os.put(meta, streamToAsyncIterable(stream));
console.log(`Stored ${filePath} as ${objectName}: ${info.size} bytes in ${info.chunks} chunks`);
return info;
}
// Retrieve and save large file
async function retrieveFile(os: ObjectStore, objectName: string, outputPath: string) {
const fs = require('fs');
const result = await os.get(objectName);
if (!result) {
throw new Error(`Object ${objectName} not found`);
}
const writeStream = fs.createWriteStream(outputPath);
for await (const chunk of result.data) {
writeStream.write(chunk);
}
writeStream.end();
console.log(`Retrieved ${objectName} to ${outputPath}: ${result.info.size} bytes`);
}
// Usage
const os = await js.views.os("large-files");
await storeFile(os, "/path/to/large-video.mp4", "video-2023-01.mp4");
await retrieveFile(os, "video-2023-01.mp4", "/path/to/downloaded-video.mp4");Create links between objects and stores.
// Create object link
async function createObjectLink(os: ObjectStore, linkName: string, targetBucket: string, targetObject: string) {
const meta: ObjectStoreMeta = {
name: linkName,
description: `Link to ${targetBucket}/${targetObject}`,
options: {
link: {
bucket: targetBucket,
name: targetObject
}
}
};
// Empty async iterable for link (no actual data)
const emptyData = async function* (): AsyncIterable<Uint8Array> {}();
const info = await os.put(meta, emptyData);
console.log(`Created link: ${linkName} -> ${targetBucket}/${targetObject}`);
return info;
}
// Follow object links
async function resolveObjectLink(os: ObjectStore, objectName: string): Promise<ObjectResult | null> {
const info = await os.info(objectName);
if (!info) return null;
if (info.link) {
// This is a link, resolve it
if (info.link.bucket && info.link.bucket !== info.bucket) {
// Link to different bucket - would need to access that bucket
console.log(`Link points to different bucket: ${info.link.bucket}/${info.link.name}`);
return null;
} else {
// Link within same bucket
return await os.get(info.link.name);
}
} else {
// Regular object
return await os.get(objectName);
}
}
// Usage
const os = await js.views.os("documents");
await createObjectLink(os, "latest-report", "documents", "report-2023-12.pdf");
const result = await resolveObjectLink(os, "latest-report");
if (result) {
console.log(`Resolved link to actual object: ${result.info.name}`);
}Administrative operations for object store lifecycle.
// Object store utilities
class ObjectStoreManager {
constructor(private js: JetStreamClient) {}
async createStore(name: string, opts?: Partial<ObjectStoreOptions>): Promise<ObjectStore> {
return await this.js.views.os(name, opts);
}
async listStores(): Promise<ObjectStoreStatus[]> {
const jsm = await this.js.getConnection().jetstreamManager();
const stores: ObjectStoreStatus[] = [];
const storeList = jsm.streams.listObjectStores();
for await (const store of storeList) {
stores.push(store);
}
return stores;
}
async cloneStore(sourceName: string, targetName: string): Promise<ObjectStore> {
const source = await this.js.views.os(sourceName);
const sourceStatus = await source.status();
// Create target store with same configuration
const target = await this.js.views.os(targetName, {
description: `Clone of ${sourceName}`,
storage: sourceStatus.storage,
compression: sourceStatus.compression,
replicas: sourceStatus.replicas
});
// Copy all objects
const objectList = await source.list();
for await (const objInfo of objectList) {
if (!objInfo.deleted) {
const result = await source.get(objInfo.name);
if (result) {
const meta: ObjectStoreMeta = {
name: objInfo.name,
description: objInfo.description,
mime: objInfo.mime,
headers: objInfo.headers
};
await target.put(meta, result.data);
console.log(`Copied: ${objInfo.name}`);
}
}
}
return target;
}
async getStoreMetrics(storeName: string): Promise<{
objects: number;
totalSize: number;
avgSize: number;
mimeTypes: Record<string, number>;
}> {
const os = await this.js.views.os(storeName);
const objectList = await os.list();
let objects = 0;
let totalSize = 0;
const mimeTypeCounts: Record<string, number> = {};
for await (const obj of objectList) {
if (!obj.deleted) {
objects++;
totalSize += obj.size;
const mime = obj.mime || 'unknown';
mimeTypeCounts[mime] = (mimeTypeCounts[mime] || 0) + 1;
}
}
return {
objects,
totalSize,
avgSize: objects > 0 ? totalSize / objects : 0,
mimeTypes: mimeTypeCounts
};
}
}
// Usage
const nc = await connect();
const js = nc.jetstream();
const manager = new ObjectStoreManager(js);
// List all object stores
const stores = await manager.listStores();
for (const store of stores) {
console.log(`Store: ${store.bucket}, Objects: ${store.size}, Size: ${store.bytes} bytes`);
}
// Get detailed metrics
const metrics = await manager.getStoreMetrics("media-files");
console.log(`Objects: ${metrics.objects}`);
console.log(`Total size: ${metrics.totalSize} bytes`);
console.log(`Average size: ${metrics.avgSize} bytes`);
console.log(`MIME types:`, metrics.mimeTypes);
// Clone a store
const clonedStore = await manager.cloneStore("production-files", "backup-files");enum StorageType {
File = "file",
Memory = "memory"
}
enum StoreCompression {
None = "none",
S2 = "s2"
}
interface StreamSource {
name: string;
opt_start_seq?: number;
opt_start_time?: string;
filter_subject?: string;
external?: ExternalStream;
}
interface ExternalStream {
api: string;
deliver?: string;
}
interface Placement {
cluster?: string;
tags?: string[];
}
interface Republish {
src?: string;
dest: string;
headers_only?: boolean;
}
interface StreamInfo {
config: StreamConfig;
state: StreamState;
created: Date;
ts: Date;
alternates?: StreamAlternate[];
}
interface StreamState {
messages: number;
bytes: number;
first_seq: number;
first_ts: Date;
last_seq: number;
last_ts: Date;
num_subjects?: number;
subjects?: Record<string, number>;
num_deleted?: number;
deleted?: number[];
lost?: LostStreamData;
consumer_count: number;
}
interface LostStreamData {
msgs?: number[];
bytes?: number;
}