Python Client for Couchbase providing comprehensive database operations including key-value, N1QL queries, search, analytics, and cluster management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
SQL++ (N1QL) query execution provides SQL-like query capabilities for JSON documents in Couchbase. Support for complex queries, joins, aggregations, and analytics operations across document collections.
Execute N1QL queries with various options and consistency levels.
class Cluster:
def query(self, statement: str, options: QueryOptions = None) -> QueryResult:
"""
Execute N1QL query.
Args:
statement (str): N1QL query statement
options (QueryOptions, optional): Query execution options
Returns:
QueryResult: Query results iterator
Raises:
QueryException: If query execution fails
TimeoutException: If query times out
"""
class QueryOptions:
def __init__(self, timeout: timedelta = None,
scan_consistency: QueryScanConsistency = None,
consistent_with: MutationState = None,
adhoc: bool = True,
client_context_id: str = None,
max_parallelism: int = None,
pipeline_batch: int = None,
pipeline_cap: int = None,
scan_cap: int = None,
scan_wait: timedelta = None,
readonly: bool = None,
profile: QueryProfile = None,
metrics: bool = False,
raw: Dict[str, Any] = None,
**kwargs):
"""
N1QL query execution options.
Args:
timeout (timedelta, optional): Query timeout
scan_consistency (QueryScanConsistency, optional): Consistency level
consistent_with (MutationState, optional): Consistency token
adhoc (bool): Whether query is ad-hoc (default: True)
client_context_id (str, optional): Client context identifier
max_parallelism (int, optional): Maximum query parallelism
pipeline_batch (int, optional): Pipeline batch size
pipeline_cap (int, optional): Pipeline capacity
scan_cap (int, optional): Scan capacity
scan_wait (timedelta, optional): Scan wait time
readonly (bool, optional): Read-only query flag
profile (QueryProfile, optional): Profiling level
metrics (bool): Include query metrics
raw (Dict[str, Any], optional): Raw query options
**kwargs: Named parameters for parameterized queries
"""Access query results and execution metadata.
class QueryResult:
def __iter__(self) -> Iterator[dict]:
"""Iterate over query result rows."""
def metadata(self) -> QueryMetaData:
"""Get query execution metadata."""
def rows(self) -> List[dict]:
"""Get all result rows as list."""
class QueryMetaData:
@property
def request_id(self) -> str:
"""Query request identifier."""
@property
def client_context_id(self) -> str:
"""Client context identifier."""
@property
def status(self) -> QueryStatus:
"""Query execution status."""
@property
def signature(self) -> dict:
"""Query result signature."""
@property
def profile(self) -> dict:
"""Query execution profile (if enabled)."""
@property
def metrics(self) -> QueryMetrics:
"""Query execution metrics (if enabled)."""
@property
def warnings(self) -> List[QueryWarning]:
"""Query execution warnings."""
class QueryMetrics:
@property
def elapsed_time(self) -> timedelta:
"""Total query execution time."""
@property
def execution_time(self) -> timedelta:
"""Query execution time."""
@property
def result_count(self) -> int:
"""Number of result rows."""
@property
def result_size(self) -> int:
"""Size of results in bytes."""
@property
def mutation_count(self) -> int:
"""Number of mutations (for DML queries)."""
@property
def sort_count(self) -> int:
"""Number of sorts performed."""
@property
def error_count(self) -> int:
"""Number of errors encountered."""
@property
def warning_count(self) -> int:
"""Number of warnings generated."""
class QueryWarning:
@property
def code(self) -> int:
"""Warning code."""
@property
def message(self) -> str:
"""Warning message."""Support for prepared statements and parameterized queries for security and performance.
class QueryOptions:
def named_parameters(self, **params) -> QueryOptions:
"""
Set named parameters for query.
Args:
**params: Named parameter values
Returns:
QueryOptions: Options with parameters set
"""
def positional_parameters(self, *params) -> QueryOptions:
"""
Set positional parameters for query.
Args:
*params: Positional parameter values
Returns:
QueryOptions: Options with parameters set
"""Control query consistency levels for different use cases.
class QueryScanConsistency:
NOT_BOUNDED = "not_bounded" # Fastest, may return stale data
REQUEST_PLUS = "request_plus" # Consistent with mutations
class MutationState:
def __init__(self, *mutation_tokens):
"""
Mutation state for consistency.
Args:
*mutation_tokens: Mutation tokens to be consistent with
"""
def add(self, *mutation_tokens) -> MutationState:
"""Add mutation tokens to state."""# Basic SELECT
query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 10"
result = cluster.query(query)
# With parameters
query = "SELECT * FROM `travel-sample` WHERE type = $type AND age > $min_age"
result = cluster.query(query, QueryOptions(type="user", min_age=21))
# With consistency
options = QueryOptions(scan_consistency=QueryScanConsistency.REQUEST_PLUS)
result = cluster.query("SELECT * FROM `travel-sample` WHERE id = 'user123'", options)# INSERT
query = "INSERT INTO `travel-sample` (KEY, VALUE) VALUES ('user::456', {'name': 'Alice', 'age': 30})"
result = cluster.query(query)
# UPDATE
query = "UPDATE `travel-sample` SET age = age + 1 WHERE type = 'user' AND name = $name"
result = cluster.query(query, QueryOptions(name="John"))
# DELETE
query = "DELETE FROM `travel-sample` WHERE type = 'user' AND age < $min_age"
result = cluster.query(query, QueryOptions(min_age=18))# JOIN operations
query = """
SELECT u.name, p.title
FROM `travel-sample` u
JOIN `travel-sample` p ON KEYS u.preferred_posts
WHERE u.type = 'user' AND p.type = 'post'
"""
# Aggregation
query = """
SELECT category, COUNT(*) as count, AVG(rating) as avg_rating
FROM `travel-sample`
WHERE type = 'product'
GROUP BY category
HAVING COUNT(*) > 10
ORDER BY avg_rating DESC
"""
# Subqueries
query = """
SELECT name, age
FROM `travel-sample`
WHERE type = 'user' AND age > (
SELECT AVG(age) FROM `travel-sample` WHERE type = 'user'
)
"""from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import QueryOptions, QueryScanConsistency
cluster = Cluster("couchbase://localhost",
ClusterOptions(PasswordAuthenticator("user", "pass")))
# Simple query
query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 5"
result = cluster.query(query)
for row in result:
print(f"Name: {row['name']}, Age: {row['age']}")
# Get metadata
metadata = result.metadata()
print(f"Query took: {metadata.metrics.elapsed_time}")
print(f"Result count: {metadata.metrics.result_count}")# Named parameters
query = "SELECT * FROM `travel-sample` WHERE type = $doc_type AND age BETWEEN $min_age AND $max_age"
options = QueryOptions(doc_type="user", min_age=25, max_age=35)
result = cluster.query(query, options)
# Positional parameters
query = "SELECT * FROM `travel-sample` WHERE type = ? AND city = ?"
options = QueryOptions().positional_parameters("user", "San Francisco")
result = cluster.query(query, options)# Use prepared statements for frequently executed queries
query = "SELECT * FROM `travel-sample` WHERE type = $type"
options = QueryOptions(adhoc=False, type="user") # adhoc=False enables preparation
result = cluster.query(query, options)from couchbase.mutation_state import MutationState
# Perform mutation
doc = {"name": "Bob", "age": 28}
mutation_result = collection.upsert("user::789", doc)
# Query with consistency
mutation_state = MutationState(mutation_result.mutation_token)
options = QueryOptions(consistent_with=mutation_state)
query = "SELECT * FROM `travel-sample` WHERE META().id = 'user::789'"
result = cluster.query(query, options)from couchbase.options import QueryProfile
# Enable profiling and metrics
options = QueryOptions(
profile=QueryProfile.TIMINGS,
metrics=True
)
result = cluster.query("SELECT * FROM `travel-sample` LIMIT 100", options)
metadata = result.metadata()
# Access metrics
print(f"Execution time: {metadata.metrics.execution_time}")
print(f"Result size: {metadata.metrics.result_size} bytes")
# Access profile
if metadata.profile:
print(f"Profile data: {metadata.profile}")
# Check warnings
for warning in metadata.warnings:
print(f"Warning {warning.code}: {warning.message}")from couchbase.exceptions import QueryException, TimeoutException
try:
result = cluster.query("SELECT * FROM `nonexistent-bucket`")
for row in result:
print(row)
except QueryException as e:
print(f"Query failed: {e}")
if hasattr(e, 'context'):
print(f"Query: {e.context.statement}")
print(f"Error code: {e.context.error_code}")
except TimeoutException:
print("Query timed out")Install with Tessl CLI
npx tessl i tessl/pypi-couchbase