Python Client for Couchbase providing comprehensive database operations including key-value, N1QL queries, search, analytics, and cluster management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Analytics query execution for complex data analysis and reporting. Supports large-scale analytical workloads with integration to external data sources and advanced SQL++ analytics capabilities.
Execute analytics queries with various options and consistency levels.
class Cluster:
def analytics_query(self, statement: str, options: AnalyticsOptions = None) -> AnalyticsResult:
"""
Execute Analytics query.
Args:
statement (str): Analytics query statement
options (AnalyticsOptions, optional): Analytics execution options
Returns:
AnalyticsResult: Analytics results iterator
Raises:
AnalyticsException: If analytics execution fails
TimeoutException: If query times out
"""
class AnalyticsOptions:
def __init__(self, timeout: timedelta = None,
scan_consistency: AnalyticsScanConsistency = None,
client_context_id: str = None,
priority: bool = False,
readonly: bool = None,
raw: Dict[str, Any] = None,
**kwargs):
"""
Analytics query execution options.
Args:
timeout (timedelta, optional): Query timeout
scan_consistency (AnalyticsScanConsistency, optional): Consistency level
client_context_id (str, optional): Client context identifier
priority (bool): High priority query flag
readonly (bool, optional): Read-only query flag
raw (Dict[str, Any], optional): Raw analytics options
**kwargs: Named parameters for parameterized queries
"""
def named_parameters(self, **params) -> AnalyticsOptions:
"""
Set named parameters for query.
Args:
**params: Named parameter values
Returns:
AnalyticsOptions: Options with parameters set
"""
def positional_parameters(self, *params) -> AnalyticsOptions:
"""
Set positional parameters for query.
Args:
*params: Positional parameter values
Returns:
AnalyticsOptions: Options with parameters set
"""Access analytics results and execution metadata.
class AnalyticsResult:
def __iter__(self) -> Iterator[dict]:
"""Iterate over analytics result rows."""
def metadata(self) -> AnalyticsMetaData:
"""Get analytics execution metadata."""
def rows(self) -> List[dict]:
"""Get all result rows as list."""
class AnalyticsMetaData:
@property
def request_id(self) -> str:
"""Analytics request identifier."""
@property
def client_context_id(self) -> str:
"""Client context identifier."""
@property
def status(self) -> AnalyticsStatus:
"""Analytics execution status."""
@property
def signature(self) -> dict:
"""Analytics result signature."""
@property
def metrics(self) -> AnalyticsMetrics:
"""Analytics execution metrics."""
@property
def warnings(self) -> List[AnalyticsWarning]:
"""Analytics execution warnings."""
class AnalyticsMetrics:
@property
def elapsed_time(self) -> timedelta:
"""Total analytics execution time."""
@property
def execution_time(self) -> timedelta:
"""Analytics execution time."""
@property
def result_count(self) -> int:
"""Number of result rows."""
@property
def result_size(self) -> int:
"""Size of results in bytes."""
@property
def processed_objects(self) -> int:
"""Number of objects processed."""
@property
def error_count(self) -> int:
"""Number of errors encountered."""
@property
def warning_count(self) -> int:
"""Number of warnings generated."""
class AnalyticsWarning:
@property
def code(self) -> int:
"""Warning code."""
@property
def message(self) -> str:
"""Warning message."""
class AnalyticsStatus:
RUNNING = "running"
SUCCESS = "success"
ERRORS = "errors"
COMPLETED = "completed"
STOPPED = "stopped"
TIMEOUT = "timeout"
CLOSED = "closed"
FATAL = "fatal"
class AnalyticsScanConsistency:
NOT_BOUNDED = "not_bounded" # Fastest, may return stale data
REQUEST_PLUS = "request_plus" # Consistent with mutationsManage analytics indexes, datasets, and external links.
class AnalyticsIndexManager:
def create_dataverse(self, dataverse_name: str, options: CreateAnalyticsDataverseOptions = None) -> None:
"""
Create analytics dataverse.
Args:
dataverse_name (str): Dataverse name
options (CreateAnalyticsDataverseOptions, optional): Creation options
Raises:
DataverseExistsException: If dataverse already exists
"""
def drop_dataverse(self, dataverse_name: str, options: DropAnalyticsDataverseOptions = None) -> None:
"""
Drop analytics dataverse.
Args:
dataverse_name (str): Dataverse name
options (DropAnalyticsDataverseOptions, optional): Drop options
Raises:
DataverseNotFoundException: If dataverse doesn't exist
"""
def create_dataset(self, dataset_name: str, bucket_name: str, options: CreateAnalyticsDatasetOptions = None) -> None:
"""
Create analytics dataset.
Args:
dataset_name (str): Dataset name
bucket_name (str): Source bucket name
options (CreateAnalyticsDatasetOptions, optional): Creation options
Raises:
DatasetExistsException: If dataset already exists
"""
def drop_dataset(self, dataset_name: str, options: DropAnalyticsDatasetOptions = None) -> None:
"""
Drop analytics dataset.
Args:
dataset_name (str): Dataset name
options (DropAnalyticsDatasetOptions, optional): Drop options
Raises:
DatasetNotFoundException: If dataset doesn't exist
"""
def create_index(self, index_name: str, dataset_name: str, fields: Dict[str, AnalyticsDataType], options: CreateAnalyticsIndexOptions = None) -> None:
"""
Create analytics index.
Args:
index_name (str): Index name
dataset_name (str): Dataset name
fields (Dict[str, AnalyticsDataType]): Index fields and types
options (CreateAnalyticsIndexOptions, optional): Creation options
Raises:
IndexExistsException: If index already exists
"""
def drop_index(self, index_name: str, dataset_name: str, options: DropAnalyticsIndexOptions = None) -> None:
"""
Drop analytics index.
Args:
index_name (str): Index name
dataset_name (str): Dataset name
options (DropAnalyticsIndexOptions, optional): Drop options
Raises:
IndexNotFoundException: If index doesn't exist
"""
def get_all_datasets(self, options: GetAllAnalyticsDatasetsOptions = None) -> List[AnalyticsDataset]:
"""
Get all analytics datasets.
Args:
options (GetAllAnalyticsDatasetsOptions, optional): Retrieval options
Returns:
List[AnalyticsDataset]: All datasets
"""
def get_all_indexes(self, options: GetAllAnalyticsIndexesOptions = None) -> List[AnalyticsIndex]:
"""
Get all analytics indexes.
Args:
options (GetAllAnalyticsIndexesOptions, optional): Retrieval options
Returns:
List[AnalyticsIndex]: All indexes
"""
def connect_link(self, options: ConnectAnalyticsLinkOptions = None) -> None:
"""
Connect analytics link.
Args:
options (ConnectAnalyticsLinkOptions, optional): Connection options
"""
def disconnect_link(self, options: DisconnectAnalyticsLinkOptions = None) -> None:
"""
Disconnect analytics link.
Args:
options (DisconnectAnalyticsLinkOptions, optional): Disconnection options
"""
def create_link(self, link: AnalyticsLink, options: CreateAnalyticsLinkOptions = None) -> None:
"""
Create external analytics link.
Args:
link (AnalyticsLink): Link configuration
options (CreateAnalyticsLinkOptions, optional): Creation options
Raises:
LinkExistsException: If link already exists
"""
def replace_link(self, link: AnalyticsLink, options: ReplaceAnalyticsLinkOptions = None) -> None:
"""
Replace external analytics link.
Args:
link (AnalyticsLink): New link configuration
options (ReplaceAnalyticsLinkOptions, optional): Replace options
Raises:
LinkNotFoundException: If link doesn't exist
"""
def drop_link(self, link_name: str, dataverse_name: str, options: DropAnalyticsLinkOptions = None) -> None:
"""
Drop external analytics link.
Args:
link_name (str): Link name
dataverse_name (str): Dataverse name
options (DropAnalyticsLinkOptions, optional): Drop options
Raises:
LinkNotFoundException: If link doesn't exist
"""
def get_links(self, options: GetAnalyticsLinksOptions = None) -> List[AnalyticsLink]:
"""
Get all analytics links.
Args:
options (GetAnalyticsLinksOptions, optional): Retrieval options
Returns:
List[AnalyticsLink]: All analytics links
"""class AnalyticsDataset:
@property
def name(self) -> str:
"""Dataset name."""
@property
def dataverse_name(self) -> str:
"""Dataverse containing the dataset."""
@property
def link_name(self) -> str:
"""Link name for external datasets."""
@property
def bucket_name(self) -> str:
"""Source bucket name."""
class AnalyticsIndex:
@property
def name(self) -> str:
"""Index name."""
@property
def dataset_name(self) -> str:
"""Dataset containing the index."""
@property
def dataverse_name(self) -> str:
"""Dataverse containing the index."""
@property
def is_primary(self) -> bool:
"""Whether this is a primary index."""
class AnalyticsLink:
"""Base class for analytics links."""
class CouchbaseRemoteAnalyticsLink(AnalyticsLink):
def __init__(self, name: str, dataverse: str, hostname: str,
username: str, password: str = None,
encryption: AnalyticsEncryptionLevel = None):
"""
Remote Couchbase cluster link.
Args:
name (str): Link name
dataverse (str): Dataverse name
hostname (str): Remote cluster hostname
username (str): Remote cluster username
password (str, optional): Remote cluster password
encryption (AnalyticsEncryptionLevel, optional): Encryption level
"""
class S3ExternalAnalyticsLink(AnalyticsLink):
def __init__(self, name: str, dataverse: str, access_key_id: str,
secret_access_key: str, region: str,
service_endpoint: str = None):
"""
Amazon S3 external link.
Args:
name (str): Link name
dataverse (str): Dataverse name
access_key_id (str): AWS access key ID
secret_access_key (str): AWS secret access key
region (str): AWS region
service_endpoint (str, optional): Custom S3 endpoint
"""
class AzureBlobExternalAnalyticsLink(AnalyticsLink):
def __init__(self, name: str, dataverse: str, connection_string: str = None,
account_name: str = None, account_key: str = None,
shared_access_signature: str = None, blob_endpoint: str = None):
"""
Azure Blob Storage external link.
Args:
name (str): Link name
dataverse (str): Dataverse name
connection_string (str, optional): Azure connection string
account_name (str, optional): Azure account name
account_key (str, optional): Azure account key
shared_access_signature (str, optional): Azure SAS token
blob_endpoint (str, optional): Azure blob endpoint
"""
class AnalyticsDataType:
STRING = "string"
INT64 = "int64"
DOUBLE = "double"
BOOLEAN = "boolean"
DATETIME = "datetime"
UUID = "uuid"
class AnalyticsEncryptionLevel:
NONE = "none"
HALF = "half"
FULL = "full"
class AnalyticsLinkType:
COUCHBASE_REMOTE = "couchbase"
S3_EXTERNAL = "s3"
AZURE_EXTERNAL = "azureblob"from couchbase.options import AnalyticsOptions, AnalyticsScanConsistency
# Simple analytics query
query = """
SELECT country, COUNT(*) as hotel_count
FROM `travel-sample`
WHERE type = 'hotel'
GROUP BY country
ORDER BY hotel_count DESC
LIMIT 10
"""
result = cluster.analytics_query(query)
for row in result:
print(f"{row['country']}: {row['hotel_count']} hotels")
# Get metadata
metadata = result.metadata()
print(f"Query took: {metadata.metrics.elapsed_time}")
print(f"Processed: {metadata.metrics.processed_objects} objects")# Named parameters
query = """
SELECT name, country, reviews.ratings.Overall as rating
FROM `travel-sample`
WHERE type = $doc_type AND country = $country_name
AND reviews.ratings.Overall >= $min_rating
ORDER BY rating DESC
"""
options = AnalyticsOptions(
doc_type="hotel",
country_name="United States",
min_rating=4.0
)
result = cluster.analytics_query(query, options)
for row in result:
print(f"{row['name']} ({row['country']}): {row['rating']}")
# Positional parameters
query2 = """
SELECT AVG(reviews.ratings.Overall) as avg_rating
FROM `travel-sample`
WHERE type = ? AND country = ?
"""
options2 = AnalyticsOptions().positional_parameters("hotel", "France")
result2 = cluster.analytics_query(query2, options2)
for row in result2:
print(f"Average rating: {row['avg_rating']}")# Join operation
query = """
SELECT h.name as hotel_name, h.city, r.content as review_text, r.ratings.Overall as rating
FROM `travel-sample` h
UNNEST h.reviews r
WHERE h.type = 'hotel'
AND h.country = 'United Kingdom'
AND r.ratings.Overall >= 4
ORDER BY r.ratings.Overall DESC, h.name
LIMIT 20
"""
result = cluster.analytics_query(query)
for row in result:
print(f"{row['hotel_name']} ({row['city']}): {row['rating']}")
print(f" Review: {row['review_text'][:100]}...")
# Aggregation with window functions
query2 = """
SELECT country, city, name, reviews.ratings.Overall as rating,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY reviews.ratings.Overall DESC) as rank
FROM `travel-sample`
WHERE type = 'hotel' AND reviews.ratings.Overall IS NOT MISSING
QUALIFY rank <= 3
ORDER BY country, rank
"""
result2 = cluster.analytics_query(query2)
current_country = None
for row in result2:
if row['country'] != current_country:
current_country = row['country']
print(f"\nTop hotels in {current_country}:")
print(f" {row['rank']}. {row['name']} ({row['city']}): {row['rating']}")# Perform document updates
doc = {"type": "hotel", "name": "Analytics Test Hotel", "country": "TestLand"}
mutation_result = collection.upsert("hotel::analytics_test", doc)
# Query with consistency
from couchbase.mutation_state import MutationState
mutation_state = MutationState(mutation_result.mutation_token)
options = AnalyticsOptions(scan_consistency=AnalyticsScanConsistency.REQUEST_PLUS)
query = "SELECT * FROM `travel-sample` WHERE name = 'Analytics Test Hotel'"
result = cluster.analytics_query(query, options)
for row in result:
print(f"Found: {row['name']}")from couchbase.management.analytics import AnalyticsIndexManager, AnalyticsDataType
analytics_mgr = cluster.analytics_indexes()
# Create dataverse
analytics_mgr.create_dataverse("travel_analytics")
# Create dataset
analytics_mgr.create_dataset("hotels", "travel-sample",
CreateAnalyticsDatasetOptions(dataverse_name="travel_analytics"))
# Create index
index_fields = {
"country": AnalyticsDataType.STRING,
"city": AnalyticsDataType.STRING,
"rating": AnalyticsDataType.DOUBLE
}
analytics_mgr.create_index("hotel_location_idx", "hotels", index_fields,
CreateAnalyticsIndexOptions(dataverse_name="travel_analytics"))
# List all datasets
datasets = analytics_mgr.get_all_datasets()
for dataset in datasets:
print(f"Dataset: {dataset.name} in {dataset.dataverse_name}")
# List all indexes
indexes = analytics_mgr.get_all_indexes()
for index in indexes:
print(f"Index: {index.name} on {index.dataset_name}")from couchbase.management.analytics import S3ExternalAnalyticsLink, AzureBlobExternalAnalyticsLink
analytics_mgr = cluster.analytics_indexes()
# Create S3 external link
s3_link = S3ExternalAnalyticsLink(
name="s3_data_link",
dataverse="external_data",
access_key_id="AKIAIOSFODNN7EXAMPLE",
secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
region="us-west-2"
)
analytics_mgr.create_link(s3_link)
# Create Azure Blob link
azure_link = AzureBlobExternalAnalyticsLink(
name="azure_data_link",
dataverse="external_data",
account_name="mystorageaccount",
account_key="myaccountkey"
)
analytics_mgr.create_link(azure_link)
# Query external data
external_query = """
SELECT *
FROM EXTERNAL `s3://my-bucket/data/*.json`
USING `s3_data_link`
LIMIT 10
"""
result = cluster.analytics_query(external_query)
for row in result:
print(row)from couchbase.exceptions import AnalyticsException, TimeoutException
try:
query = "SELECT * FROM `nonexistent-bucket`"
result = cluster.analytics_query(query)
for row in result:
print(row)
except AnalyticsException as e:
print(f"Analytics query failed: {e}")
if hasattr(e, 'context'):
print(f"Query: {e.context.statement}")
print(f"Error details: {e.context.errors}")
except TimeoutException:
print("Analytics query timed out")
# Handle warnings
try:
query = "SELECT * FROM `travel-sample` WHERE deprecated_field IS NOT MISSING"
result = cluster.analytics_query(query)
metadata = result.metadata()
if metadata.warnings:
for warning in metadata.warnings:
print(f"Warning {warning.code}: {warning.message}")
for row in result:
print(row)
except AnalyticsException as e:
print(f"Analytics error: {e}")# Query with detailed metrics
options = AnalyticsOptions(
client_context_id="performance_test_001",
priority=True # High priority query
)
query = """
SELECT country, AVG(reviews.ratings.Overall) as avg_rating,
COUNT(*) as hotel_count
FROM `travel-sample`
WHERE type = 'hotel' AND reviews.ratings.Overall IS NOT MISSING
GROUP BY country
HAVING COUNT(*) >= 10
ORDER BY avg_rating DESC
"""
result = cluster.analytics_query(query, options)
# Process results
results_list = []
for row in result:
results_list.append(row)
# Analyze performance
metadata = result.metadata()
metrics = metadata.metrics
print(f"Analytics Performance Report:")
print(f" Request ID: {metadata.request_id}")
print(f" Total Time: {metrics.elapsed_time}")
print(f" Execution Time: {metrics.execution_time}")
print(f" Objects Processed: {metrics.processed_objects}")
print(f" Result Count: {metrics.result_count}")
print(f" Result Size: {metrics.result_size} bytes")
print(f" Warnings: {metrics.warning_count}")
print(f" Errors: {metrics.error_count}")
print(f"\nTop 5 Countries by Hotel Rating:")
for i, row in enumerate(results_list[:5]):
print(f" {i+1}. {row['country']}: {row['avg_rating']:.2f} ({row['hotel_count']} hotels)")Install with Tessl CLI
npx tessl i tessl/pypi-couchbase