CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-nats

Node.js client for NATS, a lightweight, high-performance cloud native messaging system

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

kv-store.mddocs/

Key-Value Store

NATS Key-Value Store provides a high-level abstraction for storing and retrieving key-value data with history tracking, watch capabilities, and conflict resolution built on JetStream streams.

Capabilities

KV Store Access

Create and access Key-Value stores through JetStream views.

/**
 * Get or create a Key-Value store
 * @param name - KV bucket name
 * @param opts - KV configuration options
 * @returns Promise resolving to KV store instance
 */
kv(name: string, opts?: Partial<KvOptions>): Promise<KV>;

interface Views {
  kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;
  os: (name: string, opts?: Partial<ObjectStoreOptions>) => Promise<ObjectStore>;
}

interface KvOptions {
  /** Maximum number of history entries per key (default: 1) */
  history?: number;
  /** Time-to-live for entries in nanoseconds */
  ttl?: number;
  /** Maximum size of values in bytes */
  max_value_size?: number;
  /** Maximum total bucket size in bytes */
  max_bucket_size?: number;
  /** Number of replicas for HA (default: 1) */
  replicas?: number;
  /** Bucket description */
  description?: string;
  /** Storage type (file or memory) */
  storage?: StorageType;
  /** Enable compression */
  compression?: boolean;
  /** Bucket placement constraints */
  placement?: Placement;
  /** Custom republish configuration */
  republish?: Republish;
  /** Mirror another KV bucket */
  mirror?: StreamSource;
  /** Source other KV buckets */
  sources?: StreamSource[];
}

Usage Examples:

import { connect } from "nats";

const nc = await connect();
const js = nc.jetstream();

// Create or get KV store
const kv = await js.views.kv("user-preferences");

// Create KV with options
const kv = await js.views.kv("session-cache", {
  history: 5,        // Keep 5 versions per key
  ttl: 60 * 60 * 1000 * 1000 * 1000, // 1 hour TTL in nanoseconds
  max_value_size: 1024 * 1024,        // 1MB max value size
  description: "User session cache"
});

// Read-only KV access
const roKv = await js.views.kv("readonly-config");

Basic Operations

Core key-value operations for storing and retrieving data.

interface KV {
  /**
   * Get value for key
   * @param key - Key to retrieve
   * @returns Promise resolving to KV entry or null if not found
   */
  get(key: string): Promise<KvEntry | null>;

  /**
   * Set value for key
   * @param key - Key to set
   * @param value - Value as bytes
   * @param opts - Put options
   * @returns Promise resolving to revision number
   */
  put(key: string, value: Uint8Array, opts?: Partial<KvPutOptions>): Promise<number>;

  /**
   * Create key only if it doesn't exist
   * @param key - Key to create
   * @param value - Initial value
   * @returns Promise resolving to revision number or throws if key exists
   */
  create(key: string, value: Uint8Array): Promise<number>;

  /**
   * Update key only if revision matches
   * @param key - Key to update
   * @param value - New value
   * @param revision - Expected current revision
   * @returns Promise resolving to new revision number
   */
  update(key: string, value: Uint8Array, revision: number): Promise<number>;

  /**
   * Delete key (soft delete, keeps in history)
   * @param key - Key to delete
   * @param opts - Delete options
   */
  delete(key: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

  /**
   * Purge key (hard delete, removes from history)
   * @param key - Key to purge completely
   */
  purge(key: string): Promise<void>;
}

interface KvEntry {
  /** Bucket name */
  bucket: string;
  /** Entry key */
  key: string;
  /** Entry value as bytes */
  value: Uint8Array;
  /** Entry revision number */
  revision: number;
  /** Entry creation timestamp */
  created: Date;
  /** JetStream sequence number */
  sequence: number;
  /** True if entry represents a delete operation */
  delta: number;
  /** Entry operation type */
  operation: "PUT" | "DEL" | "PURGE";
}

interface KvPutOptions {
  /** Previous revision for conditional update */
  previousRevision?: number;
}

interface KvDeleteOptions {
  /** Previous revision for conditional delete */
  previousRevision?: number;
}

Usage Examples:

import { connect, StringCodec, JSONCodec } from "nats";

const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("app-config");
const sc = StringCodec();
const jc = JSONCodec();

// Basic put/get operations
await kv.put("api.url", sc.encode("https://api.example.com"));
const entry = await kv.get("api.url");
if (entry) {
  console.log(`API URL: ${sc.decode(entry.value)}`);
  console.log(`Revision: ${entry.revision}`);
}

// JSON data storage
const config = { timeout: 30, retries: 3 };
await kv.put("service.config", jc.encode(config));

// Conditional operations
try {
  // Create only if key doesn't exist
  await kv.create("counter", sc.encode("1"));
} catch (err) {
  console.log("Key already exists");
}

// Update with revision check (optimistic locking)
const current = await kv.get("counter");
if (current) {
  const newValue = parseInt(sc.decode(current.value)) + 1;
  await kv.update("counter", sc.encode(newValue.toString()), current.revision);
}

// Delete operations
await kv.delete("temp.data");  // Soft delete (in history)
await kv.purge("secret.key");  // Hard delete (removed completely)

History and Watching

Track key changes over time and monitor real-time updates.

interface KV {
  /**
   * Get history of changes for a key
   * @param key - Key to get history for
   * @returns Promise resolving to async iterator of KV entries
   */
  history(key: string): Promise<QueuedIterator<KvEntry>>;

  /**
   * Watch for changes to keys
   * @param opts - Watch options including key filters
   * @returns Promise resolving to async iterator of KV entries
   */
  watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;

  /**
   * Watch for key name changes only
   * @param opts - Watch options
   * @returns Promise resolving to async iterator of key names
   */
  keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;
}

interface KvWatchOptions {
  /** Key pattern to watch (supports wildcards) */
  key?: string;
  /** Include historical entries */
  include?: KvWatchInclude;
  /** Resume from specific revision */
  resumeFromRevision?: number;
  /** Only watch for new updates */
  updatesOnly?: boolean;
  /** Headers to include with watch */
  headers_only?: boolean;
  /** Ignore deletes in watch stream */
  ignore_deletes?: boolean;
}

enum KvWatchInclude {
  /** Include all entries */
  AllHistory = "all_history",
  /** Include only updates after resume point */
  UpdatesOnly = "updates_only",
  /** Include last value per key */
  LastPerKey = "last_per_key"
}

interface QueuedIterator<T> {
  /** Get next item from iterator */
  next(): Promise<IteratorResult<T>>;
  /** Stop the iterator */
  stop(): void;
  /** Async iterator interface */
  [Symbol.asyncIterator](): AsyncIterableIterator<T>;
}

Usage Examples:

import { connect, StringCodec, KvWatchInclude } from "nats";

const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("user-sessions");
const sc = StringCodec();

// Watch all changes to KV store
const watcher = await kv.watch();
(async () => {
  for await (const entry of watcher) {
    console.log(`Key: ${entry.key}, Operation: ${entry.operation}`);
    if (entry.operation === "PUT") {
      console.log(`Value: ${sc.decode(entry.value)}`);
    }
  }
})();

// Watch specific key pattern
const userWatcher = await kv.watch({ 
  key: "user.*",
  include: KvWatchInclude.UpdatesOnly 
});

(async () => {
  for await (const entry of userWatcher) {
    console.log(`User ${entry.key} updated: ${sc.decode(entry.value)}`);
  }
})();

// Watch for key changes only
const keyWatcher = await kv.keys({ key: "config.*" });
(async () => {
  for await (const key of keyWatcher) {
    console.log(`Config key changed: ${key}`);
  }
})();

// Get key history
const history = await kv.history("user.123");
console.log("History for user.123:");
for await (const entry of history) {
  console.log(`Revision ${entry.revision}: ${sc.decode(entry.value)} (${entry.created})`);
}

// Resume watching from specific revision
const resumeWatcher = await kv.watch({
  key: "events.*",
  resumeFromRevision: 1000,
  include: KvWatchInclude.UpdatesOnly
});

Management Operations

Administrative operations for managing KV store lifecycle and status.

interface KV {
  /**
   * Get KV store status and statistics
   * @returns Promise resolving to KV status information
   */
  status(): Promise<KvStatus>;

  /**
   * Close KV store (cleanup resources)
   */
  close(): Promise<void>;

  /**
   * Destroy KV store (delete underlying stream)
   * @returns Promise resolving to true if destroyed
   */
  destroy(): Promise<boolean>;
}

interface RoKV {
  /** Read-only interface with subset of KV operations */
  get(key: string): Promise<KvEntry | null>;
  history(key: string): Promise<QueuedIterator<KvEntry>>;
  watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;
  keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;
  status(): Promise<KvStatus>;
}

interface KvStatus {
  /** Bucket name */
  bucket: string;
  /** Number of entries */
  values: number;
  /** Bucket configuration */
  history: number;
  /** TTL setting */
  ttl: number;
  /** Bucket size in bytes */
  bucket_location?: string;
  /** Underlying stream info */
  streamInfo: StreamInfo;
  /** Compression enabled */
  compression: boolean;
  /** Storage type */
  storage: StorageType;
  /** Number of replicas */
  replicas: number;
  /** Backing stream */
  backingStore: string;
}

interface KvLimits {
  /** Maximum keys */
  max_keys?: number;
  /** Maximum history per key */
  max_history?: number;
  /** Maximum value size */
  max_value_size?: number;
  /** Maximum bucket size */
  max_bucket_size?: number;
  /** Minimum TTL */
  min_ttl?: number;
  /** Maximum TTL */
  max_ttl?: number;
}

Usage Examples:

import { connect } from "nats";

const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("metrics");

// Check KV store status
const status = await kv.status();
console.log(`Bucket: ${status.bucket}`);
console.log(`Values: ${status.values}`);
console.log(`History: ${status.history}`);
console.log(`TTL: ${status.ttl}ns`);
console.log(`Storage: ${status.storage}`);
console.log(`Replicas: ${status.replicas}`);

// Monitor bucket statistics
setInterval(async () => {
  const status = await kv.status();
  console.log(`KV entries: ${status.values}, Stream messages: ${status.streamInfo.state.messages}`);
}, 10000);

// Cleanup operations
await kv.close();     // Close and cleanup resources
await kv.destroy();   // Delete the entire KV store

Codecs for KV Operations

Type-safe encoding/decoding for KV values.

interface KvCodec<T> {
  encode(value: T): Uint8Array;
  decode(data: Uint8Array): T;
}

interface KvCodecs {
  /** String codec */
  strings: KvCodec<string>;
  /** JSON codec */
  json<T = unknown>(): KvCodec<T>;
  /** Binary codec (pass-through) */
  binary: KvCodec<Uint8Array>;
}

/** Built-in KV codecs */
const NoopKvCodecs: KvCodecs;

/** Base64 key encoding codec */
const Base64KeyCodec: {
  encode(key: string): string;
  decode(encoded: string): string;
};

Usage Examples:

import { connect, JSONCodec, StringCodec } from "nats";

const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("typed-data");

// Type-safe JSON operations
interface UserPrefs {
  theme: string;
  notifications: boolean;
  language: string;
}

const jsonCodec = JSONCodec<UserPrefs>();
const userPrefs: UserPrefs = {
  theme: "dark",
  notifications: true,
  language: "en"
};

// Store typed JSON data
await kv.put("user.123.prefs", jsonCodec.encode(userPrefs));

// Retrieve and decode typed data
const entry = await kv.get("user.123.prefs");
if (entry) {
  const prefs = jsonCodec.decode(entry.value); // Type is UserPrefs
  console.log(`Theme: ${prefs.theme}, Notifications: ${prefs.notifications}`);
}

// String operations
const stringCodec = StringCodec();
await kv.put("app.version", stringCodec.encode("1.2.3"));

const versionEntry = await kv.get("app.version");
if (versionEntry) {
  const version = stringCodec.decode(versionEntry.value);
  console.log(`App version: ${version}`);
}

Advanced Patterns

Atomic Operations

Implement atomic updates and conflict resolution.

// Atomic counter increment
async function incrementCounter(kv: KV, key: string): Promise<number> {
  while (true) {
    const current = await kv.get(key);
    const currentValue = current ? parseInt(StringCodec().decode(current.value)) : 0;
    const newValue = currentValue + 1;
    
    try {
      if (current) {
        await kv.update(key, StringCodec().encode(newValue.toString()), current.revision);
      } else {
        await kv.create(key, StringCodec().encode(newValue.toString()));
      }
      return newValue;
    } catch (err) {
      // Conflict detected, retry
      continue;
    }
  }
}

// Conditional updates with retry logic
async function conditionalUpdate<T>(
  kv: KV, 
  key: string, 
  updateFn: (current: T | null) => T,
  codec: KvCodec<T>,
  maxRetries = 10
): Promise<number> {
  for (let i = 0; i < maxRetries; i++) {
    const current = await kv.get(key);
    const currentValue = current ? codec.decode(current.value) : null;
    const newValue = updateFn(currentValue);
    
    try {
      if (current) {
        return await kv.update(key, codec.encode(newValue), current.revision);
      } else {
        return await kv.create(key, codec.encode(newValue));
      }
    } catch (err) {
      if (i === maxRetries - 1) throw err;
      // Wait with exponential backoff
      await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 100));
    }
  }
  throw new Error("Max retries exceeded");
}

Distributed Locking

Implement distributed locks using KV operations.

class DistributedLock {
  constructor(private kv: KV, private lockKey: string, private ttl: number) {}

  async acquire(holder: string, timeout = 5000): Promise<boolean> {
    const deadline = Date.now() + timeout;
    
    while (Date.now() < deadline) {
      try {
        // Try to create lock entry
        await this.kv.create(this.lockKey, StringCodec().encode(holder));
        
        // Set up TTL renewal
        this.renewLock(holder);
        return true;
      } catch (err) {
        // Lock exists, check if expired
        const current = await this.kv.get(this.lockKey);
        if (current && Date.now() - current.created.getTime() > this.ttl) {
          // Lock expired, try to acquire it
          try {
            await this.kv.update(this.lockKey, StringCodec().encode(holder), current.revision);
            this.renewLock(holder);
            return true;
          } catch (updateErr) {
            // Someone else got it, continue loop
          }
        }
        
        // Wait before retry
        await new Promise(resolve => setTimeout(resolve, 100));
      }
    }
    
    return false;
  }

  async release(holder: string): Promise<boolean> {
    try {
      const current = await this.kv.get(this.lockKey);
      if (current && StringCodec().decode(current.value) === holder) {
        await this.kv.delete(this.lockKey);
        return true;
      }
      return false;
    } catch (err) {
      return false;
    }
  }

  private renewLock(holder: string): void {
    // Implementation would periodically update the lock entry
    // to maintain ownership while the holder is active
  }
}

docs

connection.md

index.md

jetstream.md

kv-store.md

messaging.md

object-store.md

services.md

tile.json