Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
Overall
score
92%
Enumeration values and constants for configuring Service Bus behavior including receive modes, message states, transport types, and entity sub-queues.
Configure how messages are received from queues and subscriptions.
class ServiceBusReceiveMode(str, Enum):
"""
Message receive modes for controlling message acknowledgment behavior.
"""
PEEK_LOCK = "peeklock"
"""
Messages are locked for processing and must be explicitly completed, abandoned,
deferred, or dead-lettered. Provides at-least-once delivery guarantee.
"""
RECEIVE_AND_DELETE = "receiveanddelete"
"""
Messages are automatically deleted when received. Provides at-most-once
delivery with potential message loss if processing fails.
"""from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode
client = ServiceBusClient.from_connection_string("your_connection_string")
# Peek-lock mode (default) - messages must be explicitly completed
with client.get_queue_receiver("my-queue", receive_mode=ServiceBusReceiveMode.PEEK_LOCK) as receiver:
messages = receiver.receive_messages(max_message_count=5)
for message in messages:
try:
process_message(message)
receiver.complete_message(message) # Must explicitly complete
except Exception:
receiver.abandon_message(message) # Or abandon for retry
# Receive-and-delete mode - messages are automatically removed
with client.get_queue_receiver("my-queue", receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE) as receiver:
messages = receiver.receive_messages(max_message_count=5)
for message in messages:
# Message is already deleted from queue - no settlement needed
process_message(message)Indicate the current state of messages in the Service Bus.
class ServiceBusMessageState(int, Enum):
"""
States that a message can be in within Service Bus.
"""
ACTIVE = 0
"""
Message is active and available for processing in the queue or subscription.
"""
DEFERRED = 1
"""
Message has been deferred for later processing and must be retrieved
using its sequence number.
"""
SCHEDULED = 2
"""
Message is scheduled for delivery at a future time and is not yet
available for processing.
"""from azure.servicebus import ServiceBusClient, ServiceBusMessageState
client = ServiceBusClient.from_connection_string("your_connection_string")
with client.get_queue_receiver("my-queue") as receiver:
messages = receiver.receive_messages(max_message_count=10)
for message in messages:
print(f"Message: {message.body}")
print(f"State: {message.state}")
if message.state == ServiceBusMessageState.ACTIVE:
print("Message is active and ready for processing")
receiver.complete_message(message)
elif message.state == ServiceBusMessageState.DEFERRED:
print("Message was previously deferred")
# Handle deferred message processing
elif message.state == ServiceBusMessageState.SCHEDULED:
print("Message is scheduled for future delivery")
# This shouldn't typically appear in received messagesSpecify sub-queues for accessing dead letter and transfer dead letter messages.
class ServiceBusSubQueue(str, Enum):
"""
Sub-queue types for accessing special message queues.
"""
DEAD_LETTER = "deadletter"
"""
Dead letter sub-queue containing messages that could not be processed
successfully and have exceeded the maximum delivery count or expired.
"""
TRANSFER_DEAD_LETTER = "transferdeadletter"
"""
Transfer dead letter sub-queue containing messages that could not be
forwarded to their destination due to forwarding failures.
"""from azure.servicebus import ServiceBusClient, ServiceBusSubQueue
client = ServiceBusClient.from_connection_string("your_connection_string")
# Access dead letter messages from a queue
with client.get_queue_receiver("my-queue", sub_queue=ServiceBusSubQueue.DEAD_LETTER) as dlq_receiver:
dead_messages = dlq_receiver.receive_messages(max_message_count=10)
for message in dead_messages:
print(f"Dead letter message: {message.body}")
print(f"Dead letter reason: {message.dead_letter_reason}")
print(f"Dead letter error: {message.dead_letter_error_description}")
print(f"Delivery count: {message.delivery_count}")
# Process dead letter message or resubmit to main queue
if should_reprocess(message):
resubmit_to_main_queue(message)
dlq_receiver.complete_message(message)
# Access transfer dead letter messages
with client.get_queue_receiver("my-queue", sub_queue=ServiceBusSubQueue.TRANSFER_DEAD_LETTER) as tdlq_receiver:
transfer_dead_messages = tdlq_receiver.receive_messages(max_message_count=5)
for message in transfer_dead_messages:
print(f"Transfer failed message: {message.body}")
print(f"Original destination: {message.to}")
# Handle transfer failures
tdlq_receiver.complete_message(message)Constants for session-based message processing.
class ServiceBusSessionFilter(Enum):
"""
Filter options for session selection.
"""
NEXT_AVAILABLE = 0
"""
Select the next available session that has messages waiting.
"""
# Convenience constant
NEXT_AVAILABLE_SESSION: ServiceBusSessionFilter = ServiceBusSessionFilter.NEXT_AVAILABLEfrom azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION
client = ServiceBusClient.from_connection_string("your_connection_string")
# Connect to next available session
with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
if receiver.session:
print(f"Connected to session: {receiver.session.session_id}")
messages = receiver.receive_messages(max_message_count=10)
for message in messages:
print(f"Session message: {message.body}")
receiver.complete_message(message)
else:
print("No available sessions with messages")
# You can also connect to a specific session by ID
with client.get_queue_receiver("my-session-queue", session_id="specific-session-123") as receiver:
# Process messages from this specific session
passConfigure the underlying transport protocol for Service Bus connections.
class TransportType(Enum):
"""
Transport protocol options for Service Bus connections.
"""
Amqp = "Amqp"
"""
Standard AMQP 1.0 protocol over TCP. Provides the best performance
and feature support.
"""
AmqpOverWebsocket = "AmqpOverWebsocket"
"""
AMQP 1.0 protocol over WebSocket. Useful in environments with
firewall restrictions that block standard AMQP ports.
"""from azure.servicebus import ServiceBusClient, TransportType
# Use standard AMQP (default)
client = ServiceBusClient.from_connection_string(
"your_connection_string",
transport_type=TransportType.Amqp
)
# Use AMQP over WebSocket (for restrictive network environments)
client_websocket = ServiceBusClient.from_connection_string(
"your_connection_string",
transport_type=TransportType.AmqpOverWebsocket
)
# Both clients work the same way
with client.get_queue_sender("my-queue") as sender:
sender.send_messages(ServiceBusMessage("Hello via AMQP"))
with client_websocket.get_queue_sender("my-queue") as sender:
sender.send_messages(ServiceBusMessage("Hello via WebSocket"))Enumerations for entity properties in administrative operations.
class EntityStatus(str, Enum):
"""
Status values for Service Bus entities.
"""
ACTIVE = "Active"
"""Entity is active and operational."""
DISABLED = "Disabled"
"""Entity is disabled and not processing messages."""
SEND_DISABLED = "SendDisabled"
"""Entity cannot receive new messages but can deliver existing ones."""
RECEIVE_DISABLED = "ReceiveDisabled"
"""Entity cannot deliver messages but can receive new ones."""
class EntityAvailabilityStatus(str, Enum):
"""
Availability status for Service Bus entities.
"""
AVAILABLE = "Available"
"""Entity is available for operations."""
LIMITED = "Limited"
"""Entity has limited availability."""
RENAMING = "Renaming"
"""Entity is being renamed."""
RESTORING = "Restoring"
"""Entity is being restored from backup."""
UNKNOWN = "Unknown"
"""Entity availability status is unknown."""
class AccessRights(str, Enum):
"""
Access rights for authorization rules.
"""
MANAGE = "Manage"
"""Full management access including create, read, update, delete operations."""
SEND = "Send"
"""Permission to send messages to the entity."""
LISTEN = "Listen"
"""Permission to receive messages from the entity."""
class MessagingSku(str, Enum):
"""
Service Bus messaging tier/SKU options.
"""
BASIC = "Basic"
"""Basic tier with limited features."""
STANDARD = "Standard"
"""Standard tier with full feature set."""
PREMIUM = "Premium"
"""Premium tier with enhanced performance and isolation."""
class NamespaceType(str, Enum):
"""
Service Bus namespace types.
"""
MESSAGING = "Messaging"
"""Standard messaging namespace."""
NOTIFICATION_HUB = "NotificationHub"
"""Notification hub namespace."""
MIXED = "Mixed"
"""Mixed namespace supporting multiple services."""
EVENT_HUB = "EventHub"
"""Event hub namespace."""
RELAY = "Relay"
"""Relay namespace."""from azure.servicebus.management import (
ServiceBusAdministrationClient,
EntityStatus,
AccessRights,
MessagingSku
)
admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
# Create a queue with specific status
queue = admin_client.create_queue(
"my-queue",
status=EntityStatus.ACTIVE,
max_delivery_count=5
)
# Check namespace information
namespace_info = admin_client.get_namespace_properties()
print(f"Namespace SKU: {namespace_info.messaging_sku}")
if namespace_info.messaging_sku == MessagingSku.PREMIUM:
print("Using premium namespace with enhanced features")
elif namespace_info.messaging_sku == MessagingSku.STANDARD:
print("Using standard namespace")
# Create authorization rule with specific rights
from azure.servicebus.management import AuthorizationRule
auth_rule = AuthorizationRule(
key_name="MyAppRule",
primary_key="generated_key",
secondary_key="backup_key",
rights=[AccessRights.SEND, AccessRights.LISTEN]
)Detailed breakdown of message counts in entities.
class MessageCountDetails:
"""
Detailed message count information for Service Bus entities.
"""
@property
def active_message_count(self) -> int:
"""Number of active messages."""
@property
def dead_letter_message_count(self) -> int:
"""Number of dead letter messages."""
@property
def scheduled_message_count(self) -> int:
"""Number of scheduled messages."""
@property
def transfer_dead_letter_message_count(self) -> int:
"""Number of transfer dead letter messages."""
@property
def transfer_message_count(self) -> int:
"""Number of messages being transferred."""Available API versions for management operations.
class ApiVersion(str, Enum):
"""
Supported Service Bus management API versions.
"""
V2017_04 = "2017-04"
"""API version from April 2017."""
V2021_05 = "2021-05"
"""API version from May 2021 (default)."""
# Default API version used by the SDK
DEFAULT_VERSION: ApiVersion = ApiVersion.V2021_05from azure.servicebus.management import ServiceBusAdministrationClient, ApiVersion
# Use specific API version
admin_client = ServiceBusAdministrationClient.from_connection_string(
"your_connection_string",
api_version=ApiVersion.V2021_05
)
# Use default API version (recommended)
admin_client = ServiceBusAdministrationClient.from_connection_string(
"your_connection_string"
)Common import patterns for constants and enums.
# Import specific enums
from azure.servicebus import (
ServiceBusReceiveMode,
ServiceBusMessageState,
ServiceBusSubQueue,
TransportType,
NEXT_AVAILABLE_SESSION
)
# Import management enums
from azure.servicebus.management import (
EntityStatus,
EntityAvailabilityStatus,
AccessRights,
MessagingSku,
NamespaceType,
ApiVersion
)
# Use in client creation
client = ServiceBusClient.from_connection_string(
"your_connection_string",
transport_type=TransportType.AmqpOverWebsocket
)
# Use in receiver creation
receiver = client.get_queue_receiver(
"my-queue",
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
sub_queue=ServiceBusSubQueue.DEAD_LETTER,
session_id=NEXT_AVAILABLE_SESSION
)Install with Tessl CLI
npx tessl i tessl/pypi-azure-servicebusdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10