An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support
—
Scalable object storage built on JetStream with metadata support, automatic chunking for large objects, content addressing, and efficient streaming capabilities for binary data and files.
Core operations for storing and retrieving objects with metadata and content integrity.
class ObjectStore:
async def put(
self,
name: str,
data: bytes,
description: str = None,
filename: str = None,
headers: Dict[str, str] = None,
**kwargs
) -> ObjectInfo:
"""
Store object with optional metadata.
Parameters:
- name: Object name/key
- data: Object data as bytes
- description: Object description
- filename: Original filename
- headers: Custom headers/metadata
Returns:
Object information with metadata
Raises:
- ObjectAlreadyExists: Object already exists for create operations
- BadObjectMetaError: Invalid metadata
"""
async def get(self, name: str) -> bytes:
"""
Retrieve complete object data.
Parameters:
- name: Object name to retrieve
Returns:
Object data as bytes
Raises:
- ObjectNotFoundError: Object does not exist
- ObjectDeletedError: Object was deleted
"""
async def get_info(
self,
name: str,
show_deleted: bool = False
) -> ObjectInfo:
"""
Get object metadata and information.
Parameters:
- name: Object name
- show_deleted: Include deleted objects
Returns:
Object information with metadata
Raises:
- ObjectNotFoundError: Object does not exist
- ObjectDeletedError: Object was deleted and show_deleted=False
"""
async def delete(self, name: str) -> bool:
"""
Delete object from store.
Parameters:
- name: Object name to delete
Returns:
True if object was deleted
Raises:
- ObjectNotFoundError: Object does not exist
"""import asyncio
import nats
async def main():
nc = await nats.connect()
js = nc.jetstream()
# Get or create object store
os = await js.object_store("file-storage")
# Store a file
with open("document.pdf", "rb") as f:
file_data = f.read()
obj_info = await os.put(
name="documents/report-2024.pdf",
data=file_data,
description="Quarterly report 2024",
filename="report-2024.pdf",
headers={
"Content-Type": "application/pdf",
"Author": "Alice Johnson",
"Department": "Sales"
}
)
print(f"Stored object: {obj_info.name}")
print(f"Size: {obj_info.size} bytes")
print(f"Chunks: {obj_info.chunks}")
# Retrieve object info
info = await os.get_info("documents/report-2024.pdf")
print(f"Created: {info.mtime}")
print(f"Digest: {info.digest}")
# Retrieve object data
retrieved_data = await os.get("documents/report-2024.pdf")
print(f"Retrieved {len(retrieved_data)} bytes")
# Verify integrity
assert retrieved_data == file_data
# Delete object
await os.delete("documents/report-2024.pdf")List and manage objects in the store.
class ObjectStore:
async def list(self, **kwargs) -> List[ObjectInfo]:
"""
List all objects in store.
Returns:
List of object information entries
"""
async def status(self) -> ObjectStoreStatus:
"""
Get object store status and statistics.
Returns:
Store status with usage information
"""
async def seal(self) -> bool:
"""
Seal object store (make read-only).
Returns:
True if store was sealed
"""# List all objects
objects = await os.list()
for obj in objects:
print(f"Object: {obj.name}")
print(f" Size: {obj.size} bytes")
print(f" Modified: {obj.mtime}")
print(f" Description: {obj.description}")
# Get store statistics
status = await os.status()
print(f"Store: {status.bucket}")
print(f"Objects: {status.count}")
print(f"Total size: {status.size} bytes")
# Seal store to prevent modifications
await os.seal()
print("Store is now read-only")Update and manage object metadata without changing content.
class ObjectStore:
async def update_meta(self, name: str, meta: ObjectMeta) -> ObjectInfo:
"""
Update object metadata without changing content.
Parameters:
- name: Object name
- meta: Updated metadata
Returns:
Updated object information
Raises:
- ObjectNotFoundError: Object does not exist
- BadObjectMetaError: Invalid metadata
"""from nats.js.api import ObjectMeta
# Update object metadata
new_meta = ObjectMeta(
name="documents/report-2024.pdf",
description="Updated quarterly report 2024",
headers={
"Content-Type": "application/pdf",
"Author": "Alice Johnson",
"Department": "Sales",
"Status": "Final",
"Version": "2.0"
}
)
updated_info = await os.update_meta("documents/report-2024.pdf", new_meta)
print(f"Updated metadata for {updated_info.name}")Monitor object store for changes in real-time.
class ObjectStore:
async def watch(self, **kwargs) -> AsyncIterator[ObjectInfo]:
"""
Watch object store for changes.
Returns:
Async iterator yielding object info for changes
"""# Watch for object store changes
async def watch_objects():
async for obj_info in os.watch():
if obj_info.deleted:
print(f"Object deleted: {obj_info.name}")
else:
print(f"Object changed: {obj_info.name}")
print(f" Size: {obj_info.size} bytes")
print(f" Modified: {obj_info.mtime}")
# Run watcher
await watch_objects()Create and manage object stores through JetStream context.
class JetStreamContext:
async def object_store(self, bucket: str) -> ObjectStore:
"""
Get existing object store.
Parameters:
- bucket: Bucket name
Returns:
ObjectStore instance
Raises:
- BucketNotFoundError: Bucket does not exist
"""
async def create_object_store(
self,
config: ObjectStoreConfig = None,
**params
) -> ObjectStore:
"""
Create new object store.
Parameters:
- config: Complete bucket configuration
- **params: Individual configuration parameters
Returns:
ObjectStore instance
Raises:
- BadBucketError: Invalid configuration
"""
async def delete_object_store(self, bucket: str) -> bool:
"""
Delete object store and all objects.
Parameters:
- bucket: Bucket name to delete
Returns:
True if bucket was deleted
"""from nats.js.api import ObjectStoreConfig
from datetime import timedelta
# Create object store with configuration
os_config = ObjectStoreConfig(
bucket="media-files",
description="Media file storage",
max_bytes=10*1024*1024*1024, # 10GB
storage="file",
replicas=3,
ttl=timedelta(days=365) # 1 year retention
)
os = await js.create_object_store(config=os_config)
# Create simple store with parameters
os = await js.create_object_store(
bucket="temp-files",
max_bytes=1024*1024*1024, # 1GB
ttl=timedelta(hours=24)
)
# Get existing store
os = await js.object_store("media-files")
# Delete store
await js.delete_object_store("old-files")from dataclasses import dataclass
from typing import Optional, Dict, List
from datetime import datetime, timedelta
@dataclass
class ObjectInfo:
"""Object information and metadata."""
name: str
description: Optional[str]
headers: Optional[Dict[str, str]]
size: int
chunks: int
digest: str
deleted: bool
mtime: datetime
nuid: str
bucket: str
links: Optional[List[str]] = None
@dataclass
class ObjectMeta:
"""Object metadata for updates."""
name: str
description: Optional[str] = None
headers: Optional[Dict[str, str]] = None
options: Optional[ObjectMetaOptions] = None
@dataclass
class ObjectMetaOptions:
"""Object metadata options."""
link: Optional[ObjectLink] = None
chunk_size: Optional[int] = None
max_chunk_size: int = 128 * 1024 # 128KB
@dataclass
class ObjectLink:
"""Object link configuration."""
bucket: str
name: str
@dataclass
class ObjectStoreStatus:
"""Object store status and statistics."""
bucket: str
description: Optional[str]
ttl: Optional[timedelta]
storage: str
replicas: int
sealed: bool
size: int
count: int
backing_store: str # "JetStream"
@dataclass
class ObjectStoreConfig:
"""Object store configuration."""
bucket: str
description: Optional[str] = None
ttl: Optional[timedelta] = None
max_bytes: int = -1
storage: str = "file" # "file", "memory"
replicas: int = 1
placement: Optional[Placement] = None
metadata: Optional[Dict[str, str]] = None# Object store templates
OBJ_ALL_CHUNKS_PRE_TEMPLATE = "OBJ_ALL_CHUNKS"
OBJ_ALL_META_PRE_TEMPLATE = "OBJ_ALL_META"
OBJ_STREAM_TEMPLATE = "OBJ_STREAM"
# Chunk size limits
DEFAULT_CHUNK_SIZE = 128 * 1024 # 128KB
MAX_CHUNK_SIZE = 1024 * 1024 # 1MB
# Validation patterns
VALID_BUCKET_RE = r"^[a-zA-Z0-9_-]+$"Object store automatically chunks large files for efficient storage and transfer.
# Store large file (automatically chunked)
large_file_data = b"x" * (10 * 1024 * 1024) # 10MB
obj_info = await os.put("large-file.bin", large_file_data)
print(f"File stored in {obj_info.chunks} chunks")
# Retrieve works transparently
retrieved = await os.get("large-file.bin")
assert len(retrieved) == len(large_file_data)All objects are stored with SHA-256 digests for content verification.
# Object info includes content digest
info = await os.get_info("document.pdf")
print(f"Content digest: {info.digest}")
# Verify content manually if needed
import hashlib
data = await os.get("document.pdf")
computed_digest = hashlib.sha256(data).hexdigest()
assert computed_digest == info.digest.replace("SHA-256=", "")Create references between objects without duplicating data.
from nats.js.api import ObjectMeta, ObjectLink
# Create link to existing object
link_meta = ObjectMeta(
name="documents/current-report.pdf",
description="Link to current quarterly report",
options=ObjectMetaOptions(
link=ObjectLink(
bucket="file-storage",
name="documents/report-2024.pdf"
)
)
)
await os.update_meta("documents/current-report.pdf", link_meta)Install with Tessl CLI
npx tessl i tessl/pypi-nats-py