CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

key-value-store.mddocs/

Key-Value Store

Distributed key-value storage built on JetStream streams. Provides atomic operations, conditional updates, history tracking, and watch capabilities for stateful applications.

Capabilities

Key-Value Operations

Core operations for storing and retrieving key-value pairs with atomic guarantees.

class KeyValue:
    async def get(
        self,
        key: str,
        revision: Optional[int] = None,
        validate_keys: bool = True
    ) -> Entry:
        """
        Get value by key, optionally at specific revision.
        
        Parameters:
        - key: Key to retrieve
        - revision: Specific revision number (optional)
        
        Returns:
        Key-value entry with metadata
        
        Raises:
        - KeyNotFoundError: Key does not exist
        - KeyDeletedError: Key was deleted
        """
    
    async def put(
        self,
        key: str,
        value: bytes,
        validate_keys: bool = True
    ) -> int:
        """
        Store key-value pair.
        
        Parameters:
        - key: Key to store
        - value: Value data as bytes
        - validate_keys: Validate key format
        
        Returns:
        New revision number
        """
    
    async def update(
        self,
        key: str,
        value: bytes,
        last: Optional[int] = None,
        validate_keys: bool = True
    ) -> int:
        """
        Update key-value pair with conditional revision check.
        
        Parameters:
        - key: Key to update
        - value: New value data as bytes
        - last: Expected current revision for conditional update
        - validate_keys: Validate key format
        
        Returns:
        New revision number
        
        Raises:
        - KeyWrongLastSequenceError: Revision mismatch for conditional update
        """
    
    async def create(
        self,
        key: str,
        value: bytes,
        validate_keys: bool = True
    ) -> int:
        """
        Create new key-value pair, fails if key exists.
        
        Parameters:
        - key: Key to create
        - value: Value data as bytes
        
        Returns:
        Revision number of created entry
        
        Raises:
        - KeyWrongLastSequenceError: Key already exists
        """
    
    async def update(self, key: str, value: bytes, revision: int) -> int:
        """
        Update existing key-value pair with expected revision.
        
        Parameters:
        - key: Key to update
        - value: New value data
        - revision: Expected current revision
        
        Returns:
        New revision number
        
        Raises:
        - KeyWrongLastSequenceError: Revision mismatch
        - KeyNotFoundError: Key does not exist
        """
    
    async def delete(
        self,
        key: str,
        last: Optional[int] = None,
        validate_keys: bool = True
    ) -> bool:
        """
        Delete key with optional conditional delete.
        
        Parameters:
        - key: Key to delete
        - last: Expected current revision for conditional delete
        - validate_keys: Validate key format
        
        Returns:
        True if key was deleted
        
        Raises:
        - KeyWrongLastSequenceError: Revision mismatch for conditional delete
        - KeyNotFoundError: Key does not exist
        """

Usage Examples

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()
    
    # Get or create key-value store
    kv = await js.key_value("user-sessions")
    
    # Store user session
    session_data = b'{"user_id": 123, "login_time": "2024-01-01T10:00:00Z"}'
    revision = await kv.put("session:user123", session_data)
    print(f"Stored session at revision {revision}")
    
    # Retrieve session
    entry = await kv.get("session:user123")
    print(f"Session data: {entry.value.decode()}")
    print(f"Created at: {entry.created}")
    
    # Conditional update
    try:
        updated_data = b'{"user_id": 123, "last_activity": "2024-01-01T11:00:00Z"}'
        new_revision = await kv.update("session:user123", updated_data, entry.revision)
        print(f"Updated to revision {new_revision}")
    except KeyWrongLastSequenceError:
        print("Session was modified by another process")
    
    # Create-only operation
    try:
        await kv.create("session:user456", b'{"user_id": 456}')
        print("Created new session")
    except KeyWrongLastSequenceError:
        print("Session already exists")
    
    # Delete session
    await kv.delete("session:user123")

History and Versioning

Access key history and manage versioning.

class KeyValue:
    async def history(self, key: str) -> List[Entry]:
        """
        Get complete history for key.
        
        Parameters:
        - key: Key to get history for
        
        Returns:
        List of entries in chronological order
        
        Raises:
        - KeyNotFoundError: Key has no history
        """
    
    async def purge(self, key: str) -> bool:
        """
        Purge all history for key (keeps current value).
        
        Parameters:
        - key: Key to purge history for
        
        Returns:
        True if history was purged
        """
    
    async def purge_deletes(self, olderthan: int = 30*60) -> bool:
        """
        Purge deleted keys older than specified time.
        
        Parameters:
        - olderthan: Age threshold in seconds (default 30 minutes)
        
        Returns:
        True if purge completed
        """

Usage Examples

# Get key history
history = await kv.history("session:user123")
for entry in history:
    if entry.operation == "PUT":
        print(f"Revision {entry.revision}: {entry.value.decode()}")
    elif entry.operation == "DEL":
        print(f"Revision {entry.revision}: DELETED")

# Purge old versions but keep current
await kv.purge("session:user123")

# Clean up old deleted keys
await kv.purge_deletes(olderthan=24*60*60)  # 24 hours

Key Listing and Filtering

List and filter keys in the store.

class KeyValue:
    async def keys(self, filters: List[str] = None, **kwargs) -> List[str]:
        """
        List keys in store with optional filtering.
        
        Parameters:
        - filters: List of subject filters (wildcard patterns)
        
        Returns:
        List of key names matching filters
        
        Raises:
        - NoKeysError: No keys found
        """

Usage Examples

# List all keys
all_keys = await kv.keys()
print(f"Total keys: {len(all_keys)}")

# List keys with pattern
session_keys = await kv.keys(filters=["session:*"])
for key in session_keys:
    print(f"Session key: {key}")

# List user-specific keys
user_keys = await kv.keys(filters=["session:user123*", "profile:user123*"])

Watching for Changes

Monitor key-value store for changes in real-time.

class KeyValue:
    async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]:
        """
        Watch specific key for changes.
        
        Parameters:
        - key: Key to watch (supports wildcards)
        
        Returns:
        Async iterator yielding entries for changes
        """
    
    async def watchall(self, **kwargs) -> AsyncIterator[Entry]:
        """
        Watch all keys in store for changes.
        
        Returns:
        Async iterator yielding entries for all changes
        """

Usage Examples

# Watch specific key
async def watch_user_session():
    async for entry in kv.watch("session:user123"):
        if entry.operation == "PUT":
            print(f"Session updated: {entry.value.decode()}")
        elif entry.operation == "DEL":
            print("Session deleted")

# Watch all sessions
async def watch_all_sessions():
    async for entry in kv.watch("session:*"):
        print(f"Session change: {entry.key} -> {entry.operation}")

# Watch entire store
async def watch_store():
    async for entry in kv.watchall():
        print(f"Store change: {entry.key} = {entry.operation}")

# Run watchers concurrently
await asyncio.gather(
    watch_user_session(),
    watch_all_sessions(),
    watch_store()
)

Bucket Management

Get bucket status and statistics.

class KeyValue:
    async def status(self) -> BucketStatus:
        """
        Get key-value bucket status and statistics.
        
        Returns:
        Bucket status with metadata and statistics
        """

Usage Examples

# Get bucket information
status = await kv.status()
print(f"Bucket: {status.bucket}")
print(f"Values: {status.values}")
print(f"History: {status.history}")
print(f"TTL: {status.ttl}")

# Monitor bucket size
if status.bytes > 1024*1024*100:  # 100MB
    print("Bucket is getting large, consider cleanup")

JetStream Integration

Create and manage key-value stores through JetStream context.

class JetStreamContext:
    async def key_value(self, bucket: str) -> KeyValue:
        """
        Get existing key-value store.
        
        Parameters:
        - bucket: Bucket name
        
        Returns:
        KeyValue store instance
        
        Raises:
        - BucketNotFoundError: Bucket does not exist
        """
    
    async def create_key_value(
        self,
        config: KeyValueConfig = None,
        **params
    ) -> KeyValue:
        """
        Create new key-value store.
        
        Parameters:
        - config: Complete bucket configuration
        - **params: Individual configuration parameters
        
        Returns:
        KeyValue store instance
        
        Raises:
        - BadBucketError: Invalid configuration
        """
    
    async def delete_key_value(self, bucket: str) -> bool:
        """
        Delete key-value store and all data.
        
        Parameters:
        - bucket: Bucket name to delete
        
        Returns:
        True if bucket was deleted
        """

Usage Examples

from nats.js.api import KeyValueConfig
from datetime import timedelta

# Create key-value store with configuration
kv_config = KeyValueConfig(
    bucket="user-preferences",
    description="User preference storage",
    max_value_size=1024*1024,  # 1MB per value
    history=5,  # Keep 5 versions
    ttl=timedelta(days=30),  # 30 day TTL
    max_bytes=1024*1024*1024,  # 1GB total
    storage="file",
    replicas=3
)

kv = await js.create_key_value(config=kv_config)

# Create simple store with parameters
kv = await js.create_key_value(
    bucket="cache",
    history=1,
    ttl=timedelta(hours=1)
)

# Get existing store
kv = await js.key_value("user-preferences")

# Delete store
await js.delete_key_value("old-cache")

Data Types

from dataclasses import dataclass
from typing import Optional
from datetime import datetime, timedelta

@dataclass
class Entry:
    """Key-value store entry."""
    key: str
    value: bytes
    revision: int
    created: datetime
    delta: int
    operation: str  # "PUT", "DEL", "PURGE"
    bucket: str

@dataclass
class BucketStatus:
    """Key-value bucket status."""
    bucket: str
    values: int
    history: int
    ttl: Optional[timedelta]
    bytes: int
    backing_store: str  # "JetStream"
    
@dataclass
class KeyValueConfig:
    """Key-value bucket configuration."""
    bucket: str
    description: Optional[str] = None
    max_value_size: int = -1
    history: int = 1
    ttl: Optional[timedelta] = None
    max_bytes: int = -1
    storage: str = "file"  # "file", "memory"
    replicas: int = 1
    placement: Optional[Placement] = None
    republish: Optional[RePublish] = None
    mirror: Optional[StreamSource] = None
    sources: Optional[List[StreamSource]] = None
    metadata: Optional[Dict[str, str]] = None

Constants

# Key-Value operation types
KV_OP = "KV-Operation"
KV_DEL = "DEL"
KV_PURGE = "PURGE"

# Maximum history entries
KV_MAX_HISTORY = 64

# Default values
DEFAULT_KV_HISTORY = 1
DEFAULT_KV_REPLICAS = 1

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json