Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
Overall
score
92%
Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.
Message class for creating messages to send to Service Bus entities.
class ServiceBusMessage:
def __init__(
self,
body: Optional[Union[str, bytes]],
*,
application_properties: Optional[Dict[Union[str, bytes], PrimitiveTypes]] = None,
session_id: Optional[str] = None,
message_id: Optional[str] = None,
scheduled_enqueue_time_utc: Optional[datetime] = None,
time_to_live: Optional[timedelta] = None,
content_type: Optional[str] = None,
correlation_id: Optional[str] = None,
subject: Optional[str] = None,
partition_key: Optional[str] = None,
to: Optional[str] = None,
reply_to: Optional[str] = None,
reply_to_session_id: Optional[str] = None,
**kwargs
):
"""
Create a Service Bus message.
Parameters:
- body: Message content (string or bytes)
- application_properties: Custom application properties dictionary
- session_id: Session identifier for session-enabled entities
- message_id: Unique message identifier
- scheduled_enqueue_time_utc: UTC time to schedule message delivery
- time_to_live: Message expiration duration
- content_type: MIME content type
- correlation_id: Correlation identifier for message correlation
- subject: Message subject or label
- partition_key: Partitioning key for partitioned entities
- to: Destination address
- reply_to: Reply-to address for response messages
- reply_to_session_id: Session ID for reply messages
"""Access and modify message properties and metadata.
@property
def body(self) -> Any:
"""
The message body content.
Returns:
Message body as provided during creation
"""
@property
def body_type(self) -> AmqpMessageBodyType:
"""
The AMQP message body type.
Returns:
AmqpMessageBodyType enum value
"""
@property
def application_properties(self) -> Optional[Dict[Union[str, bytes], PrimitiveTypes]]:
"""
Custom application-specific properties.
Returns:
Dictionary of application properties or None
"""
@application_properties.setter
def application_properties(self, value: Optional[Dict[Union[str, bytes], PrimitiveTypes]]) -> None: ...
@property
def session_id(self) -> Optional[str]:
"""
Session identifier for session-enabled entities.
Returns:
Session ID string or None
"""
@session_id.setter
def session_id(self, value: Optional[str]) -> None: ...
@property
def message_id(self) -> Optional[str]:
"""
Unique message identifier.
Returns:
Message ID string or None
"""
@message_id.setter
def message_id(self, value: Optional[str]) -> None: ...
@property
def scheduled_enqueue_time_utc(self) -> Optional[datetime]:
"""
UTC time when message should be enqueued.
Returns:
Scheduled enqueue time or None for immediate delivery
"""
@scheduled_enqueue_time_utc.setter
def scheduled_enqueue_time_utc(self, value: Optional[datetime]) -> None: ...
@property
def time_to_live(self) -> Optional[timedelta]:
"""
Message time-to-live duration.
Returns:
Time-to-live duration or None for no expiration
"""
@time_to_live.setter
def time_to_live(self, value: Optional[timedelta]) -> None: ...
@property
def content_type(self) -> Optional[str]:
"""
MIME content type of the message body.
Returns:
Content type string or None
"""
@content_type.setter
def content_type(self, value: Optional[str]) -> None: ...
@property
def correlation_id(self) -> Optional[str]:
"""
Correlation identifier for message correlation patterns.
Returns:
Correlation ID string or None
"""
@correlation_id.setter
def correlation_id(self, value: Optional[str]) -> None: ...
@property
def subject(self) -> Optional[str]:
"""
Message subject or label.
Returns:
Subject string or None
"""
@subject.setter
def subject(self, value: Optional[str]) -> None: ...
@property
def partition_key(self) -> Optional[str]:
"""
Partitioning key for partitioned entities.
Returns:
Partition key string or None
"""
@partition_key.setter
def partition_key(self, value: Optional[str]) -> None: ...
@property
def to(self) -> Optional[str]:
"""
Destination address.
Returns:
Destination address string or None
"""
@to.setter
def to(self, value: Optional[str]) -> None: ...
@property
def reply_to(self) -> Optional[str]:
"""
Reply-to address for response messages.
Returns:
Reply-to address string or None
"""
@reply_to.setter
def reply_to(self, value: Optional[str]) -> None: ...
@property
def reply_to_session_id(self) -> Optional[str]:
"""
Session ID for reply messages.
Returns:
Reply-to session ID string or None
"""
@reply_to_session_id.setter
def reply_to_session_id(self, value: Optional[str]) -> None: ...
@property
def raw_amqp_message(self) -> AmqpAnnotatedMessage:
"""
Underlying AMQP message for advanced scenarios.
Returns:
AmqpAnnotatedMessage object
"""from azure.servicebus import ServiceBusMessage
from datetime import datetime, timedelta
# Create a basic message
message = ServiceBusMessage("Hello, Service Bus!")
# Create a message with properties
message = ServiceBusMessage(
body="Order processing request",
application_properties={
"order_id": "12345",
"priority": "high",
"customer_id": "customer_abc"
},
session_id="order-session-12345",
message_id="msg-001",
content_type="application/json",
correlation_id="corr-123",
subject="OrderProcessing",
time_to_live=timedelta(hours=24)
)
# Schedule a message for future delivery
future_message = ServiceBusMessage(
"This will be delivered in 1 hour",
scheduled_enqueue_time_utc=datetime.utcnow() + timedelta(hours=1)
)
# Modify properties after creation
message.application_properties["processed_at"] = datetime.utcnow().isoformat()
message.subject = "UpdatedSubject"Message class for messages received from Service Bus entities (extends ServiceBusMessage).
class ServiceBusReceivedMessage(ServiceBusMessage):
# Inherits all properties from ServiceBusMessage
@property
def dead_letter_error_description(self) -> Optional[str]:
"""
Error description if message was dead lettered.
Returns:
Error description string or None
"""
@property
def dead_letter_reason(self) -> Optional[str]:
"""
Reason if message was dead lettered.
Returns:
Dead letter reason string or None
"""
@property
def dead_letter_source(self) -> Optional[str]:
"""
Source entity name if message was dead lettered.
Returns:
Source entity name string or None
"""
@property
def state(self) -> ServiceBusMessageState:
"""
Current state of the message.
Returns:
ServiceBusMessageState enum value (ACTIVE, DEFERRED, SCHEDULED)
"""
@property
def delivery_count(self) -> Optional[int]:
"""
Number of times this message has been delivered.
Returns:
Delivery count or None
"""
@property
def enqueued_sequence_number(self) -> Optional[int]:
"""
Sequence number assigned when message was enqueued.
Returns:
Enqueued sequence number or None
"""
@property
def enqueued_time_utc(self) -> Optional[datetime]:
"""
UTC time when message was enqueued.
Returns:
Enqueue time or None
"""
@property
def expires_at_utc(self) -> Optional[datetime]:
"""
UTC time when message expires.
Returns:
Expiration time or None
"""
@property
def sequence_number(self) -> Optional[int]:
"""
Unique sequence number assigned by Service Bus.
Returns:
Sequence number or None
"""
@property
def lock_token(self) -> Optional[Union[uuid.UUID, str]]:
"""
Lock token for PEEK_LOCK receive mode.
Returns:
Lock token UUID/string or None
"""
@property
def locked_until_utc(self) -> Optional[datetime]:
"""
UTC time when message lock expires.
Returns:
Lock expiration time or None
"""from azure.servicebus import ServiceBusClient, ServiceBusMessageState
client = ServiceBusClient.from_connection_string("your_connection_string")
with client.get_queue_receiver("my-queue") as receiver:
messages = receiver.receive_messages(max_message_count=5)
for message in messages:
print(f"Message body: {message.body}")
print(f"Sequence number: {message.sequence_number}")
print(f"Delivery count: {message.delivery_count}")
print(f"Enqueued at: {message.enqueued_time_utc}")
print(f"Expires at: {message.expires_at_utc}")
print(f"Lock expires at: {message.locked_until_utc}")
print(f"State: {message.state}")
# Check application properties
if message.application_properties:
for key, value in message.application_properties.items():
print(f" {key}: {value}")
# Check if message has been redelivered
if message.delivery_count and message.delivery_count > 1:
print(f"Message has been redelivered {message.delivery_count} times")
# Check message state
if message.state == ServiceBusMessageState.DEFERRED:
print("Message is deferred")
elif message.state == ServiceBusMessageState.SCHEDULED:
print("Message is scheduled")
receiver.complete_message(message)Batch container for efficient sending of multiple messages.
class ServiceBusMessageBatch:
@property
def max_size_in_bytes(self) -> int:
"""
Maximum size of the batch in bytes.
Returns:
Maximum batch size
"""
@property
def size_in_bytes(self) -> int:
"""
Current size of the batch in bytes.
Returns:
Current batch size
"""
def add_message(self, message: Union[ServiceBusMessage, AmqpAnnotatedMessage]) -> None:
"""
Add a message to the batch.
Parameters:
- message: ServiceBusMessage or AmqpAnnotatedMessage to add
Raises:
- ValueError: If message would exceed batch size limit
"""from azure.servicebus import ServiceBusClient, ServiceBusMessage
client = ServiceBusClient.from_connection_string("your_connection_string")
with client.get_queue_sender("my-queue") as sender:
# Create a message batch
batch = sender.create_message_batch()
# Add messages to the batch
for i in range(100):
message = ServiceBusMessage(f"Batch message {i}")
try:
batch.add_message(message)
except ValueError:
# Batch is full, send it and create a new one
print(f"Sending batch with {batch.size_in_bytes} bytes")
sender.send_messages(batch)
# Create new batch and add current message
batch = sender.create_message_batch()
batch.add_message(message)
# Send remaining messages in the batch
if batch.size_in_bytes > 0:
print(f"Sending final batch with {batch.size_in_bytes} bytes")
sender.send_messages(batch)Low-level AMQP message types for advanced scenarios.
class AmqpAnnotatedMessage:
"""
Low-level AMQP message representation.
Provides direct access to AMQP message structure for advanced scenarios
where fine-grained control over message properties is needed.
"""
@property
def body(self) -> Any: ...
@property
def body_type(self) -> AmqpMessageBodyType: ...
@property
def properties(self) -> Optional[AmqpMessageProperties]: ...
@property
def application_properties(self) -> Optional[Dict]: ...
@property
def annotations(self) -> Optional[Dict]: ...
@property
def delivery_annotations(self) -> Optional[Dict]: ...
@property
def header(self) -> Optional[AmqpMessageHeader]: ...
@property
def footer(self) -> Optional[Dict]: ...
class AmqpMessageBodyType(Enum):
"""AMQP message body types."""
DATA = "data"
SEQUENCE = "sequence"
VALUE = "value"
class AmqpMessageProperties:
"""AMQP message properties section."""
@property
def message_id(self) -> Optional[Union[str, bytes]]: ...
@property
def user_id(self) -> Optional[bytes]: ...
@property
def to(self) -> Optional[Union[str, bytes]]: ...
@property
def subject(self) -> Optional[str]: ...
@property
def reply_to(self) -> Optional[Union[str, bytes]]: ...
@property
def correlation_id(self) -> Optional[Union[str, bytes]]: ...
@property
def content_type(self) -> Optional[str]: ...
@property
def content_encoding(self) -> Optional[str]: ...
@property
def absolute_expiry_time(self) -> Optional[int]: ...
@property
def creation_time(self) -> Optional[int]: ...
@property
def group_id(self) -> Optional[str]: ...
@property
def group_sequence(self) -> Optional[int]: ...
@property
def reply_to_group_id(self) -> Optional[str]: ...
class AmqpMessageHeader:
"""AMQP message header section."""
@property
def durable(self) -> Optional[bool]: ...
@property
def priority(self) -> Optional[int]: ...
@property
def time_to_live(self) -> Optional[int]: ...
@property
def first_acquirer(self) -> Optional[bool]: ...
@property
def delivery_count(self) -> Optional[int]: ...Enumerations for message states and types.
class ServiceBusMessageState(int, Enum):
"""
Message states in Service Bus.
"""
ACTIVE = 0 # Message is active in queue/subscription
DEFERRED = 1 # Message has been deferred for later processing
SCHEDULED = 2 # Message is scheduled for future deliveryAutomatic lock renewal for messages during long processing.
class AutoLockRenewer:
def register(
self,
renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],
timeout: float = 300
) -> None:
"""
Register a message for automatic lock renewal.
Parameters:
- renewable: ServiceBusReceivedMessage to auto-renew
- timeout: Maximum renewal duration in seconds
Raises:
- ValueError: If message is already registered or invalid
"""from azure.servicebus import ServiceBusClient, AutoLockRenewer
import time
def process_long_running_message(message):
# Simulate long processing time
time.sleep(60) # 1 minute processing
return f"Processed: {message.body}"
client = ServiceBusClient.from_connection_string("your_connection_string")
auto_renewer = AutoLockRenewer(max_lock_renewal_duration=300) # 5 minutes
try:
with client.get_queue_receiver("my-queue") as receiver:
messages = receiver.receive_messages(max_message_count=5)
for message in messages:
# Register message for auto-renewal
auto_renewer.register(message, timeout=300)
try:
result = process_long_running_message(message)
print(result)
receiver.complete_message(message)
except Exception as e:
print(f"Error processing message: {e}")
receiver.abandon_message(message)
finally:
auto_renewer.close()Common type aliases used throughout the SDK.
# Union types for message parameters
MessageTypes = Union[
ServiceBusMessage,
AmqpAnnotatedMessage,
List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]
]
# Primitive types for application properties
PrimitiveTypes = Union[int, float, str, bool, bytes, None]
# Session filter type
NextAvailableSessionType = ServiceBusSessionFilter
# Lock renewal callback type
LockRenewFailureCallback = Callable[
[Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]],
None
]
# Azure credential types (from azure-core)
TokenCredential = Any # azure.core.credentials.TokenCredential
AzureSasCredential = Any # azure.core.credentials.AzureSasCredential
AzureNamedKeyCredential = Any # azure.core.credentials.AzureNamedKeyCredentialInstall with Tessl CLI
npx tessl i tessl/pypi-azure-servicebusdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10