Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Microsoft Azure Cosmos Client Library for Python provides access to Azure Cosmos DB, a globally distributed, multi-model NoSQL database service. This library enables developers to perform CRUD operations on JSON documents, manage databases and containers, execute queries with SQL-like syntax, and leverage advanced features like stored procedures, triggers, and batch operations.
pip install azure-cosmosfrom azure.cosmos import CosmosClient, ConsistencyLevel, PartitionKeyFor async operations:
from azure.cosmos.aio import CosmosClientCommon data types and configuration:
from azure.cosmos import (
ThroughputProperties,
DatabaseAccount,
ProxyConfiguration,
SSLConfiguration
)from azure.cosmos import CosmosClient, ConsistencyLevel, PartitionKey
# Initialize client with account endpoint and key
client = CosmosClient(
url="https://your-account.documents.azure.com:443/",
credential="your-account-key",
consistency_level=ConsistencyLevel.Session
)
# Get or create database
database = client.create_database_if_not_exists(id="ToDoDatabase")
# Get or create container
container = database.create_container_if_not_exists(
id="ToDoContainer",
partition_key=PartitionKey(path="/category"),
offer_throughput=400
)
# Create an item
item = {
"id": "item1",
"category": "personal",
"description": "Grocery shopping",
"isComplete": False
}
container.create_item(body=item)
# Query items
items = list(container.query_items(
query="SELECT * FROM c WHERE c.category = @category",
parameters=[{"name": "@category", "value": "personal"}],
enable_cross_partition_query=True
))
# Read a specific item
item = container.read_item(item="item1", partition_key="personal")
# Update an item
item["isComplete"] = True
container.replace_item(item=item["id"], body=item)
# Delete an item
container.delete_item(item="item1", partition_key="personal")Azure Cosmos DB client follows a hierarchical structure:
Both synchronous and asynchronous versions are available, with the async API providing the same functionality with coroutine-based methods.
Account-level operations including database management, authentication, and client configuration. Provides entry points to all Azure Cosmos DB functionality.
class CosmosClient:
def __init__(self, url: str, credential: str, consistency_level: ConsistencyLevel = None, **kwargs): ...
def create_database(self, id: str, populate_query_metrics: bool = None, offer_throughput: int = None, **kwargs): ...
def get_database_client(self, database: str) -> DatabaseProxy: ...
def list_databases(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs): ...
def get_database_account(**kwargs) -> DatabaseAccount: ...Database-level operations including container management, user management, and throughput configuration. Provides access to all resources within a database.
class DatabaseProxy:
def read(self, populate_query_metrics: bool = None, **kwargs): ...
def create_container(self, id: str, partition_key: PartitionKey, indexing_policy: dict = None, default_ttl: int = None, **kwargs): ...
def get_container_client(self, container: str) -> ContainerProxy: ...
def list_containers(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs): ...
def get_throughput(**kwargs) -> ThroughputProperties: ...Container-level operations including item CRUD operations, querying, batch operations, change feed processing, and conflict resolution.
class ContainerProxy:
def create_item(self, body: dict, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs): ...
def read_item(self, item: str, partition_key: str, populate_query_metrics: bool = None, **kwargs): ...
def query_items(self, query: str = None, parameters: list = None, partition_key: str = None, enable_cross_partition_query: bool = None, **kwargs): ...
def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs): ...
def query_items_change_feed(**kwargs): ...Stored procedures, triggers, and user-defined functions management. Enables server-side logic execution and advanced data processing scenarios.
class ScriptsProxy:
def create_stored_procedure(self, body: dict, **kwargs): ...
def execute_stored_procedure(self, sproc: str, partition_key: str = None, params: list = None, enable_script_logging: bool = None, **kwargs): ...
def create_trigger(self, body: dict, **kwargs): ...
def create_user_defined_function(self, body: dict, **kwargs): ...User management and permission-based access control for fine-grained security in multi-tenant applications.
class UserProxy:
def read(**kwargs): ...
def create_permission(self, body: dict, **kwargs): ...
def list_permissions(self, max_item_count: int = None, **kwargs): ...
def get_permission(self, permission: str, **kwargs): ...Asynchronous versions of all client operations for high-performance, non-blocking applications. All async classes mirror their synchronous counterparts.
class CosmosClient: # async version
async def create_database(self, id: str, **kwargs): ...
async def close(self): ...class PartitionKey(dict):
def __init__(self, path: Union[str, List[str]], kind: str = "Hash", version: int = 2): ...
# Supports both single path: PartitionKey(path="/category")
# And multiple paths: PartitionKey(path=["/category", "/id"], kind="MultiHash")
class ThroughputProperties:
offer_throughput: Optional[int]
auto_scale_max_throughput: Optional[int]
auto_scale_increment_percent: Optional[int]
properties: Optional[Dict[str, Any]]
instant_maximum_throughput: Optional[int]
soft_allowed_maximum_throughput: Optional[int]
class ProxyConfiguration:
Host: str
Port: int
Username: str
Password: str
class SSLConfiguration:
SSLKeyFile: str
SSLCertFile: str
SSLCaCerts: strclass ConsistencyLevel:
Strong: str
BoundedStaleness: str
Session: str
Eventual: str
ConsistentPrefix: strclass CosmosDict(dict):
def get_response_headers(self) -> dict: ...
class CosmosList(list):
def get_response_headers(self) -> dict: ...class IndexKind:
Hash: str
Range: str
MultiHash: str
class IndexingMode:
Consistent: str
NoIndex: str
class DataType:
Number: str
String: str
Point: str
LineString: str
Polygon: str
MultiPolygon: strclass PermissionMode:
Read: str
All: str
class TriggerType:
Pre: str
Post: str
class TriggerOperation:
All: str
Create: str
Update: str
Delete: str
Replace: strclass Priority:
High: str
Low: str
class CosmosBatchOperationError(Exception):
"""Exception raised when batch operations fail."""
pass
class CosmosResourceExistsError(Exception):
"""Exception raised when trying to create a resource that already exists."""
pass
class CosmosResourceNotFoundError(Exception):
"""Exception raised when trying to access a resource that doesn't exist."""
pass
class CosmosAccessConditionFailedError(Exception):
"""Exception raised when conditional operations fail."""
pass