or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

cosmos-db.mddocs/

0

# Azure Cosmos DB

1

2

Comprehensive Azure Cosmos DB integration supporting database and collection management, document operations, and query execution across all Cosmos DB APIs. Provides full CRUD operations with support for multiple consistency levels and partitioning strategies.

3

4

## Capabilities

5

6

### Cosmos DB Hook

7

8

Primary interface for Azure Cosmos DB operations, providing authenticated connections and database functionality.

9

10

```python { .api }

11

class AzureCosmosDBHook(BaseHook):

12

"""

13

Hook for Azure Cosmos DB operations.

14

15

Provides methods for database management, collection operations, and document

16

manipulation across all Cosmos DB APIs (SQL, MongoDB, Cassandra, Gremlin, Table).

17

"""

18

19

def get_conn(self) -> CosmosClient:

20

"""Get authenticated Azure Cosmos DB client."""

21

22

def does_database_exist(self, database_name: str) -> bool:

23

"""

24

Check if a database exists.

25

26

Args:

27

database_name (str): Name of database to check

28

29

Returns:

30

bool: True if database exists, False otherwise

31

"""

32

33

def create_database(self, database_name: str) -> None:

34

"""

35

Create a new database.

36

37

Args:

38

database_name (str): Name of database to create

39

"""

40

41

def delete_database(self, database_name: str) -> None:

42

"""

43

Delete a database and all its collections.

44

45

Args:

46

database_name (str): Name of database to delete

47

"""

48

49

def does_collection_exist(self, collection_name: str, database_name: str) -> bool:

50

"""

51

Check if a collection exists in the specified database.

52

53

Args:

54

collection_name (str): Name of collection to check

55

database_name (str): Name of database containing collection

56

57

Returns:

58

bool: True if collection exists, False otherwise

59

"""

60

61

def create_collection(

62

self,

63

collection_name: str,

64

database_name: str | None = None,

65

partition_key: str | None = None,

66

throughput: int | None = None,

67

**kwargs

68

) -> None:

69

"""

70

Create a new collection in the database.

71

72

Args:

73

collection_name (str): Name of collection to create

74

database_name (str): Name of database (uses default if None)

75

partition_key (str): Partition key for the collection

76

throughput (int): Provisioned throughput (RU/s)

77

**kwargs: Additional collection configuration options

78

"""

79

80

def delete_collection(

81

self,

82

collection_name: str,

83

database_name: str | None = None

84

) -> None:

85

"""

86

Delete a collection from the database.

87

88

Args:

89

collection_name (str): Name of collection to delete

90

database_name (str): Name of database (uses default if None)

91

"""

92

93

def upsert_document(

94

self,

95

document: dict,

96

database_name: str | None = None,

97

collection_name: str | None = None,

98

**kwargs

99

) -> dict:

100

"""

101

Insert or update a document in the collection.

102

103

Args:

104

document (dict): Document data to upsert

105

database_name (str): Name of database (uses default if None)

106

collection_name (str): Name of collection (uses default if None)

107

**kwargs: Additional upsert options

108

109

Returns:

110

dict: Upserted document with metadata

111

"""

112

113

def insert_documents(

114

self,

115

documents: list[dict],

116

database_name: str | None = None,

117

collection_name: str | None = None,

118

**kwargs

119

) -> list[dict]:

120

"""

121

Insert multiple documents into the collection.

122

123

Args:

124

documents (list[dict]): List of documents to insert

125

database_name (str): Name of database (uses default if None)

126

collection_name (str): Name of collection (uses default if None)

127

**kwargs: Additional insert options

128

129

Returns:

130

list[dict]: List of inserted documents with metadata

131

"""

132

133

def delete_document(

134

self,

135

document_id: str,

136

database_name: str | None = None,

137

collection_name: str | None = None,

138

partition_key: Any = None,

139

**kwargs

140

) -> None:

141

"""

142

Delete a document from the collection.

143

144

Args:

145

document_id (str): ID of document to delete

146

database_name (str): Name of database (uses default if None)

147

collection_name (str): Name of collection (uses default if None)

148

partition_key: Partition key value for the document

149

**kwargs: Additional delete options

150

"""

151

152

def get_document(

153

self,

154

document_id: str,

155

database_name: str | None = None,

156

collection_name: str | None = None,

157

partition_key: Any = None,

158

**kwargs

159

) -> dict:

160

"""

161

Retrieve a document by ID.

162

163

Args:

164

document_id (str): ID of document to retrieve

165

database_name (str): Name of database (uses default if None)

166

collection_name (str): Name of collection (uses default if None)

167

partition_key: Partition key value for the document

168

**kwargs: Additional retrieval options

169

170

Returns:

171

dict: Retrieved document

172

"""

173

174

def get_documents(

175

self,

176

sql_string: str | None = None,

177

database_name: str | None = None,

178

collection_name: str | None = None,

179

partition_key: Any = None,

180

**kwargs

181

) -> list[dict]:

182

"""

183

Query documents using SQL syntax or retrieve all documents.

184

185

Args:

186

sql_string (str): SQL query string (None retrieves all)

187

database_name (str): Name of database (uses default if None)

188

collection_name (str): Name of collection (uses default if None)

189

partition_key: Partition key value to filter by

190

**kwargs: Additional query options (parameters, max_item_count, etc.)

191

192

Returns:

193

list[dict]: List of documents matching query

194

"""

195

196

def test_connection(self) -> tuple[bool, str]:

197

"""

198

Test the Cosmos DB connection.

199

200

Returns:

201

tuple[bool, str]: (success, message) indicating connection status

202

"""

203

```

204

205

### Document Insert Operator

206

207

Operator for inserting documents into Azure Cosmos DB collections.

208

209

```python { .api }

210

class AzureCosmosInsertDocumentOperator(BaseOperator):

211

"""

212

Insert documents into Azure Cosmos DB collection.

213

214

Supports inserting single documents or multiple documents with

215

automatic collection creation if needed.

216

"""

217

218

def __init__(

219

self,

220

database_name: str,

221

collection_name: str,

222

document: dict | list[dict],

223

azure_cosmos_conn_id: str = "azure_cosmos_default",

224

**kwargs

225

):

226

"""

227

Initialize Cosmos DB document insert operator.

228

229

Args:

230

database_name (str): Name of Cosmos DB database

231

collection_name (str): Name of collection to insert into

232

document (dict | list[dict]): Document(s) to insert

233

azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB

234

"""

235

```

236

237

### Document Existence Sensor

238

239

Sensor that waits for a document to exist in Azure Cosmos DB.

240

241

```python { .api }

242

class AzureCosmosDocumentSensor(BaseSensorOperator):

243

"""

244

Sensor that waits for a document to exist in Azure Cosmos DB.

245

246

Monitors a collection for the existence of a specific document

247

by ID or query criteria.

248

"""

249

250

def __init__(

251

self,

252

database_name: str,

253

collection_name: str,

254

document_id: str | None = None,

255

sql_string: str | None = None,

256

azure_cosmos_conn_id: str = "azure_cosmos_default",

257

**kwargs

258

):

259

"""

260

Initialize Cosmos DB document sensor.

261

262

Args:

263

database_name (str): Name of Cosmos DB database

264

collection_name (str): Name of collection to monitor

265

document_id (str): Specific document ID to wait for

266

sql_string (str): SQL query to check for documents

267

azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB

268

"""

269

270

def poke(self, context: dict) -> bool:

271

"""Check if the document exists."""

272

```

273

274

## Usage Examples

275

276

### Basic Document Operations

277

278

```python

279

from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook

280

281

# Initialize hook

282

cosmos_hook = AzureCosmosDBHook(azure_cosmos_conn_id='cosmos_default')

283

284

# Create database

285

cosmos_hook.create_database('my-database')

286

287

# Create collection with partition key

288

cosmos_hook.create_collection(

289

collection_name='users',

290

database_name='my-database',

291

partition_key='/userId',

292

throughput=400

293

)

294

295

# Insert a document

296

user_doc = {

297

'id': 'user-123',

298

'userId': 'user-123',

299

'name': 'John Doe',

300

'email': 'john@example.com',

301

'age': 30

302

}

303

304

result = cosmos_hook.upsert_document(

305

document=user_doc,

306

database_name='my-database',

307

collection_name='users'

308

)

309

310

# Query documents

311

users = cosmos_hook.get_documents(

312

sql_string="SELECT * FROM c WHERE c.age > 25",

313

database_name='my-database',

314

collection_name='users'

315

)

316

317

# Get specific document

318

user = cosmos_hook.get_document(

319

document_id='user-123',

320

database_name='my-database',

321

collection_name='users',

322

partition_key='user-123'

323

)

324

325

# Delete document

326

cosmos_hook.delete_document(

327

document_id='user-123',

328

database_name='my-database',

329

collection_name='users',

330

partition_key='user-123'

331

)

332

```

333

334

### Batch Document Operations

335

336

```python

337

# Insert multiple documents

338

users_batch = [

339

{'id': 'user-1', 'userId': 'user-1', 'name': 'Alice'},

340

{'id': 'user-2', 'userId': 'user-2', 'name': 'Bob'},

341

{'id': 'user-3', 'userId': 'user-3', 'name': 'Charlie'}

342

]

343

344

results = cosmos_hook.insert_documents(

345

documents=users_batch,

346

database_name='my-database',

347

collection_name='users'

348

)

349

350

# Query with parameters

351

users_in_city = cosmos_hook.get_documents(

352

sql_string="SELECT * FROM c WHERE c.city = @city",

353

database_name='my-database',

354

collection_name='users',

355

parameters=[{'name': '@city', 'value': 'New York'}]

356

)

357

```

358

359

### Using in Airflow DAGs

360

361

```python

362

from airflow import DAG

363

from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator

364

from airflow.providers.microsoft.azure.sensors.cosmos import AzureCosmosDocumentSensor

365

from datetime import datetime, timedelta

366

367

dag = DAG(

368

'cosmos_db_workflow',

369

default_args={

370

'owner': 'data-team',

371

'retries': 1,

372

'retry_delay': timedelta(minutes=5)

373

},

374

description='Cosmos DB operations',

375

schedule_interval='@daily',

376

start_date=datetime(2024, 1, 1)

377

)

378

379

# Insert daily summary document

380

insert_summary = AzureCosmosInsertDocumentOperator(

381

task_id='insert_daily_summary',

382

database_name='analytics',

383

collection_name='daily_summaries',

384

document={

385

'id': '{{ ds }}',

386

'date': '{{ ds }}',

387

'summary_type': 'daily',

388

'metrics': {

389

'users_active': 1000,

390

'orders_count': 50,

391

'revenue': 5000.00

392

}

393

},

394

azure_cosmos_conn_id='cosmos_default',

395

dag=dag

396

)

397

398

# Wait for processing completion indicator

399

wait_for_completion = AzureCosmosDocumentSensor(

400

task_id='wait_for_processing_complete',

401

database_name='analytics',

402

collection_name='processing_status',

403

sql_string="SELECT * FROM c WHERE c.date = '{{ ds }}' AND c.status = 'completed'",

404

timeout=1800, # 30 minutes

405

poke_interval=60, # Check every minute

406

dag=dag

407

)

408

409

insert_summary >> wait_for_completion

410

```

411

412

### Advanced Querying

413

414

```python

415

# Complex query with aggregation

416

monthly_stats = cosmos_hook.get_documents(

417

sql_string="""

418

SELECT

419

COUNT(1) as total_orders,

420

SUM(c.amount) as total_revenue,

421

AVG(c.amount) as avg_order_value

422

FROM c

423

WHERE c.order_date >= @start_date

424

AND c.order_date < @end_date

425

""",

426

database_name='ecommerce',

427

collection_name='orders',

428

parameters=[

429

{'name': '@start_date', 'value': '2024-01-01'},

430

{'name': '@end_date', 'value': '2024-02-01'}

431

]

432

)

433

434

# Cross-partition query with continuation

435

all_users = []

436

query_iterator = cosmos_hook.get_documents(

437

sql_string="SELECT * FROM c",

438

database_name='my-database',

439

collection_name='users',

440

enable_cross_partition=True,

441

max_item_count=100

442

)

443

```

444

445

## Connection Configuration

446

447

Azure Cosmos DB connections support multiple authentication methods and API types.

448

449

**Connection Type**: `azure_cosmos`

450

451

**Required Fields**:

452

- `endpoint_uri`: Cosmos DB account endpoint URI

453

- `database_name`: Default database name (can be overridden)

454

455

**Authentication Options**:

456

- **Primary/Secondary Key**: Use account key

457

- **Service Principal**: Use client credentials

458

- **Managed Identity**: Use Azure managed identity

459

460

**Connection Fields**:

461

- `master_key`: Cosmos DB account primary/secondary key

462

- `client_id`: Service principal client ID

463

- `client_secret`: Service principal secret

464

- `tenant_id`: Azure tenant ID

465

466

**Optional Configuration**:

467

- `collection_name`: Default collection name

468

- `consistency_level`: Default consistency level (Strong, BoundedStaleness, Session, ConsistentPrefix, Eventual)

469

470

## Error Handling

471

472

```python { .api }

473

# Common Cosmos DB exceptions

474

class CosmosDBException(AirflowException):

475

"""Base exception for Cosmos DB operations."""

476

477

class DocumentNotFound(CosmosDBException):

478

"""Raised when a document is not found."""

479

480

class CollectionNotFound(CosmosDBException):

481

"""Raised when a collection is not found."""

482

483

class DatabaseNotFound(CosmosDBException):

484

"""Raised when a database is not found."""

485

486

class PartitionKeyMismatch(CosmosDBException):

487

"""Raised when partition key doesn't match."""

488

```

489

490

## Performance Considerations

491

492

The Cosmos DB integration supports:

493

494

- **Partitioning**: Efficient partition key usage for scale

495

- **Throughput Management**: Configurable RU/s provisioning

496

- **Batch Operations**: Bulk document operations for efficiency

497

- **Query Optimization**: Parameterized queries and indexing hints

498

- **Consistency Levels**: Configurable consistency vs. performance trade-offs

499

- **Cross-Partition Queries**: Support for queries spanning partitions

500

501

Azure Cosmos DB integration provides comprehensive NoSQL database capabilities with global distribution, multiple APIs, and automatic scaling suitable for mission-critical applications.