Python Client for Couchbase providing comprehensive database operations including key-value, N1QL queries, search, analytics, and cluster management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asyncio-based asynchronous operations for high-performance, non-blocking applications. Provides the same API surface as synchronous operations with async/await support for improved concurrency and scalability.
Asyncio-compatible cluster connection and management.
class ACluster:
def __init__(self, connection_string: str, options: ClusterOptions = None):
"""
Create asynchronous cluster instance.
Args:
connection_string (str): Connection string
options (ClusterOptions, optional): Cluster options
"""
async def bucket(self, bucket_name: str) -> ABucket:
"""
Get asynchronous bucket reference.
Args:
bucket_name (str): Bucket name
Returns:
ABucket: Asynchronous bucket instance
"""
async def query(self, statement: str, options: QueryOptions = None) -> QueryResult:
"""
Execute N1QL query asynchronously.
Args:
statement (str): N1QL query statement
options (QueryOptions, optional): Query options
Returns:
QueryResult: Async query results iterator
"""
async def analytics_query(self, statement: str, options: AnalyticsOptions = None) -> AnalyticsResult:
"""
Execute Analytics query asynchronously.
Args:
statement (str): Analytics query statement
options (AnalyticsOptions, optional): Analytics options
Returns:
AnalyticsResult: Async analytics results iterator
"""
async def search_query(self, index: str, query: SearchQuery, options: SearchOptions = None) -> SearchResult:
"""
Execute search query asynchronously.
Args:
index (str): Search index name
query (SearchQuery): Search query
options (SearchOptions, optional): Search options
Returns:
SearchResult: Async search results iterator
"""
async def ping(self, options: PingOptions = None) -> PingResult:
"""
Ping cluster services asynchronously.
Args:
options (PingOptions, optional): Ping options
Returns:
PingResult: Connectivity status
"""
async def diagnostics(self, options: DiagnosticsOptions = None) -> DiagnosticsResult:
"""
Get cluster diagnostics asynchronously.
Args:
options (DiagnosticsOptions, optional): Diagnostic options
Returns:
DiagnosticsResult: Cluster health information
"""
async def close(self) -> None:
"""Close cluster connection and cleanup resources."""Async key-value operations for document management.
class AsyncCBCollection:
async def get(self, key: str, options: GetOptions = None) -> AsyncGetResult:
"""
Retrieve document asynchronously.
Args:
key (str): Document key
options (GetOptions, optional): Retrieval options
Returns:
AsyncGetResult: Document content and metadata
"""
async def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> AsyncMutationResult:
"""
Upsert document asynchronously.
Args:
key (str): Document key
value (Any): Document content
options (UpsertOptions, optional): Upsert options
Returns:
AsyncMutationResult: Operation result
"""
async def insert(self, key: str, value: Any, options: InsertOptions = None) -> AsyncMutationResult:
"""
Insert document asynchronously.
Args:
key (str): Document key
value (Any): Document content
options (InsertOptions, optional): Insert options
Returns:
AsyncMutationResult: Operation result
"""
async def replace(self, key: str, value: Any, options: ReplaceOptions = None) -> AsyncMutationResult:
"""
Replace document asynchronously.
Args:
key (str): Document key
value (Any): New document content
options (ReplaceOptions, optional): Replace options
Returns:
AsyncMutationResult: Operation result
"""
async def remove(self, key: str, options: RemoveOptions = None) -> AsyncMutationResult:
"""
Remove document asynchronously.
Args:
key (str): Document key
options (RemoveOptions, optional): Remove options
Returns:
AsyncMutationResult: Operation result
"""
async def exists(self, key: str, options: ExistsOptions = None) -> ExistsResult:
"""
Check document existence asynchronously.
Args:
key (str): Document key
options (ExistsOptions, optional): Existence check options
Returns:
ExistsResult: Existence status
"""
async def touch(self, key: str, expiry: timedelta, options: TouchOptions = None) -> AsyncMutationResult:
"""
Update document expiration asynchronously.
Args:
key (str): Document key
expiry (timedelta): New expiration time
options (TouchOptions, optional): Touch options
Returns:
AsyncMutationResult: Operation result
"""
async def get_and_touch(self, key: str, expiry: timedelta, options: GetAndTouchOptions = None) -> AsyncGetResult:
"""
Get and touch document asynchronously.
Args:
key (str): Document key
expiry (timedelta): New expiration time
options (GetAndTouchOptions, optional): Operation options
Returns:
AsyncGetResult: Document content with updated expiry
"""
async def get_and_lock(self, key: str, lock_time: timedelta, options: GetAndLockOptions = None) -> AsyncGetResult:
"""
Get and lock document asynchronously.
Args:
key (str): Document key
lock_time (timedelta): Lock duration
options (GetAndLockOptions, optional): Lock options
Returns:
AsyncGetResult: Document content with lock
"""
async def unlock(self, key: str, cas: int, options: UnlockOptions = None) -> None:
"""
Unlock document asynchronously.
Args:
key (str): Document key
cas (int): CAS value from get_and_lock
options (UnlockOptions, optional): Unlock options
"""Async subdocument operations for efficient partial document updates.
class AsyncCBCollection:
async def lookup_in(self, key: str, spec: List[Spec], options: LookupInOptions = None) -> LookupInResult:
"""
Perform subdocument lookups asynchronously.
Args:
key (str): Document key
spec (List[Spec]): Lookup specifications
options (LookupInOptions, optional): Lookup options
Returns:
LookupInResult: Lookup results
"""
async def mutate_in(self, key: str, spec: List[Spec], options: MutateInOptions = None) -> AsyncMutateInResult:
"""
Perform subdocument mutations asynchronously.
Args:
key (str): Document key
spec (List[Spec]): Mutation specifications
options (MutateInOptions, optional): Mutation options
Returns:
AsyncMutateInResult: Mutation results
"""Async binary data and counter operations.
class AsyncBinaryCollection:
async def append(self, key: str, value: bytes, options: AppendOptions = None) -> AsyncMutationResult:
"""
Append binary data asynchronously.
Args:
key (str): Document key
value (bytes): Data to append
options (AppendOptions, optional): Append options
Returns:
AsyncMutationResult: Operation result
"""
async def prepend(self, key: str, value: bytes, options: PrependOptions = None) -> AsyncMutationResult:
"""
Prepend binary data asynchronously.
Args:
key (str): Document key
value (bytes): Data to prepend
options (PrependOptions, optional): Prepend options
Returns:
AsyncMutationResult: Operation result
"""
async def increment(self, key: str, options: IncrementOptions = None) -> CounterResult:
"""
Increment counter asynchronously.
Args:
key (str): Counter key
options (IncrementOptions, optional): Increment options
Returns:
CounterResult: New counter value
"""
async def decrement(self, key: str, options: DecrementOptions = None) -> CounterResult:
"""
Decrement counter asynchronously.
Args:
key (str): Counter key
options (DecrementOptions, optional): Decrement options
Returns:
CounterResult: New counter value
"""Async administrative operations for cluster management.
class AUserManager:
async def upsert_user(self, user: User, options: UpsertUserOptions = None) -> None:
"""Create or update user asynchronously."""
async def drop_user(self, username: str, options: DropUserOptions = None) -> None:
"""Delete user asynchronously."""
async def get_user(self, username: str, options: GetUserOptions = None) -> UserAndMetadata:
"""Get user information asynchronously."""
async def get_all_users(self, options: GetAllUsersOptions = None) -> List[UserAndMetadata]:
"""Get all users asynchronously."""
class ABucketManager:
async def create_bucket(self, settings: CreateBucketSettings, options: CreateBucketOptions = None) -> None:
"""Create bucket asynchronously."""
async def update_bucket(self, settings: BucketSettings, options: UpdateBucketOptions = None) -> None:
"""Update bucket asynchronously."""
async def drop_bucket(self, bucket_name: str, options: DropBucketOptions = None) -> None:
"""Delete bucket asynchronously."""
async def get_bucket(self, bucket_name: str, options: GetBucketOptions = None) -> BucketSettings:
"""Get bucket settings asynchronously."""
async def get_all_buckets(self, options: GetAllBucketsOptions = None) -> Dict[str, BucketSettings]:
"""Get all bucket settings asynchronously."""
class ACollectionManager:
async def create_scope(self, scope_name: str, options: CreateScopeOptions = None) -> None:
"""Create scope asynchronously."""
async def drop_scope(self, scope_name: str, options: DropScopeOptions = None) -> None:
"""Delete scope asynchronously."""
async def create_collection(self, collection_spec: CollectionSpec, options: CreateCollectionOptions = None) -> None:
"""Create collection asynchronously."""
async def drop_collection(self, collection_spec: CollectionSpec, options: DropCollectionOptions = None) -> None:
"""Delete collection asynchronously."""
async def get_all_scopes(self, options: GetAllScopesOptions = None) -> List[ScopeSpec]:
"""Get all scopes asynchronously."""
class AQueryIndexManager:
async def create_index(self, bucket_name: str, index_name: str, keys: List[str], options: CreateQueryIndexOptions = None) -> None:
"""Create N1QL index asynchronously."""
async def drop_index(self, bucket_name: str, index_name: str, options: DropQueryIndexOptions = None) -> None:
"""Drop N1QL index asynchronously."""
async def get_all_indexes(self, bucket_name: str, options: GetAllQueryIndexesOptions = None) -> List[QueryIndex]:
"""Get all indexes asynchronously."""
async def build_deferred_indexes(self, bucket_name: str, options: BuildDeferredQueryIndexOptions = None) -> None:
"""Build deferred indexes asynchronously."""class AsyncGetResult:
@property
def content_as(self) -> ContentProxy:
"""Access document content with type conversion."""
@property
def cas(self) -> int:
"""Document CAS value."""
@property
def expiry_time(self) -> datetime:
"""Document expiration time (if requested)."""
class AsyncMutationResult:
@property
def cas(self) -> int:
"""New CAS value after mutation."""
@property
def mutation_token(self) -> MutationToken:
"""Mutation token for consistency."""
class AsyncMutateInResult:
def content_as(self, index: int, target_type: type):
"""Get content of mutation operation at index."""
@property
def cas(self) -> int:
"""New document CAS value."""
@property
def mutation_token(self) -> MutationToken:
"""Mutation token for consistency."""import asyncio
from acouchbase.cluster import ACluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
async def main():
# Connect to cluster
cluster = ACluster("couchbase://localhost",
ClusterOptions(PasswordAuthenticator("user", "pass")))
# Get bucket and collection
bucket = await cluster.bucket("travel-sample")
collection = bucket.default_collection()
# Document operations
doc = {"name": "Alice", "age": 30}
result = await collection.upsert("user::async", doc)
print(f"CAS: {result.cas}")
get_result = await collection.get("user::async")
print(f"Document: {get_result.content_as[dict]}")
# Close connection
await cluster.close()
# Run async function
asyncio.run(main())async def bulk_operations(collection):
# Prepare documents
docs = {
f"user::{i}": {"id": i, "name": f"User {i}", "active": True}
for i in range(100)
}
# Bulk upsert using asyncio.gather for concurrency
tasks = [
collection.upsert(key, doc)
for key, doc in docs.items()
]
results = await asyncio.gather(*tasks)
print(f"Upserted {len(results)} documents")
# Bulk get
keys = list(docs.keys())
get_tasks = [collection.get(key) for key in keys]
get_results = await asyncio.gather(*get_tasks, return_exceptions=True)
successful = [r for r in get_results if not isinstance(r, Exception)]
print(f"Retrieved {len(successful)} documents")async def query_operations(cluster):
# Simple query
query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 10"
result = await cluster.query(query)
async for row in result:
print(f"User: {row['name']}, Age: {row['age']}")
# Parameterized query
from couchbase.options import QueryOptions
query = "SELECT * FROM `travel-sample` WHERE type = $type AND age > $min_age"
options = QueryOptions(type="user", min_age=25)
result = await cluster.query(query, options)
# Collect all results
users = []
async for row in result:
users.append(row)
print(f"Found {len(users)} users")
# Get metadata
metadata = result.metadata()
print(f"Query took: {metadata.metrics.elapsed_time}")import couchbase.subdocument as SD
async def subdoc_operations(collection):
# Setup document
doc = {
"name": "John",
"stats": {"views": 0, "likes": 0},
"tags": ["user", "active"]
}
await collection.upsert("user::subdoc", doc)
# Async subdocument mutations
await collection.mutate_in("user::subdoc", [
SD.replace("name", "Johnny"),
SD.increment("stats.views", 1),
SD.array_append("tags", "premium")
])
# Async subdocument lookups
result = await collection.lookup_in("user::subdoc", [
SD.get("name"),
SD.get("stats"),
SD.count("tags")
])
name = result.content_as(0, str)
stats = result.content_as(1, dict)
tag_count = result.content_as(2, int)
print(f"Name: {name}")
print(f"Stats: {stats}")
print(f"Tag count: {tag_count}")async def management_operations(cluster):
# User management
user_mgr = cluster.users()
from couchbase.management.users import User, Role
user = User(
username="async_user",
display_name="Async User",
password="secure_pass",
roles=[Role("bucket_admin", bucket="travel-sample")]
)
await user_mgr.upsert_user(user)
# Get user info
user_info = await user_mgr.get_user("async_user")
print(f"Created user: {user_info.user.display_name}")
# Bucket management
bucket_mgr = cluster.buckets()
from couchbase.management.buckets import BucketSettings, BucketType
settings = BucketSettings(
name="async-bucket",
bucket_type=BucketType.COUCHBASE,
ram_quota_mb=128
)
await bucket_mgr.create_bucket(settings)
print("Created async bucket")
# List all buckets
all_buckets = await bucket_mgr.get_all_buckets()
for name, settings in all_buckets.items():
print(f"Bucket: {name}")from couchbase.exceptions import DocumentNotFoundException, CouchbaseException
async def error_handling_example(collection):
try:
# This will fail
result = await collection.get("nonexistent-key")
except DocumentNotFoundException:
print("Document not found")
except CouchbaseException as e:
print(f"Couchbase error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# Using asyncio.gather with error handling
keys = ["key1", "key2", "nonexistent-key", "key4"]
tasks = [collection.get(key) for key in keys]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Error getting {keys[i]}: {result}")
else:
print(f"Got {keys[i]}: {result.content_as[dict]}")async def context_manager_example():
async with ACluster("couchbase://localhost",
ClusterOptions(PasswordAuthenticator("user", "pass"))) as cluster:
bucket = await cluster.bucket("travel-sample")
collection = bucket.default_collection()
result = await collection.get("some-key")
print(f"Document: {result.content_as[dict]}")
# Cluster automatically closed when exiting contextThe Couchbase SDK supports Twisted framework for asynchronous operations using deferreds.
class TxCluster:
def __init__(self, connection_string: str, options: ClusterOptions = None):
"""
Initialize Twisted cluster connection.
Args:
connection_string (str): Connection string
options (ClusterOptions, optional): Cluster options
"""
def bucket(self, bucket_name: str) -> Deferred[TxBucket]:
"""Get bucket reference (returns Deferred)."""
def query(self, statement: str, options: QueryOptions = None) -> Deferred[QueryResult]:
"""Execute N1QL query (returns Deferred)."""
class TxCollection:
def get(self, key: str, options: GetOptions = None) -> Deferred[GetResult]:
"""Get document (returns Deferred)."""
def upsert(self, key: str, value: Any, options: UpsertOptions = None) -> Deferred[MutationResult]:
"""Upsert document (returns Deferred)."""from twisted.internet import reactor, defer
from txcouchbase.cluster import TxCluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
@defer.inlineCallbacks
def twisted_example():
try:
# Connect to cluster
auth = PasswordAuthenticator("username", "password")
cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))
# Get bucket and collection
bucket = yield cluster.bucket("travel-sample")
collection = bucket.default_collection()
# Document operations with deferreds
doc = {"name": "Alice", "age": 25}
result = yield collection.upsert("user::alice", doc)
print(f"Upsert CAS: {result.cas}")
# Retrieve document
get_result = yield collection.get("user::alice")
print(f"Document: {get_result.content_as[dict]}")
# Query with deferreds
query_result = yield cluster.query(
"SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 5"
)
for row in query_result:
print(f"User: {row}")
except Exception as e:
print(f"Error: {e}")
finally:
reactor.stop()
if __name__ == "__main__":
reactor.callWhenRunning(twisted_example)
reactor.run()from twisted.internet import defer
def deferred_chaining_example():
auth = PasswordAuthenticator("username", "password")
cluster = TxCluster("couchbase://localhost", ClusterOptions(auth))
def on_bucket_ready(bucket):
collection = bucket.default_collection()
return collection.get("some-key")
def on_document_retrieved(result):
print(f"Got document: {result.content_as[dict]}")
return result
def on_error(failure):
print(f"Operation failed: {failure}")
# Chain deferred operations
d = cluster.bucket("travel-sample")
d.addCallback(on_bucket_ready)
d.addCallback(on_document_retrieved)
d.addErrback(on_error)
return dInstall with Tessl CLI
npx tessl i tessl/pypi-couchbase