0
# Message Operations
1
2
Comprehensive message sending and receiving operations including batch processing, message scheduling, and various settlement patterns for reliable message processing.
3
4
## Capabilities
5
6
### Message Sending
7
8
Send messages to queues and topics with support for single messages, message lists, and batches.
9
10
```python { .api }
11
class ServiceBusSender:
12
@property
13
def fully_qualified_namespace(self) -> str:
14
"""The fully qualified namespace URL of the Service Bus."""
15
16
@property
17
def entity_name(self) -> str:
18
"""The name of the entity (queue or topic) that the sender sends to."""
19
20
def send_messages(
21
self,
22
message: Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]],
23
*,
24
timeout: Optional[float] = None,
25
**kwargs
26
) -> None:
27
"""
28
Send messages to the Service Bus entity.
29
30
Parameters:
31
- message: Single message, list of messages, or message batch to send
32
- timeout: Operation timeout in seconds
33
34
Raises:
35
- MessageSizeExceededError: If message exceeds size limits
36
- ServiceBusError: For other Service Bus related errors
37
"""
38
```
39
40
#### Usage Example
41
42
```python
43
from azure.servicebus import ServiceBusClient, ServiceBusMessage
44
45
client = ServiceBusClient.from_connection_string("your_connection_string")
46
47
with client.get_queue_sender("my-queue") as sender:
48
# Send a single message
49
message = ServiceBusMessage("Hello World")
50
sender.send_messages(message)
51
52
# Send multiple messages
53
messages = [
54
ServiceBusMessage("Message 1"),
55
ServiceBusMessage("Message 2"),
56
ServiceBusMessage("Message 3")
57
]
58
sender.send_messages(messages)
59
60
# Send a message batch (more efficient for large volumes)
61
batch = sender.create_message_batch()
62
for i in range(10):
63
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
64
sender.send_messages(batch)
65
```
66
67
### Message Scheduling
68
69
Schedule messages for future delivery with precise timing control.
70
71
```python { .api }
72
def schedule_messages(
73
self,
74
messages: Union[ServiceBusMessage, List[ServiceBusMessage]],
75
schedule_time_utc: datetime,
76
*,
77
timeout: Optional[float] = None,
78
**kwargs
79
) -> List[int]:
80
"""
81
Schedule messages for future delivery.
82
83
Parameters:
84
- messages: Single message or list of messages to schedule
85
- schedule_time_utc: UTC datetime when messages should be delivered
86
- timeout: Operation timeout in seconds
87
88
Returns:
89
List of sequence numbers for the scheduled messages
90
91
Raises:
92
- ServiceBusError: For Service Bus related errors
93
"""
94
95
def cancel_scheduled_messages(
96
self,
97
sequence_numbers: Union[int, List[int]],
98
*,
99
timeout: Optional[float] = None,
100
**kwargs
101
) -> None:
102
"""
103
Cancel previously scheduled messages.
104
105
Parameters:
106
- sequence_numbers: Single sequence number or list of sequence numbers to cancel
107
- timeout: Operation timeout in seconds
108
109
Raises:
110
- MessageNotFoundError: If scheduled message is not found
111
- ServiceBusError: For other Service Bus related errors
112
"""
113
```
114
115
#### Usage Example
116
117
```python
118
from datetime import datetime, timedelta
119
from azure.servicebus import ServiceBusClient, ServiceBusMessage
120
121
client = ServiceBusClient.from_connection_string("your_connection_string")
122
123
with client.get_queue_sender("my-queue") as sender:
124
# Schedule a message for 1 hour from now
125
future_time = datetime.utcnow() + timedelta(hours=1)
126
message = ServiceBusMessage("This message will be delivered in 1 hour")
127
128
sequence_numbers = sender.schedule_messages(message, future_time)
129
print(f"Scheduled message with sequence number: {sequence_numbers[0]}")
130
131
# Cancel the scheduled message if needed
132
sender.cancel_scheduled_messages(sequence_numbers[0])
133
```
134
135
### Message Batching
136
137
Create and manage message batches for efficient bulk sending.
138
139
```python { .api }
140
def create_message_batch(
141
self,
142
max_size_in_bytes: Optional[int] = None
143
) -> ServiceBusMessageBatch:
144
"""
145
Create an empty message batch.
146
147
Parameters:
148
- max_size_in_bytes: Maximum size of the batch in bytes (uses service limit if None)
149
150
Returns:
151
Empty ServiceBusMessageBatch instance
152
153
Raises:
154
- ValueError: If max_size_in_bytes is invalid
155
"""
156
```
157
158
### Message Receiving
159
160
Receive and process messages from queues and subscriptions with various modes and options.
161
162
```python { .api }
163
class ServiceBusReceiver:
164
@property
165
def fully_qualified_namespace(self) -> str:
166
"""The fully qualified namespace URL of the Service Bus."""
167
168
@property
169
def entity_path(self) -> str:
170
"""The full path of the entity (queue or subscription) that the receiver receives from."""
171
172
@property
173
def session(self) -> Optional[ServiceBusSession]:
174
"""The session object if this is a session-enabled receiver, None otherwise."""
175
176
def receive_messages(
177
self,
178
max_message_count: int = 1,
179
max_wait_time: Optional[float] = None
180
) -> List[ServiceBusReceivedMessage]:
181
"""
182
Receive messages from the Service Bus entity.
183
184
Parameters:
185
- max_message_count: Maximum number of messages to receive (1-256)
186
- max_wait_time: Maximum time to wait for messages in seconds
187
188
Returns:
189
List of ServiceBusReceivedMessage objects
190
191
Raises:
192
- ServiceBusError: For Service Bus related errors
193
"""
194
```
195
196
#### Usage Example
197
198
```python
199
from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode
200
201
client = ServiceBusClient.from_connection_string("your_connection_string")
202
203
with client.get_queue_receiver("my-queue") as receiver:
204
# Receive up to 10 messages, wait up to 5 seconds
205
messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
206
207
for message in messages:
208
print(f"Received: {message.body}")
209
# Process the message
210
try:
211
# Your message processing logic here
212
process_message(message)
213
214
# Complete the message to remove it from the queue
215
receiver.complete_message(message)
216
except Exception as e:
217
print(f"Error processing message: {e}")
218
# Abandon the message to make it available for redelivery
219
receiver.abandon_message(message)
220
```
221
222
### Message Peeking
223
224
Peek at messages without receiving them (messages remain in the queue/subscription).
225
226
```python { .api }
227
def peek_messages(
228
self,
229
max_message_count: int = 1,
230
*,
231
sequence_number: int = 0,
232
timeout: Optional[float] = None,
233
**kwargs
234
) -> List[ServiceBusReceivedMessage]:
235
"""
236
Peek at messages without receiving them.
237
238
Parameters:
239
- max_message_count: Maximum number of messages to peek (1-32)
240
- sequence_number: Starting sequence number for peeking
241
- timeout: Operation timeout in seconds
242
243
Returns:
244
List of ServiceBusReceivedMessage objects (without locks)
245
246
Raises:
247
- ServiceBusError: For Service Bus related errors
248
"""
249
```
250
251
### Message Settlement
252
253
Various ways to settle (complete processing of) received messages.
254
255
```python { .api }
256
def complete_message(self, message: ServiceBusReceivedMessage) -> None:
257
"""
258
Complete a message, removing it from the queue/subscription.
259
260
Parameters:
261
- message: The message to complete
262
263
Raises:
264
- MessageAlreadySettled: If message was already settled
265
- MessageLockLostError: If message lock has expired
266
- ServiceBusError: For other Service Bus related errors
267
"""
268
269
def abandon_message(self, message: ServiceBusReceivedMessage) -> None:
270
"""
271
Abandon a message, making it available for redelivery.
272
273
Parameters:
274
- message: The message to abandon
275
276
Raises:
277
- MessageAlreadySettled: If message was already settled
278
- MessageLockLostError: If message lock has expired
279
- ServiceBusError: For other Service Bus related errors
280
"""
281
282
def defer_message(self, message: ServiceBusReceivedMessage) -> None:
283
"""
284
Defer a message for later processing.
285
286
Parameters:
287
- message: The message to defer
288
289
Raises:
290
- MessageAlreadySettled: If message was already settled
291
- MessageLockLostError: If message lock has expired
292
- ServiceBusError: For other Service Bus related errors
293
"""
294
295
def dead_letter_message(
296
self,
297
message: ServiceBusReceivedMessage,
298
reason: Optional[str] = None,
299
error_description: Optional[str] = None
300
) -> None:
301
"""
302
Move a message to the dead letter sub-queue.
303
304
Parameters:
305
- message: The message to dead letter
306
- reason: Reason for dead lettering
307
- error_description: Description of the error
308
309
Raises:
310
- MessageAlreadySettled: If message was already settled
311
- MessageLockLostError: If message lock has expired
312
- ServiceBusError: For other Service Bus related errors
313
"""
314
```
315
316
### Message Lock Management
317
318
Manage message locks to prevent timeout during processing.
319
320
```python { .api }
321
def renew_message_lock(
322
self,
323
message: ServiceBusReceivedMessage,
324
*,
325
timeout: Optional[float] = None,
326
**kwargs
327
) -> datetime:
328
"""
329
Renew the lock on a message.
330
331
Parameters:
332
- message: The message whose lock should be renewed
333
- timeout: Operation timeout in seconds
334
335
Returns:
336
New lock expiration time
337
338
Raises:
339
- MessageAlreadySettled: If message was already settled
340
- MessageLockLostError: If message lock has expired
341
- ServiceBusError: For other Service Bus related errors
342
"""
343
```
344
345
### Iterator Support
346
347
ServiceBusReceiver supports iteration for continuous message processing.
348
349
```python { .api }
350
def __iter__(self) -> Iterator[ServiceBusReceivedMessage]:
351
"""
352
Iterate over messages from the receiver.
353
354
Returns:
355
Iterator that yields ServiceBusReceivedMessage objects
356
"""
357
358
def __next__(self) -> ServiceBusReceivedMessage:
359
"""
360
Get the next message from the receiver.
361
362
Returns:
363
Next ServiceBusReceivedMessage
364
365
Raises:
366
StopIteration: When no more messages are available
367
"""
368
```
369
370
#### Usage Example
371
372
```python
373
with client.get_queue_receiver("my-queue") as receiver:
374
# Iterate over messages as they arrive
375
for message in receiver:
376
print(f"Received: {message.body}")
377
try:
378
process_message(message)
379
receiver.complete_message(message)
380
except Exception as e:
381
print(f"Error: {e}")
382
receiver.abandon_message(message)
383
384
# Break after processing 10 messages
385
if message.delivery_count and message.delivery_count >= 10:
386
break
387
```
388
389
### Context Manager Support
390
391
All senders and receivers support context manager protocol for automatic resource cleanup.
392
393
```python { .api }
394
# ServiceBusSender
395
def __enter__(self) -> ServiceBusSender: ...
396
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
397
398
# ServiceBusReceiver
399
def __enter__(self) -> ServiceBusReceiver: ...
400
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
401
402
def close(self) -> None:
403
"""Close the sender/receiver and release resources."""
404
```
405
406
## Asynchronous Message Operations
407
408
For asynchronous operations, use the async versions of ServiceBusSender and ServiceBusReceiver from the `azure.servicebus.aio` module.
409
410
```python { .api }
411
from azure.servicebus.aio import ServiceBusSender, ServiceBusReceiver
412
413
# All methods have async equivalents
414
class ServiceBusSender:
415
async def send_messages(self, message, **kwargs) -> None: ...
416
async def schedule_messages(self, messages, schedule_time_utc, **kwargs) -> List[int]: ...
417
async def cancel_scheduled_messages(self, sequence_numbers, **kwargs) -> None: ...
418
async def close(self) -> None: ...
419
async def __aenter__(self) -> ServiceBusSender: ...
420
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
421
422
class ServiceBusReceiver:
423
async def receive_messages(self, max_message_count=1, max_wait_time=None) -> List[ServiceBusReceivedMessage]: ...
424
async def peek_messages(self, max_message_count=1, **kwargs) -> List[ServiceBusReceivedMessage]: ...
425
async def complete_message(self, message: ServiceBusReceivedMessage) -> None: ...
426
async def abandon_message(self, message: ServiceBusReceivedMessage) -> None: ...
427
async def defer_message(self, message: ServiceBusReceivedMessage) -> None: ...
428
async def dead_letter_message(self, message: ServiceBusReceivedMessage, reason=None, error_description=None) -> None: ...
429
async def renew_message_lock(self, message: ServiceBusReceivedMessage, **kwargs) -> datetime: ...
430
async def close(self) -> None: ...
431
async def __aenter__(self) -> ServiceBusReceiver: ...
432
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
433
def __aiter__(self) -> AsyncIterator[ServiceBusReceivedMessage]: ...
434
async def __anext__(self) -> ServiceBusReceivedMessage: ...
435
```
436
437
#### Usage Example
438
439
```python
440
import asyncio
441
from azure.servicebus.aio import ServiceBusClient
442
from azure.servicebus import ServiceBusMessage
443
444
async def send_and_receive():
445
async with ServiceBusClient.from_connection_string("your_connection_string") as client:
446
# Send messages asynchronously
447
async with client.get_queue_sender("my-queue") as sender:
448
messages = [ServiceBusMessage(f"Message {i}") for i in range(5)]
449
await sender.send_messages(messages)
450
451
# Receive messages asynchronously
452
async with client.get_queue_receiver("my-queue") as receiver:
453
async for message in receiver:
454
print(f"Received: {message.body}")
455
await receiver.complete_message(message)
456
457
asyncio.run(send_and_receive())
458
```