or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

key-value-store.mddocs/

0

# Key-Value Store

1

2

Distributed key-value storage built on JetStream streams. Provides atomic operations, conditional updates, history tracking, and watch capabilities for stateful applications.

3

4

## Capabilities

5

6

### Key-Value Operations

7

8

Core operations for storing and retrieving key-value pairs with atomic guarantees.

9

10

```python { .api }

11

class KeyValue:

12

async def get(

13

self,

14

key: str,

15

revision: Optional[int] = None,

16

validate_keys: bool = True

17

) -> Entry:

18

"""

19

Get value by key, optionally at specific revision.

20

21

Parameters:

22

- key: Key to retrieve

23

- revision: Specific revision number (optional)

24

25

Returns:

26

Key-value entry with metadata

27

28

Raises:

29

- KeyNotFoundError: Key does not exist

30

- KeyDeletedError: Key was deleted

31

"""

32

33

async def put(

34

self,

35

key: str,

36

value: bytes,

37

validate_keys: bool = True

38

) -> int:

39

"""

40

Store key-value pair.

41

42

Parameters:

43

- key: Key to store

44

- value: Value data as bytes

45

- validate_keys: Validate key format

46

47

Returns:

48

New revision number

49

"""

50

51

async def update(

52

self,

53

key: str,

54

value: bytes,

55

last: Optional[int] = None,

56

validate_keys: bool = True

57

) -> int:

58

"""

59

Update key-value pair with conditional revision check.

60

61

Parameters:

62

- key: Key to update

63

- value: New value data as bytes

64

- last: Expected current revision for conditional update

65

- validate_keys: Validate key format

66

67

Returns:

68

New revision number

69

70

Raises:

71

- KeyWrongLastSequenceError: Revision mismatch for conditional update

72

"""

73

74

async def create(

75

self,

76

key: str,

77

value: bytes,

78

validate_keys: bool = True

79

) -> int:

80

"""

81

Create new key-value pair, fails if key exists.

82

83

Parameters:

84

- key: Key to create

85

- value: Value data as bytes

86

87

Returns:

88

Revision number of created entry

89

90

Raises:

91

- KeyWrongLastSequenceError: Key already exists

92

"""

93

94

async def update(self, key: str, value: bytes, revision: int) -> int:

95

"""

96

Update existing key-value pair with expected revision.

97

98

Parameters:

99

- key: Key to update

100

- value: New value data

101

- revision: Expected current revision

102

103

Returns:

104

New revision number

105

106

Raises:

107

- KeyWrongLastSequenceError: Revision mismatch

108

- KeyNotFoundError: Key does not exist

109

"""

110

111

async def delete(

112

self,

113

key: str,

114

last: Optional[int] = None,

115

validate_keys: bool = True

116

) -> bool:

117

"""

118

Delete key with optional conditional delete.

119

120

Parameters:

121

- key: Key to delete

122

- last: Expected current revision for conditional delete

123

- validate_keys: Validate key format

124

125

Returns:

126

True if key was deleted

127

128

Raises:

129

- KeyWrongLastSequenceError: Revision mismatch for conditional delete

130

- KeyNotFoundError: Key does not exist

131

"""

132

```

133

134

#### Usage Examples

135

136

```python

137

import asyncio

138

import nats

139

140

async def main():

141

nc = await nats.connect()

142

js = nc.jetstream()

143

144

# Get or create key-value store

145

kv = await js.key_value("user-sessions")

146

147

# Store user session

148

session_data = b'{"user_id": 123, "login_time": "2024-01-01T10:00:00Z"}'

149

revision = await kv.put("session:user123", session_data)

150

print(f"Stored session at revision {revision}")

151

152

# Retrieve session

153

entry = await kv.get("session:user123")

154

print(f"Session data: {entry.value.decode()}")

155

print(f"Created at: {entry.created}")

156

157

# Conditional update

158

try:

159

updated_data = b'{"user_id": 123, "last_activity": "2024-01-01T11:00:00Z"}'

160

new_revision = await kv.update("session:user123", updated_data, entry.revision)

161

print(f"Updated to revision {new_revision}")

162

except KeyWrongLastSequenceError:

163

print("Session was modified by another process")

164

165

# Create-only operation

166

try:

167

await kv.create("session:user456", b'{"user_id": 456}')

168

print("Created new session")

169

except KeyWrongLastSequenceError:

170

print("Session already exists")

171

172

# Delete session

173

await kv.delete("session:user123")

174

```

175

176

### History and Versioning

177

178

Access key history and manage versioning.

179

180

```python { .api }

181

class KeyValue:

182

async def history(self, key: str) -> List[Entry]:

183

"""

184

Get complete history for key.

185

186

Parameters:

187

- key: Key to get history for

188

189

Returns:

190

List of entries in chronological order

191

192

Raises:

193

- KeyNotFoundError: Key has no history

194

"""

195

196

async def purge(self, key: str) -> bool:

197

"""

198

Purge all history for key (keeps current value).

199

200

Parameters:

201

- key: Key to purge history for

202

203

Returns:

204

True if history was purged

205

"""

206

207

async def purge_deletes(self, olderthan: int = 30*60) -> bool:

208

"""

209

Purge deleted keys older than specified time.

210

211

Parameters:

212

- olderthan: Age threshold in seconds (default 30 minutes)

213

214

Returns:

215

True if purge completed

216

"""

217

```

218

219

#### Usage Examples

220

221

```python

222

# Get key history

223

history = await kv.history("session:user123")

224

for entry in history:

225

if entry.operation == "PUT":

226

print(f"Revision {entry.revision}: {entry.value.decode()}")

227

elif entry.operation == "DEL":

228

print(f"Revision {entry.revision}: DELETED")

229

230

# Purge old versions but keep current

231

await kv.purge("session:user123")

232

233

# Clean up old deleted keys

234

await kv.purge_deletes(olderthan=24*60*60) # 24 hours

235

```

236

237

### Key Listing and Filtering

238

239

List and filter keys in the store.

240

241

```python { .api }

242

class KeyValue:

243

async def keys(self, filters: List[str] = None, **kwargs) -> List[str]:

244

"""

245

List keys in store with optional filtering.

246

247

Parameters:

248

- filters: List of subject filters (wildcard patterns)

249

250

Returns:

251

List of key names matching filters

252

253

Raises:

254

- NoKeysError: No keys found

255

"""

256

```

257

258

#### Usage Examples

259

260

```python

261

# List all keys

262

all_keys = await kv.keys()

263

print(f"Total keys: {len(all_keys)}")

264

265

# List keys with pattern

266

session_keys = await kv.keys(filters=["session:*"])

267

for key in session_keys:

268

print(f"Session key: {key}")

269

270

# List user-specific keys

271

user_keys = await kv.keys(filters=["session:user123*", "profile:user123*"])

272

```

273

274

### Watching for Changes

275

276

Monitor key-value store for changes in real-time.

277

278

```python { .api }

279

class KeyValue:

280

async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]:

281

"""

282

Watch specific key for changes.

283

284

Parameters:

285

- key: Key to watch (supports wildcards)

286

287

Returns:

288

Async iterator yielding entries for changes

289

"""

290

291

async def watchall(self, **kwargs) -> AsyncIterator[Entry]:

292

"""

293

Watch all keys in store for changes.

294

295

Returns:

296

Async iterator yielding entries for all changes

297

"""

298

```

299

300

#### Usage Examples

301

302

```python

303

# Watch specific key

304

async def watch_user_session():

305

async for entry in kv.watch("session:user123"):

306

if entry.operation == "PUT":

307

print(f"Session updated: {entry.value.decode()}")

308

elif entry.operation == "DEL":

309

print("Session deleted")

310

311

# Watch all sessions

312

async def watch_all_sessions():

313

async for entry in kv.watch("session:*"):

314

print(f"Session change: {entry.key} -> {entry.operation}")

315

316

# Watch entire store

317

async def watch_store():

318

async for entry in kv.watchall():

319

print(f"Store change: {entry.key} = {entry.operation}")

320

321

# Run watchers concurrently

322

await asyncio.gather(

323

watch_user_session(),

324

watch_all_sessions(),

325

watch_store()

326

)

327

```

328

329

### Bucket Management

330

331

Get bucket status and statistics.

332

333

```python { .api }

334

class KeyValue:

335

async def status(self) -> BucketStatus:

336

"""

337

Get key-value bucket status and statistics.

338

339

Returns:

340

Bucket status with metadata and statistics

341

"""

342

```

343

344

#### Usage Examples

345

346

```python

347

# Get bucket information

348

status = await kv.status()

349

print(f"Bucket: {status.bucket}")

350

print(f"Values: {status.values}")

351

print(f"History: {status.history}")

352

print(f"TTL: {status.ttl}")

353

354

# Monitor bucket size

355

if status.bytes > 1024*1024*100: # 100MB

356

print("Bucket is getting large, consider cleanup")

357

```

358

359

### JetStream Integration

360

361

Create and manage key-value stores through JetStream context.

362

363

```python { .api }

364

class JetStreamContext:

365

async def key_value(self, bucket: str) -> KeyValue:

366

"""

367

Get existing key-value store.

368

369

Parameters:

370

- bucket: Bucket name

371

372

Returns:

373

KeyValue store instance

374

375

Raises:

376

- BucketNotFoundError: Bucket does not exist

377

"""

378

379

async def create_key_value(

380

self,

381

config: KeyValueConfig = None,

382

**params

383

) -> KeyValue:

384

"""

385

Create new key-value store.

386

387

Parameters:

388

- config: Complete bucket configuration

389

- **params: Individual configuration parameters

390

391

Returns:

392

KeyValue store instance

393

394

Raises:

395

- BadBucketError: Invalid configuration

396

"""

397

398

async def delete_key_value(self, bucket: str) -> bool:

399

"""

400

Delete key-value store and all data.

401

402

Parameters:

403

- bucket: Bucket name to delete

404

405

Returns:

406

True if bucket was deleted

407

"""

408

```

409

410

#### Usage Examples

411

412

```python

413

from nats.js.api import KeyValueConfig

414

from datetime import timedelta

415

416

# Create key-value store with configuration

417

kv_config = KeyValueConfig(

418

bucket="user-preferences",

419

description="User preference storage",

420

max_value_size=1024*1024, # 1MB per value

421

history=5, # Keep 5 versions

422

ttl=timedelta(days=30), # 30 day TTL

423

max_bytes=1024*1024*1024, # 1GB total

424

storage="file",

425

replicas=3

426

)

427

428

kv = await js.create_key_value(config=kv_config)

429

430

# Create simple store with parameters

431

kv = await js.create_key_value(

432

bucket="cache",

433

history=1,

434

ttl=timedelta(hours=1)

435

)

436

437

# Get existing store

438

kv = await js.key_value("user-preferences")

439

440

# Delete store

441

await js.delete_key_value("old-cache")

442

```

443

444

## Data Types

445

446

```python { .api }

447

from dataclasses import dataclass

448

from typing import Optional

449

from datetime import datetime, timedelta

450

451

@dataclass

452

class Entry:

453

"""Key-value store entry."""

454

key: str

455

value: bytes

456

revision: int

457

created: datetime

458

delta: int

459

operation: str # "PUT", "DEL", "PURGE"

460

bucket: str

461

462

@dataclass

463

class BucketStatus:

464

"""Key-value bucket status."""

465

bucket: str

466

values: int

467

history: int

468

ttl: Optional[timedelta]

469

bytes: int

470

backing_store: str # "JetStream"

471

472

@dataclass

473

class KeyValueConfig:

474

"""Key-value bucket configuration."""

475

bucket: str

476

description: Optional[str] = None

477

max_value_size: int = -1

478

history: int = 1

479

ttl: Optional[timedelta] = None

480

max_bytes: int = -1

481

storage: str = "file" # "file", "memory"

482

replicas: int = 1

483

placement: Optional[Placement] = None

484

republish: Optional[RePublish] = None

485

mirror: Optional[StreamSource] = None

486

sources: Optional[List[StreamSource]] = None

487

metadata: Optional[Dict[str, str]] = None

488

```

489

490

## Constants

491

492

```python { .api }

493

# Key-Value operation types

494

KV_OP = "KV-Operation"

495

KV_DEL = "DEL"

496

KV_PURGE = "PURGE"

497

498

# Maximum history entries

499

KV_MAX_HISTORY = 64

500

501

# Default values

502

DEFAULT_KV_HISTORY = 1

503

DEFAULT_KV_REPLICAS = 1

504

```