CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-cosmos

Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

container-operations.mddocs/

Container Operations

Container-level operations for managing JSON documents, including CRUD operations, querying, batch processing, change feed monitoring, and conflict resolution. The ContainerProxy provides the interface for all document and container-scoped operations.

Capabilities

Container Information

Read container properties and configuration.

def read(self, populate_query_metrics: bool = None, **kwargs):
    """
    Read container properties and configuration.
    
    Parameters:
    - populate_query_metrics: Include query metrics in response
    - session_token: Session token for consistency
    
    Returns:
    Container properties as dictionary
    """

@property
def is_system_key(self) -> bool:
    """
    Check if container uses system-generated partition key.
    
    Returns:
    True if system partition key, False otherwise
    """

@property  
def scripts(self) -> ScriptsProxy:
    """
    Get scripts proxy for stored procedures, triggers, and UDFs.
    
    Returns:
    ScriptsProxy instance for this container
    """

Item Operations

Create, read, update, and delete JSON documents within the container.

def create_item(self, body: dict, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
    """
    Create a new item in the container.
    
    Parameters:
    - body: Item data as dictionary (must include 'id' field)
    - populate_query_metrics: Include query metrics
    - pre_trigger_include: Pre-trigger to execute
    - post_trigger_include: Post-trigger to execute
    - enable_automatic_id_generation: Auto-generate ID if missing
    - session_token: Session token for consistency
    - initial_headers: Custom headers
    
    Returns:
    Created item as dictionary
    
    Raises:
    CosmosResourceExistsError: If item with same ID already exists
    """

def read_item(self, item: str, partition_key: str, populate_query_metrics: bool = None, **kwargs):
    """
    Read a specific item by ID and partition key.
    
    Parameters:
    - item: Item ID
    - partition_key: Partition key value
    - populate_query_metrics: Include query metrics
    - session_token: Session token for consistency
    
    Returns:
    Item as dictionary
    
    Raises:
    CosmosResourceNotFoundError: If item doesn't exist
    """

def upsert_item(self, body: dict, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
    """
    Create or replace an item.
    
    Parameters:
    - body: Item data as dictionary (must include 'id' field)
    - populate_query_metrics: Include query metrics
    - pre_trigger_include: Pre-trigger to execute
    - post_trigger_include: Post-trigger to execute
    - enable_automatic_id_generation: Auto-generate ID if missing
    - session_token: Session token for consistency
    - initial_headers: Custom headers
    
    Returns:
    Upserted item as dictionary
    """

def replace_item(self, item: str, body: dict, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
    """
    Replace an existing item.
    
    Parameters:
    - item: Item ID or item dictionary with 'id' field
    - body: Updated item data
    - populate_query_metrics: Include query metrics
    - pre_trigger_include: Pre-trigger to execute
    - post_trigger_include: Post-trigger to execute
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Returns:
    Replaced item as dictionary
    
    Raises:
    CosmosResourceNotFoundError: If item doesn't exist
    CosmosAccessConditionFailedError: If conditional operation fails
    """

def patch_item(self, item: str, partition_key: str, patch_operations: list, filter_predicate: str = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
    """
    Patch an item with specific operations.
    
    Parameters:
    - item: Item ID
    - partition_key: Partition key value
    - patch_operations: List of patch operations
    - filter_predicate: Filter condition for patch
    - pre_trigger_include: Pre-trigger to execute
    - post_trigger_include: Post-trigger to execute
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Returns:
    Patched item as dictionary
    """

def delete_item(self, item: str, partition_key: str, populate_query_metrics: bool = None, pre_trigger_include: str = None, post_trigger_include: str = None, **kwargs):
    """
    Delete an item.
    
    Parameters:
    - item: Item ID or item dictionary with 'id' field
    - partition_key: Partition key value
    - populate_query_metrics: Include query metrics
    - pre_trigger_include: Pre-trigger to execute
    - post_trigger_include: Post-trigger to execute
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Raises:
    CosmosResourceNotFoundError: If item doesn't exist
    CosmosAccessConditionFailedError: If conditional operation fails
    """

def delete_all_items_by_partition_key(self, partition_key: str, **kwargs):
    """
    Delete all items in a specific partition.
    
    Parameters:
    - partition_key: Partition key value
    - session_token: Session token for consistency
    
    Returns:
    CosmosList with deletion results
    """

Batch Operations

Execute multiple operations atomically within a single partition.

def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):
    """
    Execute a batch of operations atomically within a partition.
    
    Parameters:
    - batch_operations: List of batch operation objects
    - partition_key: Partition key value (all operations must use same partition)
    - session_token: Session token for consistency
    - enable_automatic_id_generation: Auto-generate IDs for create operations
    
    Returns:
    CosmosList with results for each operation
    
    Raises:
    CosmosBatchOperationError: If any operation in the batch fails
    """

Querying Operations

Query items using SQL-like syntax with support for cross-partition queries.

def query_items(self, query: str = None, parameters: list = None, partition_key: str = None, enable_cross_partition_query: bool = None, max_item_count: int = None, enable_scan_in_query: bool = None, populate_query_metrics: bool = None, **kwargs):
    """
    Query items using SQL syntax.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters as [{"name": "@param", "value": value}]
    - partition_key: Single partition key to query
    - enable_cross_partition_query: Enable cross-partition queries
    - max_item_count: Maximum items per page
    - enable_scan_in_query: Enable scan operations
    - populate_query_metrics: Include query metrics
    - session_token: Session token for consistency
    - initial_headers: Custom headers
    - max_integrated_cache_staleness: Cache staleness tolerance
    
    Returns:
    Iterable of query results
    """

def read_all_items(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
    """
    Read all items in the container.
    
    Parameters:
    - max_item_count: Maximum items per page
    - populate_query_metrics: Include query metrics
    - session_token: Session token for consistency
    - initial_headers: Custom headers
    
    Returns:
    Iterable of all items
    """

Change Feed Operations

Monitor changes to items in the container using change feed.

def query_items_change_feed(self, **kwargs):
    """
    Query the change feed for item modifications.
    
    Parameters:
    - partition_key_range_id: Specific partition key range
    - feed_range: Specific feed range
    - is_start_from_beginning: Start from beginning of change feed
    - continuation: Continuation token for pagination
    - max_item_count: Maximum items per page
    - start_time: Start time for change feed
    - session_token: Session token for consistency
    
    Returns:
    Iterable of changed items with continuation token
    """

Batch Operations

Execute multiple operations atomically within a single partition.

def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):
    """
    Execute a batch of operations atomically.
    
    Parameters:
    - batch_operations: List of batch operation dictionaries
    - partition_key: Partition key value (all operations must be in same partition)
    - session_token: Session token for consistency
    - initial_headers: Custom headers
    
    Returns:
    CosmosList with batch operation results
    
    Raises:
    CosmosBatchOperationError: If batch operation fails
    """

Feed Range Operations

Work with feed ranges for parallel processing and fine-grained control.

def read_feed_ranges(self, **kwargs):
    """
    Get feed ranges for the container.
    
    Parameters:
    - session_token: Session token for consistency
    
    Returns:
    List of feed range dictionaries
    """

def feed_range_from_partition_key(self, partition_key: str):
    """
    Get feed range containing the specified partition key.
    
    Parameters:
    - partition_key: Partition key value
    
    Returns:
    Feed range dictionary
    """

def is_feed_range_subset(self, parent_feed_range: dict, child_feed_range: dict) -> bool:
    """
    Check if child feed range is subset of parent feed range.
    
    Parameters:
    - parent_feed_range: Parent feed range
    - child_feed_range: Child feed range
    
    Returns:
    True if child is subset of parent
    """

Session Token Management

Manage session tokens for session consistency.

def get_latest_session_token(self, partition_key: str, **kwargs) -> str:
    """
    Get the latest session token for a partition.
    
    Parameters:
    - partition_key: Partition key value
    
    Returns:
    Session token string
    """

Conflict Resolution

Handle conflicts in multi-region scenarios.

def list_conflicts(self, max_item_count: int = None, **kwargs):
    """
    List conflicts in the container.
    
    Parameters:
    - max_item_count: Maximum conflicts to return
    - session_token: Session token for consistency
    
    Returns:
    Iterable of conflict items
    """

def query_conflicts(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
    """
    Query conflicts using SQL syntax.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters
    - max_item_count: Maximum items per page
    - session_token: Session token for consistency
    
    Returns:
    Iterable of query results
    """

def get_conflict(self, conflict: str, partition_key: str, **kwargs):
    """
    Get a specific conflict.
    
    Parameters:
    - conflict: Conflict ID
    - partition_key: Partition key value
    - session_token: Session token for consistency
    
    Returns:
    Conflict item
    """

def delete_conflict(self, conflict: str, **kwargs):
    """
    Delete a conflict.
    
    Parameters:
    - conflict: Conflict ID or conflict dictionary
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    """

Throughput Management

Manage container-level throughput and auto-scaling.

def get_throughput(self, **kwargs) -> ThroughputProperties:
    """
    Get current throughput settings for the container.
    
    Parameters:
    - session_token: Session token for consistency
    
    Returns:
    ThroughputProperties with current throughput configuration
    
    Raises:
    CosmosResourceNotFoundError: If throughput not configured
    """

def replace_throughput(self, throughput: ThroughputProperties, **kwargs):
    """
    Replace throughput settings for the container.
    
    Parameters:
    - throughput: New throughput configuration
    - session_token: Session token for consistency
    
    Returns:
    ThroughputProperties with updated configuration
    """

def read_offer(self, **kwargs):
    """
    Read throughput offer (deprecated, use get_throughput).
    
    Returns:
    Offer properties
    """

Usage Examples

Basic Item Operations

# Get container client
container = database.get_container_client("Products")

# Create an item
product = {
    "id": "product1",
    "name": "Laptop",
    "category": "Electronics",
    "price": 999.99,
    "inStock": True
}

created_item = container.create_item(body=product)
print(f"Created item: {created_item['id']}")

# Read an item
item = container.read_item(item="product1", partition_key="Electronics")
print(f"Item name: {item['name']}")

# Update an item
item["price"] = 899.99
updated_item = container.replace_item(item=item["id"], body=item)

# Upsert (create or replace)
new_product = {
    "id": "product2", 
    "name": "Mouse",
    "category": "Electronics",
    "price": 29.99
}
container.upsert_item(body=new_product)

# Delete an item
container.delete_item(item="product1", partition_key="Electronics")

Patch Operations

# Patch operations for partial updates
patch_ops = [
    {"op": "replace", "path": "/price", "value": 799.99},
    {"op": "add", "path": "/tags", "value": ["sale", "featured"]},
    {"op": "remove", "path": "/oldField"}
]

patched_item = container.patch_item(
    item="product2",
    partition_key="Electronics", 
    patch_operations=patch_ops
)

Querying

# Simple query
items = list(container.query_items(
    query="SELECT * FROM c WHERE c.category = 'Electronics'",
    enable_cross_partition_query=True
))

# Parameterized query
items = list(container.query_items(
    query="SELECT * FROM c WHERE c.price BETWEEN @min AND @max",
    parameters=[
        {"name": "@min", "value": 100},
        {"name": "@max", "value": 1000}
    ],
    enable_cross_partition_query=True
))

# Query specific partition
items = list(container.query_items(
    query="SELECT * FROM c WHERE c.inStock = true",
    partition_key="Electronics"
))

# Read all items with pagination
for item in container.read_all_items(max_item_count=100):
    print(f"Item: {item['id']}")

Batch Operations

# Batch operations within same partition
batch_operations = [
    {
        "operation_type": "create",
        "id": "batch1",
        "resource_body": {"id": "batch1", "category": "Electronics", "name": "Item 1"}
    },
    {
        "operation_type": "create", 
        "id": "batch2",
        "resource_body": {"id": "batch2", "category": "Electronics", "name": "Item 2"}  
    },
    {
        "operation_type": "delete",
        "id": "old_item",
        "partition_key": "Electronics"
    }
]

try:
    results = container.execute_item_batch(
        batch_operations=batch_operations,
        partition_key="Electronics"
    )
    print(f"Batch completed successfully: {len(results)} operations")
except CosmosBatchOperationError as e:
    print(f"Batch failed: {e}")

Change Feed

# Read change feed from beginning
changes = container.query_items_change_feed(is_start_from_beginning=True)

for change in changes:
    if "_lsn" in change:  # Not end of changes
        print(f"Changed item: {change.get('id', 'N/A')}")

# Read change feed with continuation
continuation = None
while True:
    changes = container.query_items_change_feed(
        continuation=continuation,
        max_item_count=10
    )
    
    items = list(changes)
    if not items:
        break
        
    for item in items:
        print(f"Change: {item['id']}")
        
    # Get continuation for next batch
    continuation = changes.get_continuation()

Feed Ranges

# Get feed ranges for parallel processing
feed_ranges = container.read_feed_ranges()
print(f"Container has {len(feed_ranges)} feed ranges")

# Process each feed range in parallel
import concurrent.futures

def process_feed_range(feed_range):
    changes = container.query_items_change_feed(
        feed_range=feed_range,
        is_start_from_beginning=True
    )
    return list(changes)

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(process_feed_range, fr) for fr in feed_ranges]
    
    for future in concurrent.futures.as_completed(futures):
        changes = future.result()
        print(f"Processed {len(changes)} changes from feed range")

Install with Tessl CLI

npx tessl i tessl/pypi-azure-cosmos

docs

async-operations.md

client-operations.md

container-operations.md

database-operations.md

index.md

script-operations.md

user-management.md

tile.json