CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-couchbase

Python Client for Couchbase providing comprehensive database operations including key-value, N1QL queries, search, analytics, and cluster management

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

analytics-operations.mddocs/

Analytics Operations

Analytics query execution for complex data analysis and reporting. Supports large-scale analytical workloads with integration to external data sources and advanced SQL++ analytics capabilities.

Capabilities

Analytics Query Execution

Execute analytics queries with various options and consistency levels.

class Cluster:
    def analytics_query(self, statement: str, options: AnalyticsOptions = None) -> AnalyticsResult:
        """
        Execute Analytics query.

        Args:
            statement (str): Analytics query statement
            options (AnalyticsOptions, optional): Analytics execution options

        Returns:
            AnalyticsResult: Analytics results iterator

        Raises:
            AnalyticsException: If analytics execution fails
            TimeoutException: If query times out
        """

class AnalyticsOptions:
    def __init__(self, timeout: timedelta = None,
                 scan_consistency: AnalyticsScanConsistency = None,
                 client_context_id: str = None,
                 priority: bool = False,
                 readonly: bool = None,
                 raw: Dict[str, Any] = None,
                 **kwargs):
        """
        Analytics query execution options.

        Args:
            timeout (timedelta, optional): Query timeout
            scan_consistency (AnalyticsScanConsistency, optional): Consistency level
            client_context_id (str, optional): Client context identifier
            priority (bool): High priority query flag
            readonly (bool, optional): Read-only query flag
            raw (Dict[str, Any], optional): Raw analytics options
            **kwargs: Named parameters for parameterized queries
        """

    def named_parameters(self, **params) -> AnalyticsOptions:
        """
        Set named parameters for query.

        Args:
            **params: Named parameter values

        Returns:
            AnalyticsOptions: Options with parameters set
        """

    def positional_parameters(self, *params) -> AnalyticsOptions:
        """
        Set positional parameters for query.

        Args:
            *params: Positional parameter values

        Returns:
            AnalyticsOptions: Options with parameters set
        """

Analytics Results and Metadata

Access analytics results and execution metadata.

class AnalyticsResult:
    def __iter__(self) -> Iterator[dict]:
        """Iterate over analytics result rows."""

    def metadata(self) -> AnalyticsMetaData:
        """Get analytics execution metadata."""

    def rows(self) -> List[dict]:
        """Get all result rows as list."""

class AnalyticsMetaData:
    @property
    def request_id(self) -> str:
        """Analytics request identifier."""

    @property
    def client_context_id(self) -> str:
        """Client context identifier."""

    @property
    def status(self) -> AnalyticsStatus:
        """Analytics execution status."""

    @property
    def signature(self) -> dict:
        """Analytics result signature."""

    @property
    def metrics(self) -> AnalyticsMetrics:
        """Analytics execution metrics."""

    @property
    def warnings(self) -> List[AnalyticsWarning]:
        """Analytics execution warnings."""

class AnalyticsMetrics:
    @property
    def elapsed_time(self) -> timedelta:
        """Total analytics execution time."""

    @property
    def execution_time(self) -> timedelta:
        """Analytics execution time."""

    @property
    def result_count(self) -> int:
        """Number of result rows."""

    @property
    def result_size(self) -> int:
        """Size of results in bytes."""

    @property
    def processed_objects(self) -> int:
        """Number of objects processed."""

    @property
    def error_count(self) -> int:
        """Number of errors encountered."""

    @property
    def warning_count(self) -> int:
        """Number of warnings generated."""

class AnalyticsWarning:
    @property
    def code(self) -> int:
        """Warning code."""

    @property
    def message(self) -> str:
        """Warning message."""

class AnalyticsStatus:
    RUNNING = "running"
    SUCCESS = "success" 
    ERRORS = "errors"
    COMPLETED = "completed"
    STOPPED = "stopped"
    TIMEOUT = "timeout"
    CLOSED = "closed"
    FATAL = "fatal"

class AnalyticsScanConsistency:
    NOT_BOUNDED = "not_bounded"    # Fastest, may return stale data
    REQUEST_PLUS = "request_plus"  # Consistent with mutations

Analytics Index Management

Manage analytics indexes, datasets, and external links.

class AnalyticsIndexManager:
    def create_dataverse(self, dataverse_name: str, options: CreateAnalyticsDataverseOptions = None) -> None:
        """
        Create analytics dataverse.

        Args:
            dataverse_name (str): Dataverse name
            options (CreateAnalyticsDataverseOptions, optional): Creation options

        Raises:
            DataverseExistsException: If dataverse already exists
        """

    def drop_dataverse(self, dataverse_name: str, options: DropAnalyticsDataverseOptions = None) -> None:
        """
        Drop analytics dataverse.

        Args:
            dataverse_name (str): Dataverse name
            options (DropAnalyticsDataverseOptions, optional): Drop options

        Raises:
            DataverseNotFoundException: If dataverse doesn't exist
        """

    def create_dataset(self, dataset_name: str, bucket_name: str, options: CreateAnalyticsDatasetOptions = None) -> None:
        """
        Create analytics dataset.

        Args:
            dataset_name (str): Dataset name
            bucket_name (str): Source bucket name
            options (CreateAnalyticsDatasetOptions, optional): Creation options

        Raises:
            DatasetExistsException: If dataset already exists
        """

    def drop_dataset(self, dataset_name: str, options: DropAnalyticsDatasetOptions = None) -> None:
        """
        Drop analytics dataset.

        Args:
            dataset_name (str): Dataset name
            options (DropAnalyticsDatasetOptions, optional): Drop options

        Raises:
            DatasetNotFoundException: If dataset doesn't exist
        """

    def create_index(self, index_name: str, dataset_name: str, fields: Dict[str, AnalyticsDataType], options: CreateAnalyticsIndexOptions = None) -> None:
        """
        Create analytics index.

        Args:
            index_name (str): Index name
            dataset_name (str): Dataset name
            fields (Dict[str, AnalyticsDataType]): Index fields and types
            options (CreateAnalyticsIndexOptions, optional): Creation options

        Raises:
            IndexExistsException: If index already exists
        """

    def drop_index(self, index_name: str, dataset_name: str, options: DropAnalyticsIndexOptions = None) -> None:
        """
        Drop analytics index.

        Args:
            index_name (str): Index name
            dataset_name (str): Dataset name
            options (DropAnalyticsIndexOptions, optional): Drop options

        Raises:
            IndexNotFoundException: If index doesn't exist
        """

    def get_all_datasets(self, options: GetAllAnalyticsDatasetsOptions = None) -> List[AnalyticsDataset]:
        """
        Get all analytics datasets.

        Args:
            options (GetAllAnalyticsDatasetsOptions, optional): Retrieval options

        Returns:
            List[AnalyticsDataset]: All datasets
        """

    def get_all_indexes(self, options: GetAllAnalyticsIndexesOptions = None) -> List[AnalyticsIndex]:
        """
        Get all analytics indexes.

        Args:
            options (GetAllAnalyticsIndexesOptions, optional): Retrieval options

        Returns:
            List[AnalyticsIndex]: All indexes
        """

    def connect_link(self, options: ConnectAnalyticsLinkOptions = None) -> None:
        """
        Connect analytics link.

        Args:
            options (ConnectAnalyticsLinkOptions, optional): Connection options
        """

    def disconnect_link(self, options: DisconnectAnalyticsLinkOptions = None) -> None:
        """
        Disconnect analytics link.

        Args:
            options (DisconnectAnalyticsLinkOptions, optional): Disconnection options
        """

    def create_link(self, link: AnalyticsLink, options: CreateAnalyticsLinkOptions = None) -> None:
        """
        Create external analytics link.

        Args:
            link (AnalyticsLink): Link configuration
            options (CreateAnalyticsLinkOptions, optional): Creation options

        Raises:
            LinkExistsException: If link already exists
        """

    def replace_link(self, link: AnalyticsLink, options: ReplaceAnalyticsLinkOptions = None) -> None:
        """
        Replace external analytics link.

        Args:
            link (AnalyticsLink): New link configuration
            options (ReplaceAnalyticsLinkOptions, optional): Replace options

        Raises:
            LinkNotFoundException: If link doesn't exist
        """

    def drop_link(self, link_name: str, dataverse_name: str, options: DropAnalyticsLinkOptions = None) -> None:
        """
        Drop external analytics link.

        Args:
            link_name (str): Link name
            dataverse_name (str): Dataverse name
            options (DropAnalyticsLinkOptions, optional): Drop options

        Raises:
            LinkNotFoundException: If link doesn't exist
        """

    def get_links(self, options: GetAnalyticsLinksOptions = None) -> List[AnalyticsLink]:
        """
        Get all analytics links.

        Args:
            options (GetAnalyticsLinksOptions, optional): Retrieval options

        Returns:
            List[AnalyticsLink]: All analytics links
        """

Analytics Schema Types

class AnalyticsDataset:
    @property
    def name(self) -> str:
        """Dataset name."""

    @property
    def dataverse_name(self) -> str:
        """Dataverse containing the dataset."""

    @property
    def link_name(self) -> str:
        """Link name for external datasets."""

    @property
    def bucket_name(self) -> str:
        """Source bucket name."""

class AnalyticsIndex:
    @property
    def name(self) -> str:
        """Index name."""

    @property
    def dataset_name(self) -> str:
        """Dataset containing the index."""

    @property
    def dataverse_name(self) -> str:
        """Dataverse containing the index."""

    @property
    def is_primary(self) -> bool:
        """Whether this is a primary index."""

class AnalyticsLink:
    """Base class for analytics links."""

class CouchbaseRemoteAnalyticsLink(AnalyticsLink):
    def __init__(self, name: str, dataverse: str, hostname: str,
                 username: str, password: str = None,
                 encryption: AnalyticsEncryptionLevel = None):
        """
        Remote Couchbase cluster link.

        Args:
            name (str): Link name
            dataverse (str): Dataverse name
            hostname (str): Remote cluster hostname
            username (str): Remote cluster username
            password (str, optional): Remote cluster password
            encryption (AnalyticsEncryptionLevel, optional): Encryption level
        """

class S3ExternalAnalyticsLink(AnalyticsLink):
    def __init__(self, name: str, dataverse: str, access_key_id: str,
                 secret_access_key: str, region: str,
                 service_endpoint: str = None):
        """
        Amazon S3 external link.

        Args:
            name (str): Link name
            dataverse (str): Dataverse name
            access_key_id (str): AWS access key ID
            secret_access_key (str): AWS secret access key
            region (str): AWS region
            service_endpoint (str, optional): Custom S3 endpoint
        """

class AzureBlobExternalAnalyticsLink(AnalyticsLink):
    def __init__(self, name: str, dataverse: str, connection_string: str = None,
                 account_name: str = None, account_key: str = None,
                 shared_access_signature: str = None, blob_endpoint: str = None):
        """
        Azure Blob Storage external link.

        Args:
            name (str): Link name
            dataverse (str): Dataverse name
            connection_string (str, optional): Azure connection string
            account_name (str, optional): Azure account name
            account_key (str, optional): Azure account key
            shared_access_signature (str, optional): Azure SAS token
            blob_endpoint (str, optional): Azure blob endpoint
        """

class AnalyticsDataType:
    STRING = "string"
    INT64 = "int64"
    DOUBLE = "double"
    BOOLEAN = "boolean"
    DATETIME = "datetime"
    UUID = "uuid"

class AnalyticsEncryptionLevel:
    NONE = "none"
    HALF = "half"
    FULL = "full"

class AnalyticsLinkType:
    COUCHBASE_REMOTE = "couchbase"
    S3_EXTERNAL = "s3"
    AZURE_EXTERNAL = "azureblob"

Usage Examples

Basic Analytics Queries

from couchbase.options import AnalyticsOptions, AnalyticsScanConsistency

# Simple analytics query
query = """
SELECT country, COUNT(*) as hotel_count 
FROM `travel-sample` 
WHERE type = 'hotel' 
GROUP BY country 
ORDER BY hotel_count DESC
LIMIT 10
"""

result = cluster.analytics_query(query)

for row in result:
    print(f"{row['country']}: {row['hotel_count']} hotels")

# Get metadata
metadata = result.metadata()
print(f"Query took: {metadata.metrics.elapsed_time}")
print(f"Processed: {metadata.metrics.processed_objects} objects")

Parameterized Analytics Queries

# Named parameters
query = """
SELECT name, country, reviews.ratings.Overall as rating
FROM `travel-sample` 
WHERE type = $doc_type AND country = $country_name
AND reviews.ratings.Overall >= $min_rating
ORDER BY rating DESC
"""

options = AnalyticsOptions(
    doc_type="hotel",
    country_name="United States", 
    min_rating=4.0
)

result = cluster.analytics_query(query, options)

for row in result:
    print(f"{row['name']} ({row['country']}): {row['rating']}")

# Positional parameters
query2 = """
SELECT AVG(reviews.ratings.Overall) as avg_rating
FROM `travel-sample`
WHERE type = ? AND country = ?
"""

options2 = AnalyticsOptions().positional_parameters("hotel", "France")
result2 = cluster.analytics_query(query2, options2)

for row in result2:
    print(f"Average rating: {row['avg_rating']}")

Complex Analytics Queries

# Join operation
query = """
SELECT h.name as hotel_name, h.city, r.content as review_text, r.ratings.Overall as rating
FROM `travel-sample` h
UNNEST h.reviews r
WHERE h.type = 'hotel' 
AND h.country = 'United Kingdom'
AND r.ratings.Overall >= 4
ORDER BY r.ratings.Overall DESC, h.name
LIMIT 20
"""

result = cluster.analytics_query(query)

for row in result:
    print(f"{row['hotel_name']} ({row['city']}): {row['rating']}")
    print(f"  Review: {row['review_text'][:100]}...")

# Aggregation with window functions
query2 = """
SELECT country, city, name, reviews.ratings.Overall as rating,
       ROW_NUMBER() OVER (PARTITION BY country ORDER BY reviews.ratings.Overall DESC) as rank
FROM `travel-sample`
WHERE type = 'hotel' AND reviews.ratings.Overall IS NOT MISSING
QUALIFY rank <= 3
ORDER BY country, rank
"""

result2 = cluster.analytics_query(query2)

current_country = None
for row in result2:
    if row['country'] != current_country:
        current_country = row['country']
        print(f"\nTop hotels in {current_country}:")
    print(f"  {row['rank']}. {row['name']} ({row['city']}): {row['rating']}")

Analytics with Consistency

# Perform document updates
doc = {"type": "hotel", "name": "Analytics Test Hotel", "country": "TestLand"}
mutation_result = collection.upsert("hotel::analytics_test", doc)

# Query with consistency
from couchbase.mutation_state import MutationState

mutation_state = MutationState(mutation_result.mutation_token)
options = AnalyticsOptions(scan_consistency=AnalyticsScanConsistency.REQUEST_PLUS)

query = "SELECT * FROM `travel-sample` WHERE name = 'Analytics Test Hotel'"
result = cluster.analytics_query(query, options)

for row in result:
    print(f"Found: {row['name']}")

Analytics Index Management

from couchbase.management.analytics import AnalyticsIndexManager, AnalyticsDataType

analytics_mgr = cluster.analytics_indexes()

# Create dataverse
analytics_mgr.create_dataverse("travel_analytics")

# Create dataset
analytics_mgr.create_dataset("hotels", "travel-sample", 
                           CreateAnalyticsDatasetOptions(dataverse_name="travel_analytics"))

# Create index
index_fields = {
    "country": AnalyticsDataType.STRING,
    "city": AnalyticsDataType.STRING,
    "rating": AnalyticsDataType.DOUBLE
}

analytics_mgr.create_index("hotel_location_idx", "hotels", index_fields,
                         CreateAnalyticsIndexOptions(dataverse_name="travel_analytics"))

# List all datasets
datasets = analytics_mgr.get_all_datasets()
for dataset in datasets:
    print(f"Dataset: {dataset.name} in {dataset.dataverse_name}")

# List all indexes
indexes = analytics_mgr.get_all_indexes()
for index in indexes:
    print(f"Index: {index.name} on {index.dataset_name}")

External Data Links

from couchbase.management.analytics import S3ExternalAnalyticsLink, AzureBlobExternalAnalyticsLink

analytics_mgr = cluster.analytics_indexes()

# Create S3 external link
s3_link = S3ExternalAnalyticsLink(
    name="s3_data_link",
    dataverse="external_data", 
    access_key_id="AKIAIOSFODNN7EXAMPLE",
    secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    region="us-west-2"
)

analytics_mgr.create_link(s3_link)

# Create Azure Blob link
azure_link = AzureBlobExternalAnalyticsLink(
    name="azure_data_link",
    dataverse="external_data",
    account_name="mystorageaccount",
    account_key="myaccountkey"
)

analytics_mgr.create_link(azure_link)

# Query external data
external_query = """
SELECT *
FROM EXTERNAL `s3://my-bucket/data/*.json` 
USING `s3_data_link`
LIMIT 10
"""

result = cluster.analytics_query(external_query)
for row in result:
    print(row)

Error Handling

from couchbase.exceptions import AnalyticsException, TimeoutException

try:
    query = "SELECT * FROM `nonexistent-bucket`"
    result = cluster.analytics_query(query)
    for row in result:
        print(row)
except AnalyticsException as e:
    print(f"Analytics query failed: {e}")
    if hasattr(e, 'context'):
        print(f"Query: {e.context.statement}")
        print(f"Error details: {e.context.errors}")
except TimeoutException:
    print("Analytics query timed out")

# Handle warnings
try:
    query = "SELECT * FROM `travel-sample` WHERE deprecated_field IS NOT MISSING"
    result = cluster.analytics_query(query)
    
    metadata = result.metadata()
    if metadata.warnings:
        for warning in metadata.warnings:
            print(f"Warning {warning.code}: {warning.message}")
            
    for row in result:
        print(row)
except AnalyticsException as e:
    print(f"Analytics error: {e}")

Performance Monitoring

# Query with detailed metrics
options = AnalyticsOptions(
    client_context_id="performance_test_001",
    priority=True  # High priority query
)

query = """
SELECT country, AVG(reviews.ratings.Overall) as avg_rating,
       COUNT(*) as hotel_count
FROM `travel-sample`  
WHERE type = 'hotel' AND reviews.ratings.Overall IS NOT MISSING
GROUP BY country
HAVING COUNT(*) >= 10
ORDER BY avg_rating DESC
"""

result = cluster.analytics_query(query, options)

# Process results
results_list = []
for row in result:
    results_list.append(row)

# Analyze performance
metadata = result.metadata()
metrics = metadata.metrics

print(f"Analytics Performance Report:")
print(f"  Request ID: {metadata.request_id}")
print(f"  Total Time: {metrics.elapsed_time}")
print(f"  Execution Time: {metrics.execution_time}")  
print(f"  Objects Processed: {metrics.processed_objects}")
print(f"  Result Count: {metrics.result_count}")
print(f"  Result Size: {metrics.result_size} bytes")
print(f"  Warnings: {metrics.warning_count}")
print(f"  Errors: {metrics.error_count}")

print(f"\nTop 5 Countries by Hotel Rating:")
for i, row in enumerate(results_list[:5]):
    print(f"  {i+1}. {row['country']}: {row['avg_rating']:.2f} ({row['hotel_count']} hotels)")

Install with Tessl CLI

npx tessl i tessl/pypi-couchbase

docs

analytics-operations.md

async-operations.md

cluster-operations.md

document-operations.md

index.md

management-operations.md

n1ql-queries.md

search-operations.md

subdocument-operations.md

view-operations.md

tile.json