0
# Database Operations
1
2
Database-level operations for managing containers, users, and throughput configuration within an Azure Cosmos DB database. The DatabaseProxy provides the interface for all database-scoped operations.
3
4
## Capabilities
5
6
### Database Information
7
8
Read database properties and metadata.
9
10
```python { .api }
11
def read(self, populate_query_metrics: bool = None, **kwargs):
12
"""
13
Read database properties.
14
15
Parameters:
16
- populate_query_metrics: Include query metrics in response
17
- session_token: Session token for consistency
18
19
Returns:
20
Database properties as dictionary
21
"""
22
```
23
24
### Container Management
25
26
Create, list, query, and manage containers within the database.
27
28
```python { .api }
29
def create_container(self, id: str, partition_key: PartitionKey, indexing_policy: dict = None, default_ttl: int = None, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, unique_key_policy: dict = None, conflict_resolution_policy: dict = None, analytical_storage_ttl: int = None, vector_embedding_policy: dict = None, full_text_policy: dict = None, computed_properties: list = None, **kwargs):
30
"""
31
Create a new container.
32
33
Parameters:
34
- id: Container ID
35
- partition_key: Partition key configuration
36
- indexing_policy: Indexing policy configuration
37
- default_ttl: Default time-to-live in seconds
38
- populate_query_metrics: Include query metrics
39
- offer_throughput: Provisioned throughput in RU/s or ThroughputProperties
40
- unique_key_policy: Unique key constraints
41
- conflict_resolution_policy: Conflict resolution policy
42
- analytical_storage_ttl: Analytical store TTL in seconds
43
- vector_embedding_policy: Vector embedding policy configuration
44
- full_text_policy: Full text search policy configuration
45
- computed_properties: List of computed properties
46
- session_token: Session token
47
48
Returns:
49
ContainerProxy for the created container
50
51
Raises:
52
CosmosResourceExistsError: If container already exists
53
"""
54
55
def create_container_if_not_exists(self, id: str, partition_key: PartitionKey, indexing_policy: dict = None, default_ttl: int = None, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, unique_key_policy: dict = None, conflict_resolution_policy: dict = None, analytical_storage_ttl: int = None, vector_embedding_policy: dict = None, full_text_policy: dict = None, computed_properties: list = None, **kwargs):
56
"""
57
Create container if it doesn't exist, otherwise return existing.
58
59
Parameters: Same as create_container
60
61
Returns:
62
ContainerProxy for the container
63
"""
64
65
def get_container_client(self, container: str) -> ContainerProxy:
66
"""
67
Get a container client for operations on a specific container.
68
69
Parameters:
70
- container: Container ID or ContainerProxy instance
71
72
Returns:
73
ContainerProxy instance
74
"""
75
76
def list_containers(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
77
"""
78
List all containers in the database.
79
80
Parameters:
81
- max_item_count: Maximum number of items to return
82
- populate_query_metrics: Include query metrics
83
- session_token: Session token for consistency
84
85
Returns:
86
Iterable of container items
87
"""
88
89
def query_containers(self, query: str = None, parameters: list = None, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):
90
"""
91
Query containers using SQL-like syntax.
92
93
Parameters:
94
- query: SQL query string
95
- parameters: Query parameters as [{"name": "@param", "value": value}]
96
- max_item_count: Maximum items per page
97
- populate_query_metrics: Include query metrics
98
- session_token: Session token for consistency
99
100
Returns:
101
Iterable of query results
102
"""
103
104
def replace_container(self, container: str, partition_key: PartitionKey, indexing_policy: dict = None, default_ttl: int = None, conflict_resolution_policy: dict = None, populate_query_metrics: bool = None, **kwargs):
105
"""
106
Replace container configuration.
107
108
Parameters:
109
- container: Container ID or ContainerProxy instance
110
- partition_key: Partition key configuration
111
- indexing_policy: Indexing policy configuration
112
- default_ttl: Default time-to-live
113
- conflict_resolution_policy: Conflict resolution policy
114
- populate_query_metrics: Include query metrics
115
- session_token: Session token
116
- etag: ETag for conditional operations
117
- match_condition: Match condition for conditional operations
118
119
Returns:
120
ContainerProxy for the replaced container
121
"""
122
123
def delete_container(self, container: str, populate_query_metrics: bool = None, **kwargs):
124
"""
125
Delete a container.
126
127
Parameters:
128
- container: Container ID or ContainerProxy instance
129
- populate_query_metrics: Include query metrics
130
- session_token: Session token
131
- etag: ETag for conditional operations
132
- match_condition: Match condition for conditional operations
133
134
Raises:
135
CosmosResourceNotFoundError: If container doesn't exist
136
"""
137
```
138
139
### User Management
140
141
Manage users within the database for permission-based access control.
142
143
```python { .api }
144
def list_users(self, max_item_count: int = None, **kwargs):
145
"""
146
List users in the database.
147
148
Parameters:
149
- max_item_count: Maximum number of users to return
150
- session_token: Session token for consistency
151
152
Returns:
153
Iterable of user items
154
"""
155
156
def query_users(self, query: str, parameters: list = None, max_item_count: int = None, **kwargs):
157
"""
158
Query users using SQL-like syntax.
159
160
Parameters:
161
- query: SQL query string
162
- parameters: Query parameters
163
- max_item_count: Maximum items per page
164
- session_token: Session token for consistency
165
166
Returns:
167
Iterable of query results
168
"""
169
170
def get_user_client(self, user: str) -> UserProxy:
171
"""
172
Get a user client for user operations.
173
174
Parameters:
175
- user: User ID or UserProxy instance
176
177
Returns:
178
UserProxy instance
179
"""
180
181
def create_user(self, body: dict, **kwargs):
182
"""
183
Create a new user.
184
185
Parameters:
186
- body: User properties dictionary with 'id' field
187
- session_token: Session token for consistency
188
189
Returns:
190
UserProxy for the created user
191
192
Raises:
193
CosmosResourceExistsError: If user already exists
194
"""
195
196
def upsert_user(self, body: dict, **kwargs):
197
"""
198
Create or replace a user.
199
200
Parameters:
201
- body: User properties dictionary with 'id' field
202
- session_token: Session token for consistency
203
204
Returns:
205
UserProxy for the user
206
"""
207
208
def replace_user(self, user: str, body: dict, **kwargs):
209
"""
210
Replace user properties.
211
212
Parameters:
213
- user: User ID or UserProxy instance
214
- body: Updated user properties
215
- session_token: Session token
216
- etag: ETag for conditional operations
217
- match_condition: Match condition for conditional operations
218
219
Returns:
220
UserProxy for the replaced user
221
"""
222
223
def delete_user(self, user: str, **kwargs):
224
"""
225
Delete a user.
226
227
Parameters:
228
- user: User ID or UserProxy instance
229
- session_token: Session token
230
- etag: ETag for conditional operations
231
- match_condition: Match condition for conditional operations
232
233
Raises:
234
CosmosResourceNotFoundError: If user doesn't exist
235
"""
236
```
237
238
### Throughput Management
239
240
Manage provisioned throughput and auto-scaling for the database.
241
242
```python { .api }
243
def get_throughput(self, **kwargs) -> ThroughputProperties:
244
"""
245
Get current throughput settings for the database.
246
247
Parameters:
248
- session_token: Session token for consistency
249
250
Returns:
251
ThroughputProperties with current throughput configuration
252
253
Raises:
254
CosmosResourceNotFoundError: If throughput not configured
255
"""
256
257
def replace_throughput(self, throughput: ThroughputProperties, **kwargs):
258
"""
259
Replace throughput settings for the database.
260
261
Parameters:
262
- throughput: New throughput configuration
263
- session_token: Session token for consistency
264
265
Returns:
266
ThroughputProperties with updated configuration
267
"""
268
269
def read_offer(self, **kwargs):
270
"""
271
Read throughput offer (deprecated, use get_throughput).
272
273
Returns:
274
Offer properties
275
"""
276
```
277
278
## Usage Examples
279
280
### Container Management
281
282
```python
283
from azure.cosmos import PartitionKey, IndexingMode
284
285
# Get database client
286
database = client.get_database_client("ProductCatalog")
287
288
# Create container with partition key
289
container = database.create_container_if_not_exists(
290
id="Products",
291
partition_key=PartitionKey(path="/category"),
292
offer_throughput=1000
293
)
294
295
# Create container with advanced configuration
296
indexing_policy = {
297
"indexingMode": IndexingMode.Consistent,
298
"automatic": True,
299
"includedPaths": [{"path": "/*"}],
300
"excludedPaths": [{"path": "/description/*"}]
301
}
302
303
unique_key_policy = {
304
"uniqueKeys": [
305
{"paths": ["/sku"]},
306
{"paths": ["/name", "/category"]}
307
]
308
}
309
310
container = database.create_container(
311
id="UniqueProducts",
312
partition_key=PartitionKey(path="/category", version=2),
313
indexing_policy=indexing_policy,
314
unique_key_policy=unique_key_policy,
315
default_ttl=3600, # 1 hour TTL
316
offer_throughput=400
317
)
318
319
# List containers
320
containers = list(database.list_containers())
321
for cont in containers:
322
print(f"Container: {cont['id']}")
323
324
# Query containers
325
results = list(database.query_containers(
326
query="SELECT * FROM c WHERE c.defaultTtl > 0"
327
))
328
```
329
330
### Throughput Management
331
332
```python
333
from azure.cosmos import ThroughputProperties
334
335
# Get current throughput
336
try:
337
throughput = database.get_throughput()
338
print(f"Current throughput: {throughput.offer_throughput} RU/s")
339
340
if throughput.auto_scale_max_throughput:
341
print(f"Auto-scale max: {throughput.auto_scale_max_throughput} RU/s")
342
except CosmosResourceNotFoundError:
343
print("No throughput configured at database level")
344
345
# Update throughput
346
new_throughput = ThroughputProperties(offer_throughput=800)
347
database.replace_throughput(new_throughput)
348
349
# Configure auto-scaling
350
auto_scale_throughput = ThroughputProperties(
351
auto_scale_max_throughput=4000,
352
auto_scale_increment_percent=10
353
)
354
database.replace_throughput(auto_scale_throughput)
355
```
356
357
### User Management
358
359
```python
360
# Create a user
361
user_def = {"id": "appUser1"}
362
user = database.create_user(user_def)
363
364
# List users
365
users = list(database.list_users())
366
for u in users:
367
print(f"User: {u['id']}")
368
369
# Get user client for permission management
370
user_client = database.get_user_client("appUser1")
371
372
# Query users
373
results = list(database.query_users(
374
query="SELECT * FROM users u WHERE u.id = @userid",
375
parameters=[{"name": "@userid", "value": "appUser1"}]
376
))
377
```
378
379
### Database Properties
380
381
```python
382
# Read database properties
383
db_properties = database.read()
384
print(f"Database ID: {db_properties['id']}")
385
print(f"ETag: {db_properties['_etag']}")
386
print(f"Self Link: {db_properties['_self']}")
387
```