Microsoft Azure Queue Storage Client Library for Python providing comprehensive message queuing capabilities for distributed applications.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core data structures, retry policies, message encoding options, service configuration models, and error handling components for azure-storage-queue operations.
Custom policies for encoding and decoding message content to handle different data types and serialization requirements.
class TextBase64EncodePolicy:
"""Base64 encoding for text message content."""
def encode(self, content: str) -> str:
"""
Encode text content to base64.
Parameters:
- content: Text string to encode
Returns:
Base64 encoded string
"""
class TextBase64DecodePolicy:
"""Base64 decoding for text message content."""
def decode(self, content: str, response) -> str:
"""
Decode base64 content to text.
Parameters:
- content: Base64 encoded string
- response: HTTP response object
Returns:
Decoded text string
"""
class BinaryBase64EncodePolicy:
"""Base64 encoding for binary message content."""
def encode(self, content: bytes) -> str:
"""
Encode binary content to base64.
Parameters:
- content: Binary data to encode
Returns:
Base64 encoded string
"""
class BinaryBase64DecodePolicy:
"""Base64 decoding for binary message content."""
def decode(self, content: str, response) -> bytes:
"""
Decode base64 content to binary.
Parameters:
- content: Base64 encoded string
- response: HTTP response object
Returns:
Decoded binary data
"""Configurable retry mechanisms for handling transient failures and network issues.
class ExponentialRetry:
"""Exponential backoff retry policy with jitter."""
def __init__(
self,
initial_backoff: int = 15,
increment_base: int = 3,
retry_total: int = 3,
retry_to_secondary: bool = False,
random_jitter_range: int = 3,
**kwargs
):
"""
Create exponential retry policy.
Parameters:
- initial_backoff: Initial backoff interval in seconds
- increment_base: Multiplier for exponential backoff
- retry_total: Maximum number of retry attempts
- retry_to_secondary: Whether to retry to secondary endpoint
- random_jitter_range: Random jitter range in seconds
"""
initial_backoff: int
increment_base: int
random_jitter_range: int
retry_total: int
retry_to_secondary: bool
class LinearRetry:
"""Linear backoff retry policy with jitter."""
def __init__(
self,
backoff: int = 15,
retry_total: int = 3,
retry_to_secondary: bool = False,
random_jitter_range: int = 3,
**kwargs
):
"""
Create linear retry policy.
Parameters:
- backoff: Fixed backoff interval in seconds
- retry_total: Maximum number of retry attempts
- retry_to_secondary: Whether to retry to secondary endpoint
- random_jitter_range: Random jitter range in seconds
"""
backoff: int
random_jitter_range: int
retry_total: int
retry_to_secondary: boolModels for configuring storage service properties including analytics, metrics, and CORS policies.
class QueueAnalyticsLogging:
"""Azure Storage Analytics logging configuration."""
version: str # Analytics version (default: "1.0")
delete: bool # Log delete requests
read: bool # Log read requests
write: bool # Log write requests
retention_policy: RetentionPolicy # Log retention settings
@classmethod
def _from_generated(cls, generated) -> 'QueueAnalyticsLogging': ...
class Metrics:
"""Storage Analytics metrics configuration."""
version: str # Analytics version (default: "1.0")
enabled: bool # Whether metrics collection is enabled
include_apis: Optional[bool] # Include API operation statistics
retention_policy: RetentionPolicy # Metrics retention settings
@classmethod
def _from_generated(cls, generated) -> 'Metrics': ...
class RetentionPolicy:
"""Data retention policy for logs and metrics."""
enabled: bool # Whether retention policy is enabled
days: Optional[int] # Number of days to retain data (1-365)
@classmethod
def _from_generated(cls, generated) -> 'RetentionPolicy': ...
class CorsRule:
"""Cross-Origin Resource Sharing (CORS) rule."""
allowed_origins: str # Comma-separated allowed origin domains
allowed_methods: str # Comma-separated allowed HTTP methods
max_age_in_seconds: int # Preflight response cache duration
exposed_headers: str # Comma-separated response headers to expose
allowed_headers: str # Comma-separated allowed request headers
@staticmethod
def _to_generated(rules: Optional[List['CorsRule']]) -> Optional[List]: ...
@classmethod
def _from_generated(cls, generated) -> 'CorsRule': ...Location routing and comprehensive error code definitions for storage operations.
class LocationMode:
"""Location mode for geo-redundant storage requests."""
PRIMARY: str = "primary" # Route requests to primary location
SECONDARY: str = "secondary" # Route requests to secondary location
class StorageErrorCode:
"""Error codes returned by Azure Storage service."""
# Queue-specific error codes
INVALID_MARKER: str # Invalid marker value in list operation
MESSAGE_NOT_FOUND: str # Specified message does not exist
MESSAGE_TOO_LARGE: str # Message exceeds maximum size limit
POP_RECEIPT_MISMATCH: str # Pop receipt does not match message
QUEUE_ALREADY_EXISTS: str # Queue already exists
QUEUE_BEING_DELETED: str # Queue is currently being deleted
QUEUE_DISABLED: str # Queue is disabled
QUEUE_NOT_EMPTY: str # Queue contains messages
QUEUE_NOT_FOUND: str # Specified queue does not exist
# Additional common error codes
INVALID_AUTHENTICATION_INFO: str # Authentication information is invalid
INVALID_RESOURCE_NAME: str # Resource name is invalid
REQUEST_BODY_TOO_LARGE: str # Request body exceeds size limit
INVALID_HEADER_VALUE: str # Header value is invalid
MISSING_REQUIRED_HEADER: str # Required header is missing
UNSUPPORTED_HEADER: str # Header is not supported
INVALID_QUERY_PARAMETER_VALUE: str # Query parameter value is invalid
OUT_OF_RANGE_QUERY_PARAMETER_VALUE: str # Query parameter value is out of range
INVALID_URI: str # Request URI is invalid
INVALID_HTTP_VERB: str # HTTP verb is not supported
EMPTY_METADATA_KEY: str # Metadata key is empty
REQUEST_URL_FAILED_TO_PARSE: str # Request URL could not be parsed
INVALID_XML_DOCUMENT: str # XML document is invalid
INVALID_XML_NODE_VALUE: str # XML node value is invalid
MISSING_REQUIRED_XML_NODE: str # Required XML node is missing
UNSUPPORTED_XML_NODE: str # XML node is not supportedfrom azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy
import json
import pickle
# Create client with custom encoding policies
queue_client = QueueClient.from_connection_string(
conn_str,
"myqueue",
message_encode_policy=BinaryBase64EncodePolicy(),
message_decode_policy=BinaryBase64DecodePolicy()
)
# Send JSON data as binary
data = {"user_id": 12345, "action": "purchase", "amount": 99.99}
json_bytes = json.dumps(data).encode('utf-8')
queue_client.send_message(json_bytes)
# Send pickled Python objects
python_obj = {"complex": [1, 2, 3], "nested": {"key": "value"}}
pickled_data = pickle.dumps(python_obj)
queue_client.send_message(pickled_data)
# Receive and decode
message = queue_client.receive_message()
if message:
# message.content will be bytes due to BinaryBase64DecodePolicy
decoded_data = json.loads(message.content.decode('utf-8'))
print(f"Received: {decoded_data}")from azure.storage.queue import QueueServiceClient, ExponentialRetry, LinearRetry
# Configure exponential backoff
exponential_retry = ExponentialRetry(
initial_backoff=10, # Start with 10 second delay
increment_base=2, # Double the delay each retry
retry_total=5, # Maximum 5 retry attempts
retry_to_secondary=True, # Try secondary endpoint if available
random_jitter_range=5 # Add 0-5 seconds random jitter
)
service_client = QueueServiceClient.from_connection_string(
conn_str,
retry_policy=exponential_retry
)
# Configure linear backoff
linear_retry = LinearRetry(
backoff=30, # Fixed 30 second delay between retries
retry_total=3, # Maximum 3 retry attempts
random_jitter_range=10 # Add 0-10 seconds random jitter
)
queue_client = service_client.get_queue_client(
"myqueue",
retry_policy=linear_retry
)from azure.storage.queue import (
QueueServiceClient, QueueAnalyticsLogging, Metrics,
RetentionPolicy, CorsRule
)
service_client = QueueServiceClient.from_connection_string(conn_str)
# Configure comprehensive logging
logging_config = QueueAnalyticsLogging(
version="1.0",
delete=True, # Log delete operations
read=True, # Log read operations
write=True, # Log write operations
retention_policy=RetentionPolicy(enabled=True, days=30)
)
# Configure detailed metrics
metrics_config = Metrics(
version="1.0",
enabled=True,
include_apis=True, # Include per-API statistics
retention_policy=RetentionPolicy(enabled=True, days=90)
)
# Configure CORS for web applications
cors_rule = CorsRule(
allowed_origins="https://myapp.example.com,https://admin.example.com",
allowed_methods="GET,POST,PUT,DELETE",
max_age_in_seconds=3600,
exposed_headers="x-ms-request-id,x-ms-version",
allowed_headers="x-ms-client-request-id,x-custom-header"
)
# Apply service configuration
service_client.set_service_properties(
analytics_logging=logging_config,
hour_metrics=metrics_config,
minute_metrics=metrics_config,
cors=[cors_rule]
)Client-side encryption support for securing message content before transmission to Azure Storage.
class CustomerProvidedEncryptionKey:
"""Customer-provided encryption key for client-side encryption."""
def __init__(
self,
key_value: Union[str, bytes],
key_hash: Optional[str] = None
):
"""
Create customer-provided encryption key.
Parameters:
- key_value: The encryption key value (base64 string or bytes)
- key_hash: SHA256 hash of the key (auto-calculated if not provided)
"""
key_value: str
key_hash: str
class EncryptionScope:
"""Encryption scope for server-side encryption."""
def __init__(self, encryption_scope: str):
"""
Create encryption scope.
Parameters:
- encryption_scope: Name of the encryption scope
"""
encryption_scope: strPolicies for handling raw message content without encoding transformations.
class NoEncodePolicy:
"""Policy that performs no encoding on message content."""
def encode(self, content: Any) -> Any:
"""
Return content without encoding.
Parameters:
- content: Message content to process
Returns:
Unmodified content
"""
class NoDecodePolicy:
"""Policy that performs no decoding on message content."""
def decode(self, content: Any, response) -> Any:
"""
Return content without decoding.
Parameters:
- content: Message content to process
- response: HTTP response object
Returns:
Unmodified content
"""from azure.storage.queue import QueueClient, CustomerProvidedEncryptionKey
import base64
import os
# Generate encryption key
key_bytes = os.urandom(32) # 256-bit key
key_base64 = base64.b64encode(key_bytes).decode('utf-8')
# Create encryption key object
encryption_key = CustomerProvidedEncryptionKey(key_value=key_base64)
# Create client with encryption
queue_client = QueueClient.from_connection_string(
conn_str,
"encrypted-queue",
customer_provided_encryption_key=encryption_key
)
# Send encrypted message
encrypted_message = queue_client.send_message("Sensitive data content")
print(f"Encrypted message sent: {encrypted_message.id}")
# Receive and decrypt message (requires same encryption key)
message = queue_client.receive_message()
if message:
print(f"Decrypted content: {message.content}")
queue_client.delete_message(message)from azure.storage.queue import QueueClient, NoEncodePolicy, NoDecodePolicy
import json
# Create client with no encoding policies for raw data handling
queue_client = QueueClient.from_connection_string(
conn_str,
"raw-data-queue",
message_encode_policy=NoEncodePolicy(),
message_decode_policy=NoDecodePolicy()
)
# Send raw JSON data
raw_data = {"timestamp": "2024-01-01T12:00:00Z", "value": 42}
json_string = json.dumps(raw_data)
queue_client.send_message(json_string)
# Receive raw data
message = queue_client.receive_message()
if message:
# message.content will be the raw JSON string
data = json.loads(message.content)
print(f"Received data: {data}")
queue_client.delete_message(message)from azure.storage.queue import QueueClient, StorageErrorCode
from azure.core.exceptions import (
ResourceExistsError, ResourceNotFoundError,
ClientAuthenticationError, HttpResponseError
)
queue_client = QueueClient.from_connection_string(conn_str, "myqueue")
try:
queue_client.create_queue()
except ResourceExistsError as e:
if e.error_code == StorageErrorCode.QUEUE_ALREADY_EXISTS:
print("Queue already exists, continuing...")
else:
raise
try:
message = queue_client.receive_message()
if message:
# Process message
queue_client.delete_message(message)
except HttpResponseError as e:
if e.error_code == StorageErrorCode.MESSAGE_NOT_FOUND:
print("Message was already processed by another consumer")
elif e.error_code == StorageErrorCode.POP_RECEIPT_MISMATCH:
print("Message pop receipt is invalid, likely timed out")
else:
print(f"Unexpected error: {e.error_code}")
raise
try:
large_message = "x" * 100000 # > 64KB
queue_client.send_message(large_message)
except HttpResponseError as e:
if e.error_code == StorageErrorCode.REQUEST_BODY_TOO_LARGE:
print("Message too large, splitting into smaller parts...")
# Handle large message splitting logic
else:
raisefrom azure.storage.queue import QueueServiceClient, LocationMode
# Create client with secondary read access
service_client = QueueServiceClient.from_connection_string(
conn_str,
location_mode=LocationMode.SECONDARY # Read from secondary endpoint
)
# For read operations when primary is unavailable
try:
properties = service_client.get_service_properties()
except Exception:
# Fallback to secondary read
secondary_client = QueueServiceClient.from_connection_string(
conn_str,
location_mode=LocationMode.SECONDARY
)
properties = secondary_client.get_service_properties()from typing import Optional, Dict, List, Any, Union, Callable
from datetime import datetime
from azure.core.paging import ItemPaged
# Message encoding function signatures
EncodeFunctionType = Callable[[Any], str]
DecodeFunctionType = Callable[[str, Any], Any]
# Configuration value constraints
MIN_RETENTION_DAYS = 1
MAX_RETENTION_DAYS = 365
MIN_CORS_MAX_AGE = 1
MAX_CORS_MAX_AGE = 2147483647
MIN_RETRY_BACKOFF = 0
MAX_RETRY_BACKOFF = 3600
MIN_RETRY_TOTAL = 0
MAX_RETRY_TOTAL = 10
# Analytics and metrics type unions
AnalyticsConfigType = Union[QueueAnalyticsLogging, Dict[str, Any], None]
MetricsConfigType = Union[Metrics, Dict[str, Any], None]
CorsConfigType = Union[List[CorsRule], List[Dict[str, Any]], None]
# Encryption types
EncryptionKeyType = Union[CustomerProvidedEncryptionKey, str, bytes, None]
EncryptionScopeType = Union[EncryptionScope, str, None]
# Encoding policy types
EncodePolicy = Union[TextBase64EncodePolicy, BinaryBase64EncodePolicy, NoEncodePolicy]
DecodePolicy = Union[TextBase64DecodePolicy, BinaryBase64DecodePolicy, NoDecodePolicy]Install with Tessl CLI
npx tessl i tessl/pypi-azure-storage-queue