CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-servicebus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.

Overall
score

92%

Overview
Eval results
Files

message-types.mddocs/

Message Classes and Types

Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.

Capabilities

ServiceBusMessage

Message class for creating messages to send to Service Bus entities.

class ServiceBusMessage:
    def __init__(
        self,
        body: Optional[Union[str, bytes]],
        *,
        application_properties: Optional[Dict[Union[str, bytes], PrimitiveTypes]] = None,
        session_id: Optional[str] = None,
        message_id: Optional[str] = None,
        scheduled_enqueue_time_utc: Optional[datetime] = None,
        time_to_live: Optional[timedelta] = None,
        content_type: Optional[str] = None,
        correlation_id: Optional[str] = None,
        subject: Optional[str] = None,
        partition_key: Optional[str] = None,
        to: Optional[str] = None,
        reply_to: Optional[str] = None,
        reply_to_session_id: Optional[str] = None,
        **kwargs
    ):
        """
        Create a Service Bus message.

        Parameters:
        - body: Message content (string or bytes)
        - application_properties: Custom application properties dictionary
        - session_id: Session identifier for session-enabled entities
        - message_id: Unique message identifier
        - scheduled_enqueue_time_utc: UTC time to schedule message delivery
        - time_to_live: Message expiration duration
        - content_type: MIME content type
        - correlation_id: Correlation identifier for message correlation
        - subject: Message subject or label
        - partition_key: Partitioning key for partitioned entities
        - to: Destination address
        - reply_to: Reply-to address for response messages
        - reply_to_session_id: Session ID for reply messages
        """

ServiceBusMessage Properties

Access and modify message properties and metadata.

@property
def body(self) -> Any:
    """
    The message body content.
    
    Returns:
    Message body as provided during creation
    """

@property
def body_type(self) -> AmqpMessageBodyType:
    """
    The AMQP message body type.
    
    Returns:
    AmqpMessageBodyType enum value
    """

@property
def application_properties(self) -> Optional[Dict[Union[str, bytes], PrimitiveTypes]]:
    """
    Custom application-specific properties.
    
    Returns:
    Dictionary of application properties or None
    """

@application_properties.setter
def application_properties(self, value: Optional[Dict[Union[str, bytes], PrimitiveTypes]]) -> None: ...

@property
def session_id(self) -> Optional[str]:
    """
    Session identifier for session-enabled entities.
    
    Returns:
    Session ID string or None
    """

@session_id.setter
def session_id(self, value: Optional[str]) -> None: ...

@property
def message_id(self) -> Optional[str]:
    """
    Unique message identifier.
    
    Returns:
    Message ID string or None
    """

@message_id.setter
def message_id(self, value: Optional[str]) -> None: ...

@property
def scheduled_enqueue_time_utc(self) -> Optional[datetime]:
    """
    UTC time when message should be enqueued.
    
    Returns:
    Scheduled enqueue time or None for immediate delivery
    """

@scheduled_enqueue_time_utc.setter
def scheduled_enqueue_time_utc(self, value: Optional[datetime]) -> None: ...

@property
def time_to_live(self) -> Optional[timedelta]:
    """
    Message time-to-live duration.
    
    Returns:
    Time-to-live duration or None for no expiration
    """

@time_to_live.setter
def time_to_live(self, value: Optional[timedelta]) -> None: ...

@property
def content_type(self) -> Optional[str]:
    """
    MIME content type of the message body.
    
    Returns:
    Content type string or None
    """

@content_type.setter
def content_type(self, value: Optional[str]) -> None: ...

@property
def correlation_id(self) -> Optional[str]:
    """
    Correlation identifier for message correlation patterns.
    
    Returns:
    Correlation ID string or None
    """

@correlation_id.setter
def correlation_id(self, value: Optional[str]) -> None: ...

@property
def subject(self) -> Optional[str]:
    """
    Message subject or label.
    
    Returns:
    Subject string or None
    """

@subject.setter
def subject(self, value: Optional[str]) -> None: ...

@property
def partition_key(self) -> Optional[str]:
    """
    Partitioning key for partitioned entities.
    
    Returns:
    Partition key string or None
    """

@partition_key.setter
def partition_key(self, value: Optional[str]) -> None: ...

@property
def to(self) -> Optional[str]:
    """
    Destination address.
    
    Returns:
    Destination address string or None
    """

@to.setter
def to(self, value: Optional[str]) -> None: ...

@property
def reply_to(self) -> Optional[str]:
    """
    Reply-to address for response messages.
    
    Returns:
    Reply-to address string or None
    """

@reply_to.setter
def reply_to(self, value: Optional[str]) -> None: ...

@property
def reply_to_session_id(self) -> Optional[str]:
    """
    Session ID for reply messages.
    
    Returns:
    Reply-to session ID string or None
    """

@reply_to_session_id.setter
def reply_to_session_id(self, value: Optional[str]) -> None: ...

@property
def raw_amqp_message(self) -> AmqpAnnotatedMessage:
    """
    Underlying AMQP message for advanced scenarios.
    
    Returns:
    AmqpAnnotatedMessage object
    """

Usage Example

from azure.servicebus import ServiceBusMessage
from datetime import datetime, timedelta

# Create a basic message
message = ServiceBusMessage("Hello, Service Bus!")

# Create a message with properties
message = ServiceBusMessage(
    body="Order processing request",
    application_properties={
        "order_id": "12345",
        "priority": "high",
        "customer_id": "customer_abc"
    },
    session_id="order-session-12345",
    message_id="msg-001",
    content_type="application/json",
    correlation_id="corr-123",
    subject="OrderProcessing",
    time_to_live=timedelta(hours=24)
)

# Schedule a message for future delivery
future_message = ServiceBusMessage(
    "This will be delivered in 1 hour",
    scheduled_enqueue_time_utc=datetime.utcnow() + timedelta(hours=1)
)

# Modify properties after creation
message.application_properties["processed_at"] = datetime.utcnow().isoformat()
message.subject = "UpdatedSubject"

ServiceBusReceivedMessage

Message class for messages received from Service Bus entities (extends ServiceBusMessage).

class ServiceBusReceivedMessage(ServiceBusMessage):
    # Inherits all properties from ServiceBusMessage
    
    @property
    def dead_letter_error_description(self) -> Optional[str]:
        """
        Error description if message was dead lettered.
        
        Returns:
        Error description string or None
        """
    
    @property
    def dead_letter_reason(self) -> Optional[str]:
        """
        Reason if message was dead lettered.
        
        Returns:
        Dead letter reason string or None
        """
    
    @property
    def dead_letter_source(self) -> Optional[str]:
        """
        Source entity name if message was dead lettered.
        
        Returns:
        Source entity name string or None
        """
    
    @property
    def state(self) -> ServiceBusMessageState:
        """
        Current state of the message.
        
        Returns:
        ServiceBusMessageState enum value (ACTIVE, DEFERRED, SCHEDULED)
        """
    
    @property
    def delivery_count(self) -> Optional[int]:
        """
        Number of times this message has been delivered.
        
        Returns:
        Delivery count or None
        """
    
    @property
    def enqueued_sequence_number(self) -> Optional[int]:
        """
        Sequence number assigned when message was enqueued.
        
        Returns:
        Enqueued sequence number or None
        """
    
    @property
    def enqueued_time_utc(self) -> Optional[datetime]:
        """
        UTC time when message was enqueued.
        
        Returns:
        Enqueue time or None
        """
    
    @property
    def expires_at_utc(self) -> Optional[datetime]:
        """
        UTC time when message expires.
        
        Returns:
        Expiration time or None
        """
    
    @property
    def sequence_number(self) -> Optional[int]:
        """
        Unique sequence number assigned by Service Bus.
        
        Returns:
        Sequence number or None
        """
    
    @property
    def lock_token(self) -> Optional[Union[uuid.UUID, str]]:
        """
        Lock token for PEEK_LOCK receive mode.
        
        Returns:
        Lock token UUID/string or None
        """
    
    @property
    def locked_until_utc(self) -> Optional[datetime]:
        """
        UTC time when message lock expires.
        
        Returns:
        Lock expiration time or None
        """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusMessageState

client = ServiceBusClient.from_connection_string("your_connection_string")

with client.get_queue_receiver("my-queue") as receiver:
    messages = receiver.receive_messages(max_message_count=5)
    
    for message in messages:
        print(f"Message body: {message.body}")
        print(f"Sequence number: {message.sequence_number}")
        print(f"Delivery count: {message.delivery_count}")
        print(f"Enqueued at: {message.enqueued_time_utc}")
        print(f"Expires at: {message.expires_at_utc}")
        print(f"Lock expires at: {message.locked_until_utc}")
        print(f"State: {message.state}")
        
        # Check application properties
        if message.application_properties:
            for key, value in message.application_properties.items():
                print(f"  {key}: {value}")
        
        # Check if message has been redelivered
        if message.delivery_count and message.delivery_count > 1:
            print(f"Message has been redelivered {message.delivery_count} times")
        
        # Check message state
        if message.state == ServiceBusMessageState.DEFERRED:
            print("Message is deferred")
        elif message.state == ServiceBusMessageState.SCHEDULED:
            print("Message is scheduled")
        
        receiver.complete_message(message)

ServiceBusMessageBatch

Batch container for efficient sending of multiple messages.

class ServiceBusMessageBatch:
    @property
    def max_size_in_bytes(self) -> int:
        """
        Maximum size of the batch in bytes.
        
        Returns:
        Maximum batch size
        """
    
    @property
    def size_in_bytes(self) -> int:
        """
        Current size of the batch in bytes.
        
        Returns:
        Current batch size
        """
    
    def add_message(self, message: Union[ServiceBusMessage, AmqpAnnotatedMessage]) -> None:
        """
        Add a message to the batch.

        Parameters:
        - message: ServiceBusMessage or AmqpAnnotatedMessage to add

        Raises:
        - ValueError: If message would exceed batch size limit
        """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusMessage

client = ServiceBusClient.from_connection_string("your_connection_string")

with client.get_queue_sender("my-queue") as sender:
    # Create a message batch
    batch = sender.create_message_batch()
    
    # Add messages to the batch
    for i in range(100):
        message = ServiceBusMessage(f"Batch message {i}")
        try:
            batch.add_message(message)
        except ValueError:
            # Batch is full, send it and create a new one
            print(f"Sending batch with {batch.size_in_bytes} bytes")
            sender.send_messages(batch)
            
            # Create new batch and add current message
            batch = sender.create_message_batch()
            batch.add_message(message)
    
    # Send remaining messages in the batch
    if batch.size_in_bytes > 0:
        print(f"Sending final batch with {batch.size_in_bytes} bytes")
        sender.send_messages(batch)

AMQP Message Types

Low-level AMQP message types for advanced scenarios.

class AmqpAnnotatedMessage:
    """
    Low-level AMQP message representation.
    
    Provides direct access to AMQP message structure for advanced scenarios
    where fine-grained control over message properties is needed.
    """
    @property
    def body(self) -> Any: ...
    @property
    def body_type(self) -> AmqpMessageBodyType: ...
    @property
    def properties(self) -> Optional[AmqpMessageProperties]: ...
    @property
    def application_properties(self) -> Optional[Dict]: ...
    @property
    def annotations(self) -> Optional[Dict]: ...
    @property
    def delivery_annotations(self) -> Optional[Dict]: ...
    @property
    def header(self) -> Optional[AmqpMessageHeader]: ...
    @property
    def footer(self) -> Optional[Dict]: ...

class AmqpMessageBodyType(Enum):
    """AMQP message body types."""
    DATA = "data"
    SEQUENCE = "sequence"  
    VALUE = "value"

class AmqpMessageProperties:
    """AMQP message properties section."""
    @property
    def message_id(self) -> Optional[Union[str, bytes]]: ...
    @property
    def user_id(self) -> Optional[bytes]: ...
    @property
    def to(self) -> Optional[Union[str, bytes]]: ...
    @property
    def subject(self) -> Optional[str]: ...
    @property
    def reply_to(self) -> Optional[Union[str, bytes]]: ...
    @property
    def correlation_id(self) -> Optional[Union[str, bytes]]: ...
    @property
    def content_type(self) -> Optional[str]: ...
    @property
    def content_encoding(self) -> Optional[str]: ...
    @property
    def absolute_expiry_time(self) -> Optional[int]: ...
    @property
    def creation_time(self) -> Optional[int]: ...
    @property
    def group_id(self) -> Optional[str]: ...
    @property
    def group_sequence(self) -> Optional[int]: ...
    @property
    def reply_to_group_id(self) -> Optional[str]: ...

class AmqpMessageHeader:
    """AMQP message header section."""
    @property
    def durable(self) -> Optional[bool]: ...
    @property
    def priority(self) -> Optional[int]: ...
    @property
    def time_to_live(self) -> Optional[int]: ...
    @property
    def first_acquirer(self) -> Optional[bool]: ...
    @property
    def delivery_count(self) -> Optional[int]: ...

Message State and Type Enums

Enumerations for message states and types.

class ServiceBusMessageState(int, Enum):
    """
    Message states in Service Bus.
    """
    ACTIVE = 0      # Message is active in queue/subscription
    DEFERRED = 1    # Message has been deferred for later processing
    SCHEDULED = 2   # Message is scheduled for future delivery

Auto Lock Renewal

Automatic lock renewal for messages during long processing.

class AutoLockRenewer:
    def register(
        self,
        renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],
        timeout: float = 300
    ) -> None:
        """
        Register a message for automatic lock renewal.

        Parameters:
        - renewable: ServiceBusReceivedMessage to auto-renew
        - timeout: Maximum renewal duration in seconds

        Raises:
        - ValueError: If message is already registered or invalid
        """

Usage Example

from azure.servicebus import ServiceBusClient, AutoLockRenewer
import time

def process_long_running_message(message):
    # Simulate long processing time
    time.sleep(60)  # 1 minute processing
    return f"Processed: {message.body}"

client = ServiceBusClient.from_connection_string("your_connection_string")
auto_renewer = AutoLockRenewer(max_lock_renewal_duration=300)  # 5 minutes

try:
    with client.get_queue_receiver("my-queue") as receiver:
        messages = receiver.receive_messages(max_message_count=5)
        
        for message in messages:
            # Register message for auto-renewal
            auto_renewer.register(message, timeout=300)
            
            try:
                result = process_long_running_message(message)
                print(result)
                receiver.complete_message(message)
            except Exception as e:
                print(f"Error processing message: {e}")
                receiver.abandon_message(message)
                
finally:
    auto_renewer.close()

Type Definitions

Common type aliases used throughout the SDK.

# Union types for message parameters
MessageTypes = Union[
    ServiceBusMessage, 
    AmqpAnnotatedMessage, 
    List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]
]

# Primitive types for application properties
PrimitiveTypes = Union[int, float, str, bool, bytes, None]

# Session filter type
NextAvailableSessionType = ServiceBusSessionFilter

# Lock renewal callback type
LockRenewFailureCallback = Callable[
    [Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]], 
    None
]

# Azure credential types (from azure-core)
TokenCredential = Any        # azure.core.credentials.TokenCredential
AzureSasCredential = Any     # azure.core.credentials.AzureSasCredential
AzureNamedKeyCredential = Any # azure.core.credentials.AzureNamedKeyCredential

Install with Tessl CLI

npx tessl i tessl/pypi-azure-servicebus

docs

administrative-operations.md

client-management.md

constants-enums.md

exception-handling.md

index.md

message-operations.md

message-types.md

session-management.md

tile.json