0
# Async Operations
1
2
Asynchronous implementations of all queue and message operations for high-performance applications requiring non-blocking I/O. The async clients provide the same functionality as their synchronous counterparts with async/await support.
3
4
## Capabilities
5
6
### Async Client Creation
7
8
Create async clients using the aio module with identical authentication methods as synchronous clients.
9
10
```python { .api }
11
from azure.storage.queue.aio import QueueServiceClient, QueueClient
12
13
class QueueServiceClient: # from azure.storage.queue.aio
14
def __init__(
15
self,
16
account_url: str,
17
credential=None,
18
**kwargs
19
):
20
"""
21
Create async QueueServiceClient.
22
23
Parameters:
24
- account_url: Queue service endpoint URL
25
- credential: Authentication credential
26
"""
27
28
@classmethod
29
def from_connection_string(
30
cls,
31
conn_str: str,
32
credential=None,
33
**kwargs
34
) -> 'QueueServiceClient':
35
"""Create async client from connection string."""
36
37
class QueueClient: # from azure.storage.queue.aio
38
def __init__(
39
self,
40
account_url: str,
41
queue_name: str,
42
credential=None,
43
**kwargs
44
):
45
"""
46
Create async QueueClient.
47
48
Parameters:
49
- account_url: Queue service endpoint URL
50
- queue_name: Target queue name
51
- credential: Authentication credential
52
"""
53
54
@classmethod
55
def from_queue_url(cls, queue_url: str, credential=None, **kwargs) -> 'QueueClient': ...
56
57
@classmethod
58
def from_connection_string(cls, conn_str: str, queue_name: str, credential=None, **kwargs) -> 'QueueClient': ...
59
```
60
61
### Async Service Operations
62
63
Asynchronous account-level operations for queue management and service configuration.
64
65
```python { .api }
66
# QueueServiceClient async methods
67
async def list_queues(
68
self,
69
name_starts_with: Optional[str] = None,
70
include_metadata: bool = False,
71
**kwargs
72
) -> AsyncItemPaged[QueueProperties]:
73
"""
74
Asynchronously list queues with automatic pagination.
75
76
Parameters:
77
- name_starts_with: Filter by queue name prefix
78
- include_metadata: Include queue metadata
79
80
Returns:
81
AsyncItemPaged for iterating over QueueProperties
82
"""
83
84
async def create_queue(
85
self,
86
name: str,
87
metadata: Optional[Dict[str, str]] = None,
88
**kwargs
89
) -> QueueClient:
90
"""
91
Asynchronously create a queue.
92
93
Parameters:
94
- name: Queue name
95
- metadata: Optional metadata
96
97
Returns:
98
QueueClient for the created queue
99
"""
100
101
async def delete_queue(
102
self,
103
queue: Union[str, QueueProperties],
104
**kwargs
105
) -> None:
106
"""
107
Asynchronously delete a queue.
108
109
Parameters:
110
- queue: Queue name or QueueProperties
111
"""
112
113
async def get_service_properties(self, **kwargs) -> Dict[str, Any]:
114
"""Get service properties asynchronously."""
115
116
async def set_service_properties(self, **kwargs) -> None:
117
"""Set service properties asynchronously."""
118
119
async def get_service_stats(self, **kwargs) -> Dict[str, Any]:
120
"""Get service statistics asynchronously."""
121
```
122
123
### Async Queue Operations
124
125
Asynchronous queue-specific operations for properties, metadata, and access policies.
126
127
```python { .api }
128
# QueueClient async methods
129
async def create_queue(self, *, metadata: Optional[Dict[str, str]] = None, **kwargs) -> None:
130
"""Asynchronously create the queue."""
131
132
async def delete_queue(self, **kwargs) -> None:
133
"""Asynchronously delete the queue."""
134
135
async def get_queue_properties(self, **kwargs) -> QueueProperties:
136
"""Asynchronously get queue properties."""
137
138
async def set_queue_metadata(
139
self,
140
metadata: Optional[Dict[str, str]] = None,
141
**kwargs
142
) -> Dict[str, Any]:
143
"""Asynchronously set queue metadata."""
144
145
async def get_queue_access_policy(self, **kwargs) -> Dict[str, AccessPolicy]:
146
"""Asynchronously get queue access policies."""
147
148
async def set_queue_access_policy(
149
self,
150
signed_identifiers: Dict[str, AccessPolicy],
151
**kwargs
152
) -> None:
153
"""Asynchronously set queue access policies."""
154
```
155
156
### Async Message Operations
157
158
Asynchronous message operations for sending, receiving, updating, and deleting messages.
159
160
```python { .api }
161
async def send_message(
162
self,
163
content: Any,
164
*,
165
visibility_timeout: Optional[int] = None,
166
time_to_live: Optional[int] = None,
167
**kwargs
168
) -> QueueMessage:
169
"""
170
Asynchronously send a message to the queue.
171
172
Parameters:
173
- content: Message content
174
- visibility_timeout: Seconds before message becomes visible
175
- time_to_live: Message expiration in seconds
176
177
Returns:
178
QueueMessage with send details
179
"""
180
181
async def receive_message(
182
self,
183
*,
184
visibility_timeout: Optional[int] = None,
185
**kwargs
186
) -> Optional[QueueMessage]:
187
"""
188
Asynchronously receive a single message.
189
190
Parameters:
191
- visibility_timeout: Message invisibility duration
192
193
Returns:
194
QueueMessage if available, None if queue empty
195
"""
196
197
def receive_messages(
198
self,
199
*,
200
messages_per_page: Optional[int] = None,
201
visibility_timeout: Optional[int] = None,
202
max_messages: Optional[int] = None,
203
**kwargs
204
) -> AsyncItemPaged[QueueMessage]:
205
"""
206
Get async iterator for receiving multiple messages.
207
208
Parameters:
209
- messages_per_page: Messages per page (1-32)
210
- visibility_timeout: Message invisibility duration
211
- max_messages: Maximum total messages
212
213
Returns:
214
AsyncItemPaged for iterating over QueueMessage objects
215
"""
216
217
async def update_message(
218
self,
219
message: Union[QueueMessage, str],
220
pop_receipt: Optional[str] = None,
221
content: Optional[Any] = None,
222
*,
223
visibility_timeout: Optional[int] = None,
224
**kwargs
225
) -> QueueMessage:
226
"""
227
Asynchronously update message content and/or visibility.
228
229
Parameters:
230
- message: QueueMessage object or message ID
231
- pop_receipt: Message pop receipt (required if message is ID)
232
- content: New message content
233
- visibility_timeout: New visibility timeout
234
235
Returns:
236
Updated QueueMessage
237
"""
238
239
async def peek_messages(
240
self,
241
max_messages: Optional[int] = None,
242
**kwargs
243
) -> List[QueueMessage]:
244
"""
245
Asynchronously peek at messages without dequeuing.
246
247
Parameters:
248
- max_messages: Maximum messages to peek (1-32)
249
250
Returns:
251
List of QueueMessage objects
252
"""
253
254
async def delete_message(
255
self,
256
message: Union[QueueMessage, str],
257
pop_receipt: Optional[str] = None,
258
**kwargs
259
) -> None:
260
"""
261
Asynchronously delete a message.
262
263
Parameters:
264
- message: QueueMessage object or message ID
265
- pop_receipt: Message pop receipt (required if message is ID)
266
"""
267
268
async def clear_messages(self, **kwargs) -> None:
269
"""Asynchronously clear all messages from the queue."""
270
```
271
272
## Usage Examples
273
274
### Basic Async Client Usage
275
276
```python
277
import asyncio
278
from azure.storage.queue.aio import QueueServiceClient, QueueClient
279
280
async def basic_async_operations():
281
# Create async service client
282
service_client = QueueServiceClient.from_connection_string(conn_str)
283
284
try:
285
# Create queue asynchronously
286
queue_client = await service_client.create_queue("async-queue")
287
288
# Send message asynchronously
289
message = await queue_client.send_message("Hello async world!")
290
print(f"Sent message: {message.id}")
291
292
# Receive message asynchronously
293
received = await queue_client.receive_message()
294
if received:
295
print(f"Received: {received.content}")
296
await queue_client.delete_message(received)
297
298
# Clean up
299
await service_client.delete_queue("async-queue")
300
301
finally:
302
# Close client connections
303
await service_client.close()
304
305
# Run async function
306
asyncio.run(basic_async_operations())
307
```
308
309
### Async Message Processing
310
311
```python
312
import asyncio
313
from azure.storage.queue.aio import QueueClient
314
315
async def process_messages_async():
316
queue_client = QueueClient.from_connection_string(conn_str, "work-queue")
317
318
try:
319
while True:
320
# Receive messages asynchronously
321
messages = queue_client.receive_messages(messages_per_page=10)
322
323
batch = []
324
async for message in messages:
325
batch.append(message)
326
if len(batch) >= 10:
327
break
328
329
if not batch:
330
print("No messages available")
331
break
332
333
# Process messages concurrently
334
tasks = [process_single_message_async(queue_client, msg) for msg in batch]
335
await asyncio.gather(*tasks, return_exceptions=True)
336
337
finally:
338
await queue_client.close()
339
340
async def process_single_message_async(queue_client, message):
341
try:
342
# Simulate async processing
343
await asyncio.sleep(0.1)
344
print(f"Processed message: {message.content}")
345
346
# Delete message after successful processing
347
await queue_client.delete_message(message)
348
349
except Exception as e:
350
print(f"Failed to process message {message.id}: {e}")
351
# Message will become visible again after timeout
352
353
asyncio.run(process_messages_async())
354
```
355
356
### Async Batch Operations
357
358
```python
359
import asyncio
360
from azure.storage.queue.aio import QueueClient
361
362
async def batch_send_messages():
363
queue_client = QueueClient.from_connection_string(conn_str, "batch-queue")
364
365
try:
366
# Send multiple messages concurrently
367
messages_to_send = [f"Message {i}" for i in range(100)]
368
369
tasks = [
370
queue_client.send_message(content)
371
for content in messages_to_send
372
]
373
374
# Send all messages concurrently
375
results = await asyncio.gather(*tasks, return_exceptions=True)
376
377
successful = sum(1 for r in results if not isinstance(r, Exception))
378
failed = len(results) - successful
379
380
print(f"Sent {successful} messages successfully, {failed} failed")
381
382
finally:
383
await queue_client.close()
384
385
asyncio.run(batch_send_messages())
386
```
387
388
### Async with Context Managers
389
390
```python
391
import asyncio
392
from azure.storage.queue.aio import QueueServiceClient
393
394
async def context_manager_usage():
395
# Use async context manager for automatic cleanup
396
async with QueueServiceClient.from_connection_string(conn_str) as service_client:
397
# List queues asynchronously
398
queues = service_client.list_queues(include_metadata=True)
399
400
async for queue in queues:
401
print(f"Queue: {queue.name}, Messages: {queue.approximate_message_count}")
402
403
# Get queue client and process messages
404
async with service_client.get_queue_client(queue.name) as queue_client:
405
# Peek at messages
406
messages = await queue_client.peek_messages(max_messages=5)
407
print(f" Found {len(messages)} messages")
408
409
asyncio.run(context_manager_usage())
410
```
411
412
### Async Error Handling
413
414
```python
415
import asyncio
416
from azure.storage.queue.aio import QueueClient
417
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
418
419
async def async_error_handling():
420
queue_client = QueueClient.from_connection_string(conn_str, "error-test-queue")
421
422
try:
423
# Try to receive from non-existent queue
424
message = await queue_client.receive_message()
425
426
except ResourceNotFoundError:
427
print("Queue doesn't exist, creating it...")
428
await queue_client.create_queue()
429
430
except HttpResponseError as e:
431
print(f"HTTP error occurred: {e.status_code} - {e.error_code}")
432
433
except Exception as e:
434
print(f"Unexpected error: {type(e).__name__}: {e}")
435
436
finally:
437
await queue_client.close()
438
439
asyncio.run(async_error_handling())
440
```
441
442
### High-Performance Async Producer-Consumer
443
444
```python
445
import asyncio
446
from azure.storage.queue.aio import QueueClient
447
448
class AsyncQueueProducer:
449
def __init__(self, connection_string: str, queue_name: str):
450
self.client = QueueClient.from_connection_string(connection_string, queue_name)
451
452
async def produce_messages(self, message_count: int):
453
try:
454
tasks = []
455
for i in range(message_count):
456
task = self.client.send_message(f"Message {i}")
457
tasks.append(task)
458
459
# Send in batches to avoid overwhelming the service
460
if len(tasks) >= 50:
461
await asyncio.gather(*tasks)
462
tasks = []
463
464
# Send remaining messages
465
if tasks:
466
await asyncio.gather(*tasks)
467
468
finally:
469
await self.client.close()
470
471
class AsyncQueueConsumer:
472
def __init__(self, connection_string: str, queue_name: str):
473
self.client = QueueClient.from_connection_string(connection_string, queue_name)
474
475
async def consume_messages(self, max_messages: int = None):
476
processed = 0
477
try:
478
while max_messages is None or processed < max_messages:
479
# Receive batch of messages
480
messages = self.client.receive_messages(messages_per_page=32)
481
482
batch = []
483
async for message in messages:
484
batch.append(message)
485
486
if not batch:
487
break
488
489
# Process messages concurrently
490
tasks = [self._process_message(msg) for msg in batch]
491
await asyncio.gather(*tasks, return_exceptions=True)
492
493
processed += len(batch)
494
495
finally:
496
await self.client.close()
497
498
async def _process_message(self, message):
499
try:
500
# Simulate processing
501
await asyncio.sleep(0.01)
502
print(f"Processed: {message.content}")
503
504
# Delete after processing
505
await self.client.delete_message(message)
506
507
except Exception as e:
508
print(f"Failed to process {message.id}: {e}")
509
510
async def run_producer_consumer():
511
# Run producer and consumer concurrently
512
producer = AsyncQueueProducer(conn_str, "perf-test-queue")
513
consumer = AsyncQueueConsumer(conn_str, "perf-test-queue")
514
515
await asyncio.gather(
516
producer.produce_messages(1000),
517
consumer.consume_messages(1000)
518
)
519
520
asyncio.run(run_producer_consumer())
521
```
522
523
## Types
524
525
### Async-Specific Types
526
527
```python { .api }
528
from azure.core.async_paging import AsyncItemPaged
529
from azure.core.paging import ItemPaged
530
from typing import AsyncIterator, AsyncContextManager
531
532
# Async pagination support (async versions return AsyncItemPaged)
533
AsyncItemPaged[QueueProperties] # For async list_queues()
534
AsyncItemPaged[QueueMessage] # For async receive_messages()
535
536
# Sync versions return ItemPaged for comparison:
537
ItemPaged[QueueProperties] # For sync list_queues()
538
ItemPaged[QueueMessage] # For sync receive_messages()
539
540
# Context manager protocols
541
AsyncContextManager[QueueServiceClient] # For automatic resource cleanup
542
AsyncContextManager[QueueClient] # For automatic resource cleanup
543
544
# Async iterator protocols
545
AsyncIterator[QueueProperties] # From async list_queues().by_page()
546
AsyncIterator[QueueMessage] # From async receive_messages()
547
```
548
549
### Installation Requirements
550
551
```bash
552
# For async support, install with aio extra:
553
pip install azure-storage-queue[aio]
554
555
# This installs additional dependencies:
556
# - azure-core[aio]>=1.30.0
557
# - aiohttp (for async HTTP operations)
558
```
559
560
### Performance Considerations
561
562
```python { .api }
563
# Async performance best practices:
564
565
# 1. Use connection pooling (automatically handled by azure-core)
566
# 2. Batch operations when possible (send/receive multiple messages)
567
# 3. Use asyncio.gather() for concurrent operations
568
# 4. Close clients explicitly or use async context managers
569
# 5. Limit concurrent operations to avoid overwhelming the service
570
571
# Recommended concurrency limits:
572
RECOMMENDED_MAX_CONCURRENT_SENDS = 100
573
RECOMMENDED_MAX_CONCURRENT_RECEIVES = 50
574
RECOMMENDED_BATCH_SIZE = 32 # Maximum messages per receive operation
575
```