0
# Message Management
1
2
Core message classes for creating, manipulating, and processing AMQP messages including support for different body types, properties, headers, annotations, and batch operations.
3
4
## Capabilities
5
6
### Message Class
7
8
The core AMQP message representation that encapsulates all message data including body, properties, headers, and annotations.
9
10
```python { .api }
11
class Message:
12
def __init__(self, body=None, properties=None, application_properties=None,
13
annotations=None, header=None, msg_format=None, encoding='UTF-8',
14
body_type=None, footer=None, delivery_annotations=None):
15
"""
16
Create an AMQP message.
17
18
Parameters:
19
- body: Message body (str, bytes, list, or dict)
20
- properties (MessageProperties): Standard AMQP message properties
21
- application_properties (dict): Custom application-specific properties
22
- annotations (dict): Message annotations (broker-specific metadata)
23
- header (MessageHeader): Message header with delivery information
24
- msg_format (int): Message format identifier
25
- encoding (str): Text encoding for string data
26
- body_type (MessageBodyType): Type of message body (Data, Value, Sequence)
27
- footer (dict): Message footer annotations
28
- delivery_annotations (dict): Delivery-specific annotations
29
"""
30
```
31
32
**Key Properties:**
33
34
```python { .api }
35
# Message content access
36
@property
37
def data: bytes # Binary message data
38
@property
39
def value: any # Decoded message value
40
@property
41
def sequence: list # Sequence message data
42
43
# Message metadata
44
@property
45
def properties: MessageProperties # Standard message properties
46
@property
47
def application_properties: dict # Custom properties
48
@property
49
def header: MessageHeader # Message header
50
@property
51
def annotations: dict # Message annotations
52
@property
53
def delivery_annotations: dict # Delivery annotations
54
@property
55
def footer: dict # Message footer
56
57
# Message state
58
@property
59
def settled: bool # Whether message is settled
60
@property
61
def state: MessageState # Current message state (enum)
62
@property
63
def idle_time: int # Time message has been idle
64
@property
65
def retries: int # Number of retry attempts
66
@property
67
def delivery_no: int # Delivery sequence number
68
@property
69
def delivery_tag: bytes # Delivery tag identifier
70
@property
71
def message_annotations: dict # Alias for annotations property
72
```
73
74
**Key Methods:**
75
76
```python { .api }
77
def get_data(self):
78
"""Get the message body as bytes."""
79
80
def get_message(self):
81
"""Get the underlying C message object."""
82
83
def accept(self):
84
"""
85
Accept the message (acknowledge successful processing).
86
87
Returns:
88
bool: True if settlement succeeded
89
"""
90
91
def reject(self, condition=None, description=None, info=None):
92
"""
93
Reject the message (indicate processing failure).
94
95
Parameters:
96
- condition (str): Error condition code
97
- description (str): Error description
98
- info (dict): Additional error information
99
100
Returns:
101
bool: True if settlement succeeded
102
"""
103
104
def release(self):
105
"""
106
Release the message (return to queue for redelivery).
107
108
Returns:
109
bool: True if settlement succeeded
110
"""
111
112
def modify(self, failed, deliverable, annotations=None):
113
"""
114
Modify message state and annotations.
115
116
Parameters:
117
- failed (bool): Whether message processing failed
118
- deliverable (bool): Whether message is deliverable
119
- annotations (dict): Additional annotations to add
120
"""
121
122
def gather(self):
123
"""Gather multi-part message data."""
124
125
def get_message_encoded_size(self):
126
"""
127
Get the encoded size of the message.
128
129
Returns:
130
int: Message size in bytes
131
"""
132
133
def encode_message(self):
134
"""
135
Encode message to AMQP wire format.
136
137
Returns:
138
bytearray: Encoded message data
139
"""
140
141
@classmethod
142
def decode_from_bytes(cls, data, encoding='UTF-8'):
143
"""
144
Decode message from raw AMQP bytes.
145
146
Parameters:
147
- data (bytes): Raw AMQP message data
148
- encoding (str): Text encoding for string data
149
150
Returns:
151
Message: Decoded message instance
152
"""
153
```
154
155
**Usage Examples:**
156
157
```python
158
from uamqp import Message
159
from uamqp.message import MessageProperties, MessageHeader
160
161
# Simple text message
162
message = Message("Hello World")
163
164
# Message with properties
165
properties = MessageProperties(
166
message_id="msg-123",
167
content_type="text/plain",
168
reply_to="response-queue"
169
)
170
message = Message("Hello World", properties=properties)
171
172
# Message with custom application properties
173
app_props = {"priority": "high", "source": "sensor-1"}
174
message = Message("sensor data", application_properties=app_props)
175
176
# Binary message
177
binary_data = b'\x00\x01\x02\x03'
178
message = Message(binary_data)
179
180
# JSON message (will be serialized)
181
json_data = {"temperature": 23.5, "humidity": 65}
182
message = Message(json_data)
183
```
184
185
### BatchMessage Class
186
187
Optimized message class for high-throughput scenarios that can contain multiple individual messages in a single batch.
188
189
```python { .api }
190
class BatchMessage:
191
def __init__(self, data=None, properties=None, application_properties=None,
192
annotations=None, header=None, multi_messages=False, encoding='UTF-8'):
193
"""
194
Create a batch message for high-throughput scenarios.
195
196
Parameters:
197
- data: Batch data (list of messages or encoded batch data)
198
- properties (MessageProperties): Batch-level message properties
199
- application_properties (dict): Custom batch properties
200
- annotations (dict): Batch annotations
201
- header (MessageHeader): Batch header
202
- multi_messages (bool): Whether data contains multiple distinct messages
203
- encoding (str): Text encoding
204
"""
205
```
206
207
**Key Properties:**
208
209
```python { .api }
210
@property
211
def batch_format: int # Batch message format identifier
212
@property
213
def max_message_length: int # Maximum individual message length
214
@property
215
def data: bytes # Batch data
216
```
217
218
**Key Methods:**
219
220
```python { .api }
221
def gather(self):
222
"""Gather and prepare batch data for transmission."""
223
```
224
225
**Usage Example:**
226
227
```python
228
from uamqp import BatchMessage, Message
229
230
# Create individual messages
231
messages = [
232
Message("Message 1"),
233
Message("Message 2"),
234
Message("Message 3")
235
]
236
237
# Create batch message
238
batch = BatchMessage(messages, multi_messages=True)
239
240
# Send batch message
241
from uamqp import SendClient
242
with SendClient(target) as client:
243
client.queue_message(batch)
244
client.send_all_messages()
245
```
246
247
### MessageProperties Class
248
249
Container for standard AMQP message properties following the AMQP 1.0 specification.
250
251
```python { .api }
252
class MessageProperties:
253
def __init__(self, message_id=None, user_id=None, to=None, subject=None,
254
reply_to=None, correlation_id=None, content_type=None,
255
content_encoding=None, absolute_expiry_time=None,
256
creation_time=None, group_id=None, group_sequence=None,
257
reply_to_group_id=None):
258
"""
259
Standard AMQP message properties.
260
261
Parameters:
262
- message_id: Unique message identifier
263
- user_id: Identity of user responsible for producing message
264
- to: Address of node the message is being sent to
265
- subject: Application-specific subject
266
- reply_to: Address of node to send replies to
267
- correlation_id: Application correlation identifier
268
- content_type: MIME content type
269
- content_encoding: MIME content encoding
270
- absolute_expiry_time: Absolute expiry time
271
- creation_time: Message creation timestamp
272
- group_id: Group identifier
273
- group_sequence: Position within group
274
- reply_to_group_id: Group identifier for replies
275
"""
276
277
def get_properties_obj(self):
278
"""
279
Get underlying C properties reference.
280
281
Returns:
282
uamqp.c_uamqp.cProperties: C properties object
283
"""
284
```
285
286
**Usage Example:**
287
288
```python
289
from uamqp.message import MessageProperties
290
import datetime
291
292
properties = MessageProperties(
293
message_id="order-12345",
294
content_type="application/json",
295
reply_to="order-responses",
296
correlation_id="req-98765",
297
creation_time=datetime.datetime.utcnow(),
298
subject="order-created"
299
)
300
301
message = Message(order_data, properties=properties)
302
```
303
304
### MessageHeader Class
305
306
Container for AMQP message header information including delivery details and quality of service settings.
307
308
```python { .api }
309
class MessageHeader:
310
def __init__(self, durable=None, priority=None, time_to_live=None,
311
first_acquirer=None, delivery_count=None):
312
"""
313
AMQP message header information.
314
315
Parameters:
316
- durable (bool): Whether message should survive broker restart
317
- priority (int): Message priority (0-255)
318
- time_to_live (int): Message TTL in milliseconds
319
- first_acquirer (bool): Whether this is first attempt to acquire
320
- delivery_count (int): Number of delivery attempts
321
"""
322
```
323
324
**Key Properties:**
325
326
```python { .api }
327
@property
328
def delivery_count: int # Number of delivery attempts
329
@property
330
def time_to_live: int # TTL in milliseconds
331
@property
332
def ttl: int # Alias for time_to_live
333
@property
334
def durable: bool # Message durability
335
@property
336
def first_acquirer: bool # First acquisition flag
337
@property
338
def priority: int # Message priority (0-255)
339
340
def get_header_obj(self):
341
"""
342
Get underlying C header reference.
343
344
Returns:
345
uamqp.c_uamqp.cHeader: C header object
346
"""
347
```
348
349
**Usage Example:**
350
351
```python
352
from uamqp.message import MessageHeader
353
354
# High priority, durable message with 1 hour TTL
355
header = MessageHeader(
356
durable=True,
357
priority=255,
358
time_to_live=3600000 # 1 hour in milliseconds
359
)
360
361
message = Message("Important data", header=header)
362
```
363
364
### Message Body Types
365
366
Different message body formats supported by AMQP 1.0.
367
368
```python { .api }
369
class MessageBody:
370
"""Base class for message bodies."""
371
@property
372
def type: int # Body type identifier
373
@property
374
def data: any # Body data
375
376
class DataBody(MessageBody):
377
"""Binary data message body."""
378
def append(self, data):
379
"""Append binary data to body."""
380
381
def __len__(self):
382
"""Get number of data segments."""
383
384
def __getitem__(self, index):
385
"""Get data segment by index."""
386
387
def __str__(self):
388
"""String representation of data body."""
389
390
def __bytes__(self):
391
"""Bytes representation of data body."""
392
393
class ValueBody(MessageBody):
394
"""Single encoded value message body."""
395
def set(self, value):
396
"""Set the body value."""
397
398
def __str__(self):
399
"""String representation of value body."""
400
401
def __bytes__(self):
402
"""Bytes representation of value body."""
403
404
class SequenceBody(MessageBody):
405
"""Sequence of structured data message body."""
406
def append(self, data):
407
"""Append data to sequence."""
408
409
def __len__(self):
410
"""Get number of sequence items."""
411
412
def __getitem__(self, index):
413
"""Get sequence item by index."""
414
415
def __str__(self):
416
"""String representation of sequence body."""
417
418
def __bytes__(self):
419
"""Bytes representation of sequence body."""
420
```
421
422
**Usage Example:**
423
424
```python
425
from uamqp.message import DataBody, ValueBody, SequenceBody
426
427
# Binary data body
428
data_body = DataBody()
429
data_body.append(b"binary data chunk 1")
430
data_body.append(b"binary data chunk 2")
431
432
# Single value body
433
value_body = ValueBody()
434
value_body.set({"key": "value", "number": 42})
435
436
# Sequence body
437
sequence_body = SequenceBody()
438
sequence_body.append("item 1")
439
sequence_body.append("item 2")
440
sequence_body.append({"structured": "data"})
441
```
442
443
### MessageBodyType Enum
444
445
Message body type constants that define how message data is structured:
446
447
```python { .api }
448
class MessageBodyType:
449
"""Message body type identifiers."""
450
Data = 0x75 # Binary data body
451
Value = 0x77 # Single encoded value body
452
Sequence = 0x76 # Sequence of structured data body
453
```
454
455
## Message Settlement
456
457
AMQP message settlement patterns for reliable message processing:
458
459
```python
460
# Accept message (successful processing)
461
message.accept()
462
463
# Reject message (processing failed, don't retry)
464
message.reject(
465
condition="processing-error",
466
description="Invalid data format"
467
)
468
469
# Release message (temporary failure, allow retry)
470
message.release()
471
472
# Modify message (change delivery state/annotations)
473
message.modify(
474
failed=True,
475
deliverable=False,
476
annotations={"retry_count": 3}
477
)
478
```