Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Database-level operations for managing containers, users, and throughput configuration within an Azure Cosmos DB database. The DatabaseProxy provides the interface for all database-scoped operations.
Read database properties and metadata.
def read(self, populate_query_metrics: bool = None, **kwargs):
"""
Read database properties.
Parameters:
- populate_query_metrics: Include query metrics in response
- session_token: Session token for consistency
Returns:
Database properties as dictionary
"""Create, list, query, and manage containers within the database.
def create_container(self, id: str, partition_key: PartitionKey, indexing_policy: dict = None, default_ttl: int = None, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, unique_key_policy: dict = None, conflict_resolution_policy: dict = None, analytical_storage_ttl: int = None, vector_embedding_policy: dict = None, full_text_policy: dict = None, computed_properties: list = None, **kwargs):
"""
Create a new container.
Parameters:
- id: Container ID
- partition_key: Partition key configuration
- indexing_policy: Indexing policy configuration
- default_ttl: Default time-to-live in seconds
- populate_query_metrics: Include query metrics
- offer_throughput: Provisioned throughput in RU/s or ThroughputProperties
- unique_key_policy: Unique key constraints
- conflict_resolution_policy: Conflict resolution policy
- analytical_storage_ttl: Analytical store TTL in seconds
- vector_embedding_policy: Vector embedding policy configuration
- full_text_policy: Full text search policy configuration
- computed_properties: List of computed properties
- session_token: Session token
Returns:
ContainerProxy for the created container
Raises:
CosmosResourceExistsError: If container already exists
"""
def create_container_if_not_exists(self, id: str, partition_key: PartitionKey, indexing_policy: dict = None, default_ttl: int = None, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, unique_key_policy: dict = None, conflict_resolution_policy: dict = None, analytical_storage_ttl: int = None, vector_embedding_policy: dict = None, full_text_policy: dict = None, computed_properties: list = None, **kwargs):
"""
Create container if it doesn't exist, otherwise return existing.
Parameters: Same as create_container
Returns:
ContainerProxy for the container
"""
def get_container_client(self, container: str) -> ContainerProxy:
"""
Get a container client for operations on a specific container.
Parameters:
- container: Container ID or ContainerProxy instance
Returns:
ContainerProxy instance
"""
def list_containers(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
"""
List all containers in the database.
Parameters:
- max_item_count: Maximum number of items to return
- populate_query_metrics: Include query metrics
- session_token: Session token for consistency
Returns:
Iterable of container items
"""
def query_containers(self, query: str = None, parameters: list = None, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
"""
Query containers using SQL-like syntax.
Parameters:
- query: SQL query string
- parameters: Query parameters as [{"name": "@param", "value": value}]
- max_item_count: Maximum items per page
- populate_query_metrics: Include query metrics
- session_token: Session token for consistency
Returns:
Iterable of query results
"""
def replace_container(self, container: str, partition_key: PartitionKey, indexing_policy: dict = None, default_ttl: int = None, conflict_resolution_policy: dict = None, populate_query_metrics: bool = None, **kwargs):
"""
Replace container configuration.
Parameters:
- container: Container ID or ContainerProxy instance
- partition_key: Partition key configuration
- indexing_policy: Indexing policy configuration
- default_ttl: Default time-to-live
- conflict_resolution_policy: Conflict resolution policy
- populate_query_metrics: Include query metrics
- session_token: Session token
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Returns:
ContainerProxy for the replaced container
"""
def delete_container(self, container: str, populate_query_metrics: bool = None, **kwargs):
"""
Delete a container.
Parameters:
- container: Container ID or ContainerProxy instance
- populate_query_metrics: Include query metrics
- session_token: Session token
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Raises:
CosmosResourceNotFoundError: If container doesn't exist
"""Manage users within the database for permission-based access control.
def list_users(self, max_item_count: int = None, **kwargs):
"""
List users in the database.
Parameters:
- max_item_count: Maximum number of users to return
- session_token: Session token for consistency
Returns:
Iterable of user items
"""
def query_users(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
"""
Query users using SQL-like syntax.
Parameters:
- query: SQL query string
- parameters: Query parameters
- max_item_count: Maximum items per page
- session_token: Session token for consistency
Returns:
Iterable of query results
"""
def get_user_client(self, user: str) -> UserProxy:
"""
Get a user client for user operations.
Parameters:
- user: User ID or UserProxy instance
Returns:
UserProxy instance
"""
def create_user(self, body: dict, **kwargs):
"""
Create a new user.
Parameters:
- body: User properties dictionary with 'id' field
- session_token: Session token for consistency
Returns:
UserProxy for the created user
Raises:
CosmosResourceExistsError: If user already exists
"""
def upsert_user(self, body: dict, **kwargs):
"""
Create or replace a user.
Parameters:
- body: User properties dictionary with 'id' field
- session_token: Session token for consistency
Returns:
UserProxy for the user
"""
def replace_user(self, user: str, body: dict, **kwargs):
"""
Replace user properties.
Parameters:
- user: User ID or UserProxy instance
- body: Updated user properties
- session_token: Session token
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Returns:
UserProxy for the replaced user
"""
def delete_user(self, user: str, **kwargs):
"""
Delete a user.
Parameters:
- user: User ID or UserProxy instance
- session_token: Session token
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Raises:
CosmosResourceNotFoundError: If user doesn't exist
"""Manage provisioned throughput and auto-scaling for the database.
def get_throughput(self, **kwargs) -> ThroughputProperties:
"""
Get current throughput settings for the database.
Parameters:
- session_token: Session token for consistency
Returns:
ThroughputProperties with current throughput configuration
Raises:
CosmosResourceNotFoundError: If throughput not configured
"""
def replace_throughput(self, throughput: ThroughputProperties, **kwargs):
"""
Replace throughput settings for the database.
Parameters:
- throughput: New throughput configuration
- session_token: Session token for consistency
Returns:
ThroughputProperties with updated configuration
"""
def read_offer(self, **kwargs):
"""
Read throughput offer (deprecated, use get_throughput).
Returns:
Offer properties
"""from azure.cosmos import PartitionKey, IndexingMode
# Get database client
database = client.get_database_client("ProductCatalog")
# Create container with partition key
container = database.create_container_if_not_exists(
id="Products",
partition_key=PartitionKey(path="/category"),
offer_throughput=1000
)
# Create container with advanced configuration
indexing_policy = {
"indexingMode": IndexingMode.Consistent,
"automatic": True,
"includedPaths": [{"path": "/*"}],
"excludedPaths": [{"path": "/description/*"}]
}
unique_key_policy = {
"uniqueKeys": [
{"paths": ["/sku"]},
{"paths": ["/name", "/category"]}
]
}
container = database.create_container(
id="UniqueProducts",
partition_key=PartitionKey(path="/category", version=2),
indexing_policy=indexing_policy,
unique_key_policy=unique_key_policy,
default_ttl=3600, # 1 hour TTL
offer_throughput=400
)
# List containers
containers = list(database.list_containers())
for cont in containers:
print(f"Container: {cont['id']}")
# Query containers
results = list(database.query_containers(
query="SELECT * FROM c WHERE c.defaultTtl > 0"
))from azure.cosmos import ThroughputProperties
# Get current throughput
try:
throughput = database.get_throughput()
print(f"Current throughput: {throughput.offer_throughput} RU/s")
if throughput.auto_scale_max_throughput:
print(f"Auto-scale max: {throughput.auto_scale_max_throughput} RU/s")
except CosmosResourceNotFoundError:
print("No throughput configured at database level")
# Update throughput
new_throughput = ThroughputProperties(offer_throughput=800)
database.replace_throughput(new_throughput)
# Configure auto-scaling
auto_scale_throughput = ThroughputProperties(
auto_scale_max_throughput=4000,
auto_scale_increment_percent=10
)
database.replace_throughput(auto_scale_throughput)# Create a user
user_def = {"id": "appUser1"}
user = database.create_user(user_def)
# List users
users = list(database.list_users())
for u in users:
print(f"User: {u['id']}")
# Get user client for permission management
user_client = database.get_user_client("appUser1")
# Query users
results = list(database.query_users(
query="SELECT * FROM users u WHERE u.id = @userid",
parameters=[{"name": "@userid", "value": "appUser1"}]
))# Read database properties
db_properties = database.read()
print(f"Database ID: {db_properties['id']}")
print(f"ETag: {db_properties['_etag']}")
print(f"Self Link: {db_properties['_self']}")Install with Tessl CLI
npx tessl i tessl/pypi-azure-cosmos