or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection.mdindex.mdjetstream.mdkv-store.mdmessaging.mdobject-store.mdservices.md

kv-store.mddocs/

0

# Key-Value Store

1

2

NATS Key-Value Store provides a high-level abstraction for storing and retrieving key-value data with history tracking, watch capabilities, and conflict resolution built on JetStream streams.

3

4

## Capabilities

5

6

### KV Store Access

7

8

Create and access Key-Value stores through JetStream views.

9

10

```typescript { .api }

11

/**

12

* Get or create a Key-Value store

13

* @param name - KV bucket name

14

* @param opts - KV configuration options

15

* @returns Promise resolving to KV store instance

16

*/

17

kv(name: string, opts?: Partial<KvOptions>): Promise<KV>;

18

19

interface Views {

20

kv: (name: string, opts?: Partial<KvOptions>) => Promise<KV>;

21

os: (name: string, opts?: Partial<ObjectStoreOptions>) => Promise<ObjectStore>;

22

}

23

24

interface KvOptions {

25

/** Maximum number of history entries per key (default: 1) */

26

history?: number;

27

/** Time-to-live for entries in nanoseconds */

28

ttl?: number;

29

/** Maximum size of values in bytes */

30

max_value_size?: number;

31

/** Maximum total bucket size in bytes */

32

max_bucket_size?: number;

33

/** Number of replicas for HA (default: 1) */

34

replicas?: number;

35

/** Bucket description */

36

description?: string;

37

/** Storage type (file or memory) */

38

storage?: StorageType;

39

/** Enable compression */

40

compression?: boolean;

41

/** Bucket placement constraints */

42

placement?: Placement;

43

/** Custom republish configuration */

44

republish?: Republish;

45

/** Mirror another KV bucket */

46

mirror?: StreamSource;

47

/** Source other KV buckets */

48

sources?: StreamSource[];

49

}

50

```

51

52

**Usage Examples:**

53

54

```typescript

55

import { connect } from "nats";

56

57

const nc = await connect();

58

const js = nc.jetstream();

59

60

// Create or get KV store

61

const kv = await js.views.kv("user-preferences");

62

63

// Create KV with options

64

const kv = await js.views.kv("session-cache", {

65

history: 5, // Keep 5 versions per key

66

ttl: 60 * 60 * 1000 * 1000 * 1000, // 1 hour TTL in nanoseconds

67

max_value_size: 1024 * 1024, // 1MB max value size

68

description: "User session cache"

69

});

70

71

// Read-only KV access

72

const roKv = await js.views.kv("readonly-config");

73

```

74

75

### Basic Operations

76

77

Core key-value operations for storing and retrieving data.

78

79

```typescript { .api }

80

interface KV {

81

/**

82

* Get value for key

83

* @param key - Key to retrieve

84

* @returns Promise resolving to KV entry or null if not found

85

*/

86

get(key: string): Promise<KvEntry | null>;

87

88

/**

89

* Set value for key

90

* @param key - Key to set

91

* @param value - Value as bytes

92

* @param opts - Put options

93

* @returns Promise resolving to revision number

94

*/

95

put(key: string, value: Uint8Array, opts?: Partial<KvPutOptions>): Promise<number>;

96

97

/**

98

* Create key only if it doesn't exist

99

* @param key - Key to create

100

* @param value - Initial value

101

* @returns Promise resolving to revision number or throws if key exists

102

*/

103

create(key: string, value: Uint8Array): Promise<number>;

104

105

/**

106

* Update key only if revision matches

107

* @param key - Key to update

108

* @param value - New value

109

* @param revision - Expected current revision

110

* @returns Promise resolving to new revision number

111

*/

112

update(key: string, value: Uint8Array, revision: number): Promise<number>;

113

114

/**

115

* Delete key (soft delete, keeps in history)

116

* @param key - Key to delete

117

* @param opts - Delete options

118

*/

119

delete(key: string, opts?: Partial<KvDeleteOptions>): Promise<void>;

120

121

/**

122

* Purge key (hard delete, removes from history)

123

* @param key - Key to purge completely

124

*/

125

purge(key: string): Promise<void>;

126

}

127

128

interface KvEntry {

129

/** Bucket name */

130

bucket: string;

131

/** Entry key */

132

key: string;

133

/** Entry value as bytes */

134

value: Uint8Array;

135

/** Entry revision number */

136

revision: number;

137

/** Entry creation timestamp */

138

created: Date;

139

/** JetStream sequence number */

140

sequence: number;

141

/** True if entry represents a delete operation */

142

delta: number;

143

/** Entry operation type */

144

operation: "PUT" | "DEL" | "PURGE";

145

}

146

147

interface KvPutOptions {

148

/** Previous revision for conditional update */

149

previousRevision?: number;

150

}

151

152

interface KvDeleteOptions {

153

/** Previous revision for conditional delete */

154

previousRevision?: number;

155

}

156

```

157

158

**Usage Examples:**

159

160

```typescript

161

import { connect, StringCodec, JSONCodec } from "nats";

162

163

const nc = await connect();

164

const js = nc.jetstream();

165

const kv = await js.views.kv("app-config");

166

const sc = StringCodec();

167

const jc = JSONCodec();

168

169

// Basic put/get operations

170

await kv.put("api.url", sc.encode("https://api.example.com"));

171

const entry = await kv.get("api.url");

172

if (entry) {

173

console.log(`API URL: ${sc.decode(entry.value)}`);

174

console.log(`Revision: ${entry.revision}`);

175

}

176

177

// JSON data storage

178

const config = { timeout: 30, retries: 3 };

179

await kv.put("service.config", jc.encode(config));

180

181

// Conditional operations

182

try {

183

// Create only if key doesn't exist

184

await kv.create("counter", sc.encode("1"));

185

} catch (err) {

186

console.log("Key already exists");

187

}

188

189

// Update with revision check (optimistic locking)

190

const current = await kv.get("counter");

191

if (current) {

192

const newValue = parseInt(sc.decode(current.value)) + 1;

193

await kv.update("counter", sc.encode(newValue.toString()), current.revision);

194

}

195

196

// Delete operations

197

await kv.delete("temp.data"); // Soft delete (in history)

198

await kv.purge("secret.key"); // Hard delete (removed completely)

199

```

200

201

### History and Watching

202

203

Track key changes over time and monitor real-time updates.

204

205

```typescript { .api }

206

interface KV {

207

/**

208

* Get history of changes for a key

209

* @param key - Key to get history for

210

* @returns Promise resolving to async iterator of KV entries

211

*/

212

history(key: string): Promise<QueuedIterator<KvEntry>>;

213

214

/**

215

* Watch for changes to keys

216

* @param opts - Watch options including key filters

217

* @returns Promise resolving to async iterator of KV entries

218

*/

219

watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;

220

221

/**

222

* Watch for key name changes only

223

* @param opts - Watch options

224

* @returns Promise resolving to async iterator of key names

225

*/

226

keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;

227

}

228

229

interface KvWatchOptions {

230

/** Key pattern to watch (supports wildcards) */

231

key?: string;

232

/** Include historical entries */

233

include?: KvWatchInclude;

234

/** Resume from specific revision */

235

resumeFromRevision?: number;

236

/** Only watch for new updates */

237

updatesOnly?: boolean;

238

/** Headers to include with watch */

239

headers_only?: boolean;

240

/** Ignore deletes in watch stream */

241

ignore_deletes?: boolean;

242

}

243

244

enum KvWatchInclude {

245

/** Include all entries */

246

AllHistory = "all_history",

247

/** Include only updates after resume point */

248

UpdatesOnly = "updates_only",

249

/** Include last value per key */

250

LastPerKey = "last_per_key"

251

}

252

253

interface QueuedIterator<T> {

254

/** Get next item from iterator */

255

next(): Promise<IteratorResult<T>>;

256

/** Stop the iterator */

257

stop(): void;

258

/** Async iterator interface */

259

[Symbol.asyncIterator](): AsyncIterableIterator<T>;

260

}

261

```

262

263

**Usage Examples:**

264

265

```typescript

266

import { connect, StringCodec, KvWatchInclude } from "nats";

267

268

const nc = await connect();

269

const js = nc.jetstream();

270

const kv = await js.views.kv("user-sessions");

271

const sc = StringCodec();

272

273

// Watch all changes to KV store

274

const watcher = await kv.watch();

275

(async () => {

276

for await (const entry of watcher) {

277

console.log(`Key: ${entry.key}, Operation: ${entry.operation}`);

278

if (entry.operation === "PUT") {

279

console.log(`Value: ${sc.decode(entry.value)}`);

280

}

281

}

282

})();

283

284

// Watch specific key pattern

285

const userWatcher = await kv.watch({

286

key: "user.*",

287

include: KvWatchInclude.UpdatesOnly

288

});

289

290

(async () => {

291

for await (const entry of userWatcher) {

292

console.log(`User ${entry.key} updated: ${sc.decode(entry.value)}`);

293

}

294

})();

295

296

// Watch for key changes only

297

const keyWatcher = await kv.keys({ key: "config.*" });

298

(async () => {

299

for await (const key of keyWatcher) {

300

console.log(`Config key changed: ${key}`);

301

}

302

})();

303

304

// Get key history

305

const history = await kv.history("user.123");

306

console.log("History for user.123:");

307

for await (const entry of history) {

308

console.log(`Revision ${entry.revision}: ${sc.decode(entry.value)} (${entry.created})`);

309

}

310

311

// Resume watching from specific revision

312

const resumeWatcher = await kv.watch({

313

key: "events.*",

314

resumeFromRevision: 1000,

315

include: KvWatchInclude.UpdatesOnly

316

});

317

```

318

319

### Management Operations

320

321

Administrative operations for managing KV store lifecycle and status.

322

323

```typescript { .api }

324

interface KV {

325

/**

326

* Get KV store status and statistics

327

* @returns Promise resolving to KV status information

328

*/

329

status(): Promise<KvStatus>;

330

331

/**

332

* Close KV store (cleanup resources)

333

*/

334

close(): Promise<void>;

335

336

/**

337

* Destroy KV store (delete underlying stream)

338

* @returns Promise resolving to true if destroyed

339

*/

340

destroy(): Promise<boolean>;

341

}

342

343

interface RoKV {

344

/** Read-only interface with subset of KV operations */

345

get(key: string): Promise<KvEntry | null>;

346

history(key: string): Promise<QueuedIterator<KvEntry>>;

347

watch(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<KvEntry>>;

348

keys(opts?: Partial<KvWatchOptions>): Promise<QueuedIterator<string>>;

349

status(): Promise<KvStatus>;

350

}

351

352

interface KvStatus {

353

/** Bucket name */

354

bucket: string;

355

/** Number of entries */

356

values: number;

357

/** Bucket configuration */

358

history: number;

359

/** TTL setting */

360

ttl: number;

361

/** Bucket size in bytes */

362

bucket_location?: string;

363

/** Underlying stream info */

364

streamInfo: StreamInfo;

365

/** Compression enabled */

366

compression: boolean;

367

/** Storage type */

368

storage: StorageType;

369

/** Number of replicas */

370

replicas: number;

371

/** Backing stream */

372

backingStore: string;

373

}

374

375

interface KvLimits {

376

/** Maximum keys */

377

max_keys?: number;

378

/** Maximum history per key */

379

max_history?: number;

380

/** Maximum value size */

381

max_value_size?: number;

382

/** Maximum bucket size */

383

max_bucket_size?: number;

384

/** Minimum TTL */

385

min_ttl?: number;

386

/** Maximum TTL */

387

max_ttl?: number;

388

}

389

```

390

391

**Usage Examples:**

392

393

```typescript

394

import { connect } from "nats";

395

396

const nc = await connect();

397

const js = nc.jetstream();

398

const kv = await js.views.kv("metrics");

399

400

// Check KV store status

401

const status = await kv.status();

402

console.log(`Bucket: ${status.bucket}`);

403

console.log(`Values: ${status.values}`);

404

console.log(`History: ${status.history}`);

405

console.log(`TTL: ${status.ttl}ns`);

406

console.log(`Storage: ${status.storage}`);

407

console.log(`Replicas: ${status.replicas}`);

408

409

// Monitor bucket statistics

410

setInterval(async () => {

411

const status = await kv.status();

412

console.log(`KV entries: ${status.values}, Stream messages: ${status.streamInfo.state.messages}`);

413

}, 10000);

414

415

// Cleanup operations

416

await kv.close(); // Close and cleanup resources

417

await kv.destroy(); // Delete the entire KV store

418

```

419

420

### Codecs for KV Operations

421

422

Type-safe encoding/decoding for KV values.

423

424

```typescript { .api }

425

interface KvCodec<T> {

426

encode(value: T): Uint8Array;

427

decode(data: Uint8Array): T;

428

}

429

430

interface KvCodecs {

431

/** String codec */

432

strings: KvCodec<string>;

433

/** JSON codec */

434

json<T = unknown>(): KvCodec<T>;

435

/** Binary codec (pass-through) */

436

binary: KvCodec<Uint8Array>;

437

}

438

439

/** Built-in KV codecs */

440

const NoopKvCodecs: KvCodecs;

441

442

/** Base64 key encoding codec */

443

const Base64KeyCodec: {

444

encode(key: string): string;

445

decode(encoded: string): string;

446

};

447

```

448

449

**Usage Examples:**

450

451

```typescript

452

import { connect, JSONCodec, StringCodec } from "nats";

453

454

const nc = await connect();

455

const js = nc.jetstream();

456

const kv = await js.views.kv("typed-data");

457

458

// Type-safe JSON operations

459

interface UserPrefs {

460

theme: string;

461

notifications: boolean;

462

language: string;

463

}

464

465

const jsonCodec = JSONCodec<UserPrefs>();

466

const userPrefs: UserPrefs = {

467

theme: "dark",

468

notifications: true,

469

language: "en"

470

};

471

472

// Store typed JSON data

473

await kv.put("user.123.prefs", jsonCodec.encode(userPrefs));

474

475

// Retrieve and decode typed data

476

const entry = await kv.get("user.123.prefs");

477

if (entry) {

478

const prefs = jsonCodec.decode(entry.value); // Type is UserPrefs

479

console.log(`Theme: ${prefs.theme}, Notifications: ${prefs.notifications}`);

480

}

481

482

// String operations

483

const stringCodec = StringCodec();

484

await kv.put("app.version", stringCodec.encode("1.2.3"));

485

486

const versionEntry = await kv.get("app.version");

487

if (versionEntry) {

488

const version = stringCodec.decode(versionEntry.value);

489

console.log(`App version: ${version}`);

490

}

491

```

492

493

## Advanced Patterns

494

495

### Atomic Operations

496

497

Implement atomic updates and conflict resolution.

498

499

```typescript

500

// Atomic counter increment

501

async function incrementCounter(kv: KV, key: string): Promise<number> {

502

while (true) {

503

const current = await kv.get(key);

504

const currentValue = current ? parseInt(StringCodec().decode(current.value)) : 0;

505

const newValue = currentValue + 1;

506

507

try {

508

if (current) {

509

await kv.update(key, StringCodec().encode(newValue.toString()), current.revision);

510

} else {

511

await kv.create(key, StringCodec().encode(newValue.toString()));

512

}

513

return newValue;

514

} catch (err) {

515

// Conflict detected, retry

516

continue;

517

}

518

}

519

}

520

521

// Conditional updates with retry logic

522

async function conditionalUpdate<T>(

523

kv: KV,

524

key: string,

525

updateFn: (current: T | null) => T,

526

codec: KvCodec<T>,

527

maxRetries = 10

528

): Promise<number> {

529

for (let i = 0; i < maxRetries; i++) {

530

const current = await kv.get(key);

531

const currentValue = current ? codec.decode(current.value) : null;

532

const newValue = updateFn(currentValue);

533

534

try {

535

if (current) {

536

return await kv.update(key, codec.encode(newValue), current.revision);

537

} else {

538

return await kv.create(key, codec.encode(newValue));

539

}

540

} catch (err) {

541

if (i === maxRetries - 1) throw err;

542

// Wait with exponential backoff

543

await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 100));

544

}

545

}

546

throw new Error("Max retries exceeded");

547

}

548

```

549

550

### Distributed Locking

551

552

Implement distributed locks using KV operations.

553

554

```typescript

555

class DistributedLock {

556

constructor(private kv: KV, private lockKey: string, private ttl: number) {}

557

558

async acquire(holder: string, timeout = 5000): Promise<boolean> {

559

const deadline = Date.now() + timeout;

560

561

while (Date.now() < deadline) {

562

try {

563

// Try to create lock entry

564

await this.kv.create(this.lockKey, StringCodec().encode(holder));

565

566

// Set up TTL renewal

567

this.renewLock(holder);

568

return true;

569

} catch (err) {

570

// Lock exists, check if expired

571

const current = await this.kv.get(this.lockKey);

572

if (current && Date.now() - current.created.getTime() > this.ttl) {

573

// Lock expired, try to acquire it

574

try {

575

await this.kv.update(this.lockKey, StringCodec().encode(holder), current.revision);

576

this.renewLock(holder);

577

return true;

578

} catch (updateErr) {

579

// Someone else got it, continue loop

580

}

581

}

582

583

// Wait before retry

584

await new Promise(resolve => setTimeout(resolve, 100));

585

}

586

}

587

588

return false;

589

}

590

591

async release(holder: string): Promise<boolean> {

592

try {

593

const current = await this.kv.get(this.lockKey);

594

if (current && StringCodec().decode(current.value) === holder) {

595

await this.kv.delete(this.lockKey);

596

return true;

597

}

598

return false;

599

} catch (err) {

600

return false;

601

}

602

}

603

604

private renewLock(holder: string): void {

605

// Implementation would periodically update the lock entry

606

// to maintain ownership while the holder is active

607

}

608

}

609

```