or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdclient-operations.mdcontainer-operations.mddatabase-operations.mdindex.mdscript-operations.mduser-management.md

async-operations.mddocs/

0

# Async Operations

1

2

Asynchronous operations for high-performance, non-blocking Azure Cosmos DB applications. The async API provides coroutine-based versions of all synchronous operations with additional connection management capabilities.

3

4

## Capabilities

5

6

### Async Client Operations

7

8

Asynchronous version of CosmosClient with additional connection lifecycle management.

9

10

```python { .api }

11

class CosmosClient:

12

def __init__(self, url: str, credential: Union[str, Dict[str, Any], TokenCredential], consistency_level: str = None, **kwargs):

13

"""

14

Initialize async CosmosClient.

15

16

Parameters: Same as synchronous CosmosClient

17

"""

18

19

@classmethod

20

def from_connection_string(cls, conn_str: str, credential: str = None, consistency_level: str = None, **kwargs):

21

"""

22

Create async CosmosClient from connection string.

23

24

Parameters: Same as synchronous version

25

26

Returns:

27

Async CosmosClient instance

28

"""

29

30

async def create_database(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):

31

"""

32

Async version of create_database.

33

34

Parameters: Same as synchronous version

35

36

Returns:

37

Async DatabaseProxy for the created database

38

"""

39

40

async def create_database_if_not_exists(self, id: str, populate_query_metrics: bool = None, offer_throughput: Union[int, ThroughputProperties] = None, **kwargs):

41

"""

42

Async version of create_database_if_not_exists.

43

"""

44

45

def get_database_client(self, database: str):

46

"""

47

Get async database client (non-async method).

48

49

Returns:

50

Async DatabaseProxy instance

51

"""

52

53

async def list_databases(self, max_item_count: int = None, populate_query_metrics: bool = None, **kwargs):

54

"""

55

Async version of list_databases.

56

"""

57

58

async def query_databases(self, query: str = None, parameters: list = None, **kwargs):

59

"""

60

Async version of query_databases.

61

"""

62

63

async def delete_database(self, database: str, populate_query_metrics: bool = None, **kwargs):

64

"""

65

Async version of delete_database.

66

"""

67

68

async def get_database_account(self, **kwargs):

69

"""

70

Async version of get_database_account.

71

"""

72

73

async def close(self):

74

"""

75

Close the client and clean up resources.

76

77

This method should be called when the client is no longer needed

78

to properly clean up connections and resources.

79

"""

80

```

81

82

### Async Database Operations

83

84

Asynchronous database-level operations.

85

86

```python { .api }

87

class DatabaseProxy:

88

async def read(self, populate_query_metrics: bool = None, **kwargs):

89

"""

90

Async version of read database properties.

91

"""

92

93

async def create_container(self, id: str, partition_key: PartitionKey, **kwargs):

94

"""

95

Async version of create_container.

96

97

Returns:

98

Async ContainerProxy for the created container

99

"""

100

101

async def create_container_if_not_exists(self, id: str, partition_key: PartitionKey, **kwargs):

102

"""

103

Async version of create_container_if_not_exists.

104

"""

105

106

def get_container_client(self, container: str):

107

"""

108

Get async container client (non-async method).

109

110

Returns:

111

Async ContainerProxy instance

112

"""

113

114

async def list_containers(self, max_item_count: int = None, **kwargs):

115

"""

116

Async version of list_containers.

117

"""

118

119

async def query_containers(self, query: str = None, parameters: list = None, **kwargs):

120

"""

121

Async version of query_containers.

122

"""

123

124

async def replace_container(self, container: str, partition_key: PartitionKey, **kwargs):

125

"""

126

Async version of replace_container.

127

"""

128

129

async def delete_container(self, container: str, **kwargs):

130

"""

131

Async version of delete_container.

132

"""

133

134

async def get_throughput(self, **kwargs):

135

"""

136

Async version of get_throughput.

137

"""

138

139

async def replace_throughput(self, throughput: ThroughputProperties, **kwargs):

140

"""

141

Async version of replace_throughput.

142

"""

143

```

144

145

### Async Container Operations

146

147

Asynchronous container-level operations for items and queries.

148

149

```python { .api }

150

class ContainerProxy:

151

async def read(self, populate_query_metrics: bool = None, **kwargs):

152

"""

153

Async version of read container properties.

154

"""

155

156

async def create_item(self, body: dict, **kwargs):

157

"""

158

Async version of create_item.

159

"""

160

161

async def read_item(self, item: str, partition_key: str, **kwargs):

162

"""

163

Async version of read_item.

164

"""

165

166

async def upsert_item(self, body: dict, **kwargs):

167

"""

168

Async version of upsert_item.

169

"""

170

171

async def replace_item(self, item: str, body: dict, **kwargs):

172

"""

173

Async version of replace_item.

174

"""

175

176

async def patch_item(self, item: str, partition_key: str, patch_operations: list, **kwargs):

177

"""

178

Async version of patch_item.

179

"""

180

181

async def delete_item(self, item: str, partition_key: str, **kwargs):

182

"""

183

Async version of delete_item.

184

"""

185

186

async def delete_all_items_by_partition_key(self, partition_key: str, **kwargs):

187

"""

188

Async version of delete_all_items_by_partition_key.

189

"""

190

191

async def query_items(self, query: str = None, parameters: list = None, **kwargs):

192

"""

193

Async version of query_items.

194

195

Returns:

196

Async iterable of query results

197

"""

198

199

async def read_all_items(self, max_item_count: int = None, **kwargs):

200

"""

201

Async version of read_all_items.

202

203

Returns:

204

Async iterable of all items

205

"""

206

207

async def query_items_change_feed(self, **kwargs):

208

"""

209

Async version of query_items_change_feed.

210

"""

211

212

async def execute_item_batch(self, batch_operations: list, partition_key: str, **kwargs):

213

"""

214

Async version of execute_item_batch.

215

"""

216

217

@property

218

def scripts(self):

219

"""

220

Get async scripts proxy (non-async property).

221

222

Returns:

223

Async ScriptsProxy instance

224

"""

225

```

226

227

### Async Script Operations

228

229

Asynchronous script operations for stored procedures, triggers, and UDFs.

230

231

```python { .api }

232

class ScriptsProxy:

233

async def create_stored_procedure(self, body: dict, **kwargs):

234

"""

235

Async version of create_stored_procedure.

236

"""

237

238

async def execute_stored_procedure(self, sproc: str, partition_key: str = None, params: list = None, **kwargs):

239

"""

240

Async version of execute_stored_procedure.

241

"""

242

243

async def list_stored_procedures(self, max_item_count: int = None, **kwargs):

244

"""

245

Async version of list_stored_procedures.

246

"""

247

248

async def get_stored_procedure(self, sproc: str, **kwargs):

249

"""

250

Async version of get_stored_procedure.

251

"""

252

253

async def replace_stored_procedure(self, sproc: str, body: dict, **kwargs):

254

"""

255

Async version of replace_stored_procedure.

256

"""

257

258

async def delete_stored_procedure(self, sproc: str, **kwargs):

259

"""

260

Async version of delete_stored_procedure.

261

"""

262

263

async def create_trigger(self, body: dict, **kwargs):

264

"""

265

Async version of create_trigger.

266

"""

267

268

async def create_user_defined_function(self, body: dict, **kwargs):

269

"""

270

Async version of create_user_defined_function.

271

"""

272

```

273

274

### Async User Management

275

276

Asynchronous user and permission operations.

277

278

```python { .api }

279

class UserProxy:

280

async def read(self, **kwargs):

281

"""

282

Async version of read user properties.

283

"""

284

285

async def create_permission(self, body: dict, **kwargs):

286

"""

287

Async version of create_permission.

288

"""

289

290

async def list_permissions(self, max_item_count: int = None, **kwargs):

291

"""

292

Async version of list_permissions.

293

"""

294

295

async def get_permission(self, permission: str, **kwargs):

296

"""

297

Async version of get_permission.

298

"""

299

300

async def replace_permission(self, permission: str, body: dict, **kwargs):

301

"""

302

Async version of replace_permission.

303

"""

304

305

async def delete_permission(self, permission: str, **kwargs):

306

"""

307

Async version of delete_permission.

308

"""

309

```

310

311

## Usage Examples

312

313

### Basic Async Operations

314

315

```python

316

import asyncio

317

from azure.cosmos.aio import CosmosClient

318

from azure.cosmos import ConsistencyLevel, PartitionKey

319

320

async def basic_async_operations():

321

# Initialize async client

322

async with CosmosClient(

323

url="https://myaccount.documents.azure.com:443/",

324

credential="myaccountkey==",

325

consistency_level=ConsistencyLevel.Session

326

) as client:

327

328

# Create database

329

database = await client.create_database_if_not_exists(

330

id="AsyncDatabase",

331

offer_throughput=400

332

)

333

334

# Create container

335

container = await database.create_container_if_not_exists(

336

id="AsyncContainer",

337

partition_key=PartitionKey(path="/category"),

338

offer_throughput=400

339

)

340

341

# Create items concurrently

342

items_to_create = [

343

{"id": f"item{i}", "category": "electronics", "name": f"Product {i}"}

344

for i in range(10)

345

]

346

347

# Create items concurrently

348

create_tasks = [

349

container.create_item(item) for item in items_to_create

350

]

351

created_items = await asyncio.gather(*create_tasks, return_exceptions=True)

352

353

successful_creates = [item for item in created_items if not isinstance(item, Exception)]

354

print(f"Successfully created {len(successful_creates)} items")

355

356

# Query items

357

query = "SELECT * FROM c WHERE c.category = @category"

358

parameters = [{"name": "@category", "value": "electronics"}]

359

360

items = []

361

async for item in container.query_items(

362

query=query,

363

parameters=parameters,

364

enable_cross_partition_query=True

365

):

366

items.append(item)

367

368

print(f"Queried {len(items)} items")

369

370

# Run the async function

371

asyncio.run(basic_async_operations())

372

```

373

374

### Context Manager Pattern

375

376

```python

377

async def context_manager_example():

378

"""Demonstrate proper resource management with async context manager."""

379

380

async with CosmosClient(

381

url="https://myaccount.documents.azure.com:443/",

382

credential="myaccountkey=="

383

) as client:

384

385

database = client.get_database_client("MyDatabase")

386

container = database.get_container_client("MyContainer")

387

388

# Perform operations

389

await container.create_item({

390

"id": "async_item",

391

"category": "test",

392

"data": "async operation"

393

})

394

395

item = await container.read_item(

396

item="async_item",

397

partition_key="test"

398

)

399

print(f"Read item: {item['id']}")

400

401

# Client is automatically closed when exiting context manager

402

print("Client resources cleaned up")

403

404

asyncio.run(context_manager_example())

405

```

406

407

### High-Performance Bulk Operations

408

409

```python

410

import asyncio

411

from concurrent.futures import ThreadPoolExecutor

412

import time

413

414

async def bulk_operations_example():

415

"""Demonstrate high-performance bulk operations with async."""

416

417

async with CosmosClient(

418

url="https://myaccount.documents.azure.com:443/",

419

credential="myaccountkey=="

420

) as client:

421

422

database = client.get_database_client("BulkDatabase")

423

container = database.get_container_client("BulkContainer")

424

425

# Generate large number of items

426

num_items = 1000

427

items = [

428

{

429

"id": f"bulk_item_{i}",

430

"category": f"category_{i % 10}",

431

"data": f"Data for item {i}",

432

"timestamp": time.time()

433

}

434

for i in range(num_items)

435

]

436

437

print(f"Creating {num_items} items...")

438

start_time = time.time()

439

440

# Process in batches to avoid overwhelming the service

441

batch_size = 50

442

batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]

443

444

for batch in batches:

445

# Create batch concurrently

446

tasks = [container.upsert_item(item) for item in batch]

447

results = await asyncio.gather(*tasks, return_exceptions=True)

448

449

# Handle any errors

450

errors = [r for r in results if isinstance(r, Exception)]

451

if errors:

452

print(f"Batch had {len(errors)} errors")

453

454

end_time = time.time()

455

print(f"Bulk operation completed in {end_time - start_time:.2f} seconds")

456

457

# Query performance test

458

print("Testing query performance...")

459

start_time = time.time()

460

461

query_tasks = []

462

for category_id in range(10):

463

query = "SELECT COUNT(1) as count FROM c WHERE c.category = @category"

464

parameters = [{"name": "@category", "value": f"category_{category_id}"}]

465

466

task = container.query_items(

467

query=query,

468

parameters=parameters,

469

enable_cross_partition_query=True

470

)

471

query_tasks.append(task)

472

473

# Execute queries concurrently

474

query_results = await asyncio.gather(*[

475

collect_async_iterable(query_task) for query_task in query_tasks

476

])

477

478

end_time = time.time()

479

print(f"Concurrent queries completed in {end_time - start_time:.2f} seconds")

480

481

for i, results in enumerate(query_results):

482

if results:

483

print(f"Category {i}: {results[0]['count']} items")

484

485

async def collect_async_iterable(async_iterable):

486

"""Helper to collect results from async iterable."""

487

results = []

488

async for item in async_iterable:

489

results.append(item)

490

return results

491

492

asyncio.run(bulk_operations_example())

493

```

494

495

### Change Feed Processing with Async

496

497

```python

498

async def change_feed_processor():

499

"""Process change feed asynchronously with high throughput."""

500

501

async with CosmosClient(

502

url="https://myaccount.documents.azure.com:443/",

503

credential="myaccountkey=="

504

) as client:

505

506

database = client.get_database_client("ChangeDatabase")

507

container = database.get_container_client("ChangeContainer")

508

509

# Get feed ranges for parallel processing

510

feed_ranges = await container.read_feed_ranges()

511

print(f"Processing {len(feed_ranges)} feed ranges")

512

513

async def process_feed_range(feed_range, range_id):

514

"""Process a single feed range."""

515

continuation = None

516

processed_count = 0

517

518

while True:

519

try:

520

changes = container.query_items_change_feed(

521

feed_range=feed_range,

522

continuation=continuation,

523

is_start_from_beginning=True,

524

max_item_count=100

525

)

526

527

batch_changes = []

528

async for change in changes:

529

if "_lsn" in change: # Valid change record

530

batch_changes.append(change)

531

532

if not batch_changes:

533

break

534

535

# Process changes (simulate work)

536

await asyncio.sleep(0.1) # Simulate processing time

537

processed_count += len(batch_changes)

538

539

print(f"Range {range_id}: Processed {len(batch_changes)} changes "

540

f"(total: {processed_count})")

541

542

# Get continuation for next batch

543

continuation = changes.get_continuation()

544

if not continuation:

545

break

546

547

except Exception as e:

548

print(f"Error processing range {range_id}: {e}")

549

break

550

551

return processed_count

552

553

# Process all feed ranges concurrently

554

tasks = [

555

process_feed_range(feed_range, i)

556

for i, feed_range in enumerate(feed_ranges)

557

]

558

559

results = await asyncio.gather(*tasks)

560

total_processed = sum(results)

561

print(f"Total changes processed: {total_processed}")

562

563

asyncio.run(change_feed_processor())

564

```

565

566

### Error Handling and Retry Logic

567

568

```python

569

import asyncio

570

import random

571

from azure.cosmos.exceptions import CosmosHttpResponseError

572

573

async def resilient_async_operations():

574

"""Demonstrate error handling and retry logic in async operations."""

575

576

async def retry_operation(operation, max_retries=3, delay=1.0):

577

"""Generic retry wrapper for async operations."""

578

for attempt in range(max_retries):

579

try:

580

return await operation()

581

except CosmosHttpResponseError as e:

582

if e.status_code == 429: # Rate limiting

583

wait_time = delay * (2 ** attempt) + random.uniform(0, 1)

584

print(f"Rate limited, waiting {wait_time:.2f}s (attempt {attempt + 1})")

585

await asyncio.sleep(wait_time)

586

else:

587

print(f"HTTP error {e.status_code}: {e.message}")

588

if attempt == max_retries - 1:

589

raise

590

except Exception as e:

591

print(f"Unexpected error: {e}")

592

if attempt == max_retries - 1:

593

raise

594

595

raise Exception(f"Operation failed after {max_retries} attempts")

596

597

async with CosmosClient(

598

url="https://myaccount.documents.azure.com:443/",

599

credential="myaccountkey=="

600

) as client:

601

602

database = client.get_database_client("ResilientDatabase")

603

container = database.get_container_client("ResilientContainer")

604

605

# Example: Resilient item creation

606

async def create_item_operation():

607

return await container.create_item({

608

"id": f"resilient_item_{random.randint(1, 1000)}",

609

"category": "test",

610

"timestamp": time.time()

611

})

612

613

# Use retry wrapper

614

try:

615

item = await retry_operation(create_item_operation)

616

print(f"Successfully created item: {item['id']}")

617

except Exception as e:

618

print(f"Failed to create item after retries: {e}")

619

620

# Parallel operations with individual error handling

621

async def safe_create_item(item_data):

622

"""Safely create an item with error handling."""

623

try:

624

return await retry_operation(

625

lambda: container.create_item(item_data)

626

)

627

except Exception as e:

628

print(f"Failed to create item {item_data['id']}: {e}")

629

return None

630

631

# Create multiple items with resilience

632

items_to_create = [

633

{"id": f"safe_item_{i}", "category": "batch", "data": f"Data {i}"}

634

for i in range(10)

635

]

636

637

tasks = [safe_create_item(item) for item in items_to_create]

638

results = await asyncio.gather(*tasks)

639

640

successful_items = [item for item in results if item is not None]

641

print(f"Successfully created {len(successful_items)} out of {len(items_to_create)} items")

642

643

asyncio.run(resilient_async_operations())

644

```