0
# Key-Value Store
1
2
NATS Key-Value Store provides a high-level abstraction for storing and retrieving key-value data with history tracking, watch capabilities, and conflict resolution built on JetStream streams.
3
4
## Capabilities
5
6
### KV Store Access
7
8
Create and access Key-Value stores through JetStream views.
9
10
```typescript { .api }
11
/**
12
* Get or create a Key-Value store
13
* @param name - KV bucket name
14
* @param opts - KV configuration options
15
* @returns Promise resolving to KV store instance
16
*/
17
kv(name: string, opts?: Partial<KvOptions>): Promise<KV>;
18
19
interface Views {
20
kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;
21
os: (name: string, opts?: Partial<ObjectStoreOptions>) => Promise<ObjectStore>;
22
}
23
24
interface KvOptions {
25
/** Maximum number of history entries per key (default: 1) */
26
history?: number;
27
/** Time-to-live for entries in nanoseconds */
28
ttl?: number;
29
/** Maximum size of values in bytes */
30
max_value_size?: number;
31
/** Maximum total bucket size in bytes */
32
max_bucket_size?: number;
33
/** Number of replicas for HA (default: 1) */
34
replicas?: number;
35
/** Bucket description */
36
description?: string;
37
/** Storage type (file or memory) */
38
storage?: StorageType;
39
/** Enable compression */
40
compression?: boolean;
41
/** Bucket placement constraints */
42
placement?: Placement;
43
/** Custom republish configuration */
44
republish?: Republish;
45
/** Mirror another KV bucket */
46
mirror?: StreamSource;
47
/** Source other KV buckets */
48
sources?: StreamSource[];
49
}
50
```
51
52
**Usage Examples:**
53
54
```typescript
55
import { connect } from "nats";
56
57
const nc = await connect();
58
const js = nc.jetstream();
59
60
// Create or get KV store
61
const kv = await js.views.kv("user-preferences");
62
63
// Create KV with options
64
const kv = await js.views.kv("session-cache", {
65
history: 5, // Keep 5 versions per key
66
ttl: 60 * 60 * 1000 * 1000 * 1000, // 1 hour TTL in nanoseconds
67
max_value_size: 1024 * 1024, // 1MB max value size
68
description: "User session cache"
69
});
70
71
// Read-only KV access
72
const roKv = await js.views.kv("readonly-config");
73
```
74
75
### Basic Operations
76
77
Core key-value operations for storing and retrieving data.
78
79
```typescript { .api }
80
interface KV {
81
/**
82
* Get value for key
83
* @param key - Key to retrieve
84
* @returns Promise resolving to KV entry or null if not found
85
*/
86
get(key: string): Promise<KvEntry | null>;
87
88
/**
89
* Set value for key
90
* @param key - Key to set
91
* @param value - Value as bytes
92
* @param opts - Put options
93
* @returns Promise resolving to revision number
94
*/
95
put(key: string, value: Uint8Array, opts?: Partial<KvPutOptions>): Promise<number>;
96
97
/**
98
* Create key only if it doesn't exist
99
* @param key - Key to create
100
* @param value - Initial value
101
* @returns Promise resolving to revision number or throws if key exists
102
*/
103
create(key: string, value: Uint8Array): Promise<number>;
104
105
/**
106
* Update key only if revision matches
107
* @param key - Key to update
108
* @param value - New value
109
* @param revision - Expected current revision
110
* @returns Promise resolving to new revision number
111
*/
112
update(key: string, value: Uint8Array, revision: number): Promise<number>;
113
114
/**
115
* Delete key (soft delete, keeps in history)
116
* @param key - Key to delete
117
* @param opts - Delete options
118
*/
119
delete(key: string, opts?: Partial<KvDeleteOptions>): Promise<void>;
120
121
/**
122
* Purge key (hard delete, removes from history)
123
* @param key - Key to purge completely
124
*/
125
purge(key: string): Promise<void>;
126
}
127
128
interface KvEntry {
129
/** Bucket name */
130
bucket: string;
131
/** Entry key */
132
key: string;
133
/** Entry value as bytes */
134
value: Uint8Array;
135
/** Entry revision number */
136
revision: number;
137
/** Entry creation timestamp */
138
created: Date;
139
/** JetStream sequence number */
140
sequence: number;
141
/** True if entry represents a delete operation */
142
delta: number;
143
/** Entry operation type */
144
operation: "PUT" | "DEL" | "PURGE";
145
}
146
147
interface KvPutOptions {
148
/** Previous revision for conditional update */
149
previousRevision?: number;
150
}
151
152
interface KvDeleteOptions {
153
/** Previous revision for conditional delete */
154
previousRevision?: number;
155
}
156
```
157
158
**Usage Examples:**
159
160
```typescript
161
import { connect, StringCodec, JSONCodec } from "nats";
162
163
const nc = await connect();
164
const js = nc.jetstream();
165
const kv = await js.views.kv("app-config");
166
const sc = StringCodec();
167
const jc = JSONCodec();
168
169
// Basic put/get operations
170
await kv.put("api.url", sc.encode("https://api.example.com"));
171
const entry = await kv.get("api.url");
172
if (entry) {
173
console.log(`API URL: ${sc.decode(entry.value)}`);
174
console.log(`Revision: ${entry.revision}`);
175
}
176
177
// JSON data storage
178
const config = { timeout: 30, retries: 3 };
179
await kv.put("service.config", jc.encode(config));
180
181
// Conditional operations
182
try {
183
// Create only if key doesn't exist
184
await kv.create("counter", sc.encode("1"));
185
} catch (err) {
186
console.log("Key already exists");
187
}
188
189
// Update with revision check (optimistic locking)
190
const current = await kv.get("counter");
191
if (current) {
192
const newValue = parseInt(sc.decode(current.value)) + 1;
193
await kv.update("counter", sc.encode(newValue.toString()), current.revision);
194
}
195
196
// Delete operations
197
await kv.delete("temp.data"); // Soft delete (in history)
198
await kv.purge("secret.key"); // Hard delete (removed completely)
199
```
200
201
### History and Watching
202
203
Track key changes over time and monitor real-time updates.
204
205
```typescript { .api }
206
interface KV {
207
/**
208
* Get history of changes for a key
209
* @param key - Key to get history for
210
* @returns Promise resolving to async iterator of KV entries
211
*/
212
history(key: string): Promise<QueuedIterator<KvEntry>>;
213
214
/**
215
* Watch for changes to keys
216
* @param opts - Watch options including key filters
217
* @returns Promise resolving to async iterator of KV entries
218
*/
219
watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;
220
221
/**
222
* Watch for key name changes only
223
* @param opts - Watch options
224
* @returns Promise resolving to async iterator of key names
225
*/
226
keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;
227
}
228
229
interface KvWatchOptions {
230
/** Key pattern to watch (supports wildcards) */
231
key?: string;
232
/** Include historical entries */
233
include?: KvWatchInclude;
234
/** Resume from specific revision */
235
resumeFromRevision?: number;
236
/** Only watch for new updates */
237
updatesOnly?: boolean;
238
/** Headers to include with watch */
239
headers_only?: boolean;
240
/** Ignore deletes in watch stream */
241
ignore_deletes?: boolean;
242
}
243
244
enum KvWatchInclude {
245
/** Include all entries */
246
AllHistory = "all_history",
247
/** Include only updates after resume point */
248
UpdatesOnly = "updates_only",
249
/** Include last value per key */
250
LastPerKey = "last_per_key"
251
}
252
253
interface QueuedIterator<T> {
254
/** Get next item from iterator */
255
next(): Promise<IteratorResult<T>>;
256
/** Stop the iterator */
257
stop(): void;
258
/** Async iterator interface */
259
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
260
}
261
```
262
263
**Usage Examples:**
264
265
```typescript
266
import { connect, StringCodec, KvWatchInclude } from "nats";
267
268
const nc = await connect();
269
const js = nc.jetstream();
270
const kv = await js.views.kv("user-sessions");
271
const sc = StringCodec();
272
273
// Watch all changes to KV store
274
const watcher = await kv.watch();
275
(async () => {
276
for await (const entry of watcher) {
277
console.log(`Key: ${entry.key}, Operation: ${entry.operation}`);
278
if (entry.operation === "PUT") {
279
console.log(`Value: ${sc.decode(entry.value)}`);
280
}
281
}
282
})();
283
284
// Watch specific key pattern
285
const userWatcher = await kv.watch({
286
key: "user.*",
287
include: KvWatchInclude.UpdatesOnly
288
});
289
290
(async () => {
291
for await (const entry of userWatcher) {
292
console.log(`User ${entry.key} updated: ${sc.decode(entry.value)}`);
293
}
294
})();
295
296
// Watch for key changes only
297
const keyWatcher = await kv.keys({ key: "config.*" });
298
(async () => {
299
for await (const key of keyWatcher) {
300
console.log(`Config key changed: ${key}`);
301
}
302
})();
303
304
// Get key history
305
const history = await kv.history("user.123");
306
console.log("History for user.123:");
307
for await (const entry of history) {
308
console.log(`Revision ${entry.revision}: ${sc.decode(entry.value)} (${entry.created})`);
309
}
310
311
// Resume watching from specific revision
312
const resumeWatcher = await kv.watch({
313
key: "events.*",
314
resumeFromRevision: 1000,
315
include: KvWatchInclude.UpdatesOnly
316
});
317
```
318
319
### Management Operations
320
321
Administrative operations for managing KV store lifecycle and status.
322
323
```typescript { .api }
324
interface KV {
325
/**
326
* Get KV store status and statistics
327
* @returns Promise resolving to KV status information
328
*/
329
status(): Promise<KvStatus>;
330
331
/**
332
* Close KV store (cleanup resources)
333
*/
334
close(): Promise<void>;
335
336
/**
337
* Destroy KV store (delete underlying stream)
338
* @returns Promise resolving to true if destroyed
339
*/
340
destroy(): Promise<boolean>;
341
}
342
343
interface RoKV {
344
/** Read-only interface with subset of KV operations */
345
get(key: string): Promise<KvEntry | null>;
346
history(key: string): Promise<QueuedIterator<KvEntry>>;
347
watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;
348
keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;
349
status(): Promise<KvStatus>;
350
}
351
352
interface KvStatus {
353
/** Bucket name */
354
bucket: string;
355
/** Number of entries */
356
values: number;
357
/** Bucket configuration */
358
history: number;
359
/** TTL setting */
360
ttl: number;
361
/** Bucket size in bytes */
362
bucket_location?: string;
363
/** Underlying stream info */
364
streamInfo: StreamInfo;
365
/** Compression enabled */
366
compression: boolean;
367
/** Storage type */
368
storage: StorageType;
369
/** Number of replicas */
370
replicas: number;
371
/** Backing stream */
372
backingStore: string;
373
}
374
375
interface KvLimits {
376
/** Maximum keys */
377
max_keys?: number;
378
/** Maximum history per key */
379
max_history?: number;
380
/** Maximum value size */
381
max_value_size?: number;
382
/** Maximum bucket size */
383
max_bucket_size?: number;
384
/** Minimum TTL */
385
min_ttl?: number;
386
/** Maximum TTL */
387
max_ttl?: number;
388
}
389
```
390
391
**Usage Examples:**
392
393
```typescript
394
import { connect } from "nats";
395
396
const nc = await connect();
397
const js = nc.jetstream();
398
const kv = await js.views.kv("metrics");
399
400
// Check KV store status
401
const status = await kv.status();
402
console.log(`Bucket: ${status.bucket}`);
403
console.log(`Values: ${status.values}`);
404
console.log(`History: ${status.history}`);
405
console.log(`TTL: ${status.ttl}ns`);
406
console.log(`Storage: ${status.storage}`);
407
console.log(`Replicas: ${status.replicas}`);
408
409
// Monitor bucket statistics
410
setInterval(async () => {
411
const status = await kv.status();
412
console.log(`KV entries: ${status.values}, Stream messages: ${status.streamInfo.state.messages}`);
413
}, 10000);
414
415
// Cleanup operations
416
await kv.close(); // Close and cleanup resources
417
await kv.destroy(); // Delete the entire KV store
418
```
419
420
### Codecs for KV Operations
421
422
Type-safe encoding/decoding for KV values.
423
424
```typescript { .api }
425
interface KvCodec<T> {
426
encode(value: T): Uint8Array;
427
decode(data: Uint8Array): T;
428
}
429
430
interface KvCodecs {
431
/** String codec */
432
strings: KvCodec<string>;
433
/** JSON codec */
434
json<T = unknown>(): KvCodec<T>;
435
/** Binary codec (pass-through) */
436
binary: KvCodec<Uint8Array>;
437
}
438
439
/** Built-in KV codecs */
440
const NoopKvCodecs: KvCodecs;
441
442
/** Base64 key encoding codec */
443
const Base64KeyCodec: {
444
encode(key: string): string;
445
decode(encoded: string): string;
446
};
447
```
448
449
**Usage Examples:**
450
451
```typescript
452
import { connect, JSONCodec, StringCodec } from "nats";
453
454
const nc = await connect();
455
const js = nc.jetstream();
456
const kv = await js.views.kv("typed-data");
457
458
// Type-safe JSON operations
459
interface UserPrefs {
460
theme: string;
461
notifications: boolean;
462
language: string;
463
}
464
465
const jsonCodec = JSONCodec<UserPrefs>();
466
const userPrefs: UserPrefs = {
467
theme: "dark",
468
notifications: true,
469
language: "en"
470
};
471
472
// Store typed JSON data
473
await kv.put("user.123.prefs", jsonCodec.encode(userPrefs));
474
475
// Retrieve and decode typed data
476
const entry = await kv.get("user.123.prefs");
477
if (entry) {
478
const prefs = jsonCodec.decode(entry.value); // Type is UserPrefs
479
console.log(`Theme: ${prefs.theme}, Notifications: ${prefs.notifications}`);
480
}
481
482
// String operations
483
const stringCodec = StringCodec();
484
await kv.put("app.version", stringCodec.encode("1.2.3"));
485
486
const versionEntry = await kv.get("app.version");
487
if (versionEntry) {
488
const version = stringCodec.decode(versionEntry.value);
489
console.log(`App version: ${version}`);
490
}
491
```
492
493
## Advanced Patterns
494
495
### Atomic Operations
496
497
Implement atomic updates and conflict resolution.
498
499
```typescript
500
// Atomic counter increment
501
async function incrementCounter(kv: KV, key: string): Promise<number> {
502
while (true) {
503
const current = await kv.get(key);
504
const currentValue = current ? parseInt(StringCodec().decode(current.value)) : 0;
505
const newValue = currentValue + 1;
506
507
try {
508
if (current) {
509
await kv.update(key, StringCodec().encode(newValue.toString()), current.revision);
510
} else {
511
await kv.create(key, StringCodec().encode(newValue.toString()));
512
}
513
return newValue;
514
} catch (err) {
515
// Conflict detected, retry
516
continue;
517
}
518
}
519
}
520
521
// Conditional updates with retry logic
522
async function conditionalUpdate<T>(
523
kv: KV,
524
key: string,
525
updateFn: (current: T | null) => T,
526
codec: KvCodec<T>,
527
maxRetries = 10
528
): Promise<number> {
529
for (let i = 0; i < maxRetries; i++) {
530
const current = await kv.get(key);
531
const currentValue = current ? codec.decode(current.value) : null;
532
const newValue = updateFn(currentValue);
533
534
try {
535
if (current) {
536
return await kv.update(key, codec.encode(newValue), current.revision);
537
} else {
538
return await kv.create(key, codec.encode(newValue));
539
}
540
} catch (err) {
541
if (i === maxRetries - 1) throw err;
542
// Wait with exponential backoff
543
await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 100));
544
}
545
}
546
throw new Error("Max retries exceeded");
547
}
548
```
549
550
### Distributed Locking
551
552
Implement distributed locks using KV operations.
553
554
```typescript
555
class DistributedLock {
556
constructor(private kv: KV, private lockKey: string, private ttl: number) {}
557
558
async acquire(holder: string, timeout = 5000): Promise<boolean> {
559
const deadline = Date.now() + timeout;
560
561
while (Date.now() < deadline) {
562
try {
563
// Try to create lock entry
564
await this.kv.create(this.lockKey, StringCodec().encode(holder));
565
566
// Set up TTL renewal
567
this.renewLock(holder);
568
return true;
569
} catch (err) {
570
// Lock exists, check if expired
571
const current = await this.kv.get(this.lockKey);
572
if (current && Date.now() - current.created.getTime() > this.ttl) {
573
// Lock expired, try to acquire it
574
try {
575
await this.kv.update(this.lockKey, StringCodec().encode(holder), current.revision);
576
this.renewLock(holder);
577
return true;
578
} catch (updateErr) {
579
// Someone else got it, continue loop
580
}
581
}
582
583
// Wait before retry
584
await new Promise(resolve => setTimeout(resolve, 100));
585
}
586
}
587
588
return false;
589
}
590
591
async release(holder: string): Promise<boolean> {
592
try {
593
const current = await this.kv.get(this.lockKey);
594
if (current && StringCodec().decode(current.value) === holder) {
595
await this.kv.delete(this.lockKey);
596
return true;
597
}
598
return false;
599
} catch (err) {
600
return false;
601
}
602
}
603
604
private renewLock(holder: string): void {
605
// Implementation would periodically update the lock entry
606
// to maintain ownership while the holder is active
607
}
608
}
609
```