or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdbatch-operations.mdentity-data-types.mderror-handling.mdindex.mdsecurity-access-control.mdservice-management.mdtable-operations.md

async-operations.mddocs/

0

# Async Operations

1

2

Complete asynchronous operation support using async/await patterns for non-blocking I/O, providing identical functionality to synchronous clients with improved performance for concurrent scenarios.

3

4

## Capabilities

5

6

### Async Service Client

7

8

Asynchronous version of TableServiceClient for account-level operations with non-blocking I/O.

9

10

```python { .api }

11

from azure.data.tables.aio import TableServiceClient

12

13

class TableServiceClient:

14

def __init__(

15

self,

16

endpoint: str,

17

credential=None,

18

*,

19

audience: str = None,

20

api_version: str = None,

21

**kwargs

22

):

23

"""

24

Initialize async TableServiceClient.

25

26

Parameters identical to synchronous version.

27

All network operations are async and non-blocking.

28

"""

29

30

@classmethod

31

def from_connection_string(

32

cls,

33

conn_str: str,

34

**kwargs

35

) -> "TableServiceClient":

36

"""Create async client from connection string."""

37

38

async def create_table(self, table_name: str, **kwargs) -> "TableClient":

39

"""Create table asynchronously."""

40

41

async def create_table_if_not_exists(self, table_name: str, **kwargs) -> "TableClient":

42

"""Create table if not exists asynchronously."""

43

44

async def delete_table(self, table_name: str, **kwargs) -> None:

45

"""Delete table asynchronously."""

46

47

def list_tables(self, **kwargs) -> AsyncItemPaged[TableItem]:

48

"""List tables with async iteration."""

49

50

def query_tables(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableItem]:

51

"""Query tables with async iteration."""

52

53

async def get_service_properties(self, **kwargs) -> Dict[str, object]:

54

"""Get service properties asynchronously."""

55

56

async def set_service_properties(self, **kwargs) -> None:

57

"""Set service properties asynchronously."""

58

59

async def get_service_stats(self, **kwargs) -> Dict[str, object]:

60

"""Get service statistics asynchronously."""

61

62

def get_table_client(self, table_name: str, **kwargs) -> "TableClient":

63

"""Get async TableClient for specific table."""

64

65

async def close(self) -> None:

66

"""Close the client and cleanup resources."""

67

68

async def __aenter__(self) -> "TableServiceClient":

69

"""Async context manager entry."""

70

71

async def __aexit__(self, *args) -> None:

72

"""Async context manager exit."""

73

```

74

75

#### Usage Example

76

77

```python

78

import asyncio

79

from azure.data.tables.aio import TableServiceClient

80

81

async def async_service_operations():

82

"""Demonstrate async service client operations."""

83

84

# Initialize async service client

85

async with TableServiceClient.from_connection_string(conn_str) as service_client:

86

87

# Create multiple tables concurrently

88

table_names = ["customers", "orders", "products", "inventory"]

89

90

# Create all tables in parallel

91

create_tasks = [

92

service_client.create_table_if_not_exists(name)

93

for name in table_names

94

]

95

96

table_clients = await asyncio.gather(*create_tasks)

97

print(f"Created {len(table_clients)} tables concurrently")

98

99

# List all tables asynchronously

100

print("Tables in account:")

101

async for table in service_client.list_tables():

102

print(f" - {table.name}")

103

104

# Query tables with filter

105

print("Tables starting with 'c':")

106

async for table in service_client.query_tables("TableName ge 'c'"):

107

print(f" - {table.name}")

108

109

# Get service properties

110

properties = await service_client.get_service_properties()

111

logging_enabled = properties.get('analytics_logging', {}).get('read', False)

112

print(f"Read logging enabled: {logging_enabled}")

113

114

# Run async operations

115

asyncio.run(async_service_operations())

116

```

117

118

### Async Table Client

119

120

Asynchronous version of TableClient for table-specific operations with non-blocking entity operations.

121

122

```python { .api }

123

from azure.data.tables.aio import TableClient

124

125

class TableClient:

126

def __init__(

127

self,

128

endpoint: str,

129

table_name: str,

130

*,

131

credential=None,

132

**kwargs

133

):

134

"""Initialize async TableClient."""

135

136

@classmethod

137

def from_connection_string(

138

cls,

139

conn_str: str,

140

table_name: str,

141

**kwargs

142

) -> "TableClient":

143

"""Create async client from connection string."""

144

145

@classmethod

146

def from_table_url(

147

cls,

148

table_url: str,

149

*,

150

credential=None,

151

**kwargs

152

) -> "TableClient":

153

"""Create async client from table URL."""

154

155

async def create_table(self, **kwargs) -> TableItem:

156

"""Create table asynchronously."""

157

158

async def delete_table(self, **kwargs) -> None:

159

"""Delete table asynchronously."""

160

161

async def create_entity(

162

self,

163

entity: Union[TableEntity, Mapping[str, Any]],

164

**kwargs

165

) -> Dict[str, Any]:

166

"""Create entity asynchronously."""

167

168

async def get_entity(

169

self,

170

partition_key: str,

171

row_key: str,

172

**kwargs

173

) -> TableEntity:

174

"""Get entity asynchronously."""

175

176

async def update_entity(

177

self,

178

entity: Union[TableEntity, Mapping[str, Any]],

179

**kwargs

180

) -> Dict[str, Any]:

181

"""Update entity asynchronously."""

182

183

async def upsert_entity(

184

self,

185

entity: Union[TableEntity, Mapping[str, Any]],

186

**kwargs

187

) -> Dict[str, Any]:

188

"""Upsert entity asynchronously."""

189

190

async def delete_entity(

191

self,

192

partition_key: str = None,

193

row_key: str = None,

194

**kwargs

195

) -> None:

196

"""Delete entity asynchronously."""

197

198

def list_entities(self, **kwargs) -> AsyncItemPaged[TableEntity]:

199

"""List entities with async iteration."""

200

201

def query_entities(self, query_filter: str, **kwargs) -> AsyncItemPaged[TableEntity]:

202

"""Query entities with async iteration."""

203

204

async def submit_transaction(

205

self,

206

operations: Iterable,

207

**kwargs

208

) -> List[Mapping[str, Any]]:

209

"""Submit batch transaction asynchronously."""

210

211

async def get_table_access_policy(self, **kwargs) -> Dict[str, Optional[TableAccessPolicy]]:

212

"""Get access policies asynchronously."""

213

214

async def set_table_access_policy(

215

self,

216

signed_identifiers: Mapping[str, Optional[TableAccessPolicy]],

217

**kwargs

218

) -> None:

219

"""Set access policies asynchronously."""

220

221

async def close(self) -> None:

222

"""Close client and cleanup resources."""

223

224

async def __aenter__(self) -> "TableClient":

225

"""Async context manager entry."""

226

227

async def __aexit__(self, *args) -> None:

228

"""Async context manager exit."""

229

```

230

231

#### Usage Example

232

233

```python

234

import asyncio

235

from azure.data.tables.aio import TableClient

236

from azure.data.tables import TableEntity

237

238

async def async_table_operations():

239

"""Demonstrate async table client operations."""

240

241

async with TableClient.from_connection_string(conn_str, "customers") as table_client:

242

243

# Create multiple entities concurrently

244

entities = [

245

TableEntity(

246

PartitionKey="vip",

247

RowKey=f"customer-{i:03d}",

248

Name=f"Customer {i}",

249

Email=f"customer{i}@example.com",

250

VipLevel="Gold"

251

)

252

for i in range(1, 11)

253

]

254

255

# Create all entities in parallel

256

create_tasks = [

257

table_client.create_entity(entity)

258

for entity in entities

259

]

260

261

results = await asyncio.gather(*create_tasks, return_exceptions=True)

262

successful_creates = sum(1 for r in results if not isinstance(r, Exception))

263

print(f"Created {successful_creates}/{len(entities)} entities concurrently")

264

265

# Query entities asynchronously

266

print("VIP customers:")

267

async for entity in table_client.query_entities("PartitionKey eq 'vip'"):

268

print(f" - {entity['Name']} ({entity['Email']})")

269

270

# Update multiple entities concurrently

271

update_tasks = []

272

async for entity in table_client.query_entities("PartitionKey eq 'vip'"):

273

entity["VipLevel"] = "Platinum"

274

entity["LastUpdated"] = "2023-12-15"

275

update_tasks.append(table_client.update_entity(entity))

276

277

if len(update_tasks) >= 5: # Process in batches

278

break

279

280

update_results = await asyncio.gather(*update_tasks)

281

print(f"Updated {len(update_results)} entities to Platinum level")

282

283

asyncio.run(async_table_operations())

284

```

285

286

### Async Iteration

287

288

Handle large result sets efficiently using async iteration patterns.

289

290

```python { .api }

291

class AsyncItemPaged:

292

"""

293

Async iterator for paged results from Azure Tables.

294

295

Supports async iteration over large result sets

296

with automatic paging and efficient memory usage.

297

"""

298

299

def __aiter__(self) -> AsyncIterator[T]:

300

"""Return async iterator."""

301

302

async def __anext__(self) -> T:

303

"""Get next item asynchronously."""

304

305

def by_page(self) -> AsyncIterator[AsyncIterator[T]]:

306

"""Iterate by pages for batch processing."""

307

```

308

309

#### Usage Example

310

311

```python

312

import asyncio

313

from azure.data.tables.aio import TableClient

314

315

async def async_iteration_patterns():

316

"""Demonstrate various async iteration patterns."""

317

318

async with TableClient.from_connection_string(conn_str, "large_dataset") as table_client:

319

320

# Basic async iteration

321

print("Processing all entities:")

322

entity_count = 0

323

async for entity in table_client.list_entities():

324

entity_count += 1

325

if entity_count % 1000 == 0:

326

print(f" Processed {entity_count} entities...")

327

328

print(f"Total entities processed: {entity_count}")

329

330

# Page-by-page processing for memory efficiency

331

print("Processing by pages:")

332

page_count = 0

333

total_entities = 0

334

335

async for page in table_client.list_entities().by_page(results_per_page=500):

336

page_count += 1

337

page_entities = []

338

339

async for entity in page:

340

page_entities.append(entity)

341

342

total_entities += len(page_entities)

343

print(f" Page {page_count}: {len(page_entities)} entities")

344

345

# Process page batch (e.g., bulk operations)

346

await process_entity_batch(page_entities)

347

348

print(f"Processed {total_entities} entities across {page_count} pages")

349

350

# Filtered async iteration with query

351

print("Processing filtered results:")

352

async for entity in table_client.query_entities(

353

"PartitionKey eq 'active' and Status eq 'pending'"

354

):

355

await process_pending_entity(entity)

356

357

async def process_entity_batch(entities):

358

"""Process a batch of entities asynchronously."""

359

# Simulate async processing

360

await asyncio.sleep(0.1)

361

print(f" Processed batch of {len(entities)} entities")

362

363

async def process_pending_entity(entity):

364

"""Process individual pending entity."""

365

# Simulate async processing

366

await asyncio.sleep(0.01)

367

368

asyncio.run(async_iteration_patterns())

369

```

370

371

### Concurrent Operations

372

373

Leverage async capabilities for high-performance concurrent operations.

374

375

#### Concurrent Entity Operations

376

377

```python

378

import asyncio

379

from azure.data.tables.aio import TableClient

380

from azure.data.tables import TableEntity

381

382

async def concurrent_entity_operations():

383

"""Demonstrate concurrent entity operations for high throughput."""

384

385

async with TableClient.from_connection_string(conn_str, "high_throughput") as table_client:

386

387

# Concurrent creates with rate limiting

388

semaphore = asyncio.Semaphore(10) # Limit concurrent operations

389

390

async def create_entity_with_limit(entity_data):

391

async with semaphore:

392

entity = TableEntity(**entity_data)

393

return await table_client.create_entity(entity)

394

395

# Generate entity data

396

entity_data_list = [

397

{

398

"PartitionKey": f"partition-{i // 100}",

399

"RowKey": f"entity-{i:06d}",

400

"Value": i,

401

"Category": f"cat-{i % 10}",

402

"Active": i % 2 == 0

403

}

404

for i in range(1000)

405

]

406

407

# Create entities concurrently with rate limiting

408

print("Creating entities with concurrency control...")

409

start_time = asyncio.get_event_loop().time()

410

411

create_tasks = [

412

create_entity_with_limit(data)

413

for data in entity_data_list

414

]

415

416

results = await asyncio.gather(*create_tasks, return_exceptions=True)

417

418

end_time = asyncio.get_event_loop().time()

419

successful_creates = sum(1 for r in results if not isinstance(r, Exception))

420

421

print(f"Created {successful_creates}/{len(entity_data_list)} entities")

422

print(f"Time: {end_time - start_time:.2f}s")

423

print(f"Throughput: {successful_creates / (end_time - start_time):.1f} entities/sec")

424

425

asyncio.run(concurrent_entity_operations())

426

```

427

428

#### Async Batch Processing

429

430

```python

431

import asyncio

432

from azure.data.tables.aio import TableClient

433

from typing import List, Dict, Any

434

435

async def async_batch_processing():

436

"""Process multiple batches concurrently across partitions."""

437

438

async with TableClient.from_connection_string(conn_str, "batch_demo") as table_client:

439

440

# Prepare batch data for different partitions

441

partition_batches = {

442

"batch-1": [

443

{"PartitionKey": "batch-1", "RowKey": f"item-{i}", "Value": i}

444

for i in range(50)

445

],

446

"batch-2": [

447

{"PartitionKey": "batch-2", "RowKey": f"item-{i}", "Value": i}

448

for i in range(50)

449

],

450

"batch-3": [

451

{"PartitionKey": "batch-3", "RowKey": f"item-{i}", "Value": i}

452

for i in range(50)

453

]

454

}

455

456

async def process_partition_batch(partition_key: str, entities: List[Dict]):

457

"""Process a single partition batch."""

458

operations = [("create", entity) for entity in entities]

459

460

try:

461

result = await table_client.submit_transaction(operations)

462

print(f"Partition {partition_key}: {len(result)} entities created")

463

return len(result)

464

except Exception as e:

465

print(f"Partition {partition_key} failed: {e}")

466

return 0

467

468

# Process all partition batches concurrently

469

batch_tasks = [

470

process_partition_batch(partition_key, entities)

471

for partition_key, entities in partition_batches.items()

472

]

473

474

results = await asyncio.gather(*batch_tasks)

475

total_processed = sum(results)

476

477

print(f"Concurrent batch processing completed: {total_processed} total entities")

478

479

asyncio.run(async_batch_processing())

480

```

481

482

### Error Handling in Async Context

483

484

Handle exceptions and errors properly in asynchronous operations.

485

486

#### Async Error Handling Patterns

487

488

```python

489

import asyncio

490

from azure.data.tables.aio import TableClient

491

from azure.data.tables import TableTransactionError

492

from azure.core.exceptions import ResourceNotFoundError, ServiceRequestError

493

494

async def async_error_handling():

495

"""Demonstrate error handling patterns in async operations."""

496

497

async with TableClient.from_connection_string(conn_str, "error_demo") as table_client:

498

499

# Retry with exponential backoff (async version)

500

async def async_retry_with_backoff(coro_func, max_retries=3):

501

for attempt in range(max_retries):

502

try:

503

return await coro_func()

504

except ServiceRequestError as e:

505

if attempt == max_retries - 1:

506

raise

507

508

delay = 2 ** attempt # Exponential backoff

509

print(f"Attempt {attempt + 1} failed, retrying in {delay}s")

510

await asyncio.sleep(delay)

511

512

# Concurrent operations with individual error handling

513

async def safe_create_entity(entity_data):

514

try:

515

return await table_client.create_entity(entity_data)

516

except Exception as e:

517

print(f"Failed to create entity {entity_data.get('RowKey')}: {e}")

518

return None

519

520

# Process entities with error isolation

521

entities = [

522

{"PartitionKey": "safe", "RowKey": f"item-{i}", "Value": i}

523

for i in range(20)

524

]

525

526

# Add some problematic entities

527

entities.extend([

528

{"PartitionKey": "safe", "RowKey": "item-5", "Value": 999}, # Duplicate

529

{"PartitionKey": "safe"}, # Missing RowKey

530

])

531

532

# Create all entities concurrently with error handling

533

create_tasks = [safe_create_entity(entity) for entity in entities]

534

results = await asyncio.gather(*create_tasks)

535

536

successful_creates = [r for r in results if r is not None]

537

print(f"Successfully created {len(successful_creates)} entities")

538

539

# Batch operations with error recovery

540

async def robust_batch_operation(operations):

541

try:

542

return await table_client.submit_transaction(operations)

543

except TableTransactionError as e:

544

print(f"Batch failed at operation {e.index}: {e.message}")

545

546

# Execute operations individually as fallback

547

results = []

548

for i, (op_type, entity) in enumerate(operations):

549

if i == e.index:

550

results.append(None) # Skip failed operation

551

continue

552

553

try:

554

if op_type == "create":

555

result = await table_client.create_entity(entity)

556

elif op_type == "update":

557

result = await table_client.update_entity(entity)

558

# Add other operation types as needed

559

560

results.append(result)

561

except Exception as individual_error:

562

print(f"Individual operation {i} also failed: {individual_error}")

563

results.append(None)

564

565

return results

566

567

# Test batch with error recovery

568

batch_operations = [

569

("create", {"PartitionKey": "batch", "RowKey": f"item-{i}", "Value": i})

570

for i in range(10)

571

]

572

573

batch_results = await robust_batch_operation(batch_operations)

574

successful_batch = [r for r in batch_results if r is not None]

575

print(f"Batch processing: {len(successful_batch)} operations succeeded")

576

577

asyncio.run(async_error_handling())

578

```

579

580

### Context Management

581

582

Proper resource management using async context managers.

583

584

#### Resource Management Patterns

585

586

```python

587

import asyncio

588

from azure.data.tables.aio import TableServiceClient, TableClient

589

590

async def resource_management_patterns():

591

"""Demonstrate proper async resource management."""

592

593

# Pattern 1: Service client context manager

594

async with TableServiceClient.from_connection_string(conn_str) as service_client:

595

596

# Create table client from service client

597

table_client = service_client.get_table_client("managed_table")

598

599

# Use table client within service client context

600

async with table_client:

601

await table_client.create_table()

602

603

entity = {"PartitionKey": "test", "RowKey": "001", "Value": "data"}

604

await table_client.create_entity(entity)

605

606

async for entity in table_client.list_entities():

607

print(f"Entity: {entity['RowKey']}")

608

609

# Pattern 2: Multiple concurrent clients with proper cleanup

610

async def managed_concurrent_operations():

611

clients = []

612

try:

613

# Create multiple clients

614

for i in range(3):

615

client = TableClient.from_connection_string(conn_str, f"table_{i}")

616

clients.append(client)

617

618

# Use clients concurrently

619

create_tasks = [

620

client.create_table()

621

for client in clients

622

]

623

624

await asyncio.gather(*create_tasks, return_exceptions=True)

625

626

# Perform operations

627

operation_tasks = []

628

for i, client in enumerate(clients):

629

entity = {"PartitionKey": "concurrent", "RowKey": f"item-{i}", "Value": i}

630

operation_tasks.append(client.create_entity(entity))

631

632

await asyncio.gather(*operation_tasks)

633

634

finally:

635

# Ensure all clients are properly closed

636

close_tasks = [client.close() for client in clients]

637

await asyncio.gather(*close_tasks, return_exceptions=True)

638

639

await managed_concurrent_operations()

640

641

# Pattern 3: Long-lived client with proper lifecycle

642

class AsyncTableManager:

643

def __init__(self, connection_string: str, table_name: str):

644

self.connection_string = connection_string

645

self.table_name = table_name

646

self.client = None

647

648

async def start(self):

649

"""Initialize the async client."""

650

self.client = TableClient.from_connection_string(

651

self.connection_string,

652

self.table_name

653

)

654

await self.client.create_table()

655

656

async def stop(self):

657

"""Cleanup the async client."""

658

if self.client:

659

await self.client.close()

660

661

async def process_entities(self, entities):

662

"""Process entities with the managed client."""

663

if not self.client:

664

raise RuntimeError("Manager not started")

665

666

tasks = [

667

self.client.create_entity(entity)

668

for entity in entities

669

]

670

671

return await asyncio.gather(*tasks, return_exceptions=True)

672

673

# Use the managed client

674

manager = AsyncTableManager(conn_str, "managed_operations")

675

try:

676

await manager.start()

677

678

entities = [

679

{"PartitionKey": "managed", "RowKey": f"item-{i}", "Value": i}

680

for i in range(10)

681

]

682

683

results = await manager.process_entities(entities)

684

successful = sum(1 for r in results if not isinstance(r, Exception))

685

print(f"Managed processing: {successful}/{len(entities)} entities")

686

687

finally:

688

await manager.stop()

689

690

asyncio.run(resource_management_patterns())

691

```

692

693

## Performance Considerations

694

695

### Async Best Practices

696

697

1. **Connection Pooling**: Async clients automatically manage connection pools

698

2. **Concurrency Limits**: Use semaphores to control concurrent operations

699

3. **Resource Management**: Always use context managers or explicit cleanup

700

4. **Error Isolation**: Handle exceptions in concurrent operations individually

701

5. **Batch Optimization**: Process multiple partitions concurrently

702

6. **Memory Management**: Use async iteration for large result sets

703

704

### Example High-Performance Pattern

705

706

```python

707

import asyncio

708

from azure.data.tables.aio import TableClient

709

from contextlib import AsyncExitStack

710

711

async def high_performance_pattern():

712

"""Optimized pattern for high-throughput async operations."""

713

714

# Configuration

715

MAX_CONCURRENT_OPERATIONS = 50

716

BATCH_SIZE = 100

717

TABLE_COUNT = 5

718

719

semaphore = asyncio.Semaphore(MAX_CONCURRENT_OPERATIONS)

720

721

async with AsyncExitStack() as stack:

722

# Create multiple table clients

723

clients = []

724

for i in range(TABLE_COUNT):

725

client = await stack.enter_async_context(

726

TableClient.from_connection_string(conn_str, f"perf_table_{i}")

727

)

728

clients.append(client)

729

730

# High-throughput entity creation

731

async def create_entity_throttled(client, entity):

732

async with semaphore:

733

return await client.create_entity(entity)

734

735

# Generate workload across multiple tables

736

all_tasks = []

737

for client_idx, client in enumerate(clients):

738

for batch_idx in range(10): # 10 batches per table

739

entities = [

740

{

741

"PartitionKey": f"perf-{batch_idx}",

742

"RowKey": f"item-{i:06d}",

743

"TableIndex": client_idx,

744

"BatchIndex": batch_idx,

745

"Value": i

746

}

747

for i in range(BATCH_SIZE)

748

]

749

750

# Add tasks for this batch

751

for entity in entities:

752

task = create_entity_throttled(client, entity)

753

all_tasks.append(task)

754

755

# Execute all operations concurrently

756

print(f"Executing {len(all_tasks)} operations across {TABLE_COUNT} tables...")

757

start_time = asyncio.get_event_loop().time()

758

759

results = await asyncio.gather(*all_tasks, return_exceptions=True)

760

761

end_time = asyncio.get_event_loop().time()

762

successful = sum(1 for r in results if not isinstance(r, Exception))

763

764

print(f"Completed: {successful}/{len(all_tasks)} operations")

765

print(f"Time: {end_time - start_time:.2f}s")

766

print(f"Throughput: {successful / (end_time - start_time):.1f} ops/sec")

767

768

# Run high-performance example

769

asyncio.run(high_performance_pattern())

770

```