0
# Message Classes and Types
1
2
Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.
3
4
## Capabilities
5
6
### ServiceBusMessage
7
8
Message class for creating messages to send to Service Bus entities.
9
10
```python { .api }
11
class ServiceBusMessage:
12
def __init__(
13
self,
14
body: Optional[Union[str, bytes]],
15
*,
16
application_properties: Optional[Dict[Union[str, bytes], PrimitiveTypes]] = None,
17
session_id: Optional[str] = None,
18
message_id: Optional[str] = None,
19
scheduled_enqueue_time_utc: Optional[datetime] = None,
20
time_to_live: Optional[timedelta] = None,
21
content_type: Optional[str] = None,
22
correlation_id: Optional[str] = None,
23
subject: Optional[str] = None,
24
partition_key: Optional[str] = None,
25
to: Optional[str] = None,
26
reply_to: Optional[str] = None,
27
reply_to_session_id: Optional[str] = None,
28
**kwargs
29
):
30
"""
31
Create a Service Bus message.
32
33
Parameters:
34
- body: Message content (string or bytes)
35
- application_properties: Custom application properties dictionary
36
- session_id: Session identifier for session-enabled entities
37
- message_id: Unique message identifier
38
- scheduled_enqueue_time_utc: UTC time to schedule message delivery
39
- time_to_live: Message expiration duration
40
- content_type: MIME content type
41
- correlation_id: Correlation identifier for message correlation
42
- subject: Message subject or label
43
- partition_key: Partitioning key for partitioned entities
44
- to: Destination address
45
- reply_to: Reply-to address for response messages
46
- reply_to_session_id: Session ID for reply messages
47
"""
48
```
49
50
### ServiceBusMessage Properties
51
52
Access and modify message properties and metadata.
53
54
```python { .api }
55
@property
56
def body(self) -> Any:
57
"""
58
The message body content.
59
60
Returns:
61
Message body as provided during creation
62
"""
63
64
@property
65
def body_type(self) -> AmqpMessageBodyType:
66
"""
67
The AMQP message body type.
68
69
Returns:
70
AmqpMessageBodyType enum value
71
"""
72
73
@property
74
def application_properties(self) -> Optional[Dict[Union[str, bytes], PrimitiveTypes]]:
75
"""
76
Custom application-specific properties.
77
78
Returns:
79
Dictionary of application properties or None
80
"""
81
82
@application_properties.setter
83
def application_properties(self, value: Optional[Dict[Union[str, bytes], PrimitiveTypes]]) -> None: ...
84
85
@property
86
def session_id(self) -> Optional[str]:
87
"""
88
Session identifier for session-enabled entities.
89
90
Returns:
91
Session ID string or None
92
"""
93
94
@session_id.setter
95
def session_id(self, value: Optional[str]) -> None: ...
96
97
@property
98
def message_id(self) -> Optional[str]:
99
"""
100
Unique message identifier.
101
102
Returns:
103
Message ID string or None
104
"""
105
106
@message_id.setter
107
def message_id(self, value: Optional[str]) -> None: ...
108
109
@property
110
def scheduled_enqueue_time_utc(self) -> Optional[datetime]:
111
"""
112
UTC time when message should be enqueued.
113
114
Returns:
115
Scheduled enqueue time or None for immediate delivery
116
"""
117
118
@scheduled_enqueue_time_utc.setter
119
def scheduled_enqueue_time_utc(self, value: Optional[datetime]) -> None: ...
120
121
@property
122
def time_to_live(self) -> Optional[timedelta]:
123
"""
124
Message time-to-live duration.
125
126
Returns:
127
Time-to-live duration or None for no expiration
128
"""
129
130
@time_to_live.setter
131
def time_to_live(self, value: Optional[timedelta]) -> None: ...
132
133
@property
134
def content_type(self) -> Optional[str]:
135
"""
136
MIME content type of the message body.
137
138
Returns:
139
Content type string or None
140
"""
141
142
@content_type.setter
143
def content_type(self, value: Optional[str]) -> None: ...
144
145
@property
146
def correlation_id(self) -> Optional[str]:
147
"""
148
Correlation identifier for message correlation patterns.
149
150
Returns:
151
Correlation ID string or None
152
"""
153
154
@correlation_id.setter
155
def correlation_id(self, value: Optional[str]) -> None: ...
156
157
@property
158
def subject(self) -> Optional[str]:
159
"""
160
Message subject or label.
161
162
Returns:
163
Subject string or None
164
"""
165
166
@subject.setter
167
def subject(self, value: Optional[str]) -> None: ...
168
169
@property
170
def partition_key(self) -> Optional[str]:
171
"""
172
Partitioning key for partitioned entities.
173
174
Returns:
175
Partition key string or None
176
"""
177
178
@partition_key.setter
179
def partition_key(self, value: Optional[str]) -> None: ...
180
181
@property
182
def to(self) -> Optional[str]:
183
"""
184
Destination address.
185
186
Returns:
187
Destination address string or None
188
"""
189
190
@to.setter
191
def to(self, value: Optional[str]) -> None: ...
192
193
@property
194
def reply_to(self) -> Optional[str]:
195
"""
196
Reply-to address for response messages.
197
198
Returns:
199
Reply-to address string or None
200
"""
201
202
@reply_to.setter
203
def reply_to(self, value: Optional[str]) -> None: ...
204
205
@property
206
def reply_to_session_id(self) -> Optional[str]:
207
"""
208
Session ID for reply messages.
209
210
Returns:
211
Reply-to session ID string or None
212
"""
213
214
@reply_to_session_id.setter
215
def reply_to_session_id(self, value: Optional[str]) -> None: ...
216
217
@property
218
def raw_amqp_message(self) -> AmqpAnnotatedMessage:
219
"""
220
Underlying AMQP message for advanced scenarios.
221
222
Returns:
223
AmqpAnnotatedMessage object
224
"""
225
```
226
227
#### Usage Example
228
229
```python
230
from azure.servicebus import ServiceBusMessage
231
from datetime import datetime, timedelta
232
233
# Create a basic message
234
message = ServiceBusMessage("Hello, Service Bus!")
235
236
# Create a message with properties
237
message = ServiceBusMessage(
238
body="Order processing request",
239
application_properties={
240
"order_id": "12345",
241
"priority": "high",
242
"customer_id": "customer_abc"
243
},
244
session_id="order-session-12345",
245
message_id="msg-001",
246
content_type="application/json",
247
correlation_id="corr-123",
248
subject="OrderProcessing",
249
time_to_live=timedelta(hours=24)
250
)
251
252
# Schedule a message for future delivery
253
future_message = ServiceBusMessage(
254
"This will be delivered in 1 hour",
255
scheduled_enqueue_time_utc=datetime.utcnow() + timedelta(hours=1)
256
)
257
258
# Modify properties after creation
259
message.application_properties["processed_at"] = datetime.utcnow().isoformat()
260
message.subject = "UpdatedSubject"
261
```
262
263
### ServiceBusReceivedMessage
264
265
Message class for messages received from Service Bus entities (extends ServiceBusMessage).
266
267
```python { .api }
268
class ServiceBusReceivedMessage(ServiceBusMessage):
269
# Inherits all properties from ServiceBusMessage
270
271
@property
272
def dead_letter_error_description(self) -> Optional[str]:
273
"""
274
Error description if message was dead lettered.
275
276
Returns:
277
Error description string or None
278
"""
279
280
@property
281
def dead_letter_reason(self) -> Optional[str]:
282
"""
283
Reason if message was dead lettered.
284
285
Returns:
286
Dead letter reason string or None
287
"""
288
289
@property
290
def dead_letter_source(self) -> Optional[str]:
291
"""
292
Source entity name if message was dead lettered.
293
294
Returns:
295
Source entity name string or None
296
"""
297
298
@property
299
def state(self) -> ServiceBusMessageState:
300
"""
301
Current state of the message.
302
303
Returns:
304
ServiceBusMessageState enum value (ACTIVE, DEFERRED, SCHEDULED)
305
"""
306
307
@property
308
def delivery_count(self) -> Optional[int]:
309
"""
310
Number of times this message has been delivered.
311
312
Returns:
313
Delivery count or None
314
"""
315
316
@property
317
def enqueued_sequence_number(self) -> Optional[int]:
318
"""
319
Sequence number assigned when message was enqueued.
320
321
Returns:
322
Enqueued sequence number or None
323
"""
324
325
@property
326
def enqueued_time_utc(self) -> Optional[datetime]:
327
"""
328
UTC time when message was enqueued.
329
330
Returns:
331
Enqueue time or None
332
"""
333
334
@property
335
def expires_at_utc(self) -> Optional[datetime]:
336
"""
337
UTC time when message expires.
338
339
Returns:
340
Expiration time or None
341
"""
342
343
@property
344
def sequence_number(self) -> Optional[int]:
345
"""
346
Unique sequence number assigned by Service Bus.
347
348
Returns:
349
Sequence number or None
350
"""
351
352
@property
353
def lock_token(self) -> Optional[Union[uuid.UUID, str]]:
354
"""
355
Lock token for PEEK_LOCK receive mode.
356
357
Returns:
358
Lock token UUID/string or None
359
"""
360
361
@property
362
def locked_until_utc(self) -> Optional[datetime]:
363
"""
364
UTC time when message lock expires.
365
366
Returns:
367
Lock expiration time or None
368
"""
369
```
370
371
#### Usage Example
372
373
```python
374
from azure.servicebus import ServiceBusClient, ServiceBusMessageState
375
376
client = ServiceBusClient.from_connection_string("your_connection_string")
377
378
with client.get_queue_receiver("my-queue") as receiver:
379
messages = receiver.receive_messages(max_message_count=5)
380
381
for message in messages:
382
print(f"Message body: {message.body}")
383
print(f"Sequence number: {message.sequence_number}")
384
print(f"Delivery count: {message.delivery_count}")
385
print(f"Enqueued at: {message.enqueued_time_utc}")
386
print(f"Expires at: {message.expires_at_utc}")
387
print(f"Lock expires at: {message.locked_until_utc}")
388
print(f"State: {message.state}")
389
390
# Check application properties
391
if message.application_properties:
392
for key, value in message.application_properties.items():
393
print(f" {key}: {value}")
394
395
# Check if message has been redelivered
396
if message.delivery_count and message.delivery_count > 1:
397
print(f"Message has been redelivered {message.delivery_count} times")
398
399
# Check message state
400
if message.state == ServiceBusMessageState.DEFERRED:
401
print("Message is deferred")
402
elif message.state == ServiceBusMessageState.SCHEDULED:
403
print("Message is scheduled")
404
405
receiver.complete_message(message)
406
```
407
408
### ServiceBusMessageBatch
409
410
Batch container for efficient sending of multiple messages.
411
412
```python { .api }
413
class ServiceBusMessageBatch:
414
@property
415
def max_size_in_bytes(self) -> int:
416
"""
417
Maximum size of the batch in bytes.
418
419
Returns:
420
Maximum batch size
421
"""
422
423
@property
424
def size_in_bytes(self) -> int:
425
"""
426
Current size of the batch in bytes.
427
428
Returns:
429
Current batch size
430
"""
431
432
def add_message(self, message: Union[ServiceBusMessage, AmqpAnnotatedMessage]) -> None:
433
"""
434
Add a message to the batch.
435
436
Parameters:
437
- message: ServiceBusMessage or AmqpAnnotatedMessage to add
438
439
Raises:
440
- ValueError: If message would exceed batch size limit
441
"""
442
```
443
444
#### Usage Example
445
446
```python
447
from azure.servicebus import ServiceBusClient, ServiceBusMessage
448
449
client = ServiceBusClient.from_connection_string("your_connection_string")
450
451
with client.get_queue_sender("my-queue") as sender:
452
# Create a message batch
453
batch = sender.create_message_batch()
454
455
# Add messages to the batch
456
for i in range(100):
457
message = ServiceBusMessage(f"Batch message {i}")
458
try:
459
batch.add_message(message)
460
except ValueError:
461
# Batch is full, send it and create a new one
462
print(f"Sending batch with {batch.size_in_bytes} bytes")
463
sender.send_messages(batch)
464
465
# Create new batch and add current message
466
batch = sender.create_message_batch()
467
batch.add_message(message)
468
469
# Send remaining messages in the batch
470
if batch.size_in_bytes > 0:
471
print(f"Sending final batch with {batch.size_in_bytes} bytes")
472
sender.send_messages(batch)
473
```
474
475
### AMQP Message Types
476
477
Low-level AMQP message types for advanced scenarios.
478
479
```python { .api }
480
class AmqpAnnotatedMessage:
481
"""
482
Low-level AMQP message representation.
483
484
Provides direct access to AMQP message structure for advanced scenarios
485
where fine-grained control over message properties is needed.
486
"""
487
@property
488
def body(self) -> Any: ...
489
@property
490
def body_type(self) -> AmqpMessageBodyType: ...
491
@property
492
def properties(self) -> Optional[AmqpMessageProperties]: ...
493
@property
494
def application_properties(self) -> Optional[Dict]: ...
495
@property
496
def annotations(self) -> Optional[Dict]: ...
497
@property
498
def delivery_annotations(self) -> Optional[Dict]: ...
499
@property
500
def header(self) -> Optional[AmqpMessageHeader]: ...
501
@property
502
def footer(self) -> Optional[Dict]: ...
503
504
class AmqpMessageBodyType(Enum):
505
"""AMQP message body types."""
506
DATA = "data"
507
SEQUENCE = "sequence"
508
VALUE = "value"
509
510
class AmqpMessageProperties:
511
"""AMQP message properties section."""
512
@property
513
def message_id(self) -> Optional[Union[str, bytes]]: ...
514
@property
515
def user_id(self) -> Optional[bytes]: ...
516
@property
517
def to(self) -> Optional[Union[str, bytes]]: ...
518
@property
519
def subject(self) -> Optional[str]: ...
520
@property
521
def reply_to(self) -> Optional[Union[str, bytes]]: ...
522
@property
523
def correlation_id(self) -> Optional[Union[str, bytes]]: ...
524
@property
525
def content_type(self) -> Optional[str]: ...
526
@property
527
def content_encoding(self) -> Optional[str]: ...
528
@property
529
def absolute_expiry_time(self) -> Optional[int]: ...
530
@property
531
def creation_time(self) -> Optional[int]: ...
532
@property
533
def group_id(self) -> Optional[str]: ...
534
@property
535
def group_sequence(self) -> Optional[int]: ...
536
@property
537
def reply_to_group_id(self) -> Optional[str]: ...
538
539
class AmqpMessageHeader:
540
"""AMQP message header section."""
541
@property
542
def durable(self) -> Optional[bool]: ...
543
@property
544
def priority(self) -> Optional[int]: ...
545
@property
546
def time_to_live(self) -> Optional[int]: ...
547
@property
548
def first_acquirer(self) -> Optional[bool]: ...
549
@property
550
def delivery_count(self) -> Optional[int]: ...
551
```
552
553
### Message State and Type Enums
554
555
Enumerations for message states and types.
556
557
```python { .api }
558
class ServiceBusMessageState(int, Enum):
559
"""
560
Message states in Service Bus.
561
"""
562
ACTIVE = 0 # Message is active in queue/subscription
563
DEFERRED = 1 # Message has been deferred for later processing
564
SCHEDULED = 2 # Message is scheduled for future delivery
565
```
566
567
### Auto Lock Renewal
568
569
Automatic lock renewal for messages during long processing.
570
571
```python { .api }
572
class AutoLockRenewer:
573
def register(
574
self,
575
renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],
576
timeout: float = 300
577
) -> None:
578
"""
579
Register a message for automatic lock renewal.
580
581
Parameters:
582
- renewable: ServiceBusReceivedMessage to auto-renew
583
- timeout: Maximum renewal duration in seconds
584
585
Raises:
586
- ValueError: If message is already registered or invalid
587
"""
588
```
589
590
#### Usage Example
591
592
```python
593
from azure.servicebus import ServiceBusClient, AutoLockRenewer
594
import time
595
596
def process_long_running_message(message):
597
# Simulate long processing time
598
time.sleep(60) # 1 minute processing
599
return f"Processed: {message.body}"
600
601
client = ServiceBusClient.from_connection_string("your_connection_string")
602
auto_renewer = AutoLockRenewer(max_lock_renewal_duration=300) # 5 minutes
603
604
try:
605
with client.get_queue_receiver("my-queue") as receiver:
606
messages = receiver.receive_messages(max_message_count=5)
607
608
for message in messages:
609
# Register message for auto-renewal
610
auto_renewer.register(message, timeout=300)
611
612
try:
613
result = process_long_running_message(message)
614
print(result)
615
receiver.complete_message(message)
616
except Exception as e:
617
print(f"Error processing message: {e}")
618
receiver.abandon_message(message)
619
620
finally:
621
auto_renewer.close()
622
```
623
624
## Type Definitions
625
626
Common type aliases used throughout the SDK.
627
628
```python { .api }
629
# Union types for message parameters
630
MessageTypes = Union[
631
ServiceBusMessage,
632
AmqpAnnotatedMessage,
633
List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]
634
]
635
636
# Primitive types for application properties
637
PrimitiveTypes = Union[int, float, str, bool, bytes, None]
638
639
# Session filter type
640
NextAvailableSessionType = ServiceBusSessionFilter
641
642
# Lock renewal callback type
643
LockRenewFailureCallback = Callable[
644
[Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]],
645
None
646
]
647
648
# Azure credential types (from azure-core)
649
TokenCredential = Any # azure.core.credentials.TokenCredential
650
AzureSasCredential = Any # azure.core.credentials.AzureSasCredential
651
AzureNamedKeyCredential = Any # azure.core.credentials.AzureNamedKeyCredential
652
```