Node.js client for NATS, a lightweight, high-performance cloud native messaging system
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
NATS Key-Value Store provides a high-level abstraction for storing and retrieving key-value data with history tracking, watch capabilities, and conflict resolution built on JetStream streams.
Create and access Key-Value stores through JetStream views.
/**
* Get or create a Key-Value store
* @param name - KV bucket name
* @param opts - KV configuration options
* @returns Promise resolving to KV store instance
*/
kv(name: string, opts?: Partial<KvOptions>): Promise<KV>;
interface Views {
kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;
os: (name: string, opts?: Partial<ObjectStoreOptions>) => Promise<ObjectStore>;
}
interface KvOptions {
/** Maximum number of history entries per key (default: 1) */
history?: number;
/** Time-to-live for entries in nanoseconds */
ttl?: number;
/** Maximum size of values in bytes */
max_value_size?: number;
/** Maximum total bucket size in bytes */
max_bucket_size?: number;
/** Number of replicas for HA (default: 1) */
replicas?: number;
/** Bucket description */
description?: string;
/** Storage type (file or memory) */
storage?: StorageType;
/** Enable compression */
compression?: boolean;
/** Bucket placement constraints */
placement?: Placement;
/** Custom republish configuration */
republish?: Republish;
/** Mirror another KV bucket */
mirror?: StreamSource;
/** Source other KV buckets */
sources?: StreamSource[];
}Usage Examples:
import { connect } from "nats";
const nc = await connect();
const js = nc.jetstream();
// Create or get KV store
const kv = await js.views.kv("user-preferences");
// Create KV with options
const kv = await js.views.kv("session-cache", {
history: 5, // Keep 5 versions per key
ttl: 60 * 60 * 1000 * 1000 * 1000, // 1 hour TTL in nanoseconds
max_value_size: 1024 * 1024, // 1MB max value size
description: "User session cache"
});
// Read-only KV access
const roKv = await js.views.kv("readonly-config");Core key-value operations for storing and retrieving data.
interface KV {
/**
* Get value for key
* @param key - Key to retrieve
* @returns Promise resolving to KV entry or null if not found
*/
get(key: string): Promise<KvEntry | null>;
/**
* Set value for key
* @param key - Key to set
* @param value - Value as bytes
* @param opts - Put options
* @returns Promise resolving to revision number
*/
put(key: string, value: Uint8Array, opts?: Partial<KvPutOptions>): Promise<number>;
/**
* Create key only if it doesn't exist
* @param key - Key to create
* @param value - Initial value
* @returns Promise resolving to revision number or throws if key exists
*/
create(key: string, value: Uint8Array): Promise<number>;
/**
* Update key only if revision matches
* @param key - Key to update
* @param value - New value
* @param revision - Expected current revision
* @returns Promise resolving to new revision number
*/
update(key: string, value: Uint8Array, revision: number): Promise<number>;
/**
* Delete key (soft delete, keeps in history)
* @param key - Key to delete
* @param opts - Delete options
*/
delete(key: string, opts?: Partial<KvDeleteOptions>): Promise<void>;
/**
* Purge key (hard delete, removes from history)
* @param key - Key to purge completely
*/
purge(key: string): Promise<void>;
}
interface KvEntry {
/** Bucket name */
bucket: string;
/** Entry key */
key: string;
/** Entry value as bytes */
value: Uint8Array;
/** Entry revision number */
revision: number;
/** Entry creation timestamp */
created: Date;
/** JetStream sequence number */
sequence: number;
/** True if entry represents a delete operation */
delta: number;
/** Entry operation type */
operation: "PUT" | "DEL" | "PURGE";
}
interface KvPutOptions {
/** Previous revision for conditional update */
previousRevision?: number;
}
interface KvDeleteOptions {
/** Previous revision for conditional delete */
previousRevision?: number;
}Usage Examples:
import { connect, StringCodec, JSONCodec } from "nats";
const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("app-config");
const sc = StringCodec();
const jc = JSONCodec();
// Basic put/get operations
await kv.put("api.url", sc.encode("https://api.example.com"));
const entry = await kv.get("api.url");
if (entry) {
console.log(`API URL: ${sc.decode(entry.value)}`);
console.log(`Revision: ${entry.revision}`);
}
// JSON data storage
const config = { timeout: 30, retries: 3 };
await kv.put("service.config", jc.encode(config));
// Conditional operations
try {
// Create only if key doesn't exist
await kv.create("counter", sc.encode("1"));
} catch (err) {
console.log("Key already exists");
}
// Update with revision check (optimistic locking)
const current = await kv.get("counter");
if (current) {
const newValue = parseInt(sc.decode(current.value)) + 1;
await kv.update("counter", sc.encode(newValue.toString()), current.revision);
}
// Delete operations
await kv.delete("temp.data"); // Soft delete (in history)
await kv.purge("secret.key"); // Hard delete (removed completely)Track key changes over time and monitor real-time updates.
interface KV {
/**
* Get history of changes for a key
* @param key - Key to get history for
* @returns Promise resolving to async iterator of KV entries
*/
history(key: string): Promise<QueuedIterator<KvEntry>>;
/**
* Watch for changes to keys
* @param opts - Watch options including key filters
* @returns Promise resolving to async iterator of KV entries
*/
watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;
/**
* Watch for key name changes only
* @param opts - Watch options
* @returns Promise resolving to async iterator of key names
*/
keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;
}
interface KvWatchOptions {
/** Key pattern to watch (supports wildcards) */
key?: string;
/** Include historical entries */
include?: KvWatchInclude;
/** Resume from specific revision */
resumeFromRevision?: number;
/** Only watch for new updates */
updatesOnly?: boolean;
/** Headers to include with watch */
headers_only?: boolean;
/** Ignore deletes in watch stream */
ignore_deletes?: boolean;
}
enum KvWatchInclude {
/** Include all entries */
AllHistory = "all_history",
/** Include only updates after resume point */
UpdatesOnly = "updates_only",
/** Include last value per key */
LastPerKey = "last_per_key"
}
interface QueuedIterator<T> {
/** Get next item from iterator */
next(): Promise<IteratorResult<T>>;
/** Stop the iterator */
stop(): void;
/** Async iterator interface */
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}Usage Examples:
import { connect, StringCodec, KvWatchInclude } from "nats";
const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("user-sessions");
const sc = StringCodec();
// Watch all changes to KV store
const watcher = await kv.watch();
(async () => {
for await (const entry of watcher) {
console.log(`Key: ${entry.key}, Operation: ${entry.operation}`);
if (entry.operation === "PUT") {
console.log(`Value: ${sc.decode(entry.value)}`);
}
}
})();
// Watch specific key pattern
const userWatcher = await kv.watch({
key: "user.*",
include: KvWatchInclude.UpdatesOnly
});
(async () => {
for await (const entry of userWatcher) {
console.log(`User ${entry.key} updated: ${sc.decode(entry.value)}`);
}
})();
// Watch for key changes only
const keyWatcher = await kv.keys({ key: "config.*" });
(async () => {
for await (const key of keyWatcher) {
console.log(`Config key changed: ${key}`);
}
})();
// Get key history
const history = await kv.history("user.123");
console.log("History for user.123:");
for await (const entry of history) {
console.log(`Revision ${entry.revision}: ${sc.decode(entry.value)} (${entry.created})`);
}
// Resume watching from specific revision
const resumeWatcher = await kv.watch({
key: "events.*",
resumeFromRevision: 1000,
include: KvWatchInclude.UpdatesOnly
});Administrative operations for managing KV store lifecycle and status.
interface KV {
/**
* Get KV store status and statistics
* @returns Promise resolving to KV status information
*/
status(): Promise<KvStatus>;
/**
* Close KV store (cleanup resources)
*/
close(): Promise<void>;
/**
* Destroy KV store (delete underlying stream)
* @returns Promise resolving to true if destroyed
*/
destroy(): Promise<boolean>;
}
interface RoKV {
/** Read-only interface with subset of KV operations */
get(key: string): Promise<KvEntry | null>;
history(key: string): Promise<QueuedIterator<KvEntry>>;
watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;
keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;
status(): Promise<KvStatus>;
}
interface KvStatus {
/** Bucket name */
bucket: string;
/** Number of entries */
values: number;
/** Bucket configuration */
history: number;
/** TTL setting */
ttl: number;
/** Bucket size in bytes */
bucket_location?: string;
/** Underlying stream info */
streamInfo: StreamInfo;
/** Compression enabled */
compression: boolean;
/** Storage type */
storage: StorageType;
/** Number of replicas */
replicas: number;
/** Backing stream */
backingStore: string;
}
interface KvLimits {
/** Maximum keys */
max_keys?: number;
/** Maximum history per key */
max_history?: number;
/** Maximum value size */
max_value_size?: number;
/** Maximum bucket size */
max_bucket_size?: number;
/** Minimum TTL */
min_ttl?: number;
/** Maximum TTL */
max_ttl?: number;
}Usage Examples:
import { connect } from "nats";
const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("metrics");
// Check KV store status
const status = await kv.status();
console.log(`Bucket: ${status.bucket}`);
console.log(`Values: ${status.values}`);
console.log(`History: ${status.history}`);
console.log(`TTL: ${status.ttl}ns`);
console.log(`Storage: ${status.storage}`);
console.log(`Replicas: ${status.replicas}`);
// Monitor bucket statistics
setInterval(async () => {
const status = await kv.status();
console.log(`KV entries: ${status.values}, Stream messages: ${status.streamInfo.state.messages}`);
}, 10000);
// Cleanup operations
await kv.close(); // Close and cleanup resources
await kv.destroy(); // Delete the entire KV storeType-safe encoding/decoding for KV values.
interface KvCodec<T> {
encode(value: T): Uint8Array;
decode(data: Uint8Array): T;
}
interface KvCodecs {
/** String codec */
strings: KvCodec<string>;
/** JSON codec */
json<T = unknown>(): KvCodec<T>;
/** Binary codec (pass-through) */
binary: KvCodec<Uint8Array>;
}
/** Built-in KV codecs */
const NoopKvCodecs: KvCodecs;
/** Base64 key encoding codec */
const Base64KeyCodec: {
encode(key: string): string;
decode(encoded: string): string;
};Usage Examples:
import { connect, JSONCodec, StringCodec } from "nats";
const nc = await connect();
const js = nc.jetstream();
const kv = await js.views.kv("typed-data");
// Type-safe JSON operations
interface UserPrefs {
theme: string;
notifications: boolean;
language: string;
}
const jsonCodec = JSONCodec<UserPrefs>();
const userPrefs: UserPrefs = {
theme: "dark",
notifications: true,
language: "en"
};
// Store typed JSON data
await kv.put("user.123.prefs", jsonCodec.encode(userPrefs));
// Retrieve and decode typed data
const entry = await kv.get("user.123.prefs");
if (entry) {
const prefs = jsonCodec.decode(entry.value); // Type is UserPrefs
console.log(`Theme: ${prefs.theme}, Notifications: ${prefs.notifications}`);
}
// String operations
const stringCodec = StringCodec();
await kv.put("app.version", stringCodec.encode("1.2.3"));
const versionEntry = await kv.get("app.version");
if (versionEntry) {
const version = stringCodec.decode(versionEntry.value);
console.log(`App version: ${version}`);
}Implement atomic updates and conflict resolution.
// Atomic counter increment
async function incrementCounter(kv: KV, key: string): Promise<number> {
while (true) {
const current = await kv.get(key);
const currentValue = current ? parseInt(StringCodec().decode(current.value)) : 0;
const newValue = currentValue + 1;
try {
if (current) {
await kv.update(key, StringCodec().encode(newValue.toString()), current.revision);
} else {
await kv.create(key, StringCodec().encode(newValue.toString()));
}
return newValue;
} catch (err) {
// Conflict detected, retry
continue;
}
}
}
// Conditional updates with retry logic
async function conditionalUpdate<T>(
kv: KV,
key: string,
updateFn: (current: T | null) => T,
codec: KvCodec<T>,
maxRetries = 10
): Promise<number> {
for (let i = 0; i < maxRetries; i++) {
const current = await kv.get(key);
const currentValue = current ? codec.decode(current.value) : null;
const newValue = updateFn(currentValue);
try {
if (current) {
return await kv.update(key, codec.encode(newValue), current.revision);
} else {
return await kv.create(key, codec.encode(newValue));
}
} catch (err) {
if (i === maxRetries - 1) throw err;
// Wait with exponential backoff
await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 100));
}
}
throw new Error("Max retries exceeded");
}Implement distributed locks using KV operations.
class DistributedLock {
constructor(private kv: KV, private lockKey: string, private ttl: number) {}
async acquire(holder: string, timeout = 5000): Promise<boolean> {
const deadline = Date.now() + timeout;
while (Date.now() < deadline) {
try {
// Try to create lock entry
await this.kv.create(this.lockKey, StringCodec().encode(holder));
// Set up TTL renewal
this.renewLock(holder);
return true;
} catch (err) {
// Lock exists, check if expired
const current = await this.kv.get(this.lockKey);
if (current && Date.now() - current.created.getTime() > this.ttl) {
// Lock expired, try to acquire it
try {
await this.kv.update(this.lockKey, StringCodec().encode(holder), current.revision);
this.renewLock(holder);
return true;
} catch (updateErr) {
// Someone else got it, continue loop
}
}
// Wait before retry
await new Promise(resolve => setTimeout(resolve, 100));
}
}
return false;
}
async release(holder: string): Promise<boolean> {
try {
const current = await this.kv.get(this.lockKey);
if (current && StringCodec().decode(current.value) === holder) {
await this.kv.delete(this.lockKey);
return true;
}
return false;
} catch (err) {
return false;
}
}
private renewLock(holder: string): void {
// Implementation would periodically update the lock entry
// to maintain ownership while the holder is active
}
}