CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-storage-queue

Microsoft Azure Queue Storage Client Library for Python providing comprehensive message queuing capabilities for distributed applications.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

models-config.mddocs/

Data Models and Configuration

Core data structures, retry policies, message encoding options, service configuration models, and error handling components for azure-storage-queue operations.

Capabilities

Message Encoding Policies

Custom policies for encoding and decoding message content to handle different data types and serialization requirements.

class TextBase64EncodePolicy:
    """Base64 encoding for text message content."""
    
    def encode(self, content: str) -> str:
        """
        Encode text content to base64.
        
        Parameters:
        - content: Text string to encode
        
        Returns:  
        Base64 encoded string
        """

class TextBase64DecodePolicy:
    """Base64 decoding for text message content."""
    
    def decode(self, content: str, response) -> str:
        """
        Decode base64 content to text.
        
        Parameters:
        - content: Base64 encoded string
        - response: HTTP response object
        
        Returns:
        Decoded text string
        """

class BinaryBase64EncodePolicy:
    """Base64 encoding for binary message content."""
    
    def encode(self, content: bytes) -> str:
        """
        Encode binary content to base64.
        
        Parameters:
        - content: Binary data to encode
        
        Returns:
        Base64 encoded string
        """

class BinaryBase64DecodePolicy:
    """Base64 decoding for binary message content."""
    
    def decode(self, content: str, response) -> bytes:
        """
        Decode base64 content to binary.
        
        Parameters:
        - content: Base64 encoded string
        - response: HTTP response object
        
        Returns:
        Decoded binary data
        """

Retry Policies

Configurable retry mechanisms for handling transient failures and network issues.

class ExponentialRetry:
    """Exponential backoff retry policy with jitter."""
    
    def __init__(
        self,
        initial_backoff: int = 15,
        increment_base: int = 3,
        retry_total: int = 3,
        retry_to_secondary: bool = False,
        random_jitter_range: int = 3,
        **kwargs
    ):
        """
        Create exponential retry policy.
        
        Parameters:
        - initial_backoff: Initial backoff interval in seconds
        - increment_base: Multiplier for exponential backoff
        - retry_total: Maximum number of retry attempts
        - retry_to_secondary: Whether to retry to secondary endpoint
        - random_jitter_range: Random jitter range in seconds
        """
    
    initial_backoff: int
    increment_base: int
    random_jitter_range: int
    retry_total: int
    retry_to_secondary: bool

class LinearRetry:
    """Linear backoff retry policy with jitter."""
    
    def __init__(
        self,
        backoff: int = 15,
        retry_total: int = 3,
        retry_to_secondary: bool = False,
        random_jitter_range: int = 3,
        **kwargs
    ):
        """
        Create linear retry policy.
        
        Parameters:
        - backoff: Fixed backoff interval in seconds
        - retry_total: Maximum number of retry attempts  
        - retry_to_secondary: Whether to retry to secondary endpoint
        - random_jitter_range: Random jitter range in seconds
        """
    
    backoff: int
    random_jitter_range: int
    retry_total: int
    retry_to_secondary: bool

Service Configuration Models

Models for configuring storage service properties including analytics, metrics, and CORS policies.

class QueueAnalyticsLogging:
    """Azure Storage Analytics logging configuration."""
    
    version: str                          # Analytics version (default: "1.0")
    delete: bool                          # Log delete requests
    read: bool                           # Log read requests
    write: bool                          # Log write requests
    retention_policy: RetentionPolicy    # Log retention settings
    
    @classmethod
    def _from_generated(cls, generated) -> 'QueueAnalyticsLogging': ...

class Metrics:  
    """Storage Analytics metrics configuration."""
    
    version: str                          # Analytics version (default: "1.0")
    enabled: bool                         # Whether metrics collection is enabled
    include_apis: Optional[bool]          # Include API operation statistics
    retention_policy: RetentionPolicy     # Metrics retention settings
    
    @classmethod
    def _from_generated(cls, generated) -> 'Metrics': ...

class RetentionPolicy:
    """Data retention policy for logs and metrics."""
    
    enabled: bool                         # Whether retention policy is enabled
    days: Optional[int]                   # Number of days to retain data (1-365)
    
    @classmethod
    def _from_generated(cls, generated) -> 'RetentionPolicy': ...

class CorsRule:
    """Cross-Origin Resource Sharing (CORS) rule."""
    
    allowed_origins: str                  # Comma-separated allowed origin domains
    allowed_methods: str                  # Comma-separated allowed HTTP methods  
    max_age_in_seconds: int              # Preflight response cache duration
    exposed_headers: str                  # Comma-separated response headers to expose
    allowed_headers: str                  # Comma-separated allowed request headers
    
    @staticmethod
    def _to_generated(rules: Optional[List['CorsRule']]) -> Optional[List]: ...
    
    @classmethod
    def _from_generated(cls, generated) -> 'CorsRule': ...

Location and Error Models

Location routing and comprehensive error code definitions for storage operations.

class LocationMode:
    """Location mode for geo-redundant storage requests."""
    
    PRIMARY: str = "primary"              # Route requests to primary location
    SECONDARY: str = "secondary"          # Route requests to secondary location

class StorageErrorCode:
    """Error codes returned by Azure Storage service."""
    
    # Queue-specific error codes
    INVALID_MARKER: str                   # Invalid marker value in list operation
    MESSAGE_NOT_FOUND: str               # Specified message does not exist
    MESSAGE_TOO_LARGE: str               # Message exceeds maximum size limit
    POP_RECEIPT_MISMATCH: str            # Pop receipt does not match message
    QUEUE_ALREADY_EXISTS: str            # Queue already exists
    QUEUE_BEING_DELETED: str             # Queue is currently being deleted
    QUEUE_DISABLED: str                  # Queue is disabled
    QUEUE_NOT_EMPTY: str                 # Queue contains messages
    QUEUE_NOT_FOUND: str                 # Specified queue does not exist
    
    # Additional common error codes
    INVALID_AUTHENTICATION_INFO: str      # Authentication information is invalid
    INVALID_RESOURCE_NAME: str           # Resource name is invalid
    REQUEST_BODY_TOO_LARGE: str          # Request body exceeds size limit
    INVALID_HEADER_VALUE: str            # Header value is invalid
    MISSING_REQUIRED_HEADER: str         # Required header is missing
    UNSUPPORTED_HEADER: str              # Header is not supported
    INVALID_QUERY_PARAMETER_VALUE: str   # Query parameter value is invalid
    OUT_OF_RANGE_QUERY_PARAMETER_VALUE: str  # Query parameter value is out of range
    INVALID_URI: str                     # Request URI is invalid
    INVALID_HTTP_VERB: str               # HTTP verb is not supported
    EMPTY_METADATA_KEY: str              # Metadata key is empty
    REQUEST_URL_FAILED_TO_PARSE: str     # Request URL could not be parsed
    INVALID_XML_DOCUMENT: str            # XML document is invalid
    INVALID_XML_NODE_VALUE: str          # XML node value is invalid
    MISSING_REQUIRED_XML_NODE: str       # Required XML node is missing
    UNSUPPORTED_XML_NODE: str            # XML node is not supported

Usage Examples

Custom Message Encoding

from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy
import json
import pickle

# Create client with custom encoding policies
queue_client = QueueClient.from_connection_string(
    conn_str,
    "myqueue",
    message_encode_policy=BinaryBase64EncodePolicy(),
    message_decode_policy=BinaryBase64DecodePolicy()
)

# Send JSON data as binary
data = {"user_id": 12345, "action": "purchase", "amount": 99.99}
json_bytes = json.dumps(data).encode('utf-8')
queue_client.send_message(json_bytes)

# Send pickled Python objects
python_obj = {"complex": [1, 2, 3], "nested": {"key": "value"}}
pickled_data = pickle.dumps(python_obj)
queue_client.send_message(pickled_data)

# Receive and decode
message = queue_client.receive_message()
if message:
    # message.content will be bytes due to BinaryBase64DecodePolicy
    decoded_data = json.loads(message.content.decode('utf-8'))
    print(f"Received: {decoded_data}")

Retry Policy Configuration

from azure.storage.queue import QueueServiceClient, ExponentialRetry, LinearRetry

# Configure exponential backoff
exponential_retry = ExponentialRetry(
    initial_backoff=10,      # Start with 10 second delay
    increment_base=2,        # Double the delay each retry  
    retry_total=5,           # Maximum 5 retry attempts
    retry_to_secondary=True, # Try secondary endpoint if available
    random_jitter_range=5    # Add 0-5 seconds random jitter
)

service_client = QueueServiceClient.from_connection_string(
    conn_str,
    retry_policy=exponential_retry
)

# Configure linear backoff
linear_retry = LinearRetry(
    backoff=30,              # Fixed 30 second delay between retries
    retry_total=3,           # Maximum 3 retry attempts
    random_jitter_range=10   # Add 0-10 seconds random jitter
)

queue_client = service_client.get_queue_client(
    "myqueue",
    retry_policy=linear_retry
)

Service Properties Configuration

from azure.storage.queue import (
    QueueServiceClient, QueueAnalyticsLogging, Metrics, 
    RetentionPolicy, CorsRule
)

service_client = QueueServiceClient.from_connection_string(conn_str)

# Configure comprehensive logging
logging_config = QueueAnalyticsLogging(
    version="1.0",
    delete=True,     # Log delete operations
    read=True,       # Log read operations  
    write=True,      # Log write operations
    retention_policy=RetentionPolicy(enabled=True, days=30)
)

# Configure detailed metrics
metrics_config = Metrics(
    version="1.0",
    enabled=True,
    include_apis=True,   # Include per-API statistics
    retention_policy=RetentionPolicy(enabled=True, days=90)
)

# Configure CORS for web applications
cors_rule = CorsRule(
    allowed_origins="https://myapp.example.com,https://admin.example.com",
    allowed_methods="GET,POST,PUT,DELETE",
    max_age_in_seconds=3600,
    exposed_headers="x-ms-request-id,x-ms-version",
    allowed_headers="x-ms-client-request-id,x-custom-header"
)

# Apply service configuration
service_client.set_service_properties(
    analytics_logging=logging_config,
    hour_metrics=metrics_config,
    minute_metrics=metrics_config,
    cors=[cors_rule]
)

Client-Side Encryption

Client-side encryption support for securing message content before transmission to Azure Storage.

class CustomerProvidedEncryptionKey:
    """Customer-provided encryption key for client-side encryption."""
    
    def __init__(
        self,
        key_value: Union[str, bytes],
        key_hash: Optional[str] = None
    ):
        """
        Create customer-provided encryption key.
        
        Parameters:
        - key_value: The encryption key value (base64 string or bytes)
        - key_hash: SHA256 hash of the key (auto-calculated if not provided)
        """
    
    key_value: str
    key_hash: str

class EncryptionScope:
    """Encryption scope for server-side encryption."""
    
    def __init__(self, encryption_scope: str):
        """
        Create encryption scope.
        
        Parameters:
        - encryption_scope: Name of the encryption scope
        """
    
    encryption_scope: str

No-Encoding Policies

Policies for handling raw message content without encoding transformations.

class NoEncodePolicy:
    """Policy that performs no encoding on message content."""
    
    def encode(self, content: Any) -> Any:
        """
        Return content without encoding.
        
        Parameters:
        - content: Message content to process
        
        Returns:
        Unmodified content
        """

class NoDecodePolicy:
    """Policy that performs no decoding on message content."""
    
    def decode(self, content: Any, response) -> Any:
        """
        Return content without decoding.
        
        Parameters:
        - content: Message content to process
        - response: HTTP response object
        
        Returns:
        Unmodified content
        """

Client-Side Encryption Usage

from azure.storage.queue import QueueClient, CustomerProvidedEncryptionKey
import base64
import os

# Generate encryption key
key_bytes = os.urandom(32)  # 256-bit key
key_base64 = base64.b64encode(key_bytes).decode('utf-8')

# Create encryption key object
encryption_key = CustomerProvidedEncryptionKey(key_value=key_base64)

# Create client with encryption
queue_client = QueueClient.from_connection_string(
    conn_str,
    "encrypted-queue",
    customer_provided_encryption_key=encryption_key
)

# Send encrypted message
encrypted_message = queue_client.send_message("Sensitive data content")
print(f"Encrypted message sent: {encrypted_message.id}")

# Receive and decrypt message (requires same encryption key)
message = queue_client.receive_message()
if message:
    print(f"Decrypted content: {message.content}")
    queue_client.delete_message(message)

No-Encoding Policy Usage

from azure.storage.queue import QueueClient, NoEncodePolicy, NoDecodePolicy
import json

# Create client with no encoding policies for raw data handling
queue_client = QueueClient.from_connection_string(
    conn_str,
    "raw-data-queue",
    message_encode_policy=NoEncodePolicy(),
    message_decode_policy=NoDecodePolicy()
)

# Send raw JSON data
raw_data = {"timestamp": "2024-01-01T12:00:00Z", "value": 42}
json_string = json.dumps(raw_data)
queue_client.send_message(json_string)

# Receive raw data
message = queue_client.receive_message()
if message:
    # message.content will be the raw JSON string
    data = json.loads(message.content)
    print(f"Received data: {data}")
    queue_client.delete_message(message)

Error Handling Patterns

from azure.storage.queue import QueueClient, StorageErrorCode
from azure.core.exceptions import (
    ResourceExistsError, ResourceNotFoundError, 
    ClientAuthenticationError, HttpResponseError
)

queue_client = QueueClient.from_connection_string(conn_str, "myqueue")

try:
    queue_client.create_queue()
    
except ResourceExistsError as e:
    if e.error_code == StorageErrorCode.QUEUE_ALREADY_EXISTS:
        print("Queue already exists, continuing...")
    else:
        raise

try:
    message = queue_client.receive_message()
    if message:
        # Process message
        queue_client.delete_message(message)
        
except HttpResponseError as e:
    if e.error_code == StorageErrorCode.MESSAGE_NOT_FOUND:
        print("Message was already processed by another consumer")
    elif e.error_code == StorageErrorCode.POP_RECEIPT_MISMATCH:
        print("Message pop receipt is invalid, likely timed out")
    else:
        print(f"Unexpected error: {e.error_code}")
        raise

try:
    large_message = "x" * 100000  # > 64KB
    queue_client.send_message(large_message)
    
except HttpResponseError as e:
    if e.error_code == StorageErrorCode.REQUEST_BODY_TOO_LARGE:
        print("Message too large, splitting into smaller parts...")
        # Handle large message splitting logic
    else:
        raise

Location Mode Usage

from azure.storage.queue import QueueServiceClient, LocationMode

# Create client with secondary read access
service_client = QueueServiceClient.from_connection_string(
    conn_str,
    location_mode=LocationMode.SECONDARY  # Read from secondary endpoint
)

# For read operations when primary is unavailable
try:
    properties = service_client.get_service_properties()
except Exception:
    # Fallback to secondary read
    secondary_client = QueueServiceClient.from_connection_string(
        conn_str,
        location_mode=LocationMode.SECONDARY
    )
    properties = secondary_client.get_service_properties()

Types

Complete Type Definitions

from typing import Optional, Dict, List, Any, Union, Callable
from datetime import datetime
from azure.core.paging import ItemPaged

# Message encoding function signatures
EncodeFunctionType = Callable[[Any], str]
DecodeFunctionType = Callable[[str, Any], Any]

# Configuration value constraints
MIN_RETENTION_DAYS = 1
MAX_RETENTION_DAYS = 365
MIN_CORS_MAX_AGE = 1
MAX_CORS_MAX_AGE = 2147483647
MIN_RETRY_BACKOFF = 0
MAX_RETRY_BACKOFF = 3600
MIN_RETRY_TOTAL = 0
MAX_RETRY_TOTAL = 10

# Analytics and metrics type unions
AnalyticsConfigType = Union[QueueAnalyticsLogging, Dict[str, Any], None]
MetricsConfigType = Union[Metrics, Dict[str, Any], None]
CorsConfigType = Union[List[CorsRule], List[Dict[str, Any]], None]

# Encryption types
EncryptionKeyType = Union[CustomerProvidedEncryptionKey, str, bytes, None]
EncryptionScopeType = Union[EncryptionScope, str, None]

# Encoding policy types
EncodePolicy = Union[TextBase64EncodePolicy, BinaryBase64EncodePolicy, NoEncodePolicy]
DecodePolicy = Union[TextBase64DecodePolicy, BinaryBase64DecodePolicy, NoDecodePolicy]

Install with Tessl CLI

npx tessl i tessl/pypi-azure-storage-queue

docs

async-operations.md

authentication.md

index.md

message-operations.md

models-config.md

queue-operations.md

queue-service.md

tile.json