Microsoft Azure Queue Storage Client Library for Python providing comprehensive message queuing capabilities for distributed applications.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive message handling operations including sending, receiving, updating, peeking, and deletion. These operations form the core functionality for queue-based messaging patterns.
Send messages to the queue with optional visibility delays and expiration settings.
def send_message(
self,
content: Any,
*,
visibility_timeout: Optional[int] = None,
time_to_live: Optional[int] = None,
timeout: Optional[int] = None,
**kwargs
) -> QueueMessage:
"""
Send a message to the queue.
Parameters:
- content: Message content (string, bytes, or object for encoding)
- visibility_timeout: Seconds before message becomes visible (0-604800, default: 0)
- time_to_live: Message expiration in seconds (1-604800, default: 604800)
- timeout: Request timeout in seconds
Returns:
QueueMessage with ID, pop_receipt, insertion time, and expiration time
Raises:
RequestTooLargeError: If message exceeds 64KB limit
"""Receive messages from the queue with automatic visibility timeout management.
def receive_message(
self,
*,
visibility_timeout: Optional[int] = None,
timeout: Optional[int] = None,
**kwargs
) -> Optional[QueueMessage]:
"""
Receive a single message from the queue.
Parameters:
- visibility_timeout: Seconds message stays invisible (1-43200, default: 30)
- timeout: Request timeout in seconds
Returns:
QueueMessage if available, None if queue is empty
"""
def receive_messages(
self,
*,
messages_per_page: Optional[int] = None,
visibility_timeout: Optional[int] = None,
max_messages: Optional[int] = None,
timeout: Optional[int] = None,
**kwargs
) -> ItemPaged[QueueMessage]:
"""
Receive multiple messages from the queue with pagination.
Parameters:
- messages_per_page: Messages per page (1-32, default: 1)
- visibility_timeout: Seconds messages stay invisible (1-43200, default: 30)
- max_messages: Maximum total messages to receive
- timeout: Request timeout in seconds
Returns:
ItemPaged iterator yielding QueueMessage objects
"""Peek at messages without changing their visibility or dequeue count.
def peek_messages(
self,
max_messages: Optional[int] = None,
*,
timeout: Optional[int] = None,
**kwargs
) -> List[QueueMessage]:
"""
Peek at messages without dequeuing them.
Parameters:
- max_messages: Maximum messages to peek (1-32, default: 1)
- timeout: Request timeout in seconds
Returns:
List of QueueMessage objects (without pop_receipt)
Note:
Peeked messages cannot be deleted or updated
"""Update message content and visibility timeout for received messages.
def update_message(
self,
message: Union[QueueMessage, str],
pop_receipt: Optional[str] = None,
content: Optional[Any] = None,
*,
visibility_timeout: Optional[int] = None,
timeout: Optional[int] = None,
**kwargs
) -> QueueMessage:
"""
Update message content and/or visibility timeout.
Parameters:
- message: QueueMessage object or message ID
- pop_receipt: Message pop receipt (required if message is string ID)
- content: New message content (None to keep current content)
- visibility_timeout: New visibility timeout in seconds (0-43200)
- timeout: Request timeout in seconds
Returns:
QueueMessage with updated pop_receipt and next_visible_on time
Raises:
ResourceNotFoundError: If message not found or pop_receipt invalid
"""Delete individual messages or clear all messages from the queue.
def delete_message(
self,
message: Union[QueueMessage, str],
pop_receipt: Optional[str] = None,
*,
timeout: Optional[int] = None,
**kwargs
) -> None:
"""
Delete a message from the queue.
Parameters:
- message: QueueMessage object or message ID
- pop_receipt: Message pop receipt (required if message is string ID)
- timeout: Request timeout in seconds
Raises:
ResourceNotFoundError: If message not found or pop_receipt invalid
"""
def clear_messages(
self,
*,
timeout: Optional[int] = None,
**kwargs
) -> None:
"""
Clear all messages from the queue.
Parameters:
- timeout: Request timeout in seconds
Note:
This operation cannot be undone
"""from azure.storage.queue import QueueClient
queue_client = QueueClient.from_connection_string(conn_str, "myqueue")
# Send a message
message = queue_client.send_message("Hello, World!")
print(f"Message sent with ID: {message.id}")
# Send message with visibility delay
delayed_message = queue_client.send_message(
"Delayed message",
visibility_timeout=300, # Visible after 5 minutes
time_to_live=3600 # Expires after 1 hour
)
# Receive and process messages
messages = queue_client.receive_messages(max_messages=5)
for message in messages:
print(f"Processing: {message.content}")
# Simulate processing time
import time
time.sleep(2)
# Delete message after processing
queue_client.delete_message(message)# Receive message with custom visibility timeout
message = queue_client.receive_message(visibility_timeout=120) # 2 minutes
if message:
try:
# Process message
result = process_message(message.content)
# If processing successful, delete message
queue_client.delete_message(message)
except Exception as e:
# If processing fails, extend visibility timeout for retry
updated_message = queue_client.update_message(
message,
visibility_timeout=300 # Make visible again in 5 minutes
)
print(f"Message processing failed, will retry. New pop_receipt: {updated_message.pop_receipt}")# Receive message
message = queue_client.receive_message()
if message:
# Update message content and extend visibility
updated_message = queue_client.update_message(
message,
content="Updated message content",
visibility_timeout=180 # Extend visibility by 3 minutes
)
print(f"Message updated. New pop_receipt: {updated_message.pop_receipt}")
# Process and delete
queue_client.delete_message(updated_message)# Peek at messages without dequeuing
peeked_messages = queue_client.peek_messages(max_messages=10)
print(f"Found {len(peeked_messages)} messages in queue:")
for i, message in enumerate(peeked_messages, 1):
print(f"{i}. ID: {message.id}, Content: {message.content}")
print(f" Inserted: {message.inserted_on}, Expires: {message.expires_on}")
print(f" Dequeue count: {message.dequeue_count}")def process_messages_in_batches():
while True:
# Receive batch of messages
messages = list(queue_client.receive_messages(messages_per_page=32))
if not messages:
print("No more messages to process")
break
print(f"Processing batch of {len(messages)} messages")
for message in messages:
try:
# Process message
process_single_message(message.content)
# Delete on success
queue_client.delete_message(message)
except Exception as e:
print(f"Failed to process message {message.id}: {e}")
# Message will become visible again after timeoutfrom azure.core.exceptions import ResourceNotFoundError, RequestTooLargeError
try:
# Send large message
large_content = "x" * 70000 # > 64KB limit
queue_client.send_message(large_content)
except RequestTooLargeError:
print("Message too large, must be ≤ 64KB")
try:
# Try to delete message with invalid pop receipt
queue_client.delete_message("invalid-message-id", "invalid-pop-receipt")
except ResourceNotFoundError:
print("Message not found or pop receipt is invalid")class QueueMessage:
id: str # Message GUID
content: Any # Message content (decoded by policy)
pop_receipt: Optional[str] # Receipt for delete/update operations
inserted_on: Optional[datetime] # UTC insertion timestamp
expires_on: Optional[datetime] # UTC expiration timestamp
dequeue_count: Optional[int] # Number of times message was dequeued
next_visible_on: Optional[datetime] # UTC time when message becomes visible
@classmethod
def _from_generated(cls, generated) -> 'QueueMessage': ...from azure.core.paging import ItemPaged
from typing import Iterator
# ItemPaged[QueueMessage] provides:
# - Iteration over messages
# - Automatic pagination handling
# - by_page() method for page-by-page access# Message size constraints:
MAX_MESSAGE_SIZE_BYTES = 65536 # 64KB maximum
MAX_MESSAGES_PER_GET = 32 # Maximum messages per receive operation
MAX_PEEK_MESSAGES = 32 # Maximum messages per peek operation
MAX_VISIBILITY_TIMEOUT = 43200 # 12 hours maximum visibility timeout
MAX_TIME_TO_LIVE = 604800 # 7 days maximum message lifetimeInstall with Tessl CLI
npx tessl i tessl/pypi-azure-storage-queue