Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
92
Pending
Does it follow best practices?
Impact
92%
1.01xAverage score across 10 eval scenarios
Pending
The risk profile of this skill
Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications. This library enables building distributed applications with reliable message delivery, publish-subscribe patterns, and complex messaging workflows using queues, topics, and subscriptions with advanced features like message sessions, dead letter queues, scheduled messages, and transaction support.
pip install azure-servicebusfrom azure.servicebus import ServiceBusClient, ServiceBusMessageFor asynchronous operations:
from azure.servicebus.aio import ServiceBusClient, ServiceBusSender, ServiceBusReceiverFor management operations:
from azure.servicebus.management import ServiceBusAdministrationClient
from azure.servicebus.aio.management import ServiceBusAdministrationClientFor exception handling:
from azure.servicebus.exceptions import (
ServiceBusError,
ServiceBusConnectionError,
MessageLockLostError,
SessionLockLostError
)For utility functions:
from azure.servicebus import (
parse_connection_string,
ServiceBusConnectionStringProperties,
AutoLockRenewer
)from azure.servicebus import ServiceBusClient, ServiceBusMessage
# Create client from connection string
client = ServiceBusClient.from_connection_string("your_connection_string")
# Send a message to a queue
with client.get_queue_sender("queue_name") as sender:
message = ServiceBusMessage("Hello, Service Bus!")
sender.send_messages(message)
# Receive messages from a queue
with client.get_queue_receiver("queue_name") as receiver:
messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
for message in messages:
print(f"Received: {message}")
receiver.complete_message(message)
client.close()Azure Service Bus provides a comprehensive messaging platform with the following key components:
The library supports both synchronous and asynchronous patterns, automatic connection management, retry policies, and built-in error handling, making it suitable for building cloud-native and hybrid distributed applications.
Core client functionality for establishing connections and creating messaging components. Provides connection string parsing, credential management, and factory methods for senders and receivers.
class ServiceBusClient:
def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
@classmethod
def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusClient': ...
def get_queue_sender(self, queue_name: str, **kwargs) -> 'ServiceBusSender': ...
def get_queue_receiver(self, queue_name: str, **kwargs) -> 'ServiceBusReceiver': ...
def get_topic_sender(self, topic_name: str, **kwargs) -> 'ServiceBusSender': ...
def get_subscription_receiver(self, topic_name: str, subscription_name: str, **kwargs) -> 'ServiceBusReceiver': ...
def close(self) -> None: ...Comprehensive message sending and receiving operations including batch processing, message scheduling, and various settlement patterns for reliable message processing.
class ServiceBusSender:
def send_messages(self, message, **kwargs) -> None: ...
def schedule_messages(self, messages, schedule_time_utc, **kwargs) -> List[int]: ...
def cancel_scheduled_messages(self, sequence_numbers, **kwargs) -> None: ...
def create_message_batch(self, max_size_in_bytes=None) -> 'ServiceBusMessageBatch': ...
class ServiceBusReceiver:
def receive_messages(self, max_message_count=1, max_wait_time=None) -> List['ServiceBusReceivedMessage']: ...
def peek_messages(self, max_message_count=1, **kwargs) -> List['ServiceBusReceivedMessage']: ...
def complete_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
def abandon_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
def defer_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
def dead_letter_message(self, message: 'ServiceBusReceivedMessage', reason=None, error_description=None) -> None: ...Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.
class ServiceBusSession:
@property
def session_id(self) -> str: ...
@property
def locked_until_utc(self) -> Optional[datetime]: ...
def get_state(self, **kwargs) -> bytes: ...
def set_state(self, state, **kwargs) -> None: ...
def renew_lock(self, **kwargs) -> datetime: ...Complete Service Bus entity management including queues, topics, subscriptions, and rules. Supports creation, configuration, monitoring, and deletion of messaging entities.
class ServiceBusAdministrationClient:
def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
@classmethod
def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusAdministrationClient': ...
# Queue Management
def create_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
def get_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
def delete_queue(self, queue_name: str, **kwargs) -> None: ...
def list_queues(self, **kwargs): ...
# Topic Management
def create_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
def get_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
def delete_topic(self, topic_name: str, **kwargs) -> None: ...
# Subscription Management
def create_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> 'SubscriptionProperties': ...
def delete_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> None: ...Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.
class ServiceBusMessage:
def __init__(self, body, **kwargs): ...
@property
def body(self): ...
@property
def application_properties(self) -> Optional[Dict]: ...
@property
def session_id(self) -> Optional[str]: ...
@property
def message_id(self) -> Optional[str]: ...
@property
def scheduled_enqueue_time_utc(self) -> Optional[datetime]: ...
class ServiceBusReceivedMessage(ServiceBusMessage):
@property
def delivery_count(self) -> Optional[int]: ...
@property
def enqueued_time_utc(self) -> Optional[datetime]: ...
@property
def sequence_number(self) -> Optional[int]: ...
@property
def lock_token(self) -> Optional[Union[uuid.UUID, str]]: ...
@property
def locked_until_utc(self) -> Optional[datetime]: ...
class ServiceBusMessageBatch:
@property
def max_size_in_bytes(self) -> int: ...
@property
def size_in_bytes(self) -> int: ...
def add_message(self, message) -> None: ...Enumeration values and constants for configuring Service Bus behavior including receive modes, message states, transport types, and entity sub-queues.
class ServiceBusReceiveMode(str, Enum):
PEEK_LOCK = "peeklock"
RECEIVE_AND_DELETE = "receiveanddelete"
class ServiceBusMessageState(int, Enum):
ACTIVE = 0
DEFERRED = 1
SCHEDULED = 2
class ServiceBusSubQueue(str, Enum):
DEAD_LETTER = "deadletter"
TRANSFER_DEAD_LETTER = "transferdeadletter"
class TransportType(Enum):
Amqp = "Amqp"
AmqpOverWebsocket = "AmqpOverWebsocket"
NEXT_AVAILABLE_SESSION: ServiceBusSessionFilterAutomatic lock renewal for messages and sessions to prevent lock expiration during long processing operations. Essential for production scenarios requiring extended message processing time.
class AutoLockRenewer:
def __init__(
self,
max_lock_renewal_duration: float = 300,
on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,
executor: Optional[ThreadPoolExecutor] = None,
max_workers: Optional[int] = None
): ...
def register(
self,
receiver: ServiceBusReceiver,
renewable: Union[ServiceBusReceivedMessage, ServiceBusSession],
max_lock_renewal_duration: Optional[float] = None
) -> None: ...
def close(self, wait: bool = True) -> None: ...
def __enter__(self) -> 'AutoLockRenewer': ...
def __exit__(self, *args) -> None: ...Comprehensive exception hierarchy for handling Service Bus errors including connection issues, authentication failures, message processing errors, and service-side exceptions.
class ServiceBusError(AzureError):
def __init__(self, message, **kwargs): ...
# Connection and Communication Errors
class ServiceBusConnectionError(ServiceBusError): ...
class ServiceBusCommunicationError(ServiceBusError): ...
class ServiceBusAuthenticationError(ServiceBusError): ...
class ServiceBusAuthorizationError(ServiceBusError): ...
# Message Processing Errors
class MessageAlreadySettled(ValueError): ...
class MessageLockLostError(ServiceBusError): ...
class MessageNotFoundError(ServiceBusError): ...
class MessageSizeExceededError(ServiceBusError, ValueError): ...
# Session Errors
class SessionLockLostError(ServiceBusError): ...
class SessionCannotBeLockedError(ServiceBusError): ...
# Auto Lock Renewal Errors
class AutoLockRenewFailed(ServiceBusError): ...
class AutoLockRenewTimeout(ServiceBusError): ...
# Service Errors
class MessagingEntityNotFoundError(ServiceBusError): ...
class MessagingEntityDisabledError(ServiceBusError): ...
class ServiceBusQuotaExceededError(ServiceBusError): ...
class ServiceBusServerBusyError(ServiceBusError): ...
class OperationTimeoutError(ServiceBusError): ...# Type aliases for common parameter types
MessageTypes = Union[ServiceBusMessage, AmqpAnnotatedMessage, List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]]
PrimitiveTypes = Union[int, float, str, bool, bytes, None]
NextAvailableSessionType = ServiceBusSessionFilter
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]], None]
# Credential types (from azure-core and azure-identity)
TokenCredential = Any # azure.core.credentials.TokenCredential
AzureSasCredential = Any # azure.core.credentials.AzureSasCredential
AzureNamedKeyCredential = Any # azure.core.credentials.AzureNamedKeyCredential
# Utility functions
def parse_connection_string(conn_str: str) -> ServiceBusConnectionStringProperties: ...
# Connection string properties
class ServiceBusConnectionStringProperties:
@property
def endpoint(self) -> str: ...
@property
def entity_path(self) -> Optional[str]: ...
@property
def shared_access_key_name(self) -> Optional[str]: ...
@property
def shared_access_key(self) -> Optional[str]: ...
@property
def shared_access_signature(self) -> Optional[str]: ...docs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10