CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-storage-queue

Microsoft Azure Queue Storage Client Library for Python providing comprehensive message queuing capabilities for distributed applications.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

message-operations.mddocs/

Message Operations

Comprehensive message handling operations including sending, receiving, updating, peeking, and deletion. These operations form the core functionality for queue-based messaging patterns.

Capabilities

Message Sending

Send messages to the queue with optional visibility delays and expiration settings.

def send_message(
    self,
    content: Any,
    *,
    visibility_timeout: Optional[int] = None,
    time_to_live: Optional[int] = None,
    timeout: Optional[int] = None,
    **kwargs
) -> QueueMessage:
    """
    Send a message to the queue.
    
    Parameters:
    - content: Message content (string, bytes, or object for encoding)
    - visibility_timeout: Seconds before message becomes visible (0-604800, default: 0)
    - time_to_live: Message expiration in seconds (1-604800, default: 604800)
    - timeout: Request timeout in seconds
    
    Returns:
    QueueMessage with ID, pop_receipt, insertion time, and expiration time
    
    Raises:
    RequestTooLargeError: If message exceeds 64KB limit
    """

Message Receiving

Receive messages from the queue with automatic visibility timeout management.

def receive_message(
    self,
    *,
    visibility_timeout: Optional[int] = None,
    timeout: Optional[int] = None,
    **kwargs
) -> Optional[QueueMessage]:
    """
    Receive a single message from the queue.
    
    Parameters:
    - visibility_timeout: Seconds message stays invisible (1-43200, default: 30)
    - timeout: Request timeout in seconds
    
    Returns:
    QueueMessage if available, None if queue is empty
    """

def receive_messages(
    self,
    *,
    messages_per_page: Optional[int] = None,
    visibility_timeout: Optional[int] = None,
    max_messages: Optional[int] = None,
    timeout: Optional[int] = None,
    **kwargs
) -> ItemPaged[QueueMessage]:
    """
    Receive multiple messages from the queue with pagination.
    
    Parameters:
    - messages_per_page: Messages per page (1-32, default: 1)
    - visibility_timeout: Seconds messages stay invisible (1-43200, default: 30)
    - max_messages: Maximum total messages to receive
    - timeout: Request timeout in seconds
    
    Returns:
    ItemPaged iterator yielding QueueMessage objects
    """

Message Peeking

Peek at messages without changing their visibility or dequeue count.

def peek_messages(
    self,
    max_messages: Optional[int] = None,
    *,
    timeout: Optional[int] = None,
    **kwargs
) -> List[QueueMessage]:
    """
    Peek at messages without dequeuing them.
    
    Parameters:
    - max_messages: Maximum messages to peek (1-32, default: 1)
    - timeout: Request timeout in seconds
    
    Returns:
    List of QueueMessage objects (without pop_receipt)
    
    Note:
    Peeked messages cannot be deleted or updated
    """

Message Updates

Update message content and visibility timeout for received messages.

def update_message(
    self,
    message: Union[QueueMessage, str],
    pop_receipt: Optional[str] = None,
    content: Optional[Any] = None,
    *,
    visibility_timeout: Optional[int] = None,
    timeout: Optional[int] = None,
    **kwargs
) -> QueueMessage:
    """
    Update message content and/or visibility timeout.
    
    Parameters:
    - message: QueueMessage object or message ID
    - pop_receipt: Message pop receipt (required if message is string ID)
    - content: New message content (None to keep current content)
    - visibility_timeout: New visibility timeout in seconds (0-43200)
    - timeout: Request timeout in seconds
    
    Returns:
    QueueMessage with updated pop_receipt and next_visible_on time
    
    Raises:
    ResourceNotFoundError: If message not found or pop_receipt invalid
    """

Message Deletion

Delete individual messages or clear all messages from the queue.

def delete_message(
    self,
    message: Union[QueueMessage, str],
    pop_receipt: Optional[str] = None,
    *,
    timeout: Optional[int] = None,
    **kwargs
) -> None:
    """
    Delete a message from the queue.
    
    Parameters:
    - message: QueueMessage object or message ID
    - pop_receipt: Message pop receipt (required if message is string ID)
    - timeout: Request timeout in seconds
    
    Raises:
    ResourceNotFoundError: If message not found or pop_receipt invalid
    """

def clear_messages(
    self,
    *,
    timeout: Optional[int] = None,
    **kwargs
) -> None:
    """
    Clear all messages from the queue.
    
    Parameters:
    - timeout: Request timeout in seconds
    
    Note:
    This operation cannot be undone
    """

Usage Examples

Basic Message Operations

from azure.storage.queue import QueueClient

queue_client = QueueClient.from_connection_string(conn_str, "myqueue")

# Send a message
message = queue_client.send_message("Hello, World!")
print(f"Message sent with ID: {message.id}")

# Send message with visibility delay
delayed_message = queue_client.send_message(
    "Delayed message",
    visibility_timeout=300,  # Visible after 5 minutes
    time_to_live=3600        # Expires after 1 hour
)

# Receive and process messages
messages = queue_client.receive_messages(max_messages=5)
for message in messages:
    print(f"Processing: {message.content}")
    
    # Simulate processing time
    import time
    time.sleep(2)
    
    # Delete message after processing
    queue_client.delete_message(message)

Message Visibility Management

# Receive message with custom visibility timeout
message = queue_client.receive_message(visibility_timeout=120)  # 2 minutes

if message:
    try:
        # Process message
        result = process_message(message.content)
        
        # If processing successful, delete message
        queue_client.delete_message(message)
        
    except Exception as e:
        # If processing fails, extend visibility timeout for retry
        updated_message = queue_client.update_message(
            message,
            visibility_timeout=300  # Make visible again in 5 minutes
        )
        print(f"Message processing failed, will retry. New pop_receipt: {updated_message.pop_receipt}")

Message Content Updates

# Receive message
message = queue_client.receive_message()

if message:
    # Update message content and extend visibility
    updated_message = queue_client.update_message(
        message,
        content="Updated message content",
        visibility_timeout=180  # Extend visibility by 3 minutes
    )
    
    print(f"Message updated. New pop_receipt: {updated_message.pop_receipt}")
    
    # Process and delete
    queue_client.delete_message(updated_message)

Peek Operations

# Peek at messages without dequeuing
peeked_messages = queue_client.peek_messages(max_messages=10)

print(f"Found {len(peeked_messages)} messages in queue:")
for i, message in enumerate(peeked_messages, 1):
    print(f"{i}. ID: {message.id}, Content: {message.content}")
    print(f"   Inserted: {message.inserted_on}, Expires: {message.expires_on}")
    print(f"   Dequeue count: {message.dequeue_count}")

Batch Processing

def process_messages_in_batches():
    while True:
        # Receive batch of messages
        messages = list(queue_client.receive_messages(messages_per_page=32))
        
        if not messages:
            print("No more messages to process")
            break
            
        print(f"Processing batch of {len(messages)} messages")
        
        for message in messages:
            try:
                # Process message
                process_single_message(message.content)
                
                # Delete on success
                queue_client.delete_message(message)
                
            except Exception as e:
                print(f"Failed to process message {message.id}: {e}")
                # Message will become visible again after timeout

Error Handling

from azure.core.exceptions import ResourceNotFoundError, RequestTooLargeError

try:
    # Send large message
    large_content = "x" * 70000  # > 64KB limit
    queue_client.send_message(large_content)
    
except RequestTooLargeError:
    print("Message too large, must be ≤ 64KB")

try:
    # Try to delete message with invalid pop receipt
    queue_client.delete_message("invalid-message-id", "invalid-pop-receipt")
    
except ResourceNotFoundError:
    print("Message not found or pop receipt is invalid")

Types

Message Types

class QueueMessage:
    id: str                                    # Message GUID
    content: Any                              # Message content (decoded by policy)
    pop_receipt: Optional[str]                # Receipt for delete/update operations
    inserted_on: Optional[datetime]           # UTC insertion timestamp
    expires_on: Optional[datetime]            # UTC expiration timestamp  
    dequeue_count: Optional[int]              # Number of times message was dequeued
    next_visible_on: Optional[datetime]       # UTC time when message becomes visible
    
    @classmethod
    def _from_generated(cls, generated) -> 'QueueMessage': ...

Pagination Types

from azure.core.paging import ItemPaged
from typing import Iterator

# ItemPaged[QueueMessage] provides:
# - Iteration over messages
# - Automatic pagination handling  
# - by_page() method for page-by-page access

Message Size Limits

# Message size constraints:
MAX_MESSAGE_SIZE_BYTES = 65536        # 64KB maximum
MAX_MESSAGES_PER_GET = 32             # Maximum messages per receive operation
MAX_PEEK_MESSAGES = 32                # Maximum messages per peek operation
MAX_VISIBILITY_TIMEOUT = 43200        # 12 hours maximum visibility timeout
MAX_TIME_TO_LIVE = 604800            # 7 days maximum message lifetime

Install with Tessl CLI

npx tessl i tessl/pypi-azure-storage-queue

docs

async-operations.md

authentication.md

index.md

message-operations.md

models-config.md

queue-operations.md

queue-service.md

tile.json