CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-cosmos

Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

script-operations.mddocs/

Script Operations

Server-side script operations including stored procedures, triggers, and user-defined functions (UDFs). The ScriptsProxy enables execution of custom JavaScript code within the Azure Cosmos DB engine for advanced data processing and business logic.

Capabilities

Stored Procedures

Server-side procedures written in JavaScript that execute within the database engine with transactional guarantees.

def create_stored_procedure(self, body: dict, **kwargs):
    """
    Create a new stored procedure.
    
    Parameters:
    - body: Stored procedure definition with 'id' and 'body' (JavaScript code)
    - session_token: Session token for consistency
    
    Returns:
    Stored procedure properties
    
    Raises:
    CosmosResourceExistsError: If stored procedure already exists
    """

def list_stored_procedures(self, max_item_count: int = None, **kwargs):
    """
    List all stored procedures in the container.
    
    Parameters:
    - max_item_count: Maximum number of procedures to return
    - session_token: Session token for consistency
    
    Returns:
    Iterable of stored procedure items
    """

def query_stored_procedures(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
    """
    Query stored procedures using SQL syntax.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters
    - max_item_count: Maximum items per page
    - session_token: Session token for consistency
    
    Returns:
    Iterable of query results
    """

def get_stored_procedure(self, sproc: str, **kwargs):
    """
    Get stored procedure definition.
    
    Parameters:
    - sproc: Stored procedure ID
    - session_token: Session token for consistency
    
    Returns:
    Stored procedure properties
    
    Raises:
    CosmosResourceNotFoundError: If stored procedure doesn't exist
    """

def replace_stored_procedure(self, sproc: str, body: dict, **kwargs):
    """
    Replace stored procedure definition.
    
    Parameters:
    - sproc: Stored procedure ID
    - body: Updated stored procedure definition
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Returns:
    Updated stored procedure properties
    """

def delete_stored_procedure(self, sproc: str, **kwargs):
    """
    Delete a stored procedure.
    
    Parameters:
    - sproc: Stored procedure ID
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Raises:
    CosmosResourceNotFoundError: If stored procedure doesn't exist
    """

def execute_stored_procedure(self, sproc: str, partition_key: str = None, params: list = None, enable_script_logging: bool = None, **kwargs):
    """
    Execute a stored procedure.
    
    Parameters:
    - sproc: Stored procedure ID
    - partition_key: Partition key value for the execution context
    - params: Parameters to pass to the stored procedure
    - enable_script_logging: Enable console.log output in stored procedure
    - session_token: Session token for consistency
    
    Returns:
    Stored procedure execution result
    
    Raises:
    CosmosHttpResponseError: If execution fails
    """

Triggers

Pre-triggers and post-triggers that execute automatically before or after item operations.

def create_trigger(self, body: dict, **kwargs):
    """
    Create a new trigger.
    
    Parameters:
    - body: Trigger definition with 'id', 'body' (JavaScript), 'triggerType', 'triggerOperation'
    - session_token: Session token for consistency
    
    Returns:
    Trigger properties
    
    Raises:
    CosmosResourceExistsError: If trigger already exists
    """

def list_triggers(self, max_item_count: int = None, **kwargs):
    """
    List all triggers in the container.
    
    Parameters:
    - max_item_count: Maximum number of triggers to return
    - session_token: Session token for consistency
    
    Returns:
    Iterable of trigger items
    """

def query_triggers(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
    """
    Query triggers using SQL syntax.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters  
    - max_item_count: Maximum items per page
    - session_token: Session token for consistency
    
    Returns:
    Iterable of query results
    """

def get_trigger(self, trigger: str, **kwargs):
    """
    Get trigger definition.
    
    Parameters:
    - trigger: Trigger ID
    - session_token: Session token for consistency
    
    Returns:
    Trigger properties
    
    Raises:
    CosmosResourceNotFoundError: If trigger doesn't exist
    """

def replace_trigger(self, trigger: str, body: dict, **kwargs):
    """
    Replace trigger definition.
    
    Parameters:
    - trigger: Trigger ID
    - body: Updated trigger definition
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Returns:
    Updated trigger properties
    """

def delete_trigger(self, trigger: str, **kwargs):
    """
    Delete a trigger.
    
    Parameters:
    - trigger: Trigger ID
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Raises:
    CosmosResourceNotFoundError: If trigger doesn't exist
    """

User Defined Functions

Custom JavaScript functions that can be used in queries for complex calculations and data transformations.

def create_user_defined_function(self, body: dict, **kwargs):
    """
    Create a new user-defined function.
    
    Parameters:
    - body: UDF definition with 'id' and 'body' (JavaScript code)
    - session_token: Session token for consistency
    
    Returns:
    UDF properties
    
    Raises:
    CosmosResourceExistsError: If UDF already exists
    """

def list_user_defined_functions(self, max_item_count: int = None, **kwargs):
    """
    List all user-defined functions in the container.
    
    Parameters:
    - max_item_count: Maximum number of UDFs to return
    - session_token: Session token for consistency
    
    Returns:
    Iterable of UDF items
    """

def query_user_defined_functions(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
    """
    Query user-defined functions using SQL syntax.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters
    - max_item_count: Maximum items per page
    - session_token: Session token for consistency
    
    Returns:
    Iterable of query results
    """

def get_user_defined_function(self, udf: str, **kwargs):
    """
    Get user-defined function definition.
    
    Parameters:
    - udf: UDF ID
    - session_token: Session token for consistency
    
    Returns:
    UDF properties
    
    Raises:
    CosmosResourceNotFoundError: If UDF doesn't exist
    """

def replace_user_defined_function(self, udf: str, body: dict, **kwargs):
    """
    Replace user-defined function definition.
    
    Parameters:
    - udf: UDF ID
    - body: Updated UDF definition
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Returns:
    Updated UDF properties
    """

def delete_user_defined_function(self, udf: str, **kwargs):
    """
    Delete a user-defined function.
    
    Parameters:
    - udf: UDF ID
    - session_token: Session token for consistency
    - etag: ETag for conditional operations
    - match_condition: Match condition for conditional operations
    
    Raises:
    CosmosResourceNotFoundError: If UDF doesn't exist
    """

Usage Examples

Stored Procedures

# Get scripts proxy
scripts = container.scripts

# Create a stored procedure
sproc_definition = {
    "id": "createItemsProc",
    "body": """
        function createItemsProc(items) {
            var context = getContext();
            var collection = context.getCollection();
            var response = context.getResponse();
            
            if (!items || items.length === 0) {
                throw new Error('Items array is required');
            }
            
            var created = 0;
            
            function createNextItem(index) {
                if (index >= items.length) {
                    response.setBody({ created: created });
                    return;
                }
                
                var item = items[index];
                var isAccepted = collection.createDocument(
                    collection.getSelfLink(),
                    item,
                    function(err, doc) {
                        if (err) throw err;
                        created++;
                        createNextItem(index + 1);
                    }
                );
                
                if (!isAccepted) {
                    response.setBody({ created: created, stopped: true });
                }
            }
            
            createNextItem(0);
        }
    """
}

# Create the stored procedure
scripts.create_stored_procedure(sproc_definition)

# Execute the stored procedure
items_to_create = [
    {"id": "item1", "category": "Electronics", "name": "Phone"},
    {"id": "item2", "category": "Electronics", "name": "Tablet"}
]

result = scripts.execute_stored_procedure(
    sproc="createItemsProc",
    partition_key="Electronics",
    params=[items_to_create],
    enable_script_logging=True
)
print(f"Created {result['created']} items")

# List stored procedures
procedures = list(scripts.list_stored_procedures())
for proc in procedures:
    print(f"Stored Procedure: {proc['id']}")

Triggers

# Create a pre-trigger for validation
pre_trigger = {
    "id": "validateItem",
    "triggerType": "Pre",
    "triggerOperation": "Create",
    "body": """
        function validateItem() {
            var context = getContext();
            var request = context.getRequest();
            var body = request.getBody();
            
            // Validate required fields
            if (!body.name || body.name.length === 0) {
                throw new Error('Item must have a name');
            }
            
            if (!body.category) {
                throw new Error('Item must have a category');
            }
            
            // Auto-generate timestamp
            body.createdAt = new Date().toISOString();
            
            request.setBody(body);
        }
    """
}

scripts.create_trigger(pre_trigger)

# Create a post-trigger for logging
post_trigger = {
    "id": "logCreation",
    "triggerType": "Post", 
    "triggerOperation": "Create",
    "body": """
        function logCreation() {
            var context = getContext();
            var response = context.getResponse();
            var body = response.getBody();
            
            console.log('Item created:', body.id);
        }
    """
}

scripts.create_trigger(post_trigger)

# Create item with triggers
item = {"id": "triggered_item", "category": "Electronics"}
created_item = container.create_item(
    body=item,
    pre_trigger_include="validateItem",
    post_trigger_include="logCreation"
)

User Defined Functions

# Create a UDF for tax calculation
udf_definition = {
    "id": "calculateTax",
    "body": """
        function calculateTax(price, taxRate) {
            if (typeof price !== 'number' || typeof taxRate !== 'number') {
                return 0;
            }
            return price * taxRate;
        }
    """
}

scripts.create_user_defined_function(udf_definition)

# Use UDF in a query
query = """
    SELECT 
        c.id,
        c.name,
        c.price,
        udf.calculateTax(c.price, 0.08) as tax,
        (c.price + udf.calculateTax(c.price, 0.08)) as totalPrice
    FROM c 
    WHERE c.category = 'Electronics'
"""

items = list(container.query_items(
    query=query,
    enable_cross_partition_query=True
))

for item in items:
    print(f"{item['name']}: ${item['price']} + ${item['tax']} = ${item['totalPrice']}")

# List UDFs
udfs = list(scripts.list_user_defined_functions())
for udf in udfs:
    print(f"UDF: {udf['id']}")

Advanced Stored Procedure with Error Handling

# Stored procedure for batch processing with rollback
batch_sproc = {
    "id": "batchUpdateProc",
    "body": """
        function batchUpdateProc(updates) {
            var context = getContext();
            var collection = context.getCollection();
            var response = context.getResponse();
            
            var updated = 0;
            var errors = [];
            
            function updateNextItem(index) {
                if (index >= updates.length) {
                    response.setBody({ 
                        updated: updated, 
                        errors: errors 
                    });
                    return;
                }
                
                var update = updates[index];
                
                var isAccepted = collection.replaceDocument(
                    update.link,
                    update.item,
                    function(err, doc) {
                        if (err) {
                            errors.push({
                                index: index,
                                error: err.message
                            });
                        } else {
                            updated++;
                        }
                        updateNextItem(index + 1);
                    }
                );
                
                if (!isAccepted) {
                    response.setBody({ 
                        updated: updated, 
                        stopped: true,
                        errors: errors 
                    });
                }
            }
            
            updateNextItem(0);
        }
    """
}

scripts.create_stored_procedure(batch_sproc)

# Execute batch update
updates = [
    {
        "link": "dbs/MyDB/colls/MyContainer/docs/item1",
        "item": {"id": "item1", "category": "Electronics", "price": 299.99}
    },
    {
        "link": "dbs/MyDB/colls/MyContainer/docs/item2", 
        "item": {"id": "item2", "category": "Electronics", "price": 199.99}
    }
]

result = scripts.execute_stored_procedure(
    sproc="batchUpdateProc",
    partition_key="Electronics",
    params=[updates]
)

print(f"Updated: {result['updated']}, Errors: {len(result.get('errors', []))}")

Install with Tessl CLI

npx tessl i tessl/pypi-azure-cosmos

docs

async-operations.md

client-operations.md

container-operations.md

database-operations.md

index.md

script-operations.md

user-management.md

tile.json