Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Container-level operations for managing JSON documents, including CRUD operations, querying, batch processing, change feed monitoring, and conflict resolution. The ContainerProxy provides the interface for all document and container-scoped operations.
Read container properties and configuration.
def read(self, populate_query_metrics: bool = None, **kwargs):
"""
Read container properties and configuration.
Parameters:
- populate_query_metrics: Include query metrics in response
- session_token: Session token for consistency
Returns:
Container properties as dictionary
"""
@property
def is_system_key(self) -> bool:
"""
Check if container uses system-generated partition key.
Returns:
True if system partition key, False otherwise
"""
@property
def scripts(self) -> ScriptsProxy:
"""
Get scripts proxy for stored procedures, triggers, and UDFs.
Returns:
ScriptsProxy instance for this container
"""Create, read, update, and delete JSON documents within the container.
def create_item(self, body: dict, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
"""
Create a new item in the container.
Parameters:
- body: Item data as dictionary (must include 'id' field)
- populate_query_metrics: Include query metrics
- pre_trigger_include: Pre-trigger to execute
- post_trigger_include: Post-trigger to execute
- enable_automatic_id_generation: Auto-generate ID if missing
- session_token: Session token for consistency
- initial_headers: Custom headers
Returns:
Created item as dictionary
Raises:
CosmosResourceExistsError: If item with same ID already exists
"""
def read_item(self, item: str, partition_key: str, populate_query_metrics: bool = None, **kwargs):
"""
Read a specific item by ID and partition key.
Parameters:
- item: Item ID
- partition_key: Partition key value
- populate_query_metrics: Include query metrics
- session_token: Session token for consistency
Returns:
Item as dictionary
Raises:
CosmosResourceNotFoundError: If item doesn't exist
"""
def upsert_item(self, body: dict, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
"""
Create or replace an item.
Parameters:
- body: Item data as dictionary (must include 'id' field)
- populate_query_metrics: Include query metrics
- pre_trigger_include: Pre-trigger to execute
- post_trigger_include: Post-trigger to execute
- enable_automatic_id_generation: Auto-generate ID if missing
- session_token: Session token for consistency
- initial_headers: Custom headers
Returns:
Upserted item as dictionary
"""
def replace_item(self, item: str, body: dict, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
"""
Replace an existing item.
Parameters:
- item: Item ID or item dictionary with 'id' field
- body: Updated item data
- populate_query_metrics: Include query metrics
- pre_trigger_include: Pre-trigger to execute
- post_trigger_include: Post-trigger to execute
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Returns:
Replaced item as dictionary
Raises:
CosmosResourceNotFoundError: If item doesn't exist
CosmosAccessConditionFailedError: If conditional operation fails
"""
def patch_item(self, item: str, partition_key: str, patch_operations: list, filter_predicate: str = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
"""
Patch an item with specific operations.
Parameters:
- item: Item ID
- partition_key: Partition key value
- patch_operations: List of patch operations
- filter_predicate: Filter condition for patch
- pre_trigger_include: Pre-trigger to execute
- post_trigger_include: Post-trigger to execute
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Returns:
Patched item as dictionary
"""
def delete_item(self, item: str, partition_key: str, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
"""
Delete an item.
Parameters:
- item: Item ID or item dictionary with 'id' field
- partition_key: Partition key value
- populate_query_metrics: Include query metrics
- pre_trigger_include: Pre-trigger to execute
- post_trigger_include: Post-trigger to execute
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Raises:
CosmosResourceNotFoundError: If item doesn't exist
CosmosAccessConditionFailedError: If conditional operation fails
"""
def delete_all_items_by_partition_key(self, partition_key: str, **kwargs):
"""
Delete all items in a specific partition.
Parameters:
- partition_key: Partition key value
- session_token: Session token for consistency
Returns:
CosmosList with deletion results
"""Execute multiple operations atomically within a single partition.
def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):
"""
Execute a batch of operations atomically within a partition.
Parameters:
- batch_operations: List of batch operation objects
- partition_key: Partition key value (all operations must use same partition)
- session_token: Session token for consistency
- enable_automatic_id_generation: Auto-generate IDs for create operations
Returns:
CosmosList with results for each operation
Raises:
CosmosBatchOperationError: If any operation in the batch fails
"""Query items using SQL-like syntax with support for cross-partition queries.
def query_items(self, query: str = None, parameters: list = None, partition_key: str = None, enable_cross_partition_query: bool = None, max_item_count: int = None, enable_scan_in_query: bool = None, populate_query_metrics: bool = None, **kwargs):
"""
Query items using SQL syntax.
Parameters:
- query: SQL query string
- parameters: Query parameters as [{"name": "@param", "value": value}]
- partition_key: Single partition key to query
- enable_cross_partition_query: Enable cross-partition queries
- max_item_count: Maximum items per page
- enable_scan_in_query: Enable scan operations
- populate_query_metrics: Include query metrics
- session_token: Session token for consistency
- initial_headers: Custom headers
- max_integrated_cache_staleness: Cache staleness tolerance
Returns:
Iterable of query results
"""
def read_all_items(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
"""
Read all items in the container.
Parameters:
- max_item_count: Maximum items per page
- populate_query_metrics: Include query metrics
- session_token: Session token for consistency
- initial_headers: Custom headers
Returns:
Iterable of all items
"""Monitor changes to items in the container using change feed.
def query_items_change_feed(self, **kwargs):
"""
Query the change feed for item modifications.
Parameters:
- partition_key_range_id: Specific partition key range
- feed_range: Specific feed range
- is_start_from_beginning: Start from beginning of change feed
- continuation: Continuation token for pagination
- max_item_count: Maximum items per page
- start_time: Start time for change feed
- session_token: Session token for consistency
Returns:
Iterable of changed items with continuation token
"""Execute multiple operations atomically within a single partition.
def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):
"""
Execute a batch of operations atomically.
Parameters:
- batch_operations: List of batch operation dictionaries
- partition_key: Partition key value (all operations must be in same partition)
- session_token: Session token for consistency
- initial_headers: Custom headers
Returns:
CosmosList with batch operation results
Raises:
CosmosBatchOperationError: If batch operation fails
"""Work with feed ranges for parallel processing and fine-grained control.
def read_feed_ranges(self, **kwargs):
"""
Get feed ranges for the container.
Parameters:
- session_token: Session token for consistency
Returns:
List of feed range dictionaries
"""
def feed_range_from_partition_key(self, partition_key: str):
"""
Get feed range containing the specified partition key.
Parameters:
- partition_key: Partition key value
Returns:
Feed range dictionary
"""
def is_feed_range_subset(self, parent_feed_range: dict, child_feed_range: dict) -> bool:
"""
Check if child feed range is subset of parent feed range.
Parameters:
- parent_feed_range: Parent feed range
- child_feed_range: Child feed range
Returns:
True if child is subset of parent
"""Manage session tokens for session consistency.
def get_latest_session_token(self, partition_key: str, **kwargs) -> str:
"""
Get the latest session token for a partition.
Parameters:
- partition_key: Partition key value
Returns:
Session token string
"""Handle conflicts in multi-region scenarios.
def list_conflicts(self, max_item_count: int = None, **kwargs):
"""
List conflicts in the container.
Parameters:
- max_item_count: Maximum conflicts to return
- session_token: Session token for consistency
Returns:
Iterable of conflict items
"""
def query_conflicts(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
"""
Query conflicts using SQL syntax.
Parameters:
- query: SQL query string
- parameters: Query parameters
- max_item_count: Maximum items per page
- session_token: Session token for consistency
Returns:
Iterable of query results
"""
def get_conflict(self, conflict: str, partition_key: str, **kwargs):
"""
Get a specific conflict.
Parameters:
- conflict: Conflict ID
- partition_key: Partition key value
- session_token: Session token for consistency
Returns:
Conflict item
"""
def delete_conflict(self, conflict: str, **kwargs):
"""
Delete a conflict.
Parameters:
- conflict: Conflict ID or conflict dictionary
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
"""Manage container-level throughput and auto-scaling.
def get_throughput(self, **kwargs) -> ThroughputProperties:
"""
Get current throughput settings for the container.
Parameters:
- session_token: Session token for consistency
Returns:
ThroughputProperties with current throughput configuration
Raises:
CosmosResourceNotFoundError: If throughput not configured
"""
def replace_throughput(self, throughput: ThroughputProperties, **kwargs):
"""
Replace throughput settings for the container.
Parameters:
- throughput: New throughput configuration
- session_token: Session token for consistency
Returns:
ThroughputProperties with updated configuration
"""
def read_offer(self, **kwargs):
"""
Read throughput offer (deprecated, use get_throughput).
Returns:
Offer properties
"""# Get container client
container = database.get_container_client("Products")
# Create an item
product = {
"id": "product1",
"name": "Laptop",
"category": "Electronics",
"price": 999.99,
"inStock": True
}
created_item = container.create_item(body=product)
print(f"Created item: {created_item['id']}")
# Read an item
item = container.read_item(item="product1", partition_key="Electronics")
print(f"Item name: {item['name']}")
# Update an item
item["price"] = 899.99
updated_item = container.replace_item(item=item["id"], body=item)
# Upsert (create or replace)
new_product = {
"id": "product2",
"name": "Mouse",
"category": "Electronics",
"price": 29.99
}
container.upsert_item(body=new_product)
# Delete an item
container.delete_item(item="product1", partition_key="Electronics")# Patch operations for partial updates
patch_ops = [
{"op": "replace", "path": "/price", "value": 799.99},
{"op": "add", "path": "/tags", "value": ["sale", "featured"]},
{"op": "remove", "path": "/oldField"}
]
patched_item = container.patch_item(
item="product2",
partition_key="Electronics",
patch_operations=patch_ops
)# Simple query
items = list(container.query_items(
query="SELECT * FROM c WHERE c.category = 'Electronics'",
enable_cross_partition_query=True
))
# Parameterized query
items = list(container.query_items(
query="SELECT * FROM c WHERE c.price BETWEEN @min AND @max",
parameters=[
{"name": "@min", "value": 100},
{"name": "@max", "value": 1000}
],
enable_cross_partition_query=True
))
# Query specific partition
items = list(container.query_items(
query="SELECT * FROM c WHERE c.inStock = true",
partition_key="Electronics"
))
# Read all items with pagination
for item in container.read_all_items(max_item_count=100):
print(f"Item: {item['id']}")# Batch operations within same partition
batch_operations = [
{
"operation_type": "create",
"id": "batch1",
"resource_body": {"id": "batch1", "category": "Electronics", "name": "Item 1"}
},
{
"operation_type": "create",
"id": "batch2",
"resource_body": {"id": "batch2", "category": "Electronics", "name": "Item 2"}
},
{
"operation_type": "delete",
"id": "old_item",
"partition_key": "Electronics"
}
]
try:
results = container.execute_item_batch(
batch_operations=batch_operations,
partition_key="Electronics"
)
print(f"Batch completed successfully: {len(results)} operations")
except CosmosBatchOperationError as e:
print(f"Batch failed: {e}")# Read change feed from beginning
changes = container.query_items_change_feed(is_start_from_beginning=True)
for change in changes:
if "_lsn" in change: # Not end of changes
print(f"Changed item: {change.get('id', 'N/A')}")
# Read change feed with continuation
continuation = None
while True:
changes = container.query_items_change_feed(
continuation=continuation,
max_item_count=10
)
items = list(changes)
if not items:
break
for item in items:
print(f"Change: {item['id']}")
# Get continuation for next batch
continuation = changes.get_continuation()# Get feed ranges for parallel processing
feed_ranges = container.read_feed_ranges()
print(f"Container has {len(feed_ranges)} feed ranges")
# Process each feed range in parallel
import concurrent.futures
def process_feed_range(feed_range):
changes = container.query_items_change_feed(
feed_range=feed_range,
is_start_from_beginning=True
)
return list(changes)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_feed_range, fr) for fr in feed_ranges]
for future in concurrent.futures.as_completed(futures):
changes = future.result()
print(f"Processed {len(changes)} changes from feed range")Install with Tessl CLI
npx tessl i tessl/pypi-azure-cosmos