or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdauthentication.mdindex.mdmessage-operations.mdmodels-config.mdqueue-operations.mdqueue-service.md

async-operations.mddocs/

0

# Async Operations

1

2

Asynchronous implementations of all queue and message operations for high-performance applications requiring non-blocking I/O. The async clients provide the same functionality as their synchronous counterparts with async/await support.

3

4

## Capabilities

5

6

### Async Client Creation

7

8

Create async clients using the aio module with identical authentication methods as synchronous clients.

9

10

```python { .api }

11

from azure.storage.queue.aio import QueueServiceClient, QueueClient

12

13

class QueueServiceClient: # from azure.storage.queue.aio

14

def __init__(

15

self,

16

account_url: str,

17

credential=None,

18

**kwargs

19

):

20

"""

21

Create async QueueServiceClient.

22

23

Parameters:

24

- account_url: Queue service endpoint URL

25

- credential: Authentication credential

26

"""

27

28

@classmethod

29

def from_connection_string(

30

cls,

31

conn_str: str,

32

credential=None,

33

**kwargs

34

) -> 'QueueServiceClient':

35

"""Create async client from connection string."""

36

37

class QueueClient: # from azure.storage.queue.aio

38

def __init__(

39

self,

40

account_url: str,

41

queue_name: str,

42

credential=None,

43

**kwargs

44

):

45

"""

46

Create async QueueClient.

47

48

Parameters:

49

- account_url: Queue service endpoint URL

50

- queue_name: Target queue name

51

- credential: Authentication credential

52

"""

53

54

@classmethod

55

def from_queue_url(cls, queue_url: str, credential=None, **kwargs) -> 'QueueClient': ...

56

57

@classmethod

58

def from_connection_string(cls, conn_str: str, queue_name: str, credential=None, **kwargs) -> 'QueueClient': ...

59

```

60

61

### Async Service Operations

62

63

Asynchronous account-level operations for queue management and service configuration.

64

65

```python { .api }

66

# QueueServiceClient async methods

67

async def list_queues(

68

self,

69

name_starts_with: Optional[str] = None,

70

include_metadata: bool = False,

71

**kwargs

72

) -> AsyncItemPaged[QueueProperties]:

73

"""

74

Asynchronously list queues with automatic pagination.

75

76

Parameters:

77

- name_starts_with: Filter by queue name prefix

78

- include_metadata: Include queue metadata

79

80

Returns:

81

AsyncItemPaged for iterating over QueueProperties

82

"""

83

84

async def create_queue(

85

self,

86

name: str,

87

metadata: Optional[Dict[str, str]] = None,

88

**kwargs

89

) -> QueueClient:

90

"""

91

Asynchronously create a queue.

92

93

Parameters:

94

- name: Queue name

95

- metadata: Optional metadata

96

97

Returns:

98

QueueClient for the created queue

99

"""

100

101

async def delete_queue(

102

self,

103

queue: Union[str, QueueProperties],

104

**kwargs

105

) -> None:

106

"""

107

Asynchronously delete a queue.

108

109

Parameters:

110

- queue: Queue name or QueueProperties

111

"""

112

113

async def get_service_properties(self, **kwargs) -> Dict[str, Any]:

114

"""Get service properties asynchronously."""

115

116

async def set_service_properties(self, **kwargs) -> None:

117

"""Set service properties asynchronously."""

118

119

async def get_service_stats(self, **kwargs) -> Dict[str, Any]:

120

"""Get service statistics asynchronously."""

121

```

122

123

### Async Queue Operations

124

125

Asynchronous queue-specific operations for properties, metadata, and access policies.

126

127

```python { .api }

128

# QueueClient async methods

129

async def create_queue(self, *, metadata: Optional[Dict[str, str]] = None, **kwargs) -> None:

130

"""Asynchronously create the queue."""

131

132

async def delete_queue(self, **kwargs) -> None:

133

"""Asynchronously delete the queue."""

134

135

async def get_queue_properties(self, **kwargs) -> QueueProperties:

136

"""Asynchronously get queue properties."""

137

138

async def set_queue_metadata(

139

self,

140

metadata: Optional[Dict[str, str]] = None,

141

**kwargs

142

) -> Dict[str, Any]:

143

"""Asynchronously set queue metadata."""

144

145

async def get_queue_access_policy(self, **kwargs) -> Dict[str, AccessPolicy]:

146

"""Asynchronously get queue access policies."""

147

148

async def set_queue_access_policy(

149

self,

150

signed_identifiers: Dict[str, AccessPolicy],

151

**kwargs

152

) -> None:

153

"""Asynchronously set queue access policies."""

154

```

155

156

### Async Message Operations

157

158

Asynchronous message operations for sending, receiving, updating, and deleting messages.

159

160

```python { .api }

161

async def send_message(

162

self,

163

content: Any,

164

*,

165

visibility_timeout: Optional[int] = None,

166

time_to_live: Optional[int] = None,

167

**kwargs

168

) -> QueueMessage:

169

"""

170

Asynchronously send a message to the queue.

171

172

Parameters:

173

- content: Message content

174

- visibility_timeout: Seconds before message becomes visible

175

- time_to_live: Message expiration in seconds

176

177

Returns:

178

QueueMessage with send details

179

"""

180

181

async def receive_message(

182

self,

183

*,

184

visibility_timeout: Optional[int] = None,

185

**kwargs

186

) -> Optional[QueueMessage]:

187

"""

188

Asynchronously receive a single message.

189

190

Parameters:

191

- visibility_timeout: Message invisibility duration

192

193

Returns:

194

QueueMessage if available, None if queue empty

195

"""

196

197

def receive_messages(

198

self,

199

*,

200

messages_per_page: Optional[int] = None,

201

visibility_timeout: Optional[int] = None,

202

max_messages: Optional[int] = None,

203

**kwargs

204

) -> AsyncItemPaged[QueueMessage]:

205

"""

206

Get async iterator for receiving multiple messages.

207

208

Parameters:

209

- messages_per_page: Messages per page (1-32)

210

- visibility_timeout: Message invisibility duration

211

- max_messages: Maximum total messages

212

213

Returns:

214

AsyncItemPaged for iterating over QueueMessage objects

215

"""

216

217

async def update_message(

218

self,

219

message: Union[QueueMessage, str],

220

pop_receipt: Optional[str] = None,

221

content: Optional[Any] = None,

222

*,

223

visibility_timeout: Optional[int] = None,

224

**kwargs

225

) -> QueueMessage:

226

"""

227

Asynchronously update message content and/or visibility.

228

229

Parameters:

230

- message: QueueMessage object or message ID

231

- pop_receipt: Message pop receipt (required if message is ID)

232

- content: New message content

233

- visibility_timeout: New visibility timeout

234

235

Returns:

236

Updated QueueMessage

237

"""

238

239

async def peek_messages(

240

self,

241

max_messages: Optional[int] = None,

242

**kwargs

243

) -> List[QueueMessage]:

244

"""

245

Asynchronously peek at messages without dequeuing.

246

247

Parameters:

248

- max_messages: Maximum messages to peek (1-32)

249

250

Returns:

251

List of QueueMessage objects

252

"""

253

254

async def delete_message(

255

self,

256

message: Union[QueueMessage, str],

257

pop_receipt: Optional[str] = None,

258

**kwargs

259

) -> None:

260

"""

261

Asynchronously delete a message.

262

263

Parameters:

264

- message: QueueMessage object or message ID

265

- pop_receipt: Message pop receipt (required if message is ID)

266

"""

267

268

async def clear_messages(self, **kwargs) -> None:

269

"""Asynchronously clear all messages from the queue."""

270

```

271

272

## Usage Examples

273

274

### Basic Async Client Usage

275

276

```python

277

import asyncio

278

from azure.storage.queue.aio import QueueServiceClient, QueueClient

279

280

async def basic_async_operations():

281

# Create async service client

282

service_client = QueueServiceClient.from_connection_string(conn_str)

283

284

try:

285

# Create queue asynchronously

286

queue_client = await service_client.create_queue("async-queue")

287

288

# Send message asynchronously

289

message = await queue_client.send_message("Hello async world!")

290

print(f"Sent message: {message.id}")

291

292

# Receive message asynchronously

293

received = await queue_client.receive_message()

294

if received:

295

print(f"Received: {received.content}")

296

await queue_client.delete_message(received)

297

298

# Clean up

299

await service_client.delete_queue("async-queue")

300

301

finally:

302

# Close client connections

303

await service_client.close()

304

305

# Run async function

306

asyncio.run(basic_async_operations())

307

```

308

309

### Async Message Processing

310

311

```python

312

import asyncio

313

from azure.storage.queue.aio import QueueClient

314

315

async def process_messages_async():

316

queue_client = QueueClient.from_connection_string(conn_str, "work-queue")

317

318

try:

319

while True:

320

# Receive messages asynchronously

321

messages = queue_client.receive_messages(messages_per_page=10)

322

323

batch = []

324

async for message in messages:

325

batch.append(message)

326

if len(batch) >= 10:

327

break

328

329

if not batch:

330

print("No messages available")

331

break

332

333

# Process messages concurrently

334

tasks = [process_single_message_async(queue_client, msg) for msg in batch]

335

await asyncio.gather(*tasks, return_exceptions=True)

336

337

finally:

338

await queue_client.close()

339

340

async def process_single_message_async(queue_client, message):

341

try:

342

# Simulate async processing

343

await asyncio.sleep(0.1)

344

print(f"Processed message: {message.content}")

345

346

# Delete message after successful processing

347

await queue_client.delete_message(message)

348

349

except Exception as e:

350

print(f"Failed to process message {message.id}: {e}")

351

# Message will become visible again after timeout

352

353

asyncio.run(process_messages_async())

354

```

355

356

### Async Batch Operations

357

358

```python

359

import asyncio

360

from azure.storage.queue.aio import QueueClient

361

362

async def batch_send_messages():

363

queue_client = QueueClient.from_connection_string(conn_str, "batch-queue")

364

365

try:

366

# Send multiple messages concurrently

367

messages_to_send = [f"Message {i}" for i in range(100)]

368

369

tasks = [

370

queue_client.send_message(content)

371

for content in messages_to_send

372

]

373

374

# Send all messages concurrently

375

results = await asyncio.gather(*tasks, return_exceptions=True)

376

377

successful = sum(1 for r in results if not isinstance(r, Exception))

378

failed = len(results) - successful

379

380

print(f"Sent {successful} messages successfully, {failed} failed")

381

382

finally:

383

await queue_client.close()

384

385

asyncio.run(batch_send_messages())

386

```

387

388

### Async with Context Managers

389

390

```python

391

import asyncio

392

from azure.storage.queue.aio import QueueServiceClient

393

394

async def context_manager_usage():

395

# Use async context manager for automatic cleanup

396

async with QueueServiceClient.from_connection_string(conn_str) as service_client:

397

# List queues asynchronously

398

queues = service_client.list_queues(include_metadata=True)

399

400

async for queue in queues:

401

print(f"Queue: {queue.name}, Messages: {queue.approximate_message_count}")

402

403

# Get queue client and process messages

404

async with service_client.get_queue_client(queue.name) as queue_client:

405

# Peek at messages

406

messages = await queue_client.peek_messages(max_messages=5)

407

print(f" Found {len(messages)} messages")

408

409

asyncio.run(context_manager_usage())

410

```

411

412

### Async Error Handling

413

414

```python

415

import asyncio

416

from azure.storage.queue.aio import QueueClient

417

from azure.core.exceptions import ResourceNotFoundError, HttpResponseError

418

419

async def async_error_handling():

420

queue_client = QueueClient.from_connection_string(conn_str, "error-test-queue")

421

422

try:

423

# Try to receive from non-existent queue

424

message = await queue_client.receive_message()

425

426

except ResourceNotFoundError:

427

print("Queue doesn't exist, creating it...")

428

await queue_client.create_queue()

429

430

except HttpResponseError as e:

431

print(f"HTTP error occurred: {e.status_code} - {e.error_code}")

432

433

except Exception as e:

434

print(f"Unexpected error: {type(e).__name__}: {e}")

435

436

finally:

437

await queue_client.close()

438

439

asyncio.run(async_error_handling())

440

```

441

442

### High-Performance Async Producer-Consumer

443

444

```python

445

import asyncio

446

from azure.storage.queue.aio import QueueClient

447

448

class AsyncQueueProducer:

449

def __init__(self, connection_string: str, queue_name: str):

450

self.client = QueueClient.from_connection_string(connection_string, queue_name)

451

452

async def produce_messages(self, message_count: int):

453

try:

454

tasks = []

455

for i in range(message_count):

456

task = self.client.send_message(f"Message {i}")

457

tasks.append(task)

458

459

# Send in batches to avoid overwhelming the service

460

if len(tasks) >= 50:

461

await asyncio.gather(*tasks)

462

tasks = []

463

464

# Send remaining messages

465

if tasks:

466

await asyncio.gather(*tasks)

467

468

finally:

469

await self.client.close()

470

471

class AsyncQueueConsumer:

472

def __init__(self, connection_string: str, queue_name: str):

473

self.client = QueueClient.from_connection_string(connection_string, queue_name)

474

475

async def consume_messages(self, max_messages: int = None):

476

processed = 0

477

try:

478

while max_messages is None or processed < max_messages:

479

# Receive batch of messages

480

messages = self.client.receive_messages(messages_per_page=32)

481

482

batch = []

483

async for message in messages:

484

batch.append(message)

485

486

if not batch:

487

break

488

489

# Process messages concurrently

490

tasks = [self._process_message(msg) for msg in batch]

491

await asyncio.gather(*tasks, return_exceptions=True)

492

493

processed += len(batch)

494

495

finally:

496

await self.client.close()

497

498

async def _process_message(self, message):

499

try:

500

# Simulate processing

501

await asyncio.sleep(0.01)

502

print(f"Processed: {message.content}")

503

504

# Delete after processing

505

await self.client.delete_message(message)

506

507

except Exception as e:

508

print(f"Failed to process {message.id}: {e}")

509

510

async def run_producer_consumer():

511

# Run producer and consumer concurrently

512

producer = AsyncQueueProducer(conn_str, "perf-test-queue")

513

consumer = AsyncQueueConsumer(conn_str, "perf-test-queue")

514

515

await asyncio.gather(

516

producer.produce_messages(1000),

517

consumer.consume_messages(1000)

518

)

519

520

asyncio.run(run_producer_consumer())

521

```

522

523

## Types

524

525

### Async-Specific Types

526

527

```python { .api }

528

from azure.core.async_paging import AsyncItemPaged

529

from azure.core.paging import ItemPaged

530

from typing import AsyncIterator, AsyncContextManager

531

532

# Async pagination support (async versions return AsyncItemPaged)

533

AsyncItemPaged[QueueProperties] # For async list_queues()

534

AsyncItemPaged[QueueMessage] # For async receive_messages()

535

536

# Sync versions return ItemPaged for comparison:

537

ItemPaged[QueueProperties] # For sync list_queues()

538

ItemPaged[QueueMessage] # For sync receive_messages()

539

540

# Context manager protocols

541

AsyncContextManager[QueueServiceClient] # For automatic resource cleanup

542

AsyncContextManager[QueueClient] # For automatic resource cleanup

543

544

# Async iterator protocols

545

AsyncIterator[QueueProperties] # From async list_queues().by_page()

546

AsyncIterator[QueueMessage] # From async receive_messages()

547

```

548

549

### Installation Requirements

550

551

```bash

552

# For async support, install with aio extra:

553

pip install azure-storage-queue[aio]

554

555

# This installs additional dependencies:

556

# - azure-core[aio]>=1.30.0

557

# - aiohttp (for async HTTP operations)

558

```

559

560

### Performance Considerations

561

562

```python { .api }

563

# Async performance best practices:

564

565

# 1. Use connection pooling (automatically handled by azure-core)

566

# 2. Batch operations when possible (send/receive multiple messages)

567

# 3. Use asyncio.gather() for concurrent operations

568

# 4. Close clients explicitly or use async context managers

569

# 5. Limit concurrent operations to avoid overwhelming the service

570

571

# Recommended concurrency limits:

572

RECOMMENDED_MAX_CONCURRENT_SENDS = 100

573

RECOMMENDED_MAX_CONCURRENT_RECEIVES = 50

574

RECOMMENDED_BATCH_SIZE = 32 # Maximum messages per receive operation

575

```