0
# Message Properties & Types
1
2
AMQP message properties, delivery modes, exchange types, and type definitions for comprehensive message handling and routing control in RabbitMQ.
3
4
## Capabilities
5
6
### Message Properties
7
8
AMQP BasicProperties for message metadata and routing information.
9
10
```python { .api }
11
class BasicProperties:
12
"""AMQP message properties."""
13
14
def __init__(self, content_type=None, content_encoding=None, headers=None,
15
delivery_mode=None, priority=None, correlation_id=None,
16
reply_to=None, expiration=None, message_id=None,
17
timestamp=None, type=None, user_id=None, app_id=None,
18
cluster_id=None):
19
"""
20
Create message properties.
21
22
Parameters:
23
- content_type (str): MIME content type (e.g., 'application/json')
24
- content_encoding (str): Content encoding (e.g., 'utf-8')
25
- headers (dict): Application-specific headers
26
- delivery_mode (int): Delivery mode (1=transient, 2=persistent)
27
- priority (int): Message priority (0-9)
28
- correlation_id (str): Correlation identifier for RPC
29
- reply_to (str): Queue name for RPC replies
30
- expiration (str): Message expiration time in milliseconds
31
- message_id (str): Unique message identifier
32
- timestamp (int): Message timestamp (Unix time)
33
- type (str): Message type identifier
34
- user_id (str): User ID that published the message
35
- app_id (str): Application identifier
36
- cluster_id (str): Cluster identifier
37
"""
38
39
def decode(self, encoded_data):
40
"""
41
Decode properties from bytes.
42
43
Parameters:
44
- encoded_data (bytes): Encoded property data
45
"""
46
47
def encode(self):
48
"""
49
Encode properties to bytes.
50
51
Returns:
52
- bytes: Encoded property data
53
"""
54
55
@property
56
def content_type(self) -> str:
57
"""MIME content type."""
58
59
@property
60
def content_encoding(self) -> str:
61
"""Content encoding."""
62
63
@property
64
def headers(self) -> dict:
65
"""Application headers dictionary."""
66
67
@property
68
def delivery_mode(self) -> int:
69
"""Delivery mode (1=transient, 2=persistent)."""
70
71
@property
72
def priority(self) -> int:
73
"""Message priority (0-9)."""
74
75
@property
76
def correlation_id(self) -> str:
77
"""Correlation ID for request/reply patterns."""
78
79
@property
80
def reply_to(self) -> str:
81
"""Reply queue name."""
82
83
@property
84
def expiration(self) -> str:
85
"""Message expiration in milliseconds."""
86
87
@property
88
def message_id(self) -> str:
89
"""Unique message identifier."""
90
91
@property
92
def timestamp(self) -> int:
93
"""Message timestamp (Unix time)."""
94
95
@property
96
def type(self) -> str:
97
"""Message type identifier."""
98
99
@property
100
def user_id(self) -> str:
101
"""User ID of message publisher."""
102
103
@property
104
def app_id(self) -> str:
105
"""Application identifier."""
106
107
@property
108
def cluster_id(self) -> str:
109
"""Cluster identifier."""
110
```
111
112
### Delivery Mode
113
114
Message persistence modes for durability control.
115
116
```python { .api }
117
from enum import Enum
118
119
class DeliveryMode(Enum):
120
"""Message delivery mode enumeration."""
121
122
Transient = 1 # Non-persistent messages (default)
123
Persistent = 2 # Persistent messages (survive broker restart)
124
```
125
126
### Exchange Types
127
128
AMQP exchange types for message routing patterns.
129
130
```python { .api }
131
from enum import Enum
132
133
class ExchangeType(Enum):
134
"""Exchange type enumeration."""
135
136
direct = 'direct' # Direct routing by routing key
137
fanout = 'fanout' # Broadcast to all bound queues
138
headers = 'headers' # Route based on header attributes
139
topic = 'topic' # Pattern-based routing with wildcards
140
```
141
142
### Protocol Constants
143
144
AMQP protocol constants and frame definitions.
145
146
```python { .api }
147
# Protocol version
148
PROTOCOL_VERSION = (0, 9, 1)
149
PORT = 5672
150
151
# Frame types
152
FRAME_METHOD = 1
153
FRAME_HEADER = 2
154
FRAME_BODY = 3
155
FRAME_HEARTBEAT = 8
156
157
# Frame size limits
158
FRAME_MAX_SIZE = 131072
159
FRAME_MIN_SIZE = 4096
160
FRAME_HEADER_SIZE = 7
161
FRAME_END_SIZE = 1
162
FRAME_END = 206
163
164
# Delivery mode constants
165
PERSISTENT_DELIVERY_MODE = 2
166
167
# AMQP reply codes
168
REPLY_SUCCESS = 200
169
CONTENT_TOO_LARGE = 311
170
NO_ROUTE = 312
171
NO_CONSUMERS = 313
172
CONNECTION_FORCED = 320
173
INVALID_PATH = 402
174
ACCESS_REFUSED = 403
175
NOT_FOUND = 404
176
PRECONDITION_FAILED = 406
177
FRAME_ERROR = 501
178
COMMAND_INVALID = 503
179
CHANNEL_ERROR = 504
180
NOT_ALLOWED = 530
181
NOT_IMPLEMENTED = 540
182
INTERNAL_ERROR = 541
183
```
184
185
## Usage Examples
186
187
### Basic Message Properties
188
189
```python
190
import pika
191
192
properties = pika.BasicProperties(
193
content_type='application/json',
194
delivery_mode=2, # Persistent message
195
headers={'source': 'web-app', 'version': '1.0'}
196
)
197
198
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
199
channel = connection.channel()
200
201
channel.basic_publish(
202
exchange='',
203
routing_key='data_queue',
204
body='{"message": "Hello World"}',
205
properties=properties
206
)
207
208
connection.close()
209
```
210
211
### Using Delivery Mode Enum
212
213
```python
214
import pika
215
216
# Persistent message using enum
217
properties = pika.BasicProperties(
218
delivery_mode=pika.DeliveryMode.Persistent.value
219
)
220
221
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
222
channel = connection.channel()
223
224
channel.basic_publish(
225
exchange='',
226
routing_key='persistent_queue',
227
body='This message will survive broker restart',
228
properties=properties
229
)
230
231
connection.close()
232
```
233
234
### RPC Pattern with Properties
235
236
```python
237
import pika
238
import uuid
239
240
class RpcClient:
241
def __init__(self):
242
self.connection = pika.BlockingConnection(
243
pika.ConnectionParameters('localhost')
244
)
245
self.channel = self.connection.channel()
246
247
# Declare callback queue
248
result = self.channel.queue_declare(queue='', exclusive=True)
249
self.callback_queue = result.method.queue
250
251
self.channel.basic_consume(
252
queue=self.callback_queue,
253
on_message_callback=self.on_response,
254
auto_ack=True
255
)
256
257
def on_response(self, ch, method, props, body):
258
if self.corr_id == props.correlation_id:
259
self.response = body
260
261
def call(self, message):
262
self.response = None
263
self.corr_id = str(uuid.uuid4())
264
265
# Publish with reply properties
266
self.channel.basic_publish(
267
exchange='',
268
routing_key='rpc_queue',
269
properties=pika.BasicProperties(
270
reply_to=self.callback_queue,
271
correlation_id=self.corr_id,
272
content_type='text/plain'
273
),
274
body=message
275
)
276
277
# Wait for response
278
while self.response is None:
279
self.connection.process_data_events()
280
281
return self.response
282
283
# Usage
284
rpc = RpcClient()
285
response = rpc.call("Hello RPC")
286
print(f"Response: {response.decode()}")
287
```
288
289
### Message Expiration
290
291
```python
292
import pika
293
294
# Message expires after 30 seconds
295
properties = pika.BasicProperties(
296
expiration='30000', # 30 seconds in milliseconds
297
delivery_mode=2
298
)
299
300
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
301
channel = connection.channel()
302
303
channel.basic_publish(
304
exchange='',
305
routing_key='temp_queue',
306
body='This message expires in 30 seconds',
307
properties=properties
308
)
309
310
connection.close()
311
```
312
313
### Headers-Based Routing
314
315
```python
316
import pika
317
318
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
319
channel = connection.channel()
320
321
# Declare headers exchange
322
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
323
324
# Publish with custom headers
325
properties = pika.BasicProperties(
326
headers={
327
'format': 'json',
328
'source': 'api',
329
'priority': 'high',
330
'x-match': 'all' # Match all headers
331
}
332
)
333
334
channel.basic_publish(
335
exchange='headers_exchange',
336
routing_key='', # Ignored for headers exchange
337
body='{"data": "headers routing example"}',
338
properties=properties
339
)
340
341
connection.close()
342
```
343
344
### Topic Exchange with Properties
345
346
```python
347
import pika
348
349
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
350
channel = connection.channel()
351
352
# Declare topic exchange
353
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
354
355
# Publish messages with different routing keys and properties
356
messages = [
357
('app.error.database', 'Database connection failed', 'high'),
358
('app.warning.cache', 'Cache miss rate high', 'medium'),
359
('app.info.startup', 'Application started', 'low')
360
]
361
362
for routing_key, message, priority in messages:
363
properties = pika.BasicProperties(
364
priority={'high': 9, 'medium': 5, 'low': 1}[priority],
365
timestamp=int(time.time()),
366
type='log_message',
367
headers={'level': routing_key.split('.')[1]}
368
)
369
370
channel.basic_publish(
371
exchange='topic_logs',
372
routing_key=routing_key,
373
body=message,
374
properties=properties
375
)
376
377
connection.close()
378
```
379
380
### Message Metadata Inspection
381
382
```python
383
import pika
384
385
def callback(ch, method, properties, body):
386
print(f"Message: {body.decode()}")
387
print(f"Content Type: {properties.content_type}")
388
print(f"Delivery Mode: {properties.delivery_mode}")
389
print(f"Priority: {properties.priority}")
390
print(f"Headers: {properties.headers}")
391
print(f"Timestamp: {properties.timestamp}")
392
print(f"Message ID: {properties.message_id}")
393
394
ch.basic_ack(delivery_tag=method.delivery_tag)
395
396
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
397
channel = connection.channel()
398
399
channel.queue_declare(queue='inspect_queue')
400
channel.basic_consume(queue='inspect_queue', on_message_callback=callback)
401
402
print('Waiting for messages...')
403
channel.start_consuming()
404
```
405
406
### Custom Application Properties
407
408
```python
409
import pika
410
import json
411
import time
412
413
# Custom message with application-specific properties
414
properties = pika.BasicProperties(
415
content_type='application/json',
416
content_encoding='utf-8',
417
delivery_mode=2,
418
priority=5,
419
correlation_id='req-12345',
420
message_id=f'msg-{int(time.time())}',
421
timestamp=int(time.time()),
422
type='user_event',
423
user_id='user123',
424
app_id='web-service-v1.0',
425
headers={
426
'event_type': 'user_signup',
427
'version': '2.1',
428
'source_ip': '192.168.1.100',
429
'user_agent': 'Mozilla/5.0...'
430
}
431
)
432
433
message_data = {
434
'user_id': 'user123',
435
'event': 'signup',
436
'timestamp': time.time()
437
}
438
439
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
440
channel = connection.channel()
441
442
channel.basic_publish(
443
exchange='user_events',
444
routing_key='signup',
445
body=json.dumps(message_data),
446
properties=properties
447
)
448
449
connection.close()
450
```