0
# Key-Value Store
1
2
Distributed key-value storage built on JetStream streams. Provides atomic operations, conditional updates, history tracking, and watch capabilities for stateful applications.
3
4
## Capabilities
5
6
### Key-Value Operations
7
8
Core operations for storing and retrieving key-value pairs with atomic guarantees.
9
10
```python { .api }
11
class KeyValue:
12
async def get(
13
self,
14
key: str,
15
revision: Optional[int] = None,
16
validate_keys: bool = True
17
) -> Entry:
18
"""
19
Get value by key, optionally at specific revision.
20
21
Parameters:
22
- key: Key to retrieve
23
- revision: Specific revision number (optional)
24
25
Returns:
26
Key-value entry with metadata
27
28
Raises:
29
- KeyNotFoundError: Key does not exist
30
- KeyDeletedError: Key was deleted
31
"""
32
33
async def put(
34
self,
35
key: str,
36
value: bytes,
37
validate_keys: bool = True
38
) -> int:
39
"""
40
Store key-value pair.
41
42
Parameters:
43
- key: Key to store
44
- value: Value data as bytes
45
- validate_keys: Validate key format
46
47
Returns:
48
New revision number
49
"""
50
51
async def update(
52
self,
53
key: str,
54
value: bytes,
55
last: Optional[int] = None,
56
validate_keys: bool = True
57
) -> int:
58
"""
59
Update key-value pair with conditional revision check.
60
61
Parameters:
62
- key: Key to update
63
- value: New value data as bytes
64
- last: Expected current revision for conditional update
65
- validate_keys: Validate key format
66
67
Returns:
68
New revision number
69
70
Raises:
71
- KeyWrongLastSequenceError: Revision mismatch for conditional update
72
"""
73
74
async def create(
75
self,
76
key: str,
77
value: bytes,
78
validate_keys: bool = True
79
) -> int:
80
"""
81
Create new key-value pair, fails if key exists.
82
83
Parameters:
84
- key: Key to create
85
- value: Value data as bytes
86
87
Returns:
88
Revision number of created entry
89
90
Raises:
91
- KeyWrongLastSequenceError: Key already exists
92
"""
93
94
async def update(self, key: str, value: bytes, revision: int) -> int:
95
"""
96
Update existing key-value pair with expected revision.
97
98
Parameters:
99
- key: Key to update
100
- value: New value data
101
- revision: Expected current revision
102
103
Returns:
104
New revision number
105
106
Raises:
107
- KeyWrongLastSequenceError: Revision mismatch
108
- KeyNotFoundError: Key does not exist
109
"""
110
111
async def delete(
112
self,
113
key: str,
114
last: Optional[int] = None,
115
validate_keys: bool = True
116
) -> bool:
117
"""
118
Delete key with optional conditional delete.
119
120
Parameters:
121
- key: Key to delete
122
- last: Expected current revision for conditional delete
123
- validate_keys: Validate key format
124
125
Returns:
126
True if key was deleted
127
128
Raises:
129
- KeyWrongLastSequenceError: Revision mismatch for conditional delete
130
- KeyNotFoundError: Key does not exist
131
"""
132
```
133
134
#### Usage Examples
135
136
```python
137
import asyncio
138
import nats
139
140
async def main():
141
nc = await nats.connect()
142
js = nc.jetstream()
143
144
# Get or create key-value store
145
kv = await js.key_value("user-sessions")
146
147
# Store user session
148
session_data = b'{"user_id": 123, "login_time": "2024-01-01T10:00:00Z"}'
149
revision = await kv.put("session:user123", session_data)
150
print(f"Stored session at revision {revision}")
151
152
# Retrieve session
153
entry = await kv.get("session:user123")
154
print(f"Session data: {entry.value.decode()}")
155
print(f"Created at: {entry.created}")
156
157
# Conditional update
158
try:
159
updated_data = b'{"user_id": 123, "last_activity": "2024-01-01T11:00:00Z"}'
160
new_revision = await kv.update("session:user123", updated_data, entry.revision)
161
print(f"Updated to revision {new_revision}")
162
except KeyWrongLastSequenceError:
163
print("Session was modified by another process")
164
165
# Create-only operation
166
try:
167
await kv.create("session:user456", b'{"user_id": 456}')
168
print("Created new session")
169
except KeyWrongLastSequenceError:
170
print("Session already exists")
171
172
# Delete session
173
await kv.delete("session:user123")
174
```
175
176
### History and Versioning
177
178
Access key history and manage versioning.
179
180
```python { .api }
181
class KeyValue:
182
async def history(self, key: str) -> List[Entry]:
183
"""
184
Get complete history for key.
185
186
Parameters:
187
- key: Key to get history for
188
189
Returns:
190
List of entries in chronological order
191
192
Raises:
193
- KeyNotFoundError: Key has no history
194
"""
195
196
async def purge(self, key: str) -> bool:
197
"""
198
Purge all history for key (keeps current value).
199
200
Parameters:
201
- key: Key to purge history for
202
203
Returns:
204
True if history was purged
205
"""
206
207
async def purge_deletes(self, olderthan: int = 30*60) -> bool:
208
"""
209
Purge deleted keys older than specified time.
210
211
Parameters:
212
- olderthan: Age threshold in seconds (default 30 minutes)
213
214
Returns:
215
True if purge completed
216
"""
217
```
218
219
#### Usage Examples
220
221
```python
222
# Get key history
223
history = await kv.history("session:user123")
224
for entry in history:
225
if entry.operation == "PUT":
226
print(f"Revision {entry.revision}: {entry.value.decode()}")
227
elif entry.operation == "DEL":
228
print(f"Revision {entry.revision}: DELETED")
229
230
# Purge old versions but keep current
231
await kv.purge("session:user123")
232
233
# Clean up old deleted keys
234
await kv.purge_deletes(olderthan=24*60*60) # 24 hours
235
```
236
237
### Key Listing and Filtering
238
239
List and filter keys in the store.
240
241
```python { .api }
242
class KeyValue:
243
async def keys(self, filters: List[str] = None, **kwargs) -> List[str]:
244
"""
245
List keys in store with optional filtering.
246
247
Parameters:
248
- filters: List of subject filters (wildcard patterns)
249
250
Returns:
251
List of key names matching filters
252
253
Raises:
254
- NoKeysError: No keys found
255
"""
256
```
257
258
#### Usage Examples
259
260
```python
261
# List all keys
262
all_keys = await kv.keys()
263
print(f"Total keys: {len(all_keys)}")
264
265
# List keys with pattern
266
session_keys = await kv.keys(filters=["session:*"])
267
for key in session_keys:
268
print(f"Session key: {key}")
269
270
# List user-specific keys
271
user_keys = await kv.keys(filters=["session:user123*", "profile:user123*"])
272
```
273
274
### Watching for Changes
275
276
Monitor key-value store for changes in real-time.
277
278
```python { .api }
279
class KeyValue:
280
async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]:
281
"""
282
Watch specific key for changes.
283
284
Parameters:
285
- key: Key to watch (supports wildcards)
286
287
Returns:
288
Async iterator yielding entries for changes
289
"""
290
291
async def watchall(self, **kwargs) -> AsyncIterator[Entry]:
292
"""
293
Watch all keys in store for changes.
294
295
Returns:
296
Async iterator yielding entries for all changes
297
"""
298
```
299
300
#### Usage Examples
301
302
```python
303
# Watch specific key
304
async def watch_user_session():
305
async for entry in kv.watch("session:user123"):
306
if entry.operation == "PUT":
307
print(f"Session updated: {entry.value.decode()}")
308
elif entry.operation == "DEL":
309
print("Session deleted")
310
311
# Watch all sessions
312
async def watch_all_sessions():
313
async for entry in kv.watch("session:*"):
314
print(f"Session change: {entry.key} -> {entry.operation}")
315
316
# Watch entire store
317
async def watch_store():
318
async for entry in kv.watchall():
319
print(f"Store change: {entry.key} = {entry.operation}")
320
321
# Run watchers concurrently
322
await asyncio.gather(
323
watch_user_session(),
324
watch_all_sessions(),
325
watch_store()
326
)
327
```
328
329
### Bucket Management
330
331
Get bucket status and statistics.
332
333
```python { .api }
334
class KeyValue:
335
async def status(self) -> BucketStatus:
336
"""
337
Get key-value bucket status and statistics.
338
339
Returns:
340
Bucket status with metadata and statistics
341
"""
342
```
343
344
#### Usage Examples
345
346
```python
347
# Get bucket information
348
status = await kv.status()
349
print(f"Bucket: {status.bucket}")
350
print(f"Values: {status.values}")
351
print(f"History: {status.history}")
352
print(f"TTL: {status.ttl}")
353
354
# Monitor bucket size
355
if status.bytes > 1024*1024*100: # 100MB
356
print("Bucket is getting large, consider cleanup")
357
```
358
359
### JetStream Integration
360
361
Create and manage key-value stores through JetStream context.
362
363
```python { .api }
364
class JetStreamContext:
365
async def key_value(self, bucket: str) -> KeyValue:
366
"""
367
Get existing key-value store.
368
369
Parameters:
370
- bucket: Bucket name
371
372
Returns:
373
KeyValue store instance
374
375
Raises:
376
- BucketNotFoundError: Bucket does not exist
377
"""
378
379
async def create_key_value(
380
self,
381
config: KeyValueConfig = None,
382
**params
383
) -> KeyValue:
384
"""
385
Create new key-value store.
386
387
Parameters:
388
- config: Complete bucket configuration
389
- **params: Individual configuration parameters
390
391
Returns:
392
KeyValue store instance
393
394
Raises:
395
- BadBucketError: Invalid configuration
396
"""
397
398
async def delete_key_value(self, bucket: str) -> bool:
399
"""
400
Delete key-value store and all data.
401
402
Parameters:
403
- bucket: Bucket name to delete
404
405
Returns:
406
True if bucket was deleted
407
"""
408
```
409
410
#### Usage Examples
411
412
```python
413
from nats.js.api import KeyValueConfig
414
from datetime import timedelta
415
416
# Create key-value store with configuration
417
kv_config = KeyValueConfig(
418
bucket="user-preferences",
419
description="User preference storage",
420
max_value_size=1024*1024, # 1MB per value
421
history=5, # Keep 5 versions
422
ttl=timedelta(days=30), # 30 day TTL
423
max_bytes=1024*1024*1024, # 1GB total
424
storage="file",
425
replicas=3
426
)
427
428
kv = await js.create_key_value(config=kv_config)
429
430
# Create simple store with parameters
431
kv = await js.create_key_value(
432
bucket="cache",
433
history=1,
434
ttl=timedelta(hours=1)
435
)
436
437
# Get existing store
438
kv = await js.key_value("user-preferences")
439
440
# Delete store
441
await js.delete_key_value("old-cache")
442
```
443
444
## Data Types
445
446
```python { .api }
447
from dataclasses import dataclass
448
from typing import Optional
449
from datetime import datetime, timedelta
450
451
@dataclass
452
class Entry:
453
"""Key-value store entry."""
454
key: str
455
value: bytes
456
revision: int
457
created: datetime
458
delta: int
459
operation: str # "PUT", "DEL", "PURGE"
460
bucket: str
461
462
@dataclass
463
class BucketStatus:
464
"""Key-value bucket status."""
465
bucket: str
466
values: int
467
history: int
468
ttl: Optional[timedelta]
469
bytes: int
470
backing_store: str # "JetStream"
471
472
@dataclass
473
class KeyValueConfig:
474
"""Key-value bucket configuration."""
475
bucket: str
476
description: Optional[str] = None
477
max_value_size: int = -1
478
history: int = 1
479
ttl: Optional[timedelta] = None
480
max_bytes: int = -1
481
storage: str = "file" # "file", "memory"
482
replicas: int = 1
483
placement: Optional[Placement] = None
484
republish: Optional[RePublish] = None
485
mirror: Optional[StreamSource] = None
486
sources: Optional[List[StreamSource]] = None
487
metadata: Optional[Dict[str, str]] = None
488
```
489
490
## Constants
491
492
```python { .api }
493
# Key-Value operation types
494
KV_OP = "KV-Operation"
495
KV_DEL = "DEL"
496
KV_PURGE = "PURGE"
497
498
# Maximum history entries
499
KV_MAX_HISTORY = 64
500
501
# Default values
502
DEFAULT_KV_HISTORY = 1
503
DEFAULT_KV_REPLICAS = 1
504
```