CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nats

Node.js client for NATS, a lightweight, high-performance cloud native messaging system

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

object-store.mddocs/

Object Store

NATS Object Store provides file-like storage for larger payloads with metadata, chunking, compression, and linking capabilities built on JetStream streams.

Capabilities

Object Store Access

Create and access Object Stores through JetStream views.

/**
 * Get or create an Object Store
 * @param name - Object store bucket name
 * @param opts - Object store configuration options
 * @returns Promise resolving to ObjectStore instance
 */
os(name: string, opts?: Partial<ObjectStoreOptions>): Promise<ObjectStore>;

interface Views {
  kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;
  os: (name: string, opts?: Partial<ObjectStoreOptions>) => Promise<ObjectStore>;
}

interface ObjectStoreOptions {
  /** Bucket description */
  description?: string;
  /** Maximum object size in bytes */
  max_object_size?: number;
  /** Storage type (file or memory) */
  storage?: StorageType;
  /** Number of replicas for HA */
  replicas?: number;
  /** Object TTL in nanoseconds */
  ttl?: number;
  /** Enable compression */
  compression?: StoreCompression;
  /** Bucket placement constraints */
  placement?: Placement;
  /** Custom republish configuration */
  republish?: Republish;
  /** Mirror another object store */
  mirror?: StreamSource;
  /** Source other object stores */
  sources?: StreamSource[];
}

Usage Examples:

import { connect, StorageType, StoreCompression } from "nats";

const nc = await connect();
const js = nc.jetstream();

// Create or get Object Store
const os = await js.views.os("file-storage");

// Create Object Store with options
const os = await js.views.os("media-store", {
  description: "Media file storage",
  max_object_size: 10 * 1024 * 1024, // 10MB max per object
  storage: StorageType.File,
  compression: StoreCompression.S2,
  replicas: 3,
  ttl: 30 * 24 * 60 * 60 * 1000 * 1000 * 1000 // 30 days in nanoseconds
});

Basic Object Operations

Store, retrieve, and manage objects with metadata.

interface ObjectStore {
  /**
   * Get information about an object
   * @param name - Object name
   * @returns Promise resolving to object info or null if not found
   */
  info(name: string): Promise<ObjectInfo | null>;

  /**
   * Store an object with metadata
   * @param meta - Object metadata including name and optional properties
   * @param payload - Object data as async iterable of byte chunks
   * @returns Promise resolving to object information
   */
  put(meta: ObjectStoreMeta, payload: AsyncIterable<Uint8Array>): Promise<ObjectInfo>;

  /**
   * Retrieve an object
   * @param name - Object name
   * @returns Promise resolving to object result or null if not found
   */
  get(name: string): Promise<ObjectResult | null>;

  /**
   * Delete an object
   * @param name - Object name
   * @returns Promise resolving to true if deleted
   */
  delete(name: string): Promise<boolean>;

  /**
   * List all objects in the store
   * @returns Promise resolving to async iterator of object info
   */
  list(): Promise<QueuedIterator<ObjectInfo>>;
}

interface ObjectStoreMeta {
  /** Object name (required) */
  name: string;
  /** Object description */
  description?: string;
  /** Object MIME type */
  mime?: string;
  /** Maximum chunk size for large objects */
  max_chunk_size?: number;
  /** Custom metadata headers */
  headers?: MsgHdrs;
  /** Object options */
  options?: ObjectStoreMetaOptions;
}

interface ObjectStoreMetaOptions {
  /** Link to another object instead of storing data */
  link?: ObjectStoreLink;
  /** Chunk size for splitting large objects */
  chunk_size?: number;
}

interface ObjectInfo {
  /** Object store bucket name */
  bucket: string;
  /** Object name */
  name: string;
  /** Object description */
  description?: string;
  /** Object MIME type */
  mime?: string;
  /** Object size in bytes */
  size: number;
  /** Number of chunks */
  chunks: number;
  /** Object digest/checksum */
  digest: string;
  /** Object creation time */
  mtime: Date;
  /** Custom headers */
  headers?: MsgHdrs;
  /** True if object is deleted */
  deleted: boolean;
  /** Link information if this is a link */
  link?: ObjectStoreLink;
}

interface ObjectResult {
  /** Object information */
  info: ObjectInfo;
  /** Object data as async iterator */
  data: AsyncIterable<Uint8Array>;
  /** Close/cleanup function */
  close?: () => void;
}

interface ObjectStoreLink {
  /** Linked object store bucket */
  bucket?: string;
  /** Linked object name */
  name: string;
}

Usage Examples:

import { connect, headers } from "nats";
import { readFileSync, createReadStream } from "fs";

const nc = await connect();
const js = nc.jetstream();
const os = await js.views.os("documents");

// Store a file from buffer
const fileData = readFileSync("document.pdf");
async function* bufferToChunks(buffer: Uint8Array, chunkSize = 64 * 1024) {
  for (let i = 0; i < buffer.length; i += chunkSize) {
    yield buffer.slice(i, i + chunkSize);
  }
}

const meta = {
  name: "document.pdf",
  description: "Important document",
  mime: "application/pdf"
};

const info = await os.put(meta, bufferToChunks(fileData));
console.log(`Stored object: ${info.name}, size: ${info.size} bytes, chunks: ${info.chunks}`);

// Store with custom headers
const hdrs = headers();
hdrs.set("author", "John Doe");
hdrs.set("version", "1.0");

const metaWithHeaders = {
  name: "report.json",
  mime: "application/json",
  headers: hdrs
};

const jsonData = JSON.stringify({ report: "data" });
const info2 = await os.put(metaWithHeaders, bufferToChunks(new TextEncoder().encode(jsonData)));

// Retrieve object
const result = await os.get("document.pdf");
if (result) {
  console.log(`Object: ${result.info.name}, MIME: ${result.info.mime}`);
  
  // Read all chunks
  const chunks: Uint8Array[] = [];
  for await (const chunk of result.data) {
    chunks.push(chunk);
  }
  
  // Reconstruct complete file
  const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0);
  const completeFile = new Uint8Array(totalLength);
  let offset = 0;
  for (const chunk of chunks) {
    completeFile.set(chunk, offset);
    offset += chunk.length;
  }
  
  console.log(`Reconstructed file size: ${completeFile.length} bytes`);
}

// Get object information only
const objInfo = await os.info("document.pdf");
if (objInfo) {
  console.log(`Object info: ${objInfo.name}, size: ${objInfo.size}, modified: ${objInfo.mtime}`);
}

Object Listing and Watching

List objects and monitor changes in the object store.

interface ObjectStore {
  /**
   * List all objects in the store
   * @returns Promise resolving to async iterator of object info
   */
  list(): Promise<QueuedIterator<ObjectInfo>>;

  /**
   * Watch for object changes
   * @param opts - Watch options
   * @returns Promise resolving to async iterator of object info
   */
  watch(opts?: Partial<ObjectStoreMetaOptions>): Promise<QueuedIterator<ObjectInfo>>;

  /**
   * Get object store status and statistics
   * @returns Promise resolving to object store status
   */
  status(): Promise<ObjectStoreStatus>;

  /**
   * Destroy object store (delete underlying stream)
   * @returns Promise resolving to true if destroyed
   */
  destroy(): Promise<boolean>;
}

interface ObjectStoreStatus {
  /** Store bucket name */
  bucket: string;
  /** Number of objects */
  size: number;
  /** Total bytes stored */
  bytes: number;
  /** Store description */
  description?: string;
  /** TTL setting */
  ttl: number;
  /** Storage type */
  storage: StorageType;
  /** Number of replicas */
  replicas: number;
  /** Compression setting */
  compression: StoreCompression;
  /** Underlying stream info */
  streamInfo: StreamInfo;
  /** Backing stream name */
  backingStore: string;
}

interface QueuedIterator<T> {
  next(): Promise<IteratorResult<T>>;
  stop(): void;
  [Symbol.asyncIterator](): AsyncIterableIterator<T>;
}

Usage Examples:

import { connect } from "nats";

const nc = await connect();
const js = nc.jetstream();
const os = await js.views.os("media-files");

// List all objects
console.log("Objects in store:");
const objectList = await os.list();
for await (const obj of objectList) {
  console.log(`- ${obj.name} (${obj.size} bytes, ${obj.mime || 'unknown type'})`);
  if (obj.deleted) {
    console.log(`  [DELETED]`);
  }
  if (obj.link) {
    console.log(`  [LINK to ${obj.link.bucket}/${obj.link.name}]`);
  }
}

// Watch for object changes
const watcher = await os.watch();
(async () => {
  for await (const obj of watcher) {
    if (obj.deleted) {
      console.log(`Object deleted: ${obj.name}`);
    } else {
      console.log(`Object updated: ${obj.name} (${obj.size} bytes)`);
    }
  }
})();

// Get store statistics
const status = await os.status();
console.log(`Store: ${status.bucket}`);
console.log(`Objects: ${status.size}`);
console.log(`Total bytes: ${status.bytes}`);
console.log(`Storage: ${status.storage}`);
console.log(`Compression: ${status.compression}`);

Large File Handling

Handle large files with streaming and chunking.

// Store large file with streaming
async function storeFile(os: ObjectStore, filePath: string, objectName: string) {
  const fs = require('fs');
  const path = require('path');
  
  const stats = fs.statSync(filePath);
  const stream = fs.createReadStream(filePath, { highWaterMark: 64 * 1024 }); // 64KB chunks
  
  // Convert Node.js stream to async iterable
  async function* streamToAsyncIterable(stream: any) {
    for await (const chunk of stream) {
      yield chunk;
    }
  }
  
  const meta = {
    name: objectName,
    description: `File: ${path.basename(filePath)}`,
    mime: getMimeType(filePath), // Your MIME type detection
    max_chunk_size: 64 * 1024
  };
  
  const info = await os.put(meta, streamToAsyncIterable(stream));
  console.log(`Stored ${filePath} as ${objectName}: ${info.size} bytes in ${info.chunks} chunks`);
  return info;
}

// Retrieve and save large file
async function retrieveFile(os: ObjectStore, objectName: string, outputPath: string) {
  const fs = require('fs');
  
  const result = await os.get(objectName);
  if (!result) {
    throw new Error(`Object ${objectName} not found`);
  }
  
  const writeStream = fs.createWriteStream(outputPath);
  
  for await (const chunk of result.data) {
    writeStream.write(chunk);
  }
  
  writeStream.end();
  console.log(`Retrieved ${objectName} to ${outputPath}: ${result.info.size} bytes`);
}

// Usage
const os = await js.views.os("large-files");
await storeFile(os, "/path/to/large-video.mp4", "video-2023-01.mp4");
await retrieveFile(os, "video-2023-01.mp4", "/path/to/downloaded-video.mp4");

Object Linking

Create links between objects and stores.

// Create object link
async function createObjectLink(os: ObjectStore, linkName: string, targetBucket: string, targetObject: string) {
  const meta: ObjectStoreMeta = {
    name: linkName,
    description: `Link to ${targetBucket}/${targetObject}`,
    options: {
      link: {
        bucket: targetBucket,
        name: targetObject
      }
    }
  };
  
  // Empty async iterable for link (no actual data)
  const emptyData = async function* (): AsyncIterable<Uint8Array> {}();
  
  const info = await os.put(meta, emptyData);
  console.log(`Created link: ${linkName} -> ${targetBucket}/${targetObject}`);
  return info;
}

// Follow object links
async function resolveObjectLink(os: ObjectStore, objectName: string): Promise<ObjectResult | null> {
  const info = await os.info(objectName);
  if (!info) return null;
  
  if (info.link) {
    // This is a link, resolve it
    if (info.link.bucket && info.link.bucket !== info.bucket) {
      // Link to different bucket - would need to access that bucket
      console.log(`Link points to different bucket: ${info.link.bucket}/${info.link.name}`);
      return null;
    } else {
      // Link within same bucket
      return await os.get(info.link.name);
    }
  } else {
    // Regular object
    return await os.get(objectName);
  }
}

// Usage
const os = await js.views.os("documents");
await createObjectLink(os, "latest-report", "documents", "report-2023-12.pdf");

const result = await resolveObjectLink(os, "latest-report");
if (result) {
  console.log(`Resolved link to actual object: ${result.info.name}`);
}

Object Store Management

Administrative operations for object store lifecycle.

// Object store utilities
class ObjectStoreManager {
  constructor(private js: JetStreamClient) {}

  async createStore(name: string, opts?: Partial<ObjectStoreOptions>): Promise<ObjectStore> {
    return await this.js.views.os(name, opts);
  }

  async listStores(): Promise<ObjectStoreStatus[]> {
    const jsm = await this.js.getConnection().jetstreamManager();
    const stores: ObjectStoreStatus[] = [];
    
    const storeList = jsm.streams.listObjectStores();
    for await (const store of storeList) {
      stores.push(store);
    }
    
    return stores;
  }

  async cloneStore(sourceName: string, targetName: string): Promise<ObjectStore> {
    const source = await this.js.views.os(sourceName);
    const sourceStatus = await source.status();
    
    // Create target store with same configuration
    const target = await this.js.views.os(targetName, {
      description: `Clone of ${sourceName}`,
      storage: sourceStatus.storage,
      compression: sourceStatus.compression,
      replicas: sourceStatus.replicas
    });
    
    // Copy all objects
    const objectList = await source.list();
    for await (const objInfo of objectList) {
      if (!objInfo.deleted) {
        const result = await source.get(objInfo.name);
        if (result) {
          const meta: ObjectStoreMeta = {
            name: objInfo.name,
            description: objInfo.description,
            mime: objInfo.mime,
            headers: objInfo.headers
          };
          
          await target.put(meta, result.data);
          console.log(`Copied: ${objInfo.name}`);
        }
      }
    }
    
    return target;
  }

  async getStoreMetrics(storeName: string): Promise<{
    objects: number;
    totalSize: number;
    avgSize: number;
    mimeTypes: Record<string, number>;
  }> {
    const os = await this.js.views.os(storeName);
    const objectList = await os.list();
    
    let objects = 0;
    let totalSize = 0;
    const mimeTypeCounts: Record<string, number> = {};
    
    for await (const obj of objectList) {
      if (!obj.deleted) {
        objects++;
        totalSize += obj.size;
        
        const mime = obj.mime || 'unknown';
        mimeTypeCounts[mime] = (mimeTypeCounts[mime] || 0) + 1;
      }
    }
    
    return {
      objects,
      totalSize,
      avgSize: objects > 0 ? totalSize / objects : 0,
      mimeTypes: mimeTypeCounts
    };
  }
}

// Usage
const nc = await connect();
const js = nc.jetstream();
const manager = new ObjectStoreManager(js);

// List all object stores
const stores = await manager.listStores();
for (const store of stores) {
  console.log(`Store: ${store.bucket}, Objects: ${store.size}, Size: ${store.bytes} bytes`);
}

// Get detailed metrics
const metrics = await manager.getStoreMetrics("media-files");
console.log(`Objects: ${metrics.objects}`);
console.log(`Total size: ${metrics.totalSize} bytes`);
console.log(`Average size: ${metrics.avgSize} bytes`);
console.log(`MIME types:`, metrics.mimeTypes);

// Clone a store
const clonedStore = await manager.cloneStore("production-files", "backup-files");

Types

enum StorageType {
  File = "file",
  Memory = "memory"
}

enum StoreCompression {
  None = "none",
  S2 = "s2"
}

interface StreamSource {
  name: string;
  opt_start_seq?: number;
  opt_start_time?: string;
  filter_subject?: string;
  external?: ExternalStream;
}

interface ExternalStream {
  api: string;
  deliver?: string;
}

interface Placement {
  cluster?: string;
  tags?: string[];
}

interface Republish {
  src?: string;
  dest: string;
  headers_only?: boolean;
}

interface StreamInfo {
  config: StreamConfig;
  state: StreamState;
  created: Date;
  ts: Date;
  alternates?: StreamAlternate[];
}

interface StreamState {
  messages: number;
  bytes: number;
  first_seq: number;
  first_ts: Date;
  last_seq: number;
  last_ts: Date;
  num_subjects?: number;
  subjects?: Record<string, number>;
  num_deleted?: number;
  deleted?: number[];
  lost?: LostStreamData;
  consumer_count: number;
}

interface LostStreamData {
  msgs?: number[];
  bytes?: number;
}

docs

connection.md

index.md

jetstream.md

kv-store.md

messaging.md

object-store.md

services.md

tile.json