0
# Azure Cosmos DB
1
2
Comprehensive Azure Cosmos DB integration supporting database and collection management, document operations, and query execution across all Cosmos DB APIs. Provides full CRUD operations with support for multiple consistency levels and partitioning strategies.
3
4
## Capabilities
5
6
### Cosmos DB Hook
7
8
Primary interface for Azure Cosmos DB operations, providing authenticated connections and database functionality.
9
10
```python { .api }
11
class AzureCosmosDBHook(BaseHook):
12
"""
13
Hook for Azure Cosmos DB operations.
14
15
Provides methods for database management, collection operations, and document
16
manipulation across all Cosmos DB APIs (SQL, MongoDB, Cassandra, Gremlin, Table).
17
"""
18
19
def get_conn(self) -> CosmosClient:
20
"""Get authenticated Azure Cosmos DB client."""
21
22
def does_database_exist(self, database_name: str) -> bool:
23
"""
24
Check if a database exists.
25
26
Args:
27
database_name (str): Name of database to check
28
29
Returns:
30
bool: True if database exists, False otherwise
31
"""
32
33
def create_database(self, database_name: str) -> None:
34
"""
35
Create a new database.
36
37
Args:
38
database_name (str): Name of database to create
39
"""
40
41
def delete_database(self, database_name: str) -> None:
42
"""
43
Delete a database and all its collections.
44
45
Args:
46
database_name (str): Name of database to delete
47
"""
48
49
def does_collection_exist(self, collection_name: str, database_name: str) -> bool:
50
"""
51
Check if a collection exists in the specified database.
52
53
Args:
54
collection_name (str): Name of collection to check
55
database_name (str): Name of database containing collection
56
57
Returns:
58
bool: True if collection exists, False otherwise
59
"""
60
61
def create_collection(
62
self,
63
collection_name: str,
64
database_name: str | None = None,
65
partition_key: str | None = None,
66
throughput: int | None = None,
67
**kwargs
68
) -> None:
69
"""
70
Create a new collection in the database.
71
72
Args:
73
collection_name (str): Name of collection to create
74
database_name (str): Name of database (uses default if None)
75
partition_key (str): Partition key for the collection
76
throughput (int): Provisioned throughput (RU/s)
77
**kwargs: Additional collection configuration options
78
"""
79
80
def delete_collection(
81
self,
82
collection_name: str,
83
database_name: str | None = None
84
) -> None:
85
"""
86
Delete a collection from the database.
87
88
Args:
89
collection_name (str): Name of collection to delete
90
database_name (str): Name of database (uses default if None)
91
"""
92
93
def upsert_document(
94
self,
95
document: dict,
96
database_name: str | None = None,
97
collection_name: str | None = None,
98
**kwargs
99
) -> dict:
100
"""
101
Insert or update a document in the collection.
102
103
Args:
104
document (dict): Document data to upsert
105
database_name (str): Name of database (uses default if None)
106
collection_name (str): Name of collection (uses default if None)
107
**kwargs: Additional upsert options
108
109
Returns:
110
dict: Upserted document with metadata
111
"""
112
113
def insert_documents(
114
self,
115
documents: list[dict],
116
database_name: str | None = None,
117
collection_name: str | None = None,
118
**kwargs
119
) -> list[dict]:
120
"""
121
Insert multiple documents into the collection.
122
123
Args:
124
documents (list[dict]): List of documents to insert
125
database_name (str): Name of database (uses default if None)
126
collection_name (str): Name of collection (uses default if None)
127
**kwargs: Additional insert options
128
129
Returns:
130
list[dict]: List of inserted documents with metadata
131
"""
132
133
def delete_document(
134
self,
135
document_id: str,
136
database_name: str | None = None,
137
collection_name: str | None = None,
138
partition_key: Any = None,
139
**kwargs
140
) -> None:
141
"""
142
Delete a document from the collection.
143
144
Args:
145
document_id (str): ID of document to delete
146
database_name (str): Name of database (uses default if None)
147
collection_name (str): Name of collection (uses default if None)
148
partition_key: Partition key value for the document
149
**kwargs: Additional delete options
150
"""
151
152
def get_document(
153
self,
154
document_id: str,
155
database_name: str | None = None,
156
collection_name: str | None = None,
157
partition_key: Any = None,
158
**kwargs
159
) -> dict:
160
"""
161
Retrieve a document by ID.
162
163
Args:
164
document_id (str): ID of document to retrieve
165
database_name (str): Name of database (uses default if None)
166
collection_name (str): Name of collection (uses default if None)
167
partition_key: Partition key value for the document
168
**kwargs: Additional retrieval options
169
170
Returns:
171
dict: Retrieved document
172
"""
173
174
def get_documents(
175
self,
176
sql_string: str | None = None,
177
database_name: str | None = None,
178
collection_name: str | None = None,
179
partition_key: Any = None,
180
**kwargs
181
) -> list[dict]:
182
"""
183
Query documents using SQL syntax or retrieve all documents.
184
185
Args:
186
sql_string (str): SQL query string (None retrieves all)
187
database_name (str): Name of database (uses default if None)
188
collection_name (str): Name of collection (uses default if None)
189
partition_key: Partition key value to filter by
190
**kwargs: Additional query options (parameters, max_item_count, etc.)
191
192
Returns:
193
list[dict]: List of documents matching query
194
"""
195
196
def test_connection(self) -> tuple[bool, str]:
197
"""
198
Test the Cosmos DB connection.
199
200
Returns:
201
tuple[bool, str]: (success, message) indicating connection status
202
"""
203
```
204
205
### Document Insert Operator
206
207
Operator for inserting documents into Azure Cosmos DB collections.
208
209
```python { .api }
210
class AzureCosmosInsertDocumentOperator(BaseOperator):
211
"""
212
Insert documents into Azure Cosmos DB collection.
213
214
Supports inserting single documents or multiple documents with
215
automatic collection creation if needed.
216
"""
217
218
def __init__(
219
self,
220
database_name: str,
221
collection_name: str,
222
document: dict | list[dict],
223
azure_cosmos_conn_id: str = "azure_cosmos_default",
224
**kwargs
225
):
226
"""
227
Initialize Cosmos DB document insert operator.
228
229
Args:
230
database_name (str): Name of Cosmos DB database
231
collection_name (str): Name of collection to insert into
232
document (dict | list[dict]): Document(s) to insert
233
azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB
234
"""
235
```
236
237
### Document Existence Sensor
238
239
Sensor that waits for a document to exist in Azure Cosmos DB.
240
241
```python { .api }
242
class AzureCosmosDocumentSensor(BaseSensorOperator):
243
"""
244
Sensor that waits for a document to exist in Azure Cosmos DB.
245
246
Monitors a collection for the existence of a specific document
247
by ID or query criteria.
248
"""
249
250
def __init__(
251
self,
252
database_name: str,
253
collection_name: str,
254
document_id: str | None = None,
255
sql_string: str | None = None,
256
azure_cosmos_conn_id: str = "azure_cosmos_default",
257
**kwargs
258
):
259
"""
260
Initialize Cosmos DB document sensor.
261
262
Args:
263
database_name (str): Name of Cosmos DB database
264
collection_name (str): Name of collection to monitor
265
document_id (str): Specific document ID to wait for
266
sql_string (str): SQL query to check for documents
267
azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB
268
"""
269
270
def poke(self, context: dict) -> bool:
271
"""Check if the document exists."""
272
```
273
274
## Usage Examples
275
276
### Basic Document Operations
277
278
```python
279
from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook
280
281
# Initialize hook
282
cosmos_hook = AzureCosmosDBHook(azure_cosmos_conn_id='cosmos_default')
283
284
# Create database
285
cosmos_hook.create_database('my-database')
286
287
# Create collection with partition key
288
cosmos_hook.create_collection(
289
collection_name='users',
290
database_name='my-database',
291
partition_key='/userId',
292
throughput=400
293
)
294
295
# Insert a document
296
user_doc = {
297
'id': 'user-123',
298
'userId': 'user-123',
299
'name': 'John Doe',
300
'email': 'john@example.com',
301
'age': 30
302
}
303
304
result = cosmos_hook.upsert_document(
305
document=user_doc,
306
database_name='my-database',
307
collection_name='users'
308
)
309
310
# Query documents
311
users = cosmos_hook.get_documents(
312
sql_string="SELECT * FROM c WHERE c.age > 25",
313
database_name='my-database',
314
collection_name='users'
315
)
316
317
# Get specific document
318
user = cosmos_hook.get_document(
319
document_id='user-123',
320
database_name='my-database',
321
collection_name='users',
322
partition_key='user-123'
323
)
324
325
# Delete document
326
cosmos_hook.delete_document(
327
document_id='user-123',
328
database_name='my-database',
329
collection_name='users',
330
partition_key='user-123'
331
)
332
```
333
334
### Batch Document Operations
335
336
```python
337
# Insert multiple documents
338
users_batch = [
339
{'id': 'user-1', 'userId': 'user-1', 'name': 'Alice'},
340
{'id': 'user-2', 'userId': 'user-2', 'name': 'Bob'},
341
{'id': 'user-3', 'userId': 'user-3', 'name': 'Charlie'}
342
]
343
344
results = cosmos_hook.insert_documents(
345
documents=users_batch,
346
database_name='my-database',
347
collection_name='users'
348
)
349
350
# Query with parameters
351
users_in_city = cosmos_hook.get_documents(
352
sql_string="SELECT * FROM c WHERE c.city = @city",
353
database_name='my-database',
354
collection_name='users',
355
parameters=[{'name': '@city', 'value': 'New York'}]
356
)
357
```
358
359
### Using in Airflow DAGs
360
361
```python
362
from airflow import DAG
363
from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator
364
from airflow.providers.microsoft.azure.sensors.cosmos import AzureCosmosDocumentSensor
365
from datetime import datetime, timedelta
366
367
dag = DAG(
368
'cosmos_db_workflow',
369
default_args={
370
'owner': 'data-team',
371
'retries': 1,
372
'retry_delay': timedelta(minutes=5)
373
},
374
description='Cosmos DB operations',
375
schedule_interval='@daily',
376
start_date=datetime(2024, 1, 1)
377
)
378
379
# Insert daily summary document
380
insert_summary = AzureCosmosInsertDocumentOperator(
381
task_id='insert_daily_summary',
382
database_name='analytics',
383
collection_name='daily_summaries',
384
document={
385
'id': '{{ ds }}',
386
'date': '{{ ds }}',
387
'summary_type': 'daily',
388
'metrics': {
389
'users_active': 1000,
390
'orders_count': 50,
391
'revenue': 5000.00
392
}
393
},
394
azure_cosmos_conn_id='cosmos_default',
395
dag=dag
396
)
397
398
# Wait for processing completion indicator
399
wait_for_completion = AzureCosmosDocumentSensor(
400
task_id='wait_for_processing_complete',
401
database_name='analytics',
402
collection_name='processing_status',
403
sql_string="SELECT * FROM c WHERE c.date = '{{ ds }}' AND c.status = 'completed'",
404
timeout=1800, # 30 minutes
405
poke_interval=60, # Check every minute
406
dag=dag
407
)
408
409
insert_summary >> wait_for_completion
410
```
411
412
### Advanced Querying
413
414
```python
415
# Complex query with aggregation
416
monthly_stats = cosmos_hook.get_documents(
417
sql_string="""
418
SELECT
419
COUNT(1) as total_orders,
420
SUM(c.amount) as total_revenue,
421
AVG(c.amount) as avg_order_value
422
FROM c
423
WHERE c.order_date >= @start_date
424
AND c.order_date < @end_date
425
""",
426
database_name='ecommerce',
427
collection_name='orders',
428
parameters=[
429
{'name': '@start_date', 'value': '2024-01-01'},
430
{'name': '@end_date', 'value': '2024-02-01'}
431
]
432
)
433
434
# Cross-partition query with continuation
435
all_users = []
436
query_iterator = cosmos_hook.get_documents(
437
sql_string="SELECT * FROM c",
438
database_name='my-database',
439
collection_name='users',
440
enable_cross_partition=True,
441
max_item_count=100
442
)
443
```
444
445
## Connection Configuration
446
447
Azure Cosmos DB connections support multiple authentication methods and API types.
448
449
**Connection Type**: `azure_cosmos`
450
451
**Required Fields**:
452
- `endpoint_uri`: Cosmos DB account endpoint URI
453
- `database_name`: Default database name (can be overridden)
454
455
**Authentication Options**:
456
- **Primary/Secondary Key**: Use account key
457
- **Service Principal**: Use client credentials
458
- **Managed Identity**: Use Azure managed identity
459
460
**Connection Fields**:
461
- `master_key`: Cosmos DB account primary/secondary key
462
- `client_id`: Service principal client ID
463
- `client_secret`: Service principal secret
464
- `tenant_id`: Azure tenant ID
465
466
**Optional Configuration**:
467
- `collection_name`: Default collection name
468
- `consistency_level`: Default consistency level (Strong, BoundedStaleness, Session, ConsistentPrefix, Eventual)
469
470
## Error Handling
471
472
```python { .api }
473
# Common Cosmos DB exceptions
474
class CosmosDBException(AirflowException):
475
"""Base exception for Cosmos DB operations."""
476
477
class DocumentNotFound(CosmosDBException):
478
"""Raised when a document is not found."""
479
480
class CollectionNotFound(CosmosDBException):
481
"""Raised when a collection is not found."""
482
483
class DatabaseNotFound(CosmosDBException):
484
"""Raised when a database is not found."""
485
486
class PartitionKeyMismatch(CosmosDBException):
487
"""Raised when partition key doesn't match."""
488
```
489
490
## Performance Considerations
491
492
The Cosmos DB integration supports:
493
494
- **Partitioning**: Efficient partition key usage for scale
495
- **Throughput Management**: Configurable RU/s provisioning
496
- **Batch Operations**: Bulk document operations for efficiency
497
- **Query Optimization**: Parameterized queries and indexing hints
498
- **Consistency Levels**: Configurable consistency vs. performance trade-offs
499
- **Cross-Partition Queries**: Support for queries spanning partitions
500
501
Azure Cosmos DB integration provides comprehensive NoSQL database capabilities with global distribution, multiple APIs, and automatic scaling suitable for mission-critical applications.