0
# Azure Service Bus
1
2
Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications. This library enables building distributed applications with reliable message delivery, publish-subscribe patterns, and complex messaging workflows using queues, topics, and subscriptions with advanced features like message sessions, dead letter queues, scheduled messages, and transaction support.
3
4
## Package Information
5
6
- **Package Name**: azure-servicebus
7
- **Language**: Python
8
- **Installation**: `pip install azure-servicebus`
9
10
## Core Imports
11
12
```python
13
from azure.servicebus import ServiceBusClient, ServiceBusMessage
14
```
15
16
For asynchronous operations:
17
18
```python
19
from azure.servicebus.aio import ServiceBusClient, ServiceBusSender, ServiceBusReceiver
20
```
21
22
For management operations:
23
24
```python
25
from azure.servicebus.management import ServiceBusAdministrationClient
26
from azure.servicebus.aio.management import ServiceBusAdministrationClient
27
```
28
29
For exception handling:
30
31
```python
32
from azure.servicebus.exceptions import (
33
ServiceBusError,
34
ServiceBusConnectionError,
35
MessageLockLostError,
36
SessionLockLostError
37
)
38
```
39
40
For utility functions:
41
42
```python
43
from azure.servicebus import (
44
parse_connection_string,
45
ServiceBusConnectionStringProperties,
46
AutoLockRenewer
47
)
48
```
49
50
## Basic Usage
51
52
```python
53
from azure.servicebus import ServiceBusClient, ServiceBusMessage
54
55
# Create client from connection string
56
client = ServiceBusClient.from_connection_string("your_connection_string")
57
58
# Send a message to a queue
59
with client.get_queue_sender("queue_name") as sender:
60
message = ServiceBusMessage("Hello, Service Bus!")
61
sender.send_messages(message)
62
63
# Receive messages from a queue
64
with client.get_queue_receiver("queue_name") as receiver:
65
messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
66
for message in messages:
67
print(f"Received: {message}")
68
receiver.complete_message(message)
69
70
client.close()
71
```
72
73
## Architecture
74
75
Azure Service Bus provides a comprehensive messaging platform with the following key components:
76
77
- **ServiceBusClient**: Main entry point for creating senders and receivers
78
- **ServiceBusSender**: Sends messages to queues or topics with batching and scheduling capabilities
79
- **ServiceBusReceiver**: Receives and processes messages with multiple settlement options
80
- **ServiceBusSession**: Manages stateful messaging sessions for ordered processing
81
- **ServiceBusAdministrationClient**: Administrative operations for creating and managing entities
82
83
The library supports both synchronous and asynchronous patterns, automatic connection management, retry policies, and built-in error handling, making it suitable for building cloud-native and hybrid distributed applications.
84
85
## Capabilities
86
87
### Client Management
88
89
Core client functionality for establishing connections and creating messaging components. Provides connection string parsing, credential management, and factory methods for senders and receivers.
90
91
```python { .api }
92
class ServiceBusClient:
93
def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
94
@classmethod
95
def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusClient': ...
96
def get_queue_sender(self, queue_name: str, **kwargs) -> 'ServiceBusSender': ...
97
def get_queue_receiver(self, queue_name: str, **kwargs) -> 'ServiceBusReceiver': ...
98
def get_topic_sender(self, topic_name: str, **kwargs) -> 'ServiceBusSender': ...
99
def get_subscription_receiver(self, topic_name: str, subscription_name: str, **kwargs) -> 'ServiceBusReceiver': ...
100
def close(self) -> None: ...
101
```
102
103
[Client Management](./client-management.md)
104
105
### Message Operations
106
107
Comprehensive message sending and receiving operations including batch processing, message scheduling, and various settlement patterns for reliable message processing.
108
109
```python { .api }
110
class ServiceBusSender:
111
def send_messages(self, message, **kwargs) -> None: ...
112
def schedule_messages(self, messages, schedule_time_utc, **kwargs) -> List[int]: ...
113
def cancel_scheduled_messages(self, sequence_numbers, **kwargs) -> None: ...
114
def create_message_batch(self, max_size_in_bytes=None) -> 'ServiceBusMessageBatch': ...
115
116
class ServiceBusReceiver:
117
def receive_messages(self, max_message_count=1, max_wait_time=None) -> List['ServiceBusReceivedMessage']: ...
118
def peek_messages(self, max_message_count=1, **kwargs) -> List['ServiceBusReceivedMessage']: ...
119
def complete_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
120
def abandon_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
121
def defer_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
122
def dead_letter_message(self, message: 'ServiceBusReceivedMessage', reason=None, error_description=None) -> None: ...
123
```
124
125
[Message Operations](./message-operations.md)
126
127
### Session Management
128
129
Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.
130
131
```python { .api }
132
class ServiceBusSession:
133
@property
134
def session_id(self) -> str: ...
135
@property
136
def locked_until_utc(self) -> Optional[datetime]: ...
137
def get_state(self, **kwargs) -> bytes: ...
138
def set_state(self, state, **kwargs) -> None: ...
139
def renew_lock(self, **kwargs) -> datetime: ...
140
```
141
142
[Session Management](./session-management.md)
143
144
### Administrative Operations
145
146
Complete Service Bus entity management including queues, topics, subscriptions, and rules. Supports creation, configuration, monitoring, and deletion of messaging entities.
147
148
```python { .api }
149
class ServiceBusAdministrationClient:
150
def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
151
@classmethod
152
def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusAdministrationClient': ...
153
154
# Queue Management
155
def create_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
156
def get_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
157
def delete_queue(self, queue_name: str, **kwargs) -> None: ...
158
def list_queues(self, **kwargs): ...
159
160
# Topic Management
161
def create_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
162
def get_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
163
def delete_topic(self, topic_name: str, **kwargs) -> None: ...
164
165
# Subscription Management
166
def create_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> 'SubscriptionProperties': ...
167
def delete_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> None: ...
168
```
169
170
[Administrative Operations](./administrative-operations.md)
171
172
### Message Classes and Types
173
174
Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.
175
176
```python { .api }
177
class ServiceBusMessage:
178
def __init__(self, body, **kwargs): ...
179
@property
180
def body(self): ...
181
@property
182
def application_properties(self) -> Optional[Dict]: ...
183
@property
184
def session_id(self) -> Optional[str]: ...
185
@property
186
def message_id(self) -> Optional[str]: ...
187
@property
188
def scheduled_enqueue_time_utc(self) -> Optional[datetime]: ...
189
190
class ServiceBusReceivedMessage(ServiceBusMessage):
191
@property
192
def delivery_count(self) -> Optional[int]: ...
193
@property
194
def enqueued_time_utc(self) -> Optional[datetime]: ...
195
@property
196
def sequence_number(self) -> Optional[int]: ...
197
@property
198
def lock_token(self) -> Optional[Union[uuid.UUID, str]]: ...
199
@property
200
def locked_until_utc(self) -> Optional[datetime]: ...
201
202
class ServiceBusMessageBatch:
203
@property
204
def max_size_in_bytes(self) -> int: ...
205
@property
206
def size_in_bytes(self) -> int: ...
207
def add_message(self, message) -> None: ...
208
```
209
210
[Message Classes and Types](./message-types.md)
211
212
### Constants and Enums
213
214
Enumeration values and constants for configuring Service Bus behavior including receive modes, message states, transport types, and entity sub-queues.
215
216
```python { .api }
217
class ServiceBusReceiveMode(str, Enum):
218
PEEK_LOCK = "peeklock"
219
RECEIVE_AND_DELETE = "receiveanddelete"
220
221
class ServiceBusMessageState(int, Enum):
222
ACTIVE = 0
223
DEFERRED = 1
224
SCHEDULED = 2
225
226
class ServiceBusSubQueue(str, Enum):
227
DEAD_LETTER = "deadletter"
228
TRANSFER_DEAD_LETTER = "transferdeadletter"
229
230
class TransportType(Enum):
231
Amqp = "Amqp"
232
AmqpOverWebsocket = "AmqpOverWebsocket"
233
234
NEXT_AVAILABLE_SESSION: ServiceBusSessionFilter
235
```
236
237
[Constants and Enums](./constants-enums.md)
238
239
### Lock Renewal
240
241
Automatic lock renewal for messages and sessions to prevent lock expiration during long processing operations. Essential for production scenarios requiring extended message processing time.
242
243
```python { .api }
244
class AutoLockRenewer:
245
def __init__(
246
self,
247
max_lock_renewal_duration: float = 300,
248
on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,
249
executor: Optional[ThreadPoolExecutor] = None,
250
max_workers: Optional[int] = None
251
): ...
252
253
def register(
254
self,
255
receiver: ServiceBusReceiver,
256
renewable: Union[ServiceBusReceivedMessage, ServiceBusSession],
257
max_lock_renewal_duration: Optional[float] = None
258
) -> None: ...
259
260
def close(self, wait: bool = True) -> None: ...
261
def __enter__(self) -> 'AutoLockRenewer': ...
262
def __exit__(self, *args) -> None: ...
263
```
264
265
### Exception Handling
266
267
Comprehensive exception hierarchy for handling Service Bus errors including connection issues, authentication failures, message processing errors, and service-side exceptions.
268
269
```python { .api }
270
class ServiceBusError(AzureError):
271
def __init__(self, message, **kwargs): ...
272
273
# Connection and Communication Errors
274
class ServiceBusConnectionError(ServiceBusError): ...
275
class ServiceBusCommunicationError(ServiceBusError): ...
276
class ServiceBusAuthenticationError(ServiceBusError): ...
277
class ServiceBusAuthorizationError(ServiceBusError): ...
278
279
# Message Processing Errors
280
class MessageAlreadySettled(ValueError): ...
281
class MessageLockLostError(ServiceBusError): ...
282
class MessageNotFoundError(ServiceBusError): ...
283
class MessageSizeExceededError(ServiceBusError, ValueError): ...
284
285
# Session Errors
286
class SessionLockLostError(ServiceBusError): ...
287
class SessionCannotBeLockedError(ServiceBusError): ...
288
289
# Auto Lock Renewal Errors
290
class AutoLockRenewFailed(ServiceBusError): ...
291
class AutoLockRenewTimeout(ServiceBusError): ...
292
293
# Service Errors
294
class MessagingEntityNotFoundError(ServiceBusError): ...
295
class MessagingEntityDisabledError(ServiceBusError): ...
296
class ServiceBusQuotaExceededError(ServiceBusError): ...
297
class ServiceBusServerBusyError(ServiceBusError): ...
298
class OperationTimeoutError(ServiceBusError): ...
299
```
300
301
[Exception Handling](./exception-handling.md)
302
303
## Types
304
305
```python { .api }
306
# Type aliases for common parameter types
307
MessageTypes = Union[ServiceBusMessage, AmqpAnnotatedMessage, List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]]
308
PrimitiveTypes = Union[int, float, str, bool, bytes, None]
309
NextAvailableSessionType = ServiceBusSessionFilter
310
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]], None]
311
312
# Credential types (from azure-core and azure-identity)
313
TokenCredential = Any # azure.core.credentials.TokenCredential
314
AzureSasCredential = Any # azure.core.credentials.AzureSasCredential
315
AzureNamedKeyCredential = Any # azure.core.credentials.AzureNamedKeyCredential
316
317
# Utility functions
318
def parse_connection_string(conn_str: str) -> ServiceBusConnectionStringProperties: ...
319
320
# Connection string properties
321
class ServiceBusConnectionStringProperties:
322
@property
323
def endpoint(self) -> str: ...
324
@property
325
def entity_path(self) -> Optional[str]: ...
326
@property
327
def shared_access_key_name(self) -> Optional[str]: ...
328
@property
329
def shared_access_key(self) -> Optional[str]: ...
330
@property
331
def shared_access_signature(self) -> Optional[str]: ...
332
```