CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-couchbase

Python Client for Couchbase providing comprehensive database operations including key-value, N1QL queries, search, analytics, and cluster management

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Asyncio-based asynchronous operations for high-performance, non-blocking applications. Provides the same API surface as synchronous operations with async/await support for improved concurrency and scalability.

Capabilities

Asynchronous Cluster Operations

Asyncio-compatible cluster connection and management.

class ACluster:
    def __init__(self, connection_string: str, options: ClusterOptions = None):
        """
        Create asynchronous cluster instance.

        Args:
            connection_string (str): Connection string
            options (ClusterOptions, optional): Cluster options
        """

    async def bucket(self, bucket_name: str) -> ABucket:
        """
        Get asynchronous bucket reference.

        Args:
            bucket_name (str): Bucket name

        Returns:
            ABucket: Asynchronous bucket instance
        """

    async def query(self, statement: str, options: QueryOptions = None) -> QueryResult:
        """
        Execute N1QL query asynchronously.

        Args:
            statement (str): N1QL query statement
            options (QueryOptions, optional): Query options

        Returns:
            QueryResult: Async query results iterator
        """

    async def analytics_query(self, statement: str, options: AnalyticsOptions = None) -> AnalyticsResult:
        """
        Execute Analytics query asynchronously.

        Args:
            statement (str): Analytics query statement
            options (AnalyticsOptions, optional): Analytics options

        Returns:
            AnalyticsResult: Async analytics results iterator
        """

    async def search_query(self, index: str, query: SearchQuery, options: SearchOptions = None) -> SearchResult:
        """
        Execute search query asynchronously.

        Args:
            index (str): Search index name
            query (SearchQuery): Search query
            options (SearchOptions, optional): Search options

        Returns:
            SearchResult: Async search results iterator
        """

    async def ping(self, options: PingOptions = None) -> PingResult:
        """
        Ping cluster services asynchronously.

        Args:
            options (PingOptions, optional): Ping options

        Returns:
            PingResult: Connectivity status
        """

    async def diagnostics(self, options: DiagnosticsOptions = None) -> DiagnosticsResult:
        """
        Get cluster diagnostics asynchronously.

        Args:
            options (DiagnosticsOptions, optional): Diagnostic options

        Returns:
            DiagnosticsResult: Cluster health information
        """

    async def close(self) -> None:
        """Close cluster connection and cleanup resources."""

Asynchronous Document Operations

Async key-value operations for document management.

class AsyncCBCollection:
    async def get(self, key: str, options: GetOptions = None) -> AsyncGetResult:
        """
        Retrieve document asynchronously.

        Args:
            key (str): Document key
            options (GetOptions, optional): Retrieval options

        Returns:
            AsyncGetResult: Document content and metadata
        """

    async def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> AsyncMutationResult:
        """
        Upsert document asynchronously.

        Args:
            key (str): Document key
            value (Any): Document content
            options (UpsertOptions, optional): Upsert options

        Returns:
            AsyncMutationResult: Operation result
        """

    async def insert(self, key: str, value: Any, options: InsertOptions = None) -> AsyncMutationResult:
        """
        Insert document asynchronously.

        Args:
            key (str): Document key
            value (Any): Document content
            options (InsertOptions, optional): Insert options

        Returns:
            AsyncMutationResult: Operation result
        """

    async def replace(self, key: str, value: Any, options: ReplaceOptions = None) -> AsyncMutationResult:
        """
        Replace document asynchronously.

        Args:
            key (str): Document key
            value (Any): New document content
            options (ReplaceOptions, optional): Replace options

        Returns:
            AsyncMutationResult: Operation result
        """

    async def remove(self, key: str, options: RemoveOptions = None) -> AsyncMutationResult:
        """
        Remove document asynchronously.

        Args:
            key (str): Document key
            options (RemoveOptions, optional): Remove options

        Returns:
            AsyncMutationResult: Operation result
        """

    async def exists(self, key: str, options: ExistsOptions = None) -> ExistsResult:
        """
        Check document existence asynchronously.

        Args:
            key (str): Document key
            options (ExistsOptions, optional): Existence check options

        Returns:
            ExistsResult: Existence status
        """

    async def touch(self, key: str, expiry: timedelta, options: TouchOptions = None) -> AsyncMutationResult:
        """
        Update document expiration asynchronously.

        Args:
            key (str): Document key
            expiry (timedelta): New expiration time
            options (TouchOptions, optional): Touch options

        Returns:
            AsyncMutationResult: Operation result
        """

    async def get_and_touch(self, key: str, expiry: timedelta, options: GetAndTouchOptions = None) -> AsyncGetResult:
        """
        Get and touch document asynchronously.

        Args:
            key (str): Document key
            expiry (timedelta): New expiration time
            options (GetAndTouchOptions, optional): Operation options

        Returns:
            AsyncGetResult: Document content with updated expiry
        """

    async def get_and_lock(self, key: str, lock_time: timedelta, options: GetAndLockOptions = None) -> AsyncGetResult:
        """
        Get and lock document asynchronously.

        Args:
            key (str): Document key
            lock_time (timedelta): Lock duration
            options (GetAndLockOptions, optional): Lock options

        Returns:
            AsyncGetResult: Document content with lock
        """

    async def unlock(self, key: str, cas: int, options: UnlockOptions = None) -> None:
        """
        Unlock document asynchronously.

        Args:
            key (str): Document key
            cas (int): CAS value from get_and_lock
            options (UnlockOptions, optional): Unlock options
        """

Asynchronous Subdocument Operations

Async subdocument operations for efficient partial document updates.

class AsyncCBCollection:
    async def lookup_in(self, key: str, spec: List[Spec], options: LookupInOptions = None) -> LookupInResult:
        """
        Perform subdocument lookups asynchronously.

        Args:
            key (str): Document key
            spec (List[Spec]): Lookup specifications
            options (LookupInOptions, optional): Lookup options

        Returns:
            LookupInResult: Lookup results
        """

    async def mutate_in(self, key: str, spec: List[Spec], options: MutateInOptions = None) -> AsyncMutateInResult:
        """
        Perform subdocument mutations asynchronously.

        Args:
            key (str): Document key
            spec (List[Spec]): Mutation specifications
            options (MutateInOptions, optional): Mutation options

        Returns:
            AsyncMutateInResult: Mutation results
        """

Asynchronous Binary Operations

Async binary data and counter operations.

class AsyncBinaryCollection:
    async def append(self, key: str, value: bytes, options: AppendOptions = None) -> AsyncMutationResult:
        """
        Append binary data asynchronously.

        Args:
            key (str): Document key
            value (bytes): Data to append
            options (AppendOptions, optional): Append options

        Returns:
            AsyncMutationResult: Operation result
        """

    async def prepend(self, key: str, value: bytes, options: PrependOptions = None) -> AsyncMutationResult:
        """
        Prepend binary data asynchronously.

        Args:
            key (str): Document key
            value (bytes): Data to prepend
            options (PrependOptions, optional): Prepend options

        Returns:
            AsyncMutationResult: Operation result
        """

    async def increment(self, key: str, options: IncrementOptions = None) -> CounterResult:
        """
        Increment counter asynchronously.

        Args:
            key (str): Counter key
            options (IncrementOptions, optional): Increment options

        Returns:
            CounterResult: New counter value
        """

    async def decrement(self, key: str, options: DecrementOptions = None) -> CounterResult:
        """
        Decrement counter asynchronously.

        Args:
            key (str): Counter key
            options (DecrementOptions, optional): Decrement options

        Returns:
            CounterResult: New counter value
        """

Asynchronous Management Operations

Async administrative operations for cluster management.

class AUserManager:
    async def upsert_user(self, user: User, options: UpsertUserOptions = None) -> None:
        """Create or update user asynchronously."""

    async def drop_user(self, username: str, options: DropUserOptions = None) -> None:
        """Delete user asynchronously."""

    async def get_user(self, username: str, options: GetUserOptions = None) -> UserAndMetadata:
        """Get user information asynchronously."""

    async def get_all_users(self, options: GetAllUsersOptions = None) -> List[UserAndMetadata]:
        """Get all users asynchronously."""

class ABucketManager:
    async def create_bucket(self, settings: CreateBucketSettings, options: CreateBucketOptions = None) -> None:
        """Create bucket asynchronously."""

    async def update_bucket(self, settings: BucketSettings, options: UpdateBucketOptions = None) -> None:
        """Update bucket asynchronously."""

    async def drop_bucket(self, bucket_name: str, options: DropBucketOptions = None) -> None:
        """Delete bucket asynchronously."""

    async def get_bucket(self, bucket_name: str, options: GetBucketOptions = None) -> BucketSettings:
        """Get bucket settings asynchronously."""

    async def get_all_buckets(self, options: GetAllBucketsOptions = None) -> Dict[str, BucketSettings]:
        """Get all bucket settings asynchronously."""

class ACollectionManager:
    async def create_scope(self, scope_name: str, options: CreateScopeOptions = None) -> None:
        """Create scope asynchronously."""

    async def drop_scope(self, scope_name: str, options: DropScopeOptions = None) -> None:
        """Delete scope asynchronously."""

    async def create_collection(self, collection_spec: CollectionSpec, options: CreateCollectionOptions = None) -> None:
        """Create collection asynchronously."""

    async def drop_collection(self, collection_spec: CollectionSpec, options: DropCollectionOptions = None) -> None:
        """Delete collection asynchronously."""

    async def get_all_scopes(self, options: GetAllScopesOptions = None) -> List[ScopeSpec]:
        """Get all scopes asynchronously."""

class AQueryIndexManager:
    async def create_index(self, bucket_name: str, index_name: str, keys: List[str], options: CreateQueryIndexOptions = None) -> None:
        """Create N1QL index asynchronously."""

    async def drop_index(self, bucket_name: str, index_name: str, options: DropQueryIndexOptions = None) -> None:
        """Drop N1QL index asynchronously."""

    async def get_all_indexes(self, bucket_name: str, options: GetAllQueryIndexesOptions = None) -> List[QueryIndex]:
        """Get all indexes asynchronously."""

    async def build_deferred_indexes(self, bucket_name: str, options: BuildDeferredQueryIndexOptions = None) -> None:
        """Build deferred indexes asynchronously."""

Async Result Types

class AsyncGetResult:
    @property
    def content_as(self) -> ContentProxy:
        """Access document content with type conversion."""

    @property
    def cas(self) -> int:
        """Document CAS value."""

    @property
    def expiry_time(self) -> datetime:
        """Document expiration time (if requested)."""

class AsyncMutationResult:
    @property
    def cas(self) -> int:
        """New CAS value after mutation."""

    @property
    def mutation_token(self) -> MutationToken:
        """Mutation token for consistency."""

class AsyncMutateInResult:
    def content_as(self, index: int, target_type: type):
        """Get content of mutation operation at index."""

    @property
    def cas(self) -> int:
        """New document CAS value."""

    @property
    def mutation_token(self) -> MutationToken:
        """Mutation token for consistency."""

Usage Examples

Basic Async Connection

import asyncio
from acouchbase.cluster import ACluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions

async def main():
    # Connect to cluster
    cluster = ACluster("couchbase://localhost", 
                      ClusterOptions(PasswordAuthenticator("user", "pass")))
    
    # Get bucket and collection
    bucket = await cluster.bucket("travel-sample")
    collection = bucket.default_collection()
    
    # Document operations
    doc = {"name": "Alice", "age": 30}
    result = await collection.upsert("user::async", doc)
    print(f"CAS: {result.cas}")
    
    get_result = await collection.get("user::async")
    print(f"Document: {get_result.content_as[dict]}")
    
    # Close connection
    await cluster.close()

# Run async function
asyncio.run(main())

Async Bulk Operations

async def bulk_operations(collection):
    # Prepare documents
    docs = {
        f"user::{i}": {"id": i, "name": f"User {i}", "active": True}
        for i in range(100)
    }
    
    # Bulk upsert using asyncio.gather for concurrency
    tasks = [
        collection.upsert(key, doc)
        for key, doc in docs.items()
    ]
    
    results = await asyncio.gather(*tasks)
    print(f"Upserted {len(results)} documents")
    
    # Bulk get
    keys = list(docs.keys())
    get_tasks = [collection.get(key) for key in keys]
    get_results = await asyncio.gather(*get_tasks, return_exceptions=True)
    
    successful = [r for r in get_results if not isinstance(r, Exception)]
    print(f"Retrieved {len(successful)} documents")

Async Query Operations

async def query_operations(cluster):
    # Simple query
    query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 10"
    result = await cluster.query(query)
    
    async for row in result:
        print(f"User: {row['name']}, Age: {row['age']}")
    
    # Parameterized query
    from couchbase.options import QueryOptions
    
    query = "SELECT * FROM `travel-sample` WHERE type = $type AND age > $min_age"
    options = QueryOptions(type="user", min_age=25)
    result = await cluster.query(query, options)
    
    # Collect all results
    users = []
    async for row in result:
        users.append(row)
    
    print(f"Found {len(users)} users")
    
    # Get metadata
    metadata = result.metadata()
    print(f"Query took: {metadata.metrics.elapsed_time}")

Async Subdocument Operations

import couchbase.subdocument as SD

async def subdoc_operations(collection):
    # Setup document
    doc = {
        "name": "John",
        "stats": {"views": 0, "likes": 0},
        "tags": ["user", "active"]
    }
    await collection.upsert("user::subdoc", doc)
    
    # Async subdocument mutations
    await collection.mutate_in("user::subdoc", [
        SD.replace("name", "Johnny"),
        SD.increment("stats.views", 1),
        SD.array_append("tags", "premium")
    ])
    
    # Async subdocument lookups
    result = await collection.lookup_in("user::subdoc", [
        SD.get("name"),
        SD.get("stats"),
        SD.count("tags")
    ])
    
    name = result.content_as(0, str)
    stats = result.content_as(1, dict)
    tag_count = result.content_as(2, int)
    
    print(f"Name: {name}")
    print(f"Stats: {stats}")
    print(f"Tag count: {tag_count}")

Async Management Operations

async def management_operations(cluster):
    # User management
    user_mgr = cluster.users()
    
    from couchbase.management.users import User, Role
    
    user = User(
        username="async_user",
        display_name="Async User",
        password="secure_pass",
        roles=[Role("bucket_admin", bucket="travel-sample")]
    )
    
    await user_mgr.upsert_user(user)
    
    # Get user info
    user_info = await user_mgr.get_user("async_user")
    print(f"Created user: {user_info.user.display_name}")
    
    # Bucket management
    bucket_mgr = cluster.buckets()
    
    from couchbase.management.buckets import BucketSettings, BucketType
    
    settings = BucketSettings(
        name="async-bucket",
        bucket_type=BucketType.COUCHBASE,
        ram_quota_mb=128
    )
    
    await bucket_mgr.create_bucket(settings)
    print("Created async bucket")
    
    # List all buckets
    all_buckets = await bucket_mgr.get_all_buckets()
    for name, settings in all_buckets.items():
        print(f"Bucket: {name}")

Error Handling in Async Operations

from couchbase.exceptions import DocumentNotFoundException, CouchbaseException

async def error_handling_example(collection):
    try:
        # This will fail
        result = await collection.get("nonexistent-key")
    except DocumentNotFoundException:
        print("Document not found")
    except CouchbaseException as e:
        print(f"Couchbase error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
    # Using asyncio.gather with error handling
    keys = ["key1", "key2", "nonexistent-key", "key4"]
    tasks = [collection.get(key) for key in keys]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Error getting {keys[i]}: {result}")
        else:
            print(f"Got {keys[i]}: {result.content_as[dict]}")

Context Manager Support

async def context_manager_example():
    async with ACluster("couchbase://localhost", 
                       ClusterOptions(PasswordAuthenticator("user", "pass"))) as cluster:
        bucket = await cluster.bucket("travel-sample")
        collection = bucket.default_collection()
        
        result = await collection.get("some-key")
        print(f"Document: {result.content_as[dict]}")
        
        # Cluster automatically closed when exiting context

Twisted Framework Support

The Couchbase SDK supports Twisted framework for asynchronous operations using deferreds.

Basic Twisted Operations

class TxCluster:
    def __init__(self, connection_string: str, options: ClusterOptions = None):
        """
        Initialize Twisted cluster connection.

        Args:
            connection_string (str): Connection string
            options (ClusterOptions, optional): Cluster options
        """

    def bucket(self, bucket_name: str) -> Deferred[TxBucket]:
        """Get bucket reference (returns Deferred)."""

    def query(self, statement: str, options: QueryOptions = None) -> Deferred[QueryResult]:
        """Execute N1QL query (returns Deferred)."""

class TxCollection:
    def get(self, key: str, options: GetOptions = None) -> Deferred[GetResult]:
        """Get document (returns Deferred)."""

    def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> Deferred[MutationResult]:
        """Upsert document (returns Deferred)."""

Twisted Usage Examples

from twisted.internet import reactor, defer
from txcouchbase.cluster import TxCluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions

@defer.inlineCallbacks
def twisted_example():
    try:
        # Connect to cluster
        auth = PasswordAuthenticator("username", "password")
        cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))
        
        # Get bucket and collection
        bucket = yield cluster.bucket("travel-sample")
        collection = bucket.default_collection()
        
        # Document operations with deferreds
        doc = {"name": "Alice", "age": 25}
        result = yield collection.upsert("user::alice", doc)
        print(f"Upsert CAS: {result.cas}")
        
        # Retrieve document
        get_result = yield collection.get("user::alice")
        print(f"Document: {get_result.content_as[dict]}")
        
        # Query with deferreds
        query_result = yield cluster.query(
            "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 5"
        )
        
        for row in query_result:
            print(f"User: {row}")
            
    except Exception as e:
        print(f"Error: {e}")
    finally:
        reactor.stop()

if __name__ == "__main__":
    reactor.callWhenRunning(twisted_example)
    reactor.run()

Deferred Chaining

from twisted.internet import defer

def deferred_chaining_example():
    auth = PasswordAuthenticator("username", "password")
    cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))
    
    def on_bucket_ready(bucket):
        collection = bucket.default_collection()
        return collection.get("some-key")
    
    def on_document_retrieved(result):
        print(f"Got document: {result.content_as[dict]}")
        return result
    
    def on_error(failure):
        print(f"Operation failed: {failure}")
    
    # Chain deferred operations
    d = cluster.bucket("travel-sample")
    d.addCallback(on_bucket_ready)
    d.addCallback(on_document_retrieved)
    d.addErrback(on_error)
    
    return d

Install with Tessl CLI

npx tessl i tessl/pypi-couchbase

docs

analytics-operations.md

async-operations.md

cluster-operations.md

document-operations.md

index.md

management-operations.md

n1ql-queries.md

search-operations.md

subdocument-operations.md

view-operations.md

tile.json