CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-couchbase

Python Client for Couchbase providing comprehensive database operations including key-value, N1QL queries, search, analytics, and cluster management

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

n1ql-queries.mddocs/

N1QL Queries

SQL++ (N1QL) query execution provides SQL-like query capabilities for JSON documents in Couchbase. Support for complex queries, joins, aggregations, and analytics operations across document collections.

Capabilities

Basic Query Execution

Execute N1QL queries with various options and consistency levels.

class Cluster:
    def query(self, statement: str, options: QueryOptions = None) -> QueryResult:
        """
        Execute N1QL query.

        Args:
            statement (str): N1QL query statement
            options (QueryOptions, optional): Query execution options

        Returns:
            QueryResult: Query results iterator

        Raises:
            QueryException: If query execution fails
            TimeoutException: If query times out
        """

class QueryOptions:
    def __init__(self, timeout: timedelta = None,
                 scan_consistency: QueryScanConsistency = None,
                 consistent_with: MutationState = None,
                 adhoc: bool = True,
                 client_context_id: str = None,
                 max_parallelism: int = None,
                 pipeline_batch: int = None,
                 pipeline_cap: int = None,
                 scan_cap: int = None,
                 scan_wait: timedelta = None,
                 readonly: bool = None,
                 profile: QueryProfile = None,
                 metrics: bool = False,
                 raw: Dict[str, Any] = None,
                 **kwargs):
        """
        N1QL query execution options.

        Args:
            timeout (timedelta, optional): Query timeout
            scan_consistency (QueryScanConsistency, optional): Consistency level
            consistent_with (MutationState, optional): Consistency token
            adhoc (bool): Whether query is ad-hoc (default: True)
            client_context_id (str, optional): Client context identifier
            max_parallelism (int, optional): Maximum query parallelism  
            pipeline_batch (int, optional): Pipeline batch size
            pipeline_cap (int, optional): Pipeline capacity
            scan_cap (int, optional): Scan capacity
            scan_wait (timedelta, optional): Scan wait time
            readonly (bool, optional): Read-only query flag
            profile (QueryProfile, optional): Profiling level
            metrics (bool): Include query metrics
            raw (Dict[str, Any], optional): Raw query options
            **kwargs: Named parameters for parameterized queries
        """

Query Results and Metadata

Access query results and execution metadata.

class QueryResult:
    def __iter__(self) -> Iterator[dict]:
        """Iterate over query result rows."""

    def metadata(self) -> QueryMetaData:
        """Get query execution metadata."""

    def rows(self) -> List[dict]:
        """Get all result rows as list."""

class QueryMetaData:
    @property
    def request_id(self) -> str:
        """Query request identifier."""

    @property
    def client_context_id(self) -> str:
        """Client context identifier."""

    @property
    def status(self) -> QueryStatus:
        """Query execution status."""

    @property
    def signature(self) -> dict:
        """Query result signature."""

    @property
    def profile(self) -> dict:
        """Query execution profile (if enabled)."""

    @property
    def metrics(self) -> QueryMetrics:
        """Query execution metrics (if enabled)."""

    @property
    def warnings(self) -> List[QueryWarning]:
        """Query execution warnings."""

class QueryMetrics:
    @property
    def elapsed_time(self) -> timedelta:
        """Total query execution time."""

    @property
    def execution_time(self) -> timedelta:
        """Query execution time."""

    @property
    def result_count(self) -> int:
        """Number of result rows."""

    @property
    def result_size(self) -> int:
        """Size of results in bytes."""

    @property
    def mutation_count(self) -> int:
        """Number of mutations (for DML queries)."""

    @property
    def sort_count(self) -> int:
        """Number of sorts performed."""

    @property
    def error_count(self) -> int:
        """Number of errors encountered."""

    @property
    def warning_count(self) -> int:
        """Number of warnings generated."""

class QueryWarning:
    @property
    def code(self) -> int:
        """Warning code."""

    @property
    def message(self) -> str:
        """Warning message."""

Parameterized Queries

Support for prepared statements and parameterized queries for security and performance.

class QueryOptions:
    def named_parameters(self, **params) -> QueryOptions:
        """
        Set named parameters for query.

        Args:
            **params: Named parameter values

        Returns:
            QueryOptions: Options with parameters set
        """

    def positional_parameters(self, *params) -> QueryOptions:
        """
        Set positional parameters for query.

        Args:
            *params: Positional parameter values

        Returns:
            QueryOptions: Options with parameters set
        """

Consistency Control

Control query consistency levels for different use cases.

class QueryScanConsistency:
    NOT_BOUNDED = "not_bounded"      # Fastest, may return stale data
    REQUEST_PLUS = "request_plus"    # Consistent with mutations

class MutationState:
    def __init__(self, *mutation_tokens):
        """
        Mutation state for consistency.

        Args:
            *mutation_tokens: Mutation tokens to be consistent with
        """

    def add(self, *mutation_tokens) -> MutationState:
        """Add mutation tokens to state."""

Query Types and Patterns

Data Retrieval Queries

# Basic SELECT
query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 10"
result = cluster.query(query)

# With parameters
query = "SELECT * FROM `travel-sample` WHERE type = $type AND age > $min_age"
result = cluster.query(query, QueryOptions(type="user", min_age=21))

# With consistency
options = QueryOptions(scan_consistency=QueryScanConsistency.REQUEST_PLUS)
result = cluster.query("SELECT * FROM `travel-sample` WHERE id = 'user123'", options)

Data Modification Queries

# INSERT
query = "INSERT INTO `travel-sample` (KEY, VALUE) VALUES ('user::456', {'name': 'Alice', 'age': 30})"
result = cluster.query(query)

# UPDATE
query = "UPDATE `travel-sample` SET age = age + 1 WHERE type = 'user' AND name = $name"
result = cluster.query(query, QueryOptions(name="John"))

# DELETE
query = "DELETE FROM `travel-sample` WHERE type = 'user' AND age < $min_age"
result = cluster.query(query, QueryOptions(min_age=18))

Complex Queries

# JOIN operations
query = """
SELECT u.name, p.title
FROM `travel-sample` u
JOIN `travel-sample` p ON KEYS u.preferred_posts
WHERE u.type = 'user' AND p.type = 'post'
"""

# Aggregation
query = """
SELECT category, COUNT(*) as count, AVG(rating) as avg_rating
FROM `travel-sample`
WHERE type = 'product'
GROUP BY category
HAVING COUNT(*) > 10
ORDER BY avg_rating DESC
"""

# Subqueries
query = """
SELECT name, age
FROM `travel-sample`
WHERE type = 'user' AND age > (
    SELECT AVG(age) FROM `travel-sample` WHERE type = 'user'
)
"""

Usage Examples

Basic Query Execution

from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import QueryOptions, QueryScanConsistency

cluster = Cluster("couchbase://localhost", 
                 ClusterOptions(PasswordAuthenticator("user", "pass")))

# Simple query
query = "SELECT name, age FROM `travel-sample` WHERE type = 'user' LIMIT 5"
result = cluster.query(query)

for row in result:
    print(f"Name: {row['name']}, Age: {row['age']}")

# Get metadata
metadata = result.metadata()
print(f"Query took: {metadata.metrics.elapsed_time}")
print(f"Result count: {metadata.metrics.result_count}")

Parameterized Queries

# Named parameters
query = "SELECT * FROM `travel-sample` WHERE type = $doc_type AND age BETWEEN $min_age AND $max_age"
options = QueryOptions(doc_type="user", min_age=25, max_age=35)
result = cluster.query(query, options)

# Positional parameters
query = "SELECT * FROM `travel-sample` WHERE type = ? AND city = ?"
options = QueryOptions().positional_parameters("user", "San Francisco")
result = cluster.query(query, options)

Prepared Statements

# Use prepared statements for frequently executed queries
query = "SELECT * FROM `travel-sample` WHERE type = $type"
options = QueryOptions(adhoc=False, type="user")  # adhoc=False enables preparation
result = cluster.query(query, options)

Consistency Control

from couchbase.mutation_state import MutationState

# Perform mutation
doc = {"name": "Bob", "age": 28}
mutation_result = collection.upsert("user::789", doc)

# Query with consistency
mutation_state = MutationState(mutation_result.mutation_token)
options = QueryOptions(consistent_with=mutation_state)
query = "SELECT * FROM `travel-sample` WHERE META().id = 'user::789'"
result = cluster.query(query, options)

Query Profiling and Metrics

from couchbase.options import QueryProfile

# Enable profiling and metrics
options = QueryOptions(
    profile=QueryProfile.TIMINGS,
    metrics=True
)

result = cluster.query("SELECT * FROM `travel-sample` LIMIT 100", options)
metadata = result.metadata()

# Access metrics
print(f"Execution time: {metadata.metrics.execution_time}")
print(f"Result size: {metadata.metrics.result_size} bytes")

# Access profile
if metadata.profile:
    print(f"Profile data: {metadata.profile}")

# Check warnings
for warning in metadata.warnings:
    print(f"Warning {warning.code}: {warning.message}")

Error Handling

from couchbase.exceptions import QueryException, TimeoutException

try:
    result = cluster.query("SELECT * FROM `nonexistent-bucket`")
    for row in result:
        print(row)
except QueryException as e:
    print(f"Query failed: {e}")
    if hasattr(e, 'context'):
        print(f"Query: {e.context.statement}")
        print(f"Error code: {e.context.error_code}")
except TimeoutException:
    print("Query timed out")

Install with Tessl CLI

npx tessl i tessl/pypi-couchbase

docs

analytics-operations.md

async-operations.md

cluster-operations.md

document-operations.md

index.md

management-operations.md

n1ql-queries.md

search-operations.md

subdocument-operations.md

view-operations.md

tile.json