Microsoft Azure Queue Storage Client Library for Python providing comprehensive message queuing capabilities for distributed applications.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asynchronous implementations of all queue and message operations for high-performance applications requiring non-blocking I/O. The async clients provide the same functionality as their synchronous counterparts with async/await support.
Create async clients using the aio module with identical authentication methods as synchronous clients.
from azure.storage.queue.aio import QueueServiceClient, QueueClient
class QueueServiceClient: # from azure.storage.queue.aio
def __init__(
self,
account_url: str,
credential=None,
**kwargs
):
"""
Create async QueueServiceClient.
Parameters:
- account_url: Queue service endpoint URL
- credential: Authentication credential
"""
@classmethod
def from_connection_string(
cls,
conn_str: str,
credential=None,
**kwargs
) -> 'QueueServiceClient':
"""Create async client from connection string."""
class QueueClient: # from azure.storage.queue.aio
def __init__(
self,
account_url: str,
queue_name: str,
credential=None,
**kwargs
):
"""
Create async QueueClient.
Parameters:
- account_url: Queue service endpoint URL
- queue_name: Target queue name
- credential: Authentication credential
"""
@classmethod
def from_queue_url(cls, queue_url: str, credential=None, **kwargs) -> 'QueueClient': ...
@classmethod
def from_connection_string(cls, conn_str: str, queue_name: str, credential=None, **kwargs) -> 'QueueClient': ...Asynchronous account-level operations for queue management and service configuration.
# QueueServiceClient async methods
async def list_queues(
self,
name_starts_with: Optional[str] = None,
include_metadata: bool = False,
**kwargs
) -> AsyncItemPaged[QueueProperties]:
"""
Asynchronously list queues with automatic pagination.
Parameters:
- name_starts_with: Filter by queue name prefix
- include_metadata: Include queue metadata
Returns:
AsyncItemPaged for iterating over QueueProperties
"""
async def create_queue(
self,
name: str,
metadata: Optional[Dict[str, str]] = None,
**kwargs
) -> QueueClient:
"""
Asynchronously create a queue.
Parameters:
- name: Queue name
- metadata: Optional metadata
Returns:
QueueClient for the created queue
"""
async def delete_queue(
self,
queue: Union[str, QueueProperties],
**kwargs
) -> None:
"""
Asynchronously delete a queue.
Parameters:
- queue: Queue name or QueueProperties
"""
async def get_service_properties(self, **kwargs) -> Dict[str, Any]:
"""Get service properties asynchronously."""
async def set_service_properties(self, **kwargs) -> None:
"""Set service properties asynchronously."""
async def get_service_stats(self, **kwargs) -> Dict[str, Any]:
"""Get service statistics asynchronously."""Asynchronous queue-specific operations for properties, metadata, and access policies.
# QueueClient async methods
async def create_queue(self, *, metadata: Optional[Dict[str, str]] = None, **kwargs) -> None:
"""Asynchronously create the queue."""
async def delete_queue(self, **kwargs) -> None:
"""Asynchronously delete the queue."""
async def get_queue_properties(self, **kwargs) -> QueueProperties:
"""Asynchronously get queue properties."""
async def set_queue_metadata(
self,
metadata: Optional[Dict[str, str]] = None,
**kwargs
) -> Dict[str, Any]:
"""Asynchronously set queue metadata."""
async def get_queue_access_policy(self, **kwargs) -> Dict[str, AccessPolicy]:
"""Asynchronously get queue access policies."""
async def set_queue_access_policy(
self,
signed_identifiers: Dict[str, AccessPolicy],
**kwargs
) -> None:
"""Asynchronously set queue access policies."""Asynchronous message operations for sending, receiving, updating, and deleting messages.
async def send_message(
self,
content: Any,
*,
visibility_timeout: Optional[int] = None,
time_to_live: Optional[int] = None,
**kwargs
) -> QueueMessage:
"""
Asynchronously send a message to the queue.
Parameters:
- content: Message content
- visibility_timeout: Seconds before message becomes visible
- time_to_live: Message expiration in seconds
Returns:
QueueMessage with send details
"""
async def receive_message(
self,
*,
visibility_timeout: Optional[int] = None,
**kwargs
) -> Optional[QueueMessage]:
"""
Asynchronously receive a single message.
Parameters:
- visibility_timeout: Message invisibility duration
Returns:
QueueMessage if available, None if queue empty
"""
def receive_messages(
self,
*,
messages_per_page: Optional[int] = None,
visibility_timeout: Optional[int] = None,
max_messages: Optional[int] = None,
**kwargs
) -> AsyncItemPaged[QueueMessage]:
"""
Get async iterator for receiving multiple messages.
Parameters:
- messages_per_page: Messages per page (1-32)
- visibility_timeout: Message invisibility duration
- max_messages: Maximum total messages
Returns:
AsyncItemPaged for iterating over QueueMessage objects
"""
async def update_message(
self,
message: Union[QueueMessage, str],
pop_receipt: Optional[str] = None,
content: Optional[Any] = None,
*,
visibility_timeout: Optional[int] = None,
**kwargs
) -> QueueMessage:
"""
Asynchronously update message content and/or visibility.
Parameters:
- message: QueueMessage object or message ID
- pop_receipt: Message pop receipt (required if message is ID)
- content: New message content
- visibility_timeout: New visibility timeout
Returns:
Updated QueueMessage
"""
async def peek_messages(
self,
max_messages: Optional[int] = None,
**kwargs
) -> List[QueueMessage]:
"""
Asynchronously peek at messages without dequeuing.
Parameters:
- max_messages: Maximum messages to peek (1-32)
Returns:
List of QueueMessage objects
"""
async def delete_message(
self,
message: Union[QueueMessage, str],
pop_receipt: Optional[str] = None,
**kwargs
) -> None:
"""
Asynchronously delete a message.
Parameters:
- message: QueueMessage object or message ID
- pop_receipt: Message pop receipt (required if message is ID)
"""
async def clear_messages(self, **kwargs) -> None:
"""Asynchronously clear all messages from the queue."""import asyncio
from azure.storage.queue.aio import QueueServiceClient, QueueClient
async def basic_async_operations():
# Create async service client
service_client = QueueServiceClient.from_connection_string(conn_str)
try:
# Create queue asynchronously
queue_client = await service_client.create_queue("async-queue")
# Send message asynchronously
message = await queue_client.send_message("Hello async world!")
print(f"Sent message: {message.id}")
# Receive message asynchronously
received = await queue_client.receive_message()
if received:
print(f"Received: {received.content}")
await queue_client.delete_message(received)
# Clean up
await service_client.delete_queue("async-queue")
finally:
# Close client connections
await service_client.close()
# Run async function
asyncio.run(basic_async_operations())import asyncio
from azure.storage.queue.aio import QueueClient
async def process_messages_async():
queue_client = QueueClient.from_connection_string(conn_str, "work-queue")
try:
while True:
# Receive messages asynchronously
messages = queue_client.receive_messages(messages_per_page=10)
batch = []
async for message in messages:
batch.append(message)
if len(batch) >= 10:
break
if not batch:
print("No messages available")
break
# Process messages concurrently
tasks = [process_single_message_async(queue_client, msg) for msg in batch]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
await queue_client.close()
async def process_single_message_async(queue_client, message):
try:
# Simulate async processing
await asyncio.sleep(0.1)
print(f"Processed message: {message.content}")
# Delete message after successful processing
await queue_client.delete_message(message)
except Exception as e:
print(f"Failed to process message {message.id}: {e}")
# Message will become visible again after timeout
asyncio.run(process_messages_async())import asyncio
from azure.storage.queue.aio import QueueClient
async def batch_send_messages():
queue_client = QueueClient.from_connection_string(conn_str, "batch-queue")
try:
# Send multiple messages concurrently
messages_to_send = [f"Message {i}" for i in range(100)]
tasks = [
queue_client.send_message(content)
for content in messages_to_send
]
# Send all messages concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = len(results) - successful
print(f"Sent {successful} messages successfully, {failed} failed")
finally:
await queue_client.close()
asyncio.run(batch_send_messages())import asyncio
from azure.storage.queue.aio import QueueServiceClient
async def context_manager_usage():
# Use async context manager for automatic cleanup
async with QueueServiceClient.from_connection_string(conn_str) as service_client:
# List queues asynchronously
queues = service_client.list_queues(include_metadata=True)
async for queue in queues:
print(f"Queue: {queue.name}, Messages: {queue.approximate_message_count}")
# Get queue client and process messages
async with service_client.get_queue_client(queue.name) as queue_client:
# Peek at messages
messages = await queue_client.peek_messages(max_messages=5)
print(f" Found {len(messages)} messages")
asyncio.run(context_manager_usage())import asyncio
from azure.storage.queue.aio import QueueClient
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
async def async_error_handling():
queue_client = QueueClient.from_connection_string(conn_str, "error-test-queue")
try:
# Try to receive from non-existent queue
message = await queue_client.receive_message()
except ResourceNotFoundError:
print("Queue doesn't exist, creating it...")
await queue_client.create_queue()
except HttpResponseError as e:
print(f"HTTP error occurred: {e.status_code} - {e.error_code}")
except Exception as e:
print(f"Unexpected error: {type(e).__name__}: {e}")
finally:
await queue_client.close()
asyncio.run(async_error_handling())import asyncio
from azure.storage.queue.aio import QueueClient
class AsyncQueueProducer:
def __init__(self, connection_string: str, queue_name: str):
self.client = QueueClient.from_connection_string(connection_string, queue_name)
async def produce_messages(self, message_count: int):
try:
tasks = []
for i in range(message_count):
task = self.client.send_message(f"Message {i}")
tasks.append(task)
# Send in batches to avoid overwhelming the service
if len(tasks) >= 50:
await asyncio.gather(*tasks)
tasks = []
# Send remaining messages
if tasks:
await asyncio.gather(*tasks)
finally:
await self.client.close()
class AsyncQueueConsumer:
def __init__(self, connection_string: str, queue_name: str):
self.client = QueueClient.from_connection_string(connection_string, queue_name)
async def consume_messages(self, max_messages: int = None):
processed = 0
try:
while max_messages is None or processed < max_messages:
# Receive batch of messages
messages = self.client.receive_messages(messages_per_page=32)
batch = []
async for message in messages:
batch.append(message)
if not batch:
break
# Process messages concurrently
tasks = [self._process_message(msg) for msg in batch]
await asyncio.gather(*tasks, return_exceptions=True)
processed += len(batch)
finally:
await self.client.close()
async def _process_message(self, message):
try:
# Simulate processing
await asyncio.sleep(0.01)
print(f"Processed: {message.content}")
# Delete after processing
await self.client.delete_message(message)
except Exception as e:
print(f"Failed to process {message.id}: {e}")
async def run_producer_consumer():
# Run producer and consumer concurrently
producer = AsyncQueueProducer(conn_str, "perf-test-queue")
consumer = AsyncQueueConsumer(conn_str, "perf-test-queue")
await asyncio.gather(
producer.produce_messages(1000),
consumer.consume_messages(1000)
)
asyncio.run(run_producer_consumer())from azure.core.async_paging import AsyncItemPaged
from azure.core.paging import ItemPaged
from typing import AsyncIterator, AsyncContextManager
# Async pagination support (async versions return AsyncItemPaged)
AsyncItemPaged[QueueProperties] # For async list_queues()
AsyncItemPaged[QueueMessage] # For async receive_messages()
# Sync versions return ItemPaged for comparison:
ItemPaged[QueueProperties] # For sync list_queues()
ItemPaged[QueueMessage] # For sync receive_messages()
# Context manager protocols
AsyncContextManager[QueueServiceClient] # For automatic resource cleanup
AsyncContextManager[QueueClient] # For automatic resource cleanup
# Async iterator protocols
AsyncIterator[QueueProperties] # From async list_queues().by_page()
AsyncIterator[QueueMessage] # From async receive_messages()# For async support, install with aio extra:
pip install azure-storage-queue[aio]
# This installs additional dependencies:
# - azure-core[aio]>=1.30.0
# - aiohttp (for async HTTP operations)# Async performance best practices:
# 1. Use connection pooling (automatically handled by azure-core)
# 2. Batch operations when possible (send/receive multiple messages)
# 3. Use asyncio.gather() for concurrent operations
# 4. Close clients explicitly or use async context managers
# 5. Limit concurrent operations to avoid overwhelming the service
# Recommended concurrency limits:
RECOMMENDED_MAX_CONCURRENT_SENDS = 100
RECOMMENDED_MAX_CONCURRENT_RECEIVES = 50
RECOMMENDED_BATCH_SIZE = 32 # Maximum messages per receive operationInstall with Tessl CLI
npx tessl i tessl/pypi-azure-storage-queue