Microsoft Azure Cosmos Client Library for Python providing access to Azure Cosmos DB SQL API operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Server-side script operations including stored procedures, triggers, and user-defined functions (UDFs). The ScriptsProxy enables execution of custom JavaScript code within the Azure Cosmos DB engine for advanced data processing and business logic.
Server-side procedures written in JavaScript that execute within the database engine with transactional guarantees.
def create_stored_procedure(self, body: dict, **kwargs):
"""
Create a new stored procedure.
Parameters:
- body: Stored procedure definition with 'id' and 'body' (JavaScript code)
- session_token: Session token for consistency
Returns:
Stored procedure properties
Raises:
CosmosResourceExistsError: If stored procedure already exists
"""
def list_stored_procedures(self, max_item_count: int = None, **kwargs):
"""
List all stored procedures in the container.
Parameters:
- max_item_count: Maximum number of procedures to return
- session_token: Session token for consistency
Returns:
Iterable of stored procedure items
"""
def query_stored_procedures(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
"""
Query stored procedures using SQL syntax.
Parameters:
- query: SQL query string
- parameters: Query parameters
- max_item_count: Maximum items per page
- session_token: Session token for consistency
Returns:
Iterable of query results
"""
def get_stored_procedure(self, sproc: str, **kwargs):
"""
Get stored procedure definition.
Parameters:
- sproc: Stored procedure ID
- session_token: Session token for consistency
Returns:
Stored procedure properties
Raises:
CosmosResourceNotFoundError: If stored procedure doesn't exist
"""
def replace_stored_procedure(self, sproc: str, body: dict, **kwargs):
"""
Replace stored procedure definition.
Parameters:
- sproc: Stored procedure ID
- body: Updated stored procedure definition
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Returns:
Updated stored procedure properties
"""
def delete_stored_procedure(self, sproc: str, **kwargs):
"""
Delete a stored procedure.
Parameters:
- sproc: Stored procedure ID
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Raises:
CosmosResourceNotFoundError: If stored procedure doesn't exist
"""
def execute_stored_procedure(self, sproc: str, partition_key: str = None, params: list = None, enable_script_logging: bool = None, **kwargs):
"""
Execute a stored procedure.
Parameters:
- sproc: Stored procedure ID
- partition_key: Partition key value for the execution context
- params: Parameters to pass to the stored procedure
- enable_script_logging: Enable console.log output in stored procedure
- session_token: Session token for consistency
Returns:
Stored procedure execution result
Raises:
CosmosHttpResponseError: If execution fails
"""Pre-triggers and post-triggers that execute automatically before or after item operations.
def create_trigger(self, body: dict, **kwargs):
"""
Create a new trigger.
Parameters:
- body: Trigger definition with 'id', 'body' (JavaScript), 'triggerType', 'triggerOperation'
- session_token: Session token for consistency
Returns:
Trigger properties
Raises:
CosmosResourceExistsError: If trigger already exists
"""
def list_triggers(self, max_item_count: int = None, **kwargs):
"""
List all triggers in the container.
Parameters:
- max_item_count: Maximum number of triggers to return
- session_token: Session token for consistency
Returns:
Iterable of trigger items
"""
def query_triggers(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
"""
Query triggers using SQL syntax.
Parameters:
- query: SQL query string
- parameters: Query parameters
- max_item_count: Maximum items per page
- session_token: Session token for consistency
Returns:
Iterable of query results
"""
def get_trigger(self, trigger: str, **kwargs):
"""
Get trigger definition.
Parameters:
- trigger: Trigger ID
- session_token: Session token for consistency
Returns:
Trigger properties
Raises:
CosmosResourceNotFoundError: If trigger doesn't exist
"""
def replace_trigger(self, trigger: str, body: dict, **kwargs):
"""
Replace trigger definition.
Parameters:
- trigger: Trigger ID
- body: Updated trigger definition
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Returns:
Updated trigger properties
"""
def delete_trigger(self, trigger: str, **kwargs):
"""
Delete a trigger.
Parameters:
- trigger: Trigger ID
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Raises:
CosmosResourceNotFoundError: If trigger doesn't exist
"""Custom JavaScript functions that can be used in queries for complex calculations and data transformations.
def create_user_defined_function(self, body: dict, **kwargs):
"""
Create a new user-defined function.
Parameters:
- body: UDF definition with 'id' and 'body' (JavaScript code)
- session_token: Session token for consistency
Returns:
UDF properties
Raises:
CosmosResourceExistsError: If UDF already exists
"""
def list_user_defined_functions(self, max_item_count: int = None, **kwargs):
"""
List all user-defined functions in the container.
Parameters:
- max_item_count: Maximum number of UDFs to return
- session_token: Session token for consistency
Returns:
Iterable of UDF items
"""
def query_user_defined_functions(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
"""
Query user-defined functions using SQL syntax.
Parameters:
- query: SQL query string
- parameters: Query parameters
- max_item_count: Maximum items per page
- session_token: Session token for consistency
Returns:
Iterable of query results
"""
def get_user_defined_function(self, udf: str, **kwargs):
"""
Get user-defined function definition.
Parameters:
- udf: UDF ID
- session_token: Session token for consistency
Returns:
UDF properties
Raises:
CosmosResourceNotFoundError: If UDF doesn't exist
"""
def replace_user_defined_function(self, udf: str, body: dict, **kwargs):
"""
Replace user-defined function definition.
Parameters:
- udf: UDF ID
- body: Updated UDF definition
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Returns:
Updated UDF properties
"""
def delete_user_defined_function(self, udf: str, **kwargs):
"""
Delete a user-defined function.
Parameters:
- udf: UDF ID
- session_token: Session token for consistency
- etag: ETag for conditional operations
- match_condition: Match condition for conditional operations
Raises:
CosmosResourceNotFoundError: If UDF doesn't exist
"""# Get scripts proxy
scripts = container.scripts
# Create a stored procedure
sproc_definition = {
"id": "createItemsProc",
"body": """
function createItemsProc(items) {
var context = getContext();
var collection = context.getCollection();
var response = context.getResponse();
if (!items || items.length === 0) {
throw new Error('Items array is required');
}
var created = 0;
function createNextItem(index) {
if (index >= items.length) {
response.setBody({ created: created });
return;
}
var item = items[index];
var isAccepted = collection.createDocument(
collection.getSelfLink(),
item,
function(err, doc) {
if (err) throw err;
created++;
createNextItem(index + 1);
}
);
if (!isAccepted) {
response.setBody({ created: created, stopped: true });
}
}
createNextItem(0);
}
"""
}
# Create the stored procedure
scripts.create_stored_procedure(sproc_definition)
# Execute the stored procedure
items_to_create = [
{"id": "item1", "category": "Electronics", "name": "Phone"},
{"id": "item2", "category": "Electronics", "name": "Tablet"}
]
result = scripts.execute_stored_procedure(
sproc="createItemsProc",
partition_key="Electronics",
params=[items_to_create],
enable_script_logging=True
)
print(f"Created {result['created']} items")
# List stored procedures
procedures = list(scripts.list_stored_procedures())
for proc in procedures:
print(f"Stored Procedure: {proc['id']}")# Create a pre-trigger for validation
pre_trigger = {
"id": "validateItem",
"triggerType": "Pre",
"triggerOperation": "Create",
"body": """
function validateItem() {
var context = getContext();
var request = context.getRequest();
var body = request.getBody();
// Validate required fields
if (!body.name || body.name.length === 0) {
throw new Error('Item must have a name');
}
if (!body.category) {
throw new Error('Item must have a category');
}
// Auto-generate timestamp
body.createdAt = new Date().toISOString();
request.setBody(body);
}
"""
}
scripts.create_trigger(pre_trigger)
# Create a post-trigger for logging
post_trigger = {
"id": "logCreation",
"triggerType": "Post",
"triggerOperation": "Create",
"body": """
function logCreation() {
var context = getContext();
var response = context.getResponse();
var body = response.getBody();
console.log('Item created:', body.id);
}
"""
}
scripts.create_trigger(post_trigger)
# Create item with triggers
item = {"id": "triggered_item", "category": "Electronics"}
created_item = container.create_item(
body=item,
pre_trigger_include="validateItem",
post_trigger_include="logCreation"
)# Create a UDF for tax calculation
udf_definition = {
"id": "calculateTax",
"body": """
function calculateTax(price, taxRate) {
if (typeof price !== 'number' || typeof taxRate !== 'number') {
return 0;
}
return price * taxRate;
}
"""
}
scripts.create_user_defined_function(udf_definition)
# Use UDF in a query
query = """
SELECT
c.id,
c.name,
c.price,
udf.calculateTax(c.price, 0.08) as tax,
(c.price + udf.calculateTax(c.price, 0.08)) as totalPrice
FROM c
WHERE c.category = 'Electronics'
"""
items = list(container.query_items(
query=query,
enable_cross_partition_query=True
))
for item in items:
print(f"{item['name']}: ${item['price']} + ${item['tax']} = ${item['totalPrice']}")
# List UDFs
udfs = list(scripts.list_user_defined_functions())
for udf in udfs:
print(f"UDF: {udf['id']}")# Stored procedure for batch processing with rollback
batch_sproc = {
"id": "batchUpdateProc",
"body": """
function batchUpdateProc(updates) {
var context = getContext();
var collection = context.getCollection();
var response = context.getResponse();
var updated = 0;
var errors = [];
function updateNextItem(index) {
if (index >= updates.length) {
response.setBody({
updated: updated,
errors: errors
});
return;
}
var update = updates[index];
var isAccepted = collection.replaceDocument(
update.link,
update.item,
function(err, doc) {
if (err) {
errors.push({
index: index,
error: err.message
});
} else {
updated++;
}
updateNextItem(index + 1);
}
);
if (!isAccepted) {
response.setBody({
updated: updated,
stopped: true,
errors: errors
});
}
}
updateNextItem(0);
}
"""
}
scripts.create_stored_procedure(batch_sproc)
# Execute batch update
updates = [
{
"link": "dbs/MyDB/colls/MyContainer/docs/item1",
"item": {"id": "item1", "category": "Electronics", "price": 299.99}
},
{
"link": "dbs/MyDB/colls/MyContainer/docs/item2",
"item": {"id": "item2", "category": "Electronics", "price": 199.99}
}
]
result = scripts.execute_stored_procedure(
sproc="batchUpdateProc",
partition_key="Electronics",
params=[updates]
)
print(f"Updated: {result['updated']}, Errors: {len(result.get('errors', []))}")Install with Tessl CLI
npx tessl i tessl/pypi-azure-cosmos