An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Distributed key-value storage built on JetStream streams. Provides atomic operations, conditional updates, history tracking, and watch capabilities for stateful applications.
Core operations for storing and retrieving key-value pairs with atomic guarantees.
class KeyValue:
async def get(
self,
key: str,
revision: Optional[int] = None,
validate_keys: bool = True
) -> Entry:
"""
Get value by key, optionally at specific revision.
Parameters:
- key: Key to retrieve
- revision: Specific revision number (optional)
Returns:
Key-value entry with metadata
Raises:
- KeyNotFoundError: Key does not exist
- KeyDeletedError: Key was deleted
"""
async def put(
self,
key: str,
value: bytes,
validate_keys: bool = True
) -> int:
"""
Store key-value pair.
Parameters:
- key: Key to store
- value: Value data as bytes
- validate_keys: Validate key format
Returns:
New revision number
"""
async def update(
self,
key: str,
value: bytes,
last: Optional[int] = None,
validate_keys: bool = True
) -> int:
"""
Update key-value pair with conditional revision check.
Parameters:
- key: Key to update
- value: New value data as bytes
- last: Expected current revision for conditional update
- validate_keys: Validate key format
Returns:
New revision number
Raises:
- KeyWrongLastSequenceError: Revision mismatch for conditional update
"""
async def create(
self,
key: str,
value: bytes,
validate_keys: bool = True
) -> int:
"""
Create new key-value pair, fails if key exists.
Parameters:
- key: Key to create
- value: Value data as bytes
Returns:
Revision number of created entry
Raises:
- KeyWrongLastSequenceError: Key already exists
"""
async def update(self, key: str, value: bytes, revision: int) -> int:
"""
Update existing key-value pair with expected revision.
Parameters:
- key: Key to update
- value: New value data
- revision: Expected current revision
Returns:
New revision number
Raises:
- KeyWrongLastSequenceError: Revision mismatch
- KeyNotFoundError: Key does not exist
"""
async def delete(
self,
key: str,
last: Optional[int] = None,
validate_keys: bool = True
) -> bool:
"""
Delete key with optional conditional delete.
Parameters:
- key: Key to delete
- last: Expected current revision for conditional delete
- validate_keys: Validate key format
Returns:
True if key was deleted
Raises:
- KeyWrongLastSequenceError: Revision mismatch for conditional delete
- KeyNotFoundError: Key does not exist
"""import asyncio
import nats
async def main():
nc = await nats.connect()
js = nc.jetstream()
# Get or create key-value store
kv = await js.key_value("user-sessions")
# Store user session
session_data = b'{"user_id": 123, "login_time": "2024-01-01T10:00:00Z"}'
revision = await kv.put("session:user123", session_data)
print(f"Stored session at revision {revision}")
# Retrieve session
entry = await kv.get("session:user123")
print(f"Session data: {entry.value.decode()}")
print(f"Created at: {entry.created}")
# Conditional update
try:
updated_data = b'{"user_id": 123, "last_activity": "2024-01-01T11:00:00Z"}'
new_revision = await kv.update("session:user123", updated_data, entry.revision)
print(f"Updated to revision {new_revision}")
except KeyWrongLastSequenceError:
print("Session was modified by another process")
# Create-only operation
try:
await kv.create("session:user456", b'{"user_id": 456}')
print("Created new session")
except KeyWrongLastSequenceError:
print("Session already exists")
# Delete session
await kv.delete("session:user123")Access key history and manage versioning.
class KeyValue:
async def history(self, key: str) -> List[Entry]:
"""
Get complete history for key.
Parameters:
- key: Key to get history for
Returns:
List of entries in chronological order
Raises:
- KeyNotFoundError: Key has no history
"""
async def purge(self, key: str) -> bool:
"""
Purge all history for key (keeps current value).
Parameters:
- key: Key to purge history for
Returns:
True if history was purged
"""
async def purge_deletes(self, olderthan: int = 30*60) -> bool:
"""
Purge deleted keys older than specified time.
Parameters:
- olderthan: Age threshold in seconds (default 30 minutes)
Returns:
True if purge completed
"""# Get key history
history = await kv.history("session:user123")
for entry in history:
if entry.operation == "PUT":
print(f"Revision {entry.revision}: {entry.value.decode()}")
elif entry.operation == "DEL":
print(f"Revision {entry.revision}: DELETED")
# Purge old versions but keep current
await kv.purge("session:user123")
# Clean up old deleted keys
await kv.purge_deletes(olderthan=24*60*60) # 24 hoursList and filter keys in the store.
class KeyValue:
async def keys(self, filters: List[str] = None, **kwargs) -> List[str]:
"""
List keys in store with optional filtering.
Parameters:
- filters: List of subject filters (wildcard patterns)
Returns:
List of key names matching filters
Raises:
- NoKeysError: No keys found
"""# List all keys
all_keys = await kv.keys()
print(f"Total keys: {len(all_keys)}")
# List keys with pattern
session_keys = await kv.keys(filters=["session:*"])
for key in session_keys:
print(f"Session key: {key}")
# List user-specific keys
user_keys = await kv.keys(filters=["session:user123*", "profile:user123*"])Monitor key-value store for changes in real-time.
class KeyValue:
async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]:
"""
Watch specific key for changes.
Parameters:
- key: Key to watch (supports wildcards)
Returns:
Async iterator yielding entries for changes
"""
async def watchall(self, **kwargs) -> AsyncIterator[Entry]:
"""
Watch all keys in store for changes.
Returns:
Async iterator yielding entries for all changes
"""# Watch specific key
async def watch_user_session():
async for entry in kv.watch("session:user123"):
if entry.operation == "PUT":
print(f"Session updated: {entry.value.decode()}")
elif entry.operation == "DEL":
print("Session deleted")
# Watch all sessions
async def watch_all_sessions():
async for entry in kv.watch("session:*"):
print(f"Session change: {entry.key} -> {entry.operation}")
# Watch entire store
async def watch_store():
async for entry in kv.watchall():
print(f"Store change: {entry.key} = {entry.operation}")
# Run watchers concurrently
await asyncio.gather(
watch_user_session(),
watch_all_sessions(),
watch_store()
)Get bucket status and statistics.
class KeyValue:
async def status(self) -> BucketStatus:
"""
Get key-value bucket status and statistics.
Returns:
Bucket status with metadata and statistics
"""# Get bucket information
status = await kv.status()
print(f"Bucket: {status.bucket}")
print(f"Values: {status.values}")
print(f"History: {status.history}")
print(f"TTL: {status.ttl}")
# Monitor bucket size
if status.bytes > 1024*1024*100: # 100MB
print("Bucket is getting large, consider cleanup")Create and manage key-value stores through JetStream context.
class JetStreamContext:
async def key_value(self, bucket: str) -> KeyValue:
"""
Get existing key-value store.
Parameters:
- bucket: Bucket name
Returns:
KeyValue store instance
Raises:
- BucketNotFoundError: Bucket does not exist
"""
async def create_key_value(
self,
config: KeyValueConfig = None,
**params
) -> KeyValue:
"""
Create new key-value store.
Parameters:
- config: Complete bucket configuration
- **params: Individual configuration parameters
Returns:
KeyValue store instance
Raises:
- BadBucketError: Invalid configuration
"""
async def delete_key_value(self, bucket: str) -> bool:
"""
Delete key-value store and all data.
Parameters:
- bucket: Bucket name to delete
Returns:
True if bucket was deleted
"""from nats.js.api import KeyValueConfig
from datetime import timedelta
# Create key-value store with configuration
kv_config = KeyValueConfig(
bucket="user-preferences",
description="User preference storage",
max_value_size=1024*1024, # 1MB per value
history=5, # Keep 5 versions
ttl=timedelta(days=30), # 30 day TTL
max_bytes=1024*1024*1024, # 1GB total
storage="file",
replicas=3
)
kv = await js.create_key_value(config=kv_config)
# Create simple store with parameters
kv = await js.create_key_value(
bucket="cache",
history=1,
ttl=timedelta(hours=1)
)
# Get existing store
kv = await js.key_value("user-preferences")
# Delete store
await js.delete_key_value("old-cache")from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timedelta
@dataclass
class Entry:
"""Key-value store entry."""
key: str
value: bytes
revision: int
created: datetime
delta: int
operation: str # "PUT", "DEL", "PURGE"
bucket: str
@dataclass
class BucketStatus:
"""Key-value bucket status."""
bucket: str
values: int
history: int
ttl: Optional[timedelta]
bytes: int
backing_store: str # "JetStream"
@dataclass
class KeyValueConfig:
"""Key-value bucket configuration."""
bucket: str
description: Optional[str] = None
max_value_size: int = -1
history: int = 1
ttl: Optional[timedelta] = None
max_bytes: int = -1
storage: str = "file" # "file", "memory"
replicas: int = 1
placement: Optional[Placement] = None
republish: Optional[RePublish] = None
mirror: Optional[StreamSource] = None
sources: Optional[List[StreamSource]] = None
metadata: Optional[Dict[str, str]] = None# Key-Value operation types
KV_OP = "KV-Operation"
KV_DEL = "DEL"
KV_PURGE = "PURGE"
# Maximum history entries
KV_MAX_HISTORY = 64
# Default values
DEFAULT_KV_HISTORY = 1
DEFAULT_KV_REPLICAS = 1Install with Tessl CLI
npx tessl i tessl/pypi-nats-py