Python Client for Couchbase providing comprehensive database operations including key-value, N1QL queries, search, analytics, and cluster management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cluster-level operations for connecting to Couchbase, managing authentication, and accessing cluster-wide services. The Cluster class serves as the main entry point for all database operations.
Establish connections to Couchbase clusters with various configuration options and authentication methods.
class Cluster:
def __init__(self, connection_string: str, options: ClusterOptions = None):
"""
Create a new Cluster instance.
Args:
connection_string (str): Connection string (e.g., "couchbase://localhost")
options (ClusterOptions, optional): Cluster configuration options
Raises:
AuthenticationException: If authentication fails
NetworkException: If connection cannot be established
"""
class ClusterOptions:
def __init__(self, authenticator: Authenticator = None,
timeout_options: ClusterTimeoutOptions = None,
tracing_options: ClusterTracingOptions = None):
"""
Cluster configuration options.
Args:
authenticator (Authenticator, optional): Authentication mechanism
timeout_options (ClusterTimeoutOptions, optional): Timeout configuration
tracing_options (ClusterTracingOptions, optional): Tracing configuration
"""
class ClusterTimeoutOptions:
def __init__(self, kv_timeout: timedelta = None,
query_timeout: timedelta = None,
search_timeout: timedelta = None,
analytics_timeout: timedelta = None,
management_timeout: timedelta = None):
"""
Timeout configuration for various operations.
Args:
kv_timeout (timedelta, optional): Key-value operation timeout
query_timeout (timedelta, optional): N1QL query timeout
search_timeout (timedelta, optional): Search query timeout
analytics_timeout (timedelta, optional): Analytics query timeout
management_timeout (timedelta, optional): Management operation timeout
"""Various authentication mechanisms for secure cluster access.
class Authenticator:
"""Base class for all authenticators."""
class PasswordAuthenticator(Authenticator):
def __init__(self, username: str, password: str):
"""
Username/password authentication.
Args:
username (str): RBAC username
password (str): User password
"""
class CertificateAuthenticator(Authenticator):
def __init__(self, cert_path: str, key_path: str,
trust_store_path: str = None):
"""
Certificate-based authentication.
Args:
cert_path (str): Path to client certificate
key_path (str): Path to client private key
trust_store_path (str, optional): Path to trust store
"""Access to individual buckets within the cluster.
class Cluster:
def bucket(self, bucket_name: str) -> Bucket:
"""
Get a bucket reference.
Args:
bucket_name (str): Name of the bucket
Returns:
Bucket: Bucket instance for data operations
Raises:
BucketNotFoundException: If bucket doesn't exist
"""
def wait_until_ready(self, timeout: timedelta, options: WaitUntilReadyOptions = None) -> None:
"""
Wait until cluster services are ready for operations.
Args:
timeout (timedelta): Maximum time to wait
options (WaitUntilReadyOptions, optional): Wait options
Raises:
TimeoutException: If cluster is not ready within timeout
"""Cluster health monitoring and diagnostic operations.
class Cluster:
def ping(self, options: PingOptions = None) -> PingResult:
"""
Ping cluster services to check connectivity.
Args:
options (PingOptions, optional): Ping operation options
Returns:
PingResult: Connectivity status for each service
"""
def diagnostics(self, options: DiagnosticsOptions = None) -> DiagnosticsResult:
"""
Get cluster diagnostic information.
Args:
options (DiagnosticsOptions, optional): Diagnostic options
Returns:
DiagnosticsResult: Detailed cluster health information
"""
class PingOptions:
def __init__(self, timeout: timedelta = None,
service_types: List[ServiceType] = None):
"""
Options for ping operations.
Args:
timeout (timedelta, optional): Operation timeout
service_types (List[ServiceType], optional): Services to ping
"""
class DiagnosticsOptions:
def __init__(self, report_id: str = None):
"""
Options for diagnostics operations.
Args:
report_id (str, optional): Custom report identifier
"""
class PingResult:
@property
def id(self) -> str: ...
@property
def services(self) -> Dict[ServiceType, List[PingResultEndpoint]]: ...
class DiagnosticsResult:
@property
def id(self) -> str: ...
@property
def services(self) -> Dict[ServiceType, List[DiagnosticsEndpoint]]: ...Cluster-level query operations for N1QL, Analytics, and Search.
class Cluster:
def query(self, statement: str, options: QueryOptions = None) -> QueryResult:
"""
Execute N1QL query.
Args:
statement (str): N1QL query statement
options (QueryOptions, optional): Query execution options
Returns:
QueryResult: Query results iterator
Raises:
QueryException: If query execution fails
"""
def analytics_query(self, statement: str, options: AnalyticsOptions = None) -> AnalyticsResult:
"""
Execute Analytics query.
Args:
statement (str): Analytics query statement
options (AnalyticsOptions, optional): Analytics execution options
Returns:
AnalyticsResult: Analytics results iterator
Raises:
AnalyticsException: If analytics execution fails
"""
def search_query(self, index: str, query: SearchQuery, options: SearchOptions = None) -> SearchResult:
"""
Execute full-text search query.
Args:
index (str): Search index name
query (SearchQuery): Search query object
options (SearchOptions, optional): Search execution options
Returns:
SearchResult: Search results iterator
Raises:
SearchException: If search execution fails
"""Access to various management interfaces for administrative operations.
class Cluster:
def buckets(self) -> BucketManager:
"""Get bucket management interface."""
def users(self) -> UserManager:
"""Get user management interface."""
def query_indexes(self) -> QueryIndexManager:
"""Get N1QL index management interface."""
def analytics_indexes(self) -> AnalyticsIndexManager:
"""Get Analytics index management interface."""
def search_indexes(self) -> SearchIndexManager:
"""Get Search index management interface."""
def eventing_functions(self) -> EventingFunctionManager:
"""Get Eventing function management interface."""from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
# Simple connection
cluster = Cluster("couchbase://localhost",
ClusterOptions(PasswordAuthenticator("user", "pass")))
# With timeout configuration
from datetime import timedelta
timeout_opts = ClusterTimeoutOptions(
kv_timeout=timedelta(seconds=10),
query_timeout=timedelta(seconds=30)
)
cluster = Cluster("couchbase://localhost",
ClusterOptions(
PasswordAuthenticator("user", "pass"),
timeout_options=timeout_opts
))from couchbase.auth import CertificateAuthenticator
auth = CertificateAuthenticator(
cert_path="/path/to/cert.pem",
key_path="/path/to/key.pem",
trust_store_path="/path/to/ca.pem"
)
cluster = Cluster("couchbases://localhost", ClusterOptions(auth))# Basic ping
ping_result = cluster.ping()
print(f"Ping ID: {ping_result.id}")
for service_type, endpoints in ping_result.services.items():
print(f"{service_type}: {len(endpoints)} endpoints")
# Detailed diagnostics
diag_result = cluster.diagnostics()
print(f"Diagnostics: {diag_result.id}")enum ServiceType:
KV = "kv"
QUERY = "n1ql"
SEARCH = "fts"
ANALYTICS = "cbas"
MANAGEMENT = "mgmt"
VIEWS = "views"
EVENTING = "eventing"
class WaitUntilReadyOptions:
def __init__(self, service_types: List[ServiceType] = None,
desired_state: ClusterState = ClusterState.ONLINE):
"""
Options for wait_until_ready operation.
Args:
service_types (List[ServiceType], optional): Services to wait for
desired_state (ClusterState, optional): Target cluster state
"""
enum ClusterState:
ONLINE = "online"
DEGRADED = "degraded"
OFFLINE = "offline"Install with Tessl CLI
npx tessl i tessl/pypi-couchbase