Azure Cosmos DB partition keys, consistency levels, change feed, SDK patterns
50
40%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Passed
No known issues
Optimize this skill with Tessl
npx tessl skill review --optimize ./skills/azure-cosmosdb/SKILL.mdChoose partition key wisely, design for your access patterns, understand consistency tradeoffs.
Cosmos DB distributes data across partitions. Your partition key choice determines scalability, performance, and cost. Design for even distribution and query efficiency.
| API | Use Case |
|---|---|
| NoSQL (Core) | Document database, most flexible |
| MongoDB | MongoDB wire protocol compatible |
| PostgreSQL | Distributed PostgreSQL (Citus) |
| Apache Cassandra | Wide-column store |
| Apache Gremlin | Graph database |
| Table | Key-value (Azure Table Storage compatible) |
This skill focuses on NoSQL (Core) API - the most common choice.
| Concept | Description |
|---|---|
| Container | Collection of items (like a table) |
| Item | Single document/record (JSON) |
| Partition Key | Determines data distribution |
| Logical Partition | Items with same partition key |
| Physical Partition | Storage unit (max 50GB, 10K RU/s) |
| RU (Request Unit) | Throughput currency |
// High cardinality, even distribution, used in queries
// E-commerce: userId for user data
{ "id": "order-123", "userId": "user-456", ... } // PK: /userId
// Multi-tenant: tenantId
{ "id": "doc-1", "tenantId": "tenant-abc", ... } // PK: /tenantId
// IoT: deviceId for telemetry
{ "id": "reading-1", "deviceId": "device-789", ... } // PK: /deviceId
// Logs: synthetic key (date + category)
{ "id": "log-1", "partitionKey": "2024-01-15_errors", ... } // PK: /partitionKey// For multi-level distribution (e.g., tenant → user)
// Container created with: /tenantId, /userId
{
"id": "order-123",
"tenantId": "acme-corp",
"userId": "user-456",
"items": [...]
}
// Query within tenant and user efficiently// Avoid:
// - Low cardinality (status, type, boolean)
// - Monotonically increasing (timestamp, auto-increment)
// - Frequently updated fields
// - Fields not used in queries
// Bad: Only 3 values → hot partitions
{ "status": "pending" | "completed" | "cancelled" }
// Bad: All writes go to latest partition
{ "timestamp": "2024-01-15T10:30:00Z" }npm install @azure/cosmos// lib/cosmosdb.ts
import { CosmosClient, Database, Container } from '@azure/cosmos';
const endpoint = process.env.COSMOS_ENDPOINT!;
const key = process.env.COSMOS_KEY!;
const databaseId = process.env.COSMOS_DATABASE!;
const client = new CosmosClient({ endpoint, key });
// Or with connection string
// const client = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!);
export const database: Database = client.database(databaseId);
export function getContainer(containerId: string): Container {
return database.container(containerId);
}// types/cosmos.ts
export interface BaseItem {
id: string;
_ts?: number; // Auto-generated timestamp
_etag?: string; // For optimistic concurrency
}
export interface User extends BaseItem {
userId: string; // Partition key
email: string;
name: string;
createdAt: string;
updatedAt: string;
}
export interface Order extends BaseItem {
userId: string; // Partition key
orderId: string;
items: OrderItem[];
total: number;
status: 'pending' | 'paid' | 'shipped' | 'delivered';
createdAt: string;
}
export interface OrderItem {
productId: string;
name: string;
quantity: number;
price: number;
}import { getContainer } from './cosmosdb';
import { User } from './types';
const usersContainer = getContainer('users');
async function createUser(data: Omit<User, 'id' | 'createdAt' | 'updatedAt'>): Promise<User> {
const now = new Date().toISOString();
const user: User = {
id: crypto.randomUUID(),
...data,
createdAt: now,
updatedAt: now
};
const { resource } = await usersContainer.items.create(user);
return resource as User;
}// Most efficient read - requires id AND partition key
async function getUser(userId: string, id: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(id, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}
// If id equals partition key value
async function getUserById(userId: string): Promise<User | null> {
try {
const { resource } = await usersContainer.item(userId, userId).read<User>();
return resource || null;
} catch (error: any) {
if (error.code === 404) return null;
throw error;
}
}// Query within partition (efficient)
async function getUserOrders(userId: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
})
.fetchAll();
return resources;
}
// Cross-partition query (use sparingly)
async function getOrdersByStatus(status: string): Promise<Order[]> {
const ordersContainer = getContainer('orders');
const { resources } = await ordersContainer.items
.query<Order>({
query: 'SELECT * FROM c WHERE c.status = @status',
parameters: [{ name: '@status', value: status }]
})
.fetchAll();
return resources;
}
// Paginated query
async function getOrdersPaginated(
userId: string,
pageSize: number = 10,
continuationToken?: string
): Promise<{ items: Order[]; continuationToken?: string }> {
const ordersContainer = getContainer('orders');
const queryIterator = ordersContainer.items.query<Order>(
{
query: 'SELECT * FROM c WHERE c.userId = @userId ORDER BY c.createdAt DESC',
parameters: [{ name: '@userId', value: userId }]
},
{
maxItemCount: pageSize,
continuationToken
}
);
const { resources, continuationToken: nextToken } = await queryIterator.fetchNext();
return {
items: resources,
continuationToken: nextToken
};
}// Replace entire item
async function updateUser(userId: string, id: string, updates: Partial<User>): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
const { resource } = await usersContainer.item(id, userId).replace(updated);
return resource as User;
}
// Partial update (patch operations)
async function patchUser(userId: string, id: string, operations: any[]): Promise<User> {
const { resource } = await usersContainer.item(id, userId).patch(operations);
return resource as User;
}
// Usage:
await patchUser('user-123', 'user-123', [
{ op: 'set', path: '/name', value: 'New Name' },
{ op: 'set', path: '/updatedAt', value: new Date().toISOString() },
{ op: 'incr', path: '/loginCount', value: 1 }
]);async function deleteUser(userId: string, id: string): Promise<void> {
await usersContainer.item(id, userId).delete();
}async function updateUserWithETag(
userId: string,
id: string,
updates: Partial<User>,
etag: string
): Promise<User> {
const existing = await getUser(userId, id);
if (!existing) throw new Error('User not found');
const updated: User = {
...existing,
...updates,
updatedAt: new Date().toISOString()
};
try {
const { resource } = await usersContainer.item(id, userId).replace(updated, {
accessCondition: { type: 'IfMatch', condition: etag }
});
return resource as User;
} catch (error: any) {
if (error.code === 412) {
throw new Error('Document was modified by another process');
}
throw error;
}
}| Level | Guarantees | Latency | Use Case |
|---|---|---|---|
| Strong | Linearizable reads | Highest | Financial, inventory |
| Bounded Staleness | Consistent within bounds | High | Leaderboards, counters |
| Session | Read your writes | Medium | User sessions (default) |
| Consistent Prefix | Ordered reads | Low | Social feeds |
| Eventual | No ordering guarantee | Lowest | Analytics, logs |
// Override default consistency
const { resource } = await usersContainer.item(id, userId).read<User>({
consistencyLevel: 'Strong'
});
// For queries
const { resources } = await container.items.query(
{ query: 'SELECT * FROM c' },
{ consistencyLevel: 'BoundedStaleness' }
).fetchAll();async function createOrderWithItems(userId: string, order: Order, items: any[]): Promise<void> {
const ordersContainer = getContainer('orders');
const operations = [
{ operationType: 'Create' as const, resourceBody: order },
...items.map(item => ({
operationType: 'Create' as const,
resourceBody: { ...item, userId, orderId: order.orderId }
}))
];
const { result } = await ordersContainer.items.batch(operations, userId);
// Check if any operation failed
if (result.some(r => r.statusCode >= 400)) {
throw new Error('Batch operation failed');
}
}// For large-scale imports (not transactional)
async function bulkImportUsers(users: User[]): Promise<void> {
const operations = users.map(user => ({
operationType: 'Create' as const,
resourceBody: user,
partitionKey: user.userId
}));
// Process in chunks
const chunkSize = 100;
for (let i = 0; i < operations.length; i += chunkSize) {
const chunk = operations.slice(i, i + chunkSize);
await usersContainer.items.bulk(chunk);
}
}import { ChangeFeedStartFrom } from '@azure/cosmos';
async function processChangeFeed(): Promise<void> {
const container = getContainer('orders');
const changeFeedIterator = container.items.changeFeed({
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
});
while (changeFeedIterator.hasMoreResults) {
const { result: items, statusCode } = await changeFeedIterator.fetchNext();
if (statusCode === 304) {
// No new changes
await sleep(1000);
continue;
}
for (const item of items) {
console.log('Changed item:', item);
// Process the change...
}
}
}
// For production, use Change Feed Processor with lease containerasync function startChangeFeedProcessor(): Promise<void> {
const sourceContainer = getContainer('orders');
const leaseContainer = getContainer('leases');
const changeFeedProcessor = sourceContainer.items.changeFeed
.for(item => {
// Process each change
console.log('Processing:', item);
})
.withLeaseContainer(leaseContainer)
.build();
await changeFeedProcessor.start();
}pip install azure-cosmos# cosmos_db.py
import os
from azure.cosmos import CosmosClient, PartitionKey
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from typing import Optional, List
from datetime import datetime
import uuid
# Initialize client
endpoint = os.environ['COSMOS_ENDPOINT']
key = os.environ['COSMOS_KEY']
database_name = os.environ['COSMOS_DATABASE']
client = CosmosClient(endpoint, key)
database = client.get_database_client(database_name)
def get_container(container_name: str):
return database.get_container_client(container_name)
# CRUD Operations
users_container = get_container('users')
def create_user(email: str, name: str, user_id: str = None) -> dict:
user_id = user_id or str(uuid.uuid4())
now = datetime.utcnow().isoformat()
user = {
'id': user_id,
'userId': user_id, # Partition key
'email': email,
'name': name,
'createdAt': now,
'updatedAt': now
}
return users_container.create_item(user)
def get_user(user_id: str) -> Optional[dict]:
try:
return users_container.read_item(item=user_id, partition_key=user_id)
except CosmosResourceNotFoundError:
return None
def query_users(email_domain: str) -> List[dict]:
query = "SELECT * FROM c WHERE CONTAINS(c.email, @domain)"
parameters = [{'name': '@domain', 'value': email_domain}]
return list(users_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
def update_user(user_id: str, **updates) -> dict:
user = get_user(user_id)
if not user:
raise ValueError('User not found')
user.update(updates)
user['updatedAt'] = datetime.utcnow().isoformat()
return users_container.replace_item(item=user_id, body=user)
def delete_user(user_id: str) -> None:
users_container.delete_item(item=user_id, partition_key=user_id)
# Paginated query
def get_users_paginated(page_size: int = 10, continuation_token: str = None):
query = "SELECT * FROM c ORDER BY c.createdAt DESC"
items = users_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=page_size,
continuation_token=continuation_token
)
page = items.by_page()
results = list(next(page))
return {
'items': results,
'continuation_token': page.continuation_token
}{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{ "path": "/userId/?" },
{ "path": "/status/?" },
{ "path": "/createdAt/?" }
],
"excludedPaths": [
{ "path": "/content/*" },
{ "path": "/_etag/?" }
],
"compositeIndexes": [
[
{ "path": "/userId", "order": "ascending" },
{ "path": "/createdAt", "order": "descending" }
]
]
}await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
indexingPolicy: {
indexingMode: 'consistent',
includedPaths: [
{ path: '/userId/?' },
{ path: '/status/?' },
{ path: '/createdAt/?' }
],
excludedPaths: [
{ path: '/*' } // Exclude all by default
]
}
});// Container level
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
throughput: 1000 // RU/s
});
// Scale throughput
const container = database.container('orders');
await container.throughput.replace(2000);await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] },
maxThroughput: 10000 // Auto-scales 10% to 100%
});// No throughput configuration needed
// Pay per request (good for dev/test, intermittent workloads)
await database.containers.createIfNotExists({
id: 'orders',
partitionKey: { paths: ['/userId'] }
// No throughput = serverless
});# Azure CLI
az cosmosdb create --name myaccount --resource-group mygroup
az cosmosdb sql database create --account-name myaccount --name mydb --resource-group mygroup
az cosmosdb sql container create \
--account-name myaccount \
--database-name mydb \
--name orders \
--partition-key-path /userId \
--throughput 400
# Query
az cosmosdb sql query --account-name myaccount --database-name mydb \
--container-name orders --query "SELECT * FROM c"
# Keys
az cosmosdb keys list --name myaccount --resource-group mygroup
az cosmosdb keys list --name myaccount --resource-group mygroup --type connection-strings| Strategy | Impact |
|---|---|
| Right partition key | Avoid hot partitions (wasted RUs) |
| Index only what you query | Reduce write RU cost |
| Use point reads | 1 RU vs 3+ RU for queries |
| Serverless for dev/test | Pay per request |
| Autoscale for production | Scale down during low traffic |
| TTL for temporary data | Auto-delete old items |
// Enable TTL on container
await database.containers.createIfNotExists({
id: 'sessions',
partitionKey: { paths: ['/userId'] },
defaultTtl: 3600 // 1 hour
});
// Per-item TTL
const session = {
id: 'session-123',
userId: 'user-456',
ttl: 1800 // Override: 30 minutes
};d4ddb03
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.