CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-storage-queue

Microsoft Azure Queue Storage Client Library for Python providing comprehensive message queuing capabilities for distributed applications.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Asynchronous implementations of all queue and message operations for high-performance applications requiring non-blocking I/O. The async clients provide the same functionality as their synchronous counterparts with async/await support.

Capabilities

Async Client Creation

Create async clients using the aio module with identical authentication methods as synchronous clients.

from azure.storage.queue.aio import QueueServiceClient, QueueClient

class QueueServiceClient:  # from azure.storage.queue.aio
    def __init__(
        self,
        account_url: str,
        credential=None,
        **kwargs
    ):
        """
        Create async QueueServiceClient.
        
        Parameters:
        - account_url: Queue service endpoint URL
        - credential: Authentication credential
        """
    
    @classmethod
    def from_connection_string(
        cls,
        conn_str: str,
        credential=None,
        **kwargs
    ) -> 'QueueServiceClient':
        """Create async client from connection string."""

class QueueClient:  # from azure.storage.queue.aio  
    def __init__(
        self,
        account_url: str,
        queue_name: str,
        credential=None,
        **kwargs
    ):
        """
        Create async QueueClient.
        
        Parameters:
        - account_url: Queue service endpoint URL
        - queue_name: Target queue name
        - credential: Authentication credential
        """
    
    @classmethod
    def from_queue_url(cls, queue_url: str, credential=None, **kwargs) -> 'QueueClient': ...
    
    @classmethod
    def from_connection_string(cls, conn_str: str, queue_name: str, credential=None, **kwargs) -> 'QueueClient': ...

Async Service Operations

Asynchronous account-level operations for queue management and service configuration.

# QueueServiceClient async methods
async def list_queues(
    self,
    name_starts_with: Optional[str] = None,
    include_metadata: bool = False,
    **kwargs
) -> AsyncItemPaged[QueueProperties]:
    """
    Asynchronously list queues with automatic pagination.
    
    Parameters:
    - name_starts_with: Filter by queue name prefix
    - include_metadata: Include queue metadata
    
    Returns:
    AsyncItemPaged for iterating over QueueProperties
    """

async def create_queue(
    self,
    name: str,
    metadata: Optional[Dict[str, str]] = None,
    **kwargs
) -> QueueClient:
    """
    Asynchronously create a queue.
    
    Parameters:
    - name: Queue name
    - metadata: Optional metadata
    
    Returns:
    QueueClient for the created queue
    """

async def delete_queue(
    self,
    queue: Union[str, QueueProperties],
    **kwargs
) -> None:
    """
    Asynchronously delete a queue.
    
    Parameters:
    - queue: Queue name or QueueProperties
    """

async def get_service_properties(self, **kwargs) -> Dict[str, Any]:
    """Get service properties asynchronously."""

async def set_service_properties(self, **kwargs) -> None:
    """Set service properties asynchronously."""

async def get_service_stats(self, **kwargs) -> Dict[str, Any]:
    """Get service statistics asynchronously."""

Async Queue Operations

Asynchronous queue-specific operations for properties, metadata, and access policies.

# QueueClient async methods
async def create_queue(self, *, metadata: Optional[Dict[str, str]] = None, **kwargs) -> None:
    """Asynchronously create the queue."""

async def delete_queue(self, **kwargs) -> None:
    """Asynchronously delete the queue."""

async def get_queue_properties(self, **kwargs) -> QueueProperties:
    """Asynchronously get queue properties."""

async def set_queue_metadata(
    self,
    metadata: Optional[Dict[str, str]] = None,
    **kwargs
) -> Dict[str, Any]:
    """Asynchronously set queue metadata."""

async def get_queue_access_policy(self, **kwargs) -> Dict[str, AccessPolicy]:
    """Asynchronously get queue access policies."""

async def set_queue_access_policy(
    self,
    signed_identifiers: Dict[str, AccessPolicy],
    **kwargs
) -> None:
    """Asynchronously set queue access policies."""

Async Message Operations

Asynchronous message operations for sending, receiving, updating, and deleting messages.

async def send_message(
    self,
    content: Any,
    *,
    visibility_timeout: Optional[int] = None,
    time_to_live: Optional[int] = None,
    **kwargs
) -> QueueMessage:
    """
    Asynchronously send a message to the queue.
    
    Parameters:
    - content: Message content
    - visibility_timeout: Seconds before message becomes visible
    - time_to_live: Message expiration in seconds
    
    Returns:
    QueueMessage with send details
    """

async def receive_message(
    self,
    *,
    visibility_timeout: Optional[int] = None,
    **kwargs
) -> Optional[QueueMessage]:
    """
    Asynchronously receive a single message.
    
    Parameters:
    - visibility_timeout: Message invisibility duration
    
    Returns:
    QueueMessage if available, None if queue empty
    """

def receive_messages(
    self,
    *,
    messages_per_page: Optional[int] = None,
    visibility_timeout: Optional[int] = None,
    max_messages: Optional[int] = None,
    **kwargs
) -> AsyncItemPaged[QueueMessage]:
    """
    Get async iterator for receiving multiple messages.
    
    Parameters:
    - messages_per_page: Messages per page (1-32)
    - visibility_timeout: Message invisibility duration
    - max_messages: Maximum total messages
    
    Returns:
    AsyncItemPaged for iterating over QueueMessage objects
    """

async def update_message(
    self,
    message: Union[QueueMessage, str],
    pop_receipt: Optional[str] = None,
    content: Optional[Any] = None,
    *,
    visibility_timeout: Optional[int] = None,
    **kwargs
) -> QueueMessage:
    """
    Asynchronously update message content and/or visibility.
    
    Parameters:
    - message: QueueMessage object or message ID
    - pop_receipt: Message pop receipt (required if message is ID)
    - content: New message content
    - visibility_timeout: New visibility timeout
    
    Returns:
    Updated QueueMessage
    """

async def peek_messages(
    self,
    max_messages: Optional[int] = None,
    **kwargs
) -> List[QueueMessage]:
    """
    Asynchronously peek at messages without dequeuing.
    
    Parameters:
    - max_messages: Maximum messages to peek (1-32)
    
    Returns:
    List of QueueMessage objects
    """

async def delete_message(
    self,
    message: Union[QueueMessage, str],
    pop_receipt: Optional[str] = None,
    **kwargs
) -> None:
    """
    Asynchronously delete a message.
    
    Parameters:
    - message: QueueMessage object or message ID
    - pop_receipt: Message pop receipt (required if message is ID)
    """

async def clear_messages(self, **kwargs) -> None:
    """Asynchronously clear all messages from the queue."""

Usage Examples

Basic Async Client Usage

import asyncio
from azure.storage.queue.aio import QueueServiceClient, QueueClient

async def basic_async_operations():
    # Create async service client
    service_client = QueueServiceClient.from_connection_string(conn_str)
    
    try:
        # Create queue asynchronously
        queue_client = await service_client.create_queue("async-queue")
        
        # Send message asynchronously
        message = await queue_client.send_message("Hello async world!")
        print(f"Sent message: {message.id}")
        
        # Receive message asynchronously
        received = await queue_client.receive_message()
        if received:
            print(f"Received: {received.content}")
            await queue_client.delete_message(received)
        
        # Clean up
        await service_client.delete_queue("async-queue")
        
    finally:
        # Close client connections
        await service_client.close()

# Run async function
asyncio.run(basic_async_operations())

Async Message Processing

import asyncio
from azure.storage.queue.aio import QueueClient

async def process_messages_async():
    queue_client = QueueClient.from_connection_string(conn_str, "work-queue")
    
    try:
        while True:
            # Receive messages asynchronously  
            messages = queue_client.receive_messages(messages_per_page=10)
            
            batch = []
            async for message in messages:
                batch.append(message)
                if len(batch) >= 10:
                    break
            
            if not batch:
                print("No messages available")
                break
                
            # Process messages concurrently
            tasks = [process_single_message_async(queue_client, msg) for msg in batch]
            await asyncio.gather(*tasks, return_exceptions=True)
            
    finally:
        await queue_client.close()

async def process_single_message_async(queue_client, message):
    try:
        # Simulate async processing
        await asyncio.sleep(0.1)
        print(f"Processed message: {message.content}")
        
        # Delete message after successful processing
        await queue_client.delete_message(message)
        
    except Exception as e:
        print(f"Failed to process message {message.id}: {e}")
        # Message will become visible again after timeout

asyncio.run(process_messages_async())

Async Batch Operations

import asyncio
from azure.storage.queue.aio import QueueClient

async def batch_send_messages():
    queue_client = QueueClient.from_connection_string(conn_str, "batch-queue")
    
    try:
        # Send multiple messages concurrently
        messages_to_send = [f"Message {i}" for i in range(100)]
        
        tasks = [
            queue_client.send_message(content) 
            for content in messages_to_send
        ]
        
        # Send all messages concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = sum(1 for r in results if not isinstance(r, Exception))
        failed = len(results) - successful
        
        print(f"Sent {successful} messages successfully, {failed} failed")
        
    finally:
        await queue_client.close()

asyncio.run(batch_send_messages())

Async with Context Managers

import asyncio
from azure.storage.queue.aio import QueueServiceClient

async def context_manager_usage():
    # Use async context manager for automatic cleanup
    async with QueueServiceClient.from_connection_string(conn_str) as service_client:
        # List queues asynchronously
        queues = service_client.list_queues(include_metadata=True)
        
        async for queue in queues:
            print(f"Queue: {queue.name}, Messages: {queue.approximate_message_count}")
            
            # Get queue client and process messages
            async with service_client.get_queue_client(queue.name) as queue_client:
                # Peek at messages
                messages = await queue_client.peek_messages(max_messages=5)
                print(f"  Found {len(messages)} messages")

asyncio.run(context_manager_usage())

Async Error Handling

import asyncio
from azure.storage.queue.aio import QueueClient
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError

async def async_error_handling():
    queue_client = QueueClient.from_connection_string(conn_str, "error-test-queue")
    
    try:
        # Try to receive from non-existent queue
        message = await queue_client.receive_message()
        
    except ResourceNotFoundError:
        print("Queue doesn't exist, creating it...")
        await queue_client.create_queue()
        
    except HttpResponseError as e:
        print(f"HTTP error occurred: {e.status_code} - {e.error_code}")
        
    except Exception as e:
        print(f"Unexpected error: {type(e).__name__}: {e}")
        
    finally:
        await queue_client.close()

asyncio.run(async_error_handling())

High-Performance Async Producer-Consumer

import asyncio
from azure.storage.queue.aio import QueueClient

class AsyncQueueProducer:
    def __init__(self, connection_string: str, queue_name: str):
        self.client = QueueClient.from_connection_string(connection_string, queue_name)
        
    async def produce_messages(self, message_count: int):
        try:
            tasks = []
            for i in range(message_count):
                task = self.client.send_message(f"Message {i}")
                tasks.append(task)
                
                # Send in batches to avoid overwhelming the service
                if len(tasks) >= 50:
                    await asyncio.gather(*tasks)
                    tasks = []
            
            # Send remaining messages
            if tasks:
                await asyncio.gather(*tasks)
                
        finally:
            await self.client.close()

class AsyncQueueConsumer:
    def __init__(self, connection_string: str, queue_name: str):
        self.client = QueueClient.from_connection_string(connection_string, queue_name)
        
    async def consume_messages(self, max_messages: int = None):
        processed = 0
        try:
            while max_messages is None or processed < max_messages:
                # Receive batch of messages
                messages = self.client.receive_messages(messages_per_page=32)
                
                batch = []
                async for message in messages:
                    batch.append(message)
                
                if not batch:
                    break
                    
                # Process messages concurrently
                tasks = [self._process_message(msg) for msg in batch]
                await asyncio.gather(*tasks, return_exceptions=True)
                
                processed += len(batch)
                
        finally:
            await self.client.close()
            
    async def _process_message(self, message):
        try:
            # Simulate processing
            await asyncio.sleep(0.01)
            print(f"Processed: {message.content}")
            
            # Delete after processing
            await self.client.delete_message(message)
            
        except Exception as e:
            print(f"Failed to process {message.id}: {e}")

async def run_producer_consumer():
    # Run producer and consumer concurrently
    producer = AsyncQueueProducer(conn_str, "perf-test-queue")
    consumer = AsyncQueueConsumer(conn_str, "perf-test-queue")
    
    await asyncio.gather(
        producer.produce_messages(1000),
        consumer.consume_messages(1000)
    )

asyncio.run(run_producer_consumer())

Types

Async-Specific Types

from azure.core.async_paging import AsyncItemPaged
from azure.core.paging import ItemPaged
from typing import AsyncIterator, AsyncContextManager

# Async pagination support (async versions return AsyncItemPaged)
AsyncItemPaged[QueueProperties]     # For async list_queues()
AsyncItemPaged[QueueMessage]        # For async receive_messages()

# Sync versions return ItemPaged for comparison:
ItemPaged[QueueProperties]          # For sync list_queues()
ItemPaged[QueueMessage]             # For sync receive_messages()

# Context manager protocols
AsyncContextManager[QueueServiceClient]  # For automatic resource cleanup
AsyncContextManager[QueueClient]         # For automatic resource cleanup

# Async iterator protocols  
AsyncIterator[QueueProperties]      # From async list_queues().by_page()
AsyncIterator[QueueMessage]         # From async receive_messages()

Installation Requirements

# For async support, install with aio extra:
pip install azure-storage-queue[aio]

# This installs additional dependencies:
# - azure-core[aio]>=1.30.0
# - aiohttp (for async HTTP operations)

Performance Considerations

# Async performance best practices:

# 1. Use connection pooling (automatically handled by azure-core)
# 2. Batch operations when possible (send/receive multiple messages)
# 3. Use asyncio.gather() for concurrent operations
# 4. Close clients explicitly or use async context managers
# 5. Limit concurrent operations to avoid overwhelming the service

# Recommended concurrency limits:
RECOMMENDED_MAX_CONCURRENT_SENDS = 100
RECOMMENDED_MAX_CONCURRENT_RECEIVES = 50
RECOMMENDED_BATCH_SIZE = 32  # Maximum messages per receive operation

Install with Tessl CLI

npx tessl i tessl/pypi-azure-storage-queue

docs

async-operations.md

authentication.md

index.md

message-operations.md

models-config.md

queue-operations.md

queue-service.md

tile.json