CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nats-py

An asyncio Python client for the NATS messaging system with JetStream, microservices, and key-value store support

Pending
Overview
Eval results
Files

object-store.mddocs/

Object Store

Scalable object storage built on JetStream with metadata support, automatic chunking for large objects, content addressing, and efficient streaming capabilities for binary data and files.

Capabilities

Object Storage Operations

Core operations for storing and retrieving objects with metadata and content integrity.

class ObjectStore:
    async def put(
        self,
        name: str,
        data: bytes,
        description: str = None,
        filename: str = None,
        headers: Dict[str, str] = None,
        **kwargs
    ) -> ObjectInfo:
        """
        Store object with optional metadata.
        
        Parameters:
        - name: Object name/key
        - data: Object data as bytes
        - description: Object description
        - filename: Original filename
        - headers: Custom headers/metadata
        
        Returns:
        Object information with metadata
        
        Raises:
        - ObjectAlreadyExists: Object already exists for create operations
        - BadObjectMetaError: Invalid metadata
        """
    
    async def get(self, name: str) -> bytes:
        """
        Retrieve complete object data.
        
        Parameters:
        - name: Object name to retrieve
        
        Returns:
        Object data as bytes
        
        Raises:
        - ObjectNotFoundError: Object does not exist
        - ObjectDeletedError: Object was deleted
        """
    
    async def get_info(
        self,
        name: str,
        show_deleted: bool = False
    ) -> ObjectInfo:
        """
        Get object metadata and information.
        
        Parameters:
        - name: Object name
        - show_deleted: Include deleted objects
        
        Returns:
        Object information with metadata
        
        Raises:
        - ObjectNotFoundError: Object does not exist
        - ObjectDeletedError: Object was deleted and show_deleted=False
        """
    
    async def delete(self, name: str) -> bool:
        """
        Delete object from store.
        
        Parameters:
        - name: Object name to delete
        
        Returns:
        True if object was deleted
        
        Raises:
        - ObjectNotFoundError: Object does not exist
        """

Usage Examples

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()
    
    # Get or create object store
    os = await js.object_store("file-storage")
    
    # Store a file
    with open("document.pdf", "rb") as f:
        file_data = f.read()
    
    obj_info = await os.put(
        name="documents/report-2024.pdf",
        data=file_data,
        description="Quarterly report 2024",
        filename="report-2024.pdf",
        headers={
            "Content-Type": "application/pdf",
            "Author": "Alice Johnson",
            "Department": "Sales"
        }
    )
    
    print(f"Stored object: {obj_info.name}")
    print(f"Size: {obj_info.size} bytes")
    print(f"Chunks: {obj_info.chunks}")
    
    # Retrieve object info
    info = await os.get_info("documents/report-2024.pdf")
    print(f"Created: {info.mtime}")
    print(f"Digest: {info.digest}")
    
    # Retrieve object data
    retrieved_data = await os.get("documents/report-2024.pdf")
    print(f"Retrieved {len(retrieved_data)} bytes")
    
    # Verify integrity
    assert retrieved_data == file_data
    
    # Delete object
    await os.delete("documents/report-2024.pdf")

Object Listing and Management

List and manage objects in the store.

class ObjectStore:
    async def list(self, **kwargs) -> List[ObjectInfo]:
        """
        List all objects in store.
        
        Returns:
        List of object information entries
        """
    
    async def status(self) -> ObjectStoreStatus:
        """
        Get object store status and statistics.
        
        Returns:
        Store status with usage information
        """
    
    async def seal(self) -> bool:
        """
        Seal object store (make read-only).
        
        Returns:
        True if store was sealed
        """

Usage Examples

# List all objects
objects = await os.list()
for obj in objects:
    print(f"Object: {obj.name}")
    print(f"  Size: {obj.size} bytes")
    print(f"  Modified: {obj.mtime}")
    print(f"  Description: {obj.description}")

# Get store statistics
status = await os.status()
print(f"Store: {status.bucket}")
print(f"Objects: {status.count}")
print(f"Total size: {status.size} bytes")

# Seal store to prevent modifications
await os.seal()
print("Store is now read-only")

Object Metadata Management

Update and manage object metadata without changing content.

class ObjectStore:
    async def update_meta(self, name: str, meta: ObjectMeta) -> ObjectInfo:
        """
        Update object metadata without changing content.
        
        Parameters:
        - name: Object name
        - meta: Updated metadata
        
        Returns:
        Updated object information
        
        Raises:
        - ObjectNotFoundError: Object does not exist
        - BadObjectMetaError: Invalid metadata
        """

Usage Examples

from nats.js.api import ObjectMeta

# Update object metadata
new_meta = ObjectMeta(
    name="documents/report-2024.pdf",
    description="Updated quarterly report 2024",
    headers={
        "Content-Type": "application/pdf",
        "Author": "Alice Johnson",
        "Department": "Sales",
        "Status": "Final",
        "Version": "2.0"
    }
)

updated_info = await os.update_meta("documents/report-2024.pdf", new_meta)
print(f"Updated metadata for {updated_info.name}")

Object Watching

Monitor object store for changes in real-time.

class ObjectStore:
    async def watch(self, **kwargs) -> AsyncIterator[ObjectInfo]:
        """
        Watch object store for changes.
        
        Returns:
        Async iterator yielding object info for changes
        """

Usage Examples

# Watch for object store changes
async def watch_objects():
    async for obj_info in os.watch():
        if obj_info.deleted:
            print(f"Object deleted: {obj_info.name}")
        else:
            print(f"Object changed: {obj_info.name}")
            print(f"  Size: {obj_info.size} bytes")
            print(f"  Modified: {obj_info.mtime}")

# Run watcher
await watch_objects()

JetStream Integration

Create and manage object stores through JetStream context.

class JetStreamContext:
    async def object_store(self, bucket: str) -> ObjectStore:
        """
        Get existing object store.
        
        Parameters:
        - bucket: Bucket name
        
        Returns:
        ObjectStore instance
        
        Raises:
        - BucketNotFoundError: Bucket does not exist
        """
    
    async def create_object_store(
        self,
        config: ObjectStoreConfig = None,
        **params
    ) -> ObjectStore:
        """
        Create new object store.
        
        Parameters:
        - config: Complete bucket configuration
        - **params: Individual configuration parameters
        
        Returns:
        ObjectStore instance
        
        Raises:
        - BadBucketError: Invalid configuration
        """
    
    async def delete_object_store(self, bucket: str) -> bool:
        """
        Delete object store and all objects.
        
        Parameters:
        - bucket: Bucket name to delete
        
        Returns:
        True if bucket was deleted
        """

Usage Examples

from nats.js.api import ObjectStoreConfig
from datetime import timedelta

# Create object store with configuration
os_config = ObjectStoreConfig(
    bucket="media-files",
    description="Media file storage",
    max_bytes=10*1024*1024*1024,  # 10GB
    storage="file",
    replicas=3,
    ttl=timedelta(days=365)  # 1 year retention
)

os = await js.create_object_store(config=os_config)

# Create simple store with parameters
os = await js.create_object_store(
    bucket="temp-files",
    max_bytes=1024*1024*1024,  # 1GB
    ttl=timedelta(hours=24)
)

# Get existing store
os = await js.object_store("media-files")

# Delete store
await js.delete_object_store("old-files")

Data Types

from dataclasses import dataclass
from typing import Optional, Dict, List
from datetime import datetime, timedelta

@dataclass
class ObjectInfo:
    """Object information and metadata."""
    name: str
    description: Optional[str]
    headers: Optional[Dict[str, str]]
    size: int
    chunks: int
    digest: str
    deleted: bool
    mtime: datetime
    nuid: str
    bucket: str
    links: Optional[List[str]] = None

@dataclass
class ObjectMeta:
    """Object metadata for updates."""
    name: str
    description: Optional[str] = None
    headers: Optional[Dict[str, str]] = None
    options: Optional[ObjectMetaOptions] = None

@dataclass
class ObjectMetaOptions:
    """Object metadata options."""
    link: Optional[ObjectLink] = None
    chunk_size: Optional[int] = None
    max_chunk_size: int = 128 * 1024  # 128KB

@dataclass
class ObjectLink:
    """Object link configuration."""
    bucket: str
    name: str

@dataclass
class ObjectStoreStatus:
    """Object store status and statistics."""
    bucket: str
    description: Optional[str]
    ttl: Optional[timedelta]
    storage: str
    replicas: int
    sealed: bool
    size: int
    count: int
    backing_store: str  # "JetStream"

@dataclass
class ObjectStoreConfig:
    """Object store configuration."""
    bucket: str
    description: Optional[str] = None
    ttl: Optional[timedelta] = None
    max_bytes: int = -1
    storage: str = "file"  # "file", "memory"
    replicas: int = 1
    placement: Optional[Placement] = None
    metadata: Optional[Dict[str, str]] = None

Constants

# Object store templates
OBJ_ALL_CHUNKS_PRE_TEMPLATE = "OBJ_ALL_CHUNKS"
OBJ_ALL_META_PRE_TEMPLATE = "OBJ_ALL_META"
OBJ_STREAM_TEMPLATE = "OBJ_STREAM"

# Chunk size limits
DEFAULT_CHUNK_SIZE = 128 * 1024  # 128KB
MAX_CHUNK_SIZE = 1024 * 1024     # 1MB

# Validation patterns
VALID_BUCKET_RE = r"^[a-zA-Z0-9_-]+$"

Advanced Usage

Large File Handling

Object store automatically chunks large files for efficient storage and transfer.

# Store large file (automatically chunked)
large_file_data = b"x" * (10 * 1024 * 1024)  # 10MB
obj_info = await os.put("large-file.bin", large_file_data)
print(f"File stored in {obj_info.chunks} chunks")

# Retrieve works transparently
retrieved = await os.get("large-file.bin")
assert len(retrieved) == len(large_file_data)

Content Integrity

All objects are stored with SHA-256 digests for content verification.

# Object info includes content digest
info = await os.get_info("document.pdf")
print(f"Content digest: {info.digest}")

# Verify content manually if needed
import hashlib
data = await os.get("document.pdf")
computed_digest = hashlib.sha256(data).hexdigest()
assert computed_digest == info.digest.replace("SHA-256=", "")

Object Links

Create references between objects without duplicating data.

from nats.js.api import ObjectMeta, ObjectLink

# Create link to existing object
link_meta = ObjectMeta(
    name="documents/current-report.pdf",
    description="Link to current quarterly report",
    options=ObjectMetaOptions(
        link=ObjectLink(
            bucket="file-storage",
            name="documents/report-2024.pdf"
        )
    )
)

await os.update_meta("documents/current-report.pdf", link_meta)

Install with Tessl CLI

npx tessl i tessl/pypi-nats-py

docs

core-client.md

error-handling.md

index.md

jetstream-management.md

jetstream.md

key-value-store.md

message-handling.md

microservices.md

object-store.md

tile.json