0
# Message Queues
1
2
POSIX named message queues provide reliable, priority-based message passing between processes. Messages are ordered by priority (highest first) and FIFO within the same priority level. Message queues support both blocking and non-blocking operations, timeouts, and asynchronous notifications.
3
4
Note: Message queue support varies by platform. Check the `MESSAGE_QUEUES_SUPPORTED` constant before using message queue functionality. macOS does not support POSIX message queues.
5
6
## Capabilities
7
8
### Message Queue Creation and Management
9
10
Create and manage named POSIX message queues with configurable capacity, message size limits, and access permissions.
11
12
```python { .api }
13
class MessageQueue:
14
def __init__(self, name, flags=0, mode=0o600, max_messages=QUEUE_MESSAGES_MAX_DEFAULT,
15
max_message_size=QUEUE_MESSAGE_SIZE_MAX_DEFAULT, read=True, write=True):
16
"""
17
Create or open a named message queue.
18
19
Parameters:
20
- name: str or None. If None, a random name is chosen. If str, should start with '/' (e.g., '/my_queue')
21
- flags: int, creation flags (O_CREAT, O_EXCL, O_CREX)
22
- mode: int, permissions (octal, default 0o600)
23
- max_messages: int, maximum messages in queue (default: QUEUE_MESSAGES_MAX_DEFAULT)
24
- max_message_size: int, maximum message size in bytes (default: QUEUE_MESSAGE_SIZE_MAX_DEFAULT)
25
- read: bool, whether this handle can receive messages
26
- write: bool, whether this handle can send messages
27
28
Notes:
29
- max_messages and max_message_size are ignored when opening existing queue
30
- Default values may be quite small (e.g., 10 messages) on some systems
31
- Consider providing explicit values for production use
32
- read/write permissions only affect this handle, not other handles to same queue
33
"""
34
```
35
36
### Message Operations
37
38
Send and receive messages with priority support and timeout handling.
39
40
```python { .api }
41
def send(self, message, timeout=None, priority=0):
42
"""
43
Send a message to the queue.
44
45
Parameters:
46
- message: bytes or str, message content (can contain embedded NULLs)
47
- timeout: None, 0, or positive float
48
- None: Block indefinitely until message sent
49
- 0: Non-blocking, raises BusyError if queue full
50
- > 0: Wait up to timeout seconds, raises BusyError if timeout expires
51
- priority: int, message priority (0 = lowest, higher values = higher priority, max = QUEUE_PRIORITY_MAX)
52
53
Raises:
54
- BusyError: When timeout expires or non-blocking call cannot proceed
55
- ValueError: When message exceeds max_message_size
56
"""
57
58
def receive(self, timeout=None):
59
"""
60
Receive a message from the queue.
61
62
Parameters:
63
- timeout: None, 0, or positive float (same semantics as send())
64
65
Returns:
66
tuple: (message, priority) where message is bytes and priority is int
67
68
Messages are received in priority order (highest priority first),
69
and FIFO order within the same priority level.
70
71
Raises:
72
- BusyError: When timeout expires or non-blocking call finds empty queue
73
"""
74
```
75
76
### Asynchronous Notifications
77
78
Request notifications when the queue transitions from empty to non-empty.
79
80
```python { .api }
81
def request_notification(self, notification=None):
82
"""
83
Request or cancel notification when queue becomes non-empty.
84
85
Parameters:
86
- notification: None, int, or tuple
87
- None: Cancel any existing notification request
88
- int: Signal number to send when queue becomes non-empty
89
- tuple: (function, param) to call function(param) in new thread when queue becomes non-empty
90
91
Notes:
92
- Only one notification request per queue system-wide
93
- OS delivers at most one notification per request
94
- Must call again for subsequent notifications
95
- Fails with BusyError if another process has pending notification request
96
97
Raises:
98
- BusyError: When another process already has notification request pending
99
"""
100
```
101
102
### Resource Management
103
104
Close queue handles and clean up queue resources.
105
106
```python { .api }
107
def fileno(self):
108
"""
109
Returns the message queue descriptor.
110
111
Returns:
112
int: Message queue descriptor (same as the mqd property)
113
114
This method allows MessageQueue objects to be used with functions
115
that expect file-like objects with a fileno() method.
116
"""
117
118
def close(self):
119
"""
120
Close this handle to the message queue.
121
122
Must be called explicitly - not automatically called on garbage collection.
123
Other handles to the same queue remain valid.
124
"""
125
126
def unlink(self):
127
"""
128
Request destruction of the message queue.
129
130
Actual destruction is postponed until all handles are closed.
131
After unlinking, new open() calls with the same name create a new queue.
132
"""
133
134
def __str__(self):
135
"""
136
String representation of the message queue.
137
138
Returns:
139
str: Human-readable representation including name and current message count
140
"""
141
142
def __repr__(self):
143
"""
144
Detailed string representation for debugging.
145
146
Returns:
147
str: Technical representation with class name and key attributes
148
"""
149
```
150
151
### Message Queue Properties
152
153
Access queue metadata, capacity limits, and current state.
154
155
```python { .api }
156
@property
157
def name(self):
158
"""
159
The name provided in the constructor.
160
161
Returns:
162
str: Message queue name
163
"""
164
165
@property
166
def mode(self):
167
"""
168
The mode (permissions) provided in the constructor.
169
170
Returns:
171
int: File mode/permissions (e.g., 0o600)
172
"""
173
174
@property
175
def mqd(self):
176
"""
177
Message queue descriptor representing the queue.
178
179
Returns:
180
int: Message queue descriptor (platform-specific handle)
181
"""
182
183
@property
184
def block(self):
185
"""
186
Whether send() and receive() operations may block.
187
188
Returns:
189
bool: True if operations may block, False if they raise BusyError instead
190
"""
191
192
@block.setter
193
def block(self, value):
194
"""
195
Set blocking behavior for send() and receive() operations.
196
197
Parameters:
198
- value: bool, True to allow blocking, False to raise BusyError instead
199
"""
200
201
@property
202
def max_messages(self):
203
"""
204
Maximum number of messages the queue can hold.
205
206
Returns:
207
int: Maximum message count
208
"""
209
210
@property
211
def max_message_size(self):
212
"""
213
Maximum size of individual messages in bytes.
214
215
Returns:
216
int: Maximum message size
217
"""
218
219
@property
220
def current_messages(self):
221
"""
222
Current number of messages in the queue.
223
224
Returns:
225
int: Current message count
226
"""
227
```
228
229
### Module Function
230
231
Convenience function for unlinking message queues by name.
232
233
```python { .api }
234
def unlink_message_queue(name):
235
"""
236
Convenience function to unlink a message queue by name.
237
238
Parameters:
239
- name: str, message queue name (e.g., '/my_queue')
240
241
Equivalent to opening the queue and calling unlink(), but more convenient
242
when you only need to remove an existing queue.
243
"""
244
```
245
246
## Usage Examples
247
248
### Basic Message Queue Usage
249
250
```python
251
import posix_ipc
252
253
# Check if message queues are supported
254
if not posix_ipc.MESSAGE_QUEUES_SUPPORTED:
255
print("Message queues not supported on this platform")
256
exit(1)
257
258
# Create a message queue
259
mq = posix_ipc.MessageQueue('/my_queue', posix_ipc.O_CREAT)
260
261
# Send a message
262
message = b'Hello, message queue!'
263
mq.send(message)
264
265
# Receive the message
266
received_message, priority = mq.receive()
267
print(f"Received: {received_message.decode()}, Priority: {priority}")
268
269
# Clean up
270
mq.close()
271
mq.unlink()
272
```
273
274
### Priority-Based Messaging
275
276
```python
277
import posix_ipc
278
279
mq = posix_ipc.MessageQueue('/priority_queue', posix_ipc.O_CREAT)
280
281
# Send messages with different priorities
282
mq.send(b'Low priority message', priority=1)
283
mq.send(b'High priority message', priority=10)
284
mq.send(b'Medium priority message', priority=5)
285
286
# Receive messages (highest priority first)
287
for i in range(3):
288
message, priority = mq.receive()
289
print(f"Received (priority {priority}): {message.decode()}")
290
291
mq.close()
292
mq.unlink()
293
```
294
295
### Non-Blocking and Timeout Operations
296
297
```python
298
import posix_ipc
299
300
mq = posix_ipc.MessageQueue('/timeout_queue', posix_ipc.O_CREAT)
301
302
# Non-blocking send (queue empty, should succeed)
303
try:
304
mq.send(b'Non-blocking message', timeout=0)
305
print("Message sent immediately")
306
except posix_ipc.BusyError:
307
print("Queue full, message not sent")
308
309
# Non-blocking receive
310
try:
311
message, priority = mq.receive(timeout=0)
312
print(f"Message received immediately: {message.decode()}")
313
except posix_ipc.BusyError:
314
print("Queue empty, no message received")
315
316
# Timeout receive
317
try:
318
message, priority = mq.receive(timeout=2.0)
319
print(f"Message received within timeout: {message.decode()}")
320
except posix_ipc.BusyError:
321
print("Timeout expired, no message received")
322
323
mq.close()
324
mq.unlink()
325
```
326
327
### Using Block Property
328
329
```python
330
import posix_ipc
331
332
mq = posix_ipc.MessageQueue('/block_queue', posix_ipc.O_CREAT)
333
334
# Enable non-blocking mode
335
mq.block = False
336
337
try:
338
# This will raise BusyError immediately if queue is empty
339
message, priority = mq.receive()
340
except posix_ipc.BusyError:
341
print("Queue empty (non-blocking mode)")
342
343
# Re-enable blocking mode
344
mq.block = True
345
346
mq.close()
347
mq.unlink()
348
```
349
350
### Asynchronous Notifications with Signals
351
352
```python
353
import posix_ipc
354
import signal
355
import os
356
import time
357
358
# Signal handler
359
def message_arrived(signum, frame):
360
print(f"Signal {signum} received - message available!")
361
362
# Set up signal handler
363
signal.signal(signal.SIGUSR1, message_arrived)
364
365
mq = posix_ipc.MessageQueue('/notify_queue', posix_ipc.O_CREAT)
366
367
# Request notification via signal
368
mq.request_notification(signal.SIGUSR1)
369
370
print("Waiting for message notification...")
371
372
# In another process, send a message to trigger notification
373
if os.fork() == 0: # Child process
374
time.sleep(2) # Wait a bit
375
child_mq = posix_ipc.MessageQueue('/notify_queue')
376
child_mq.send(b'Notification test message')
377
child_mq.close()
378
exit(0)
379
380
# Parent process waits for signal
381
time.sleep(5)
382
383
# Receive the message
384
message, priority = mq.receive()
385
print(f"Received: {message.decode()}")
386
387
mq.close()
388
mq.unlink()
389
```
390
391
### Asynchronous Notifications with Function Callback
392
393
```python
394
import posix_ipc
395
import threading
396
import time
397
398
def notification_callback(queue_name):
399
print(f"Callback called for queue: {queue_name}")
400
# Could process message here or signal main thread
401
402
mq = posix_ipc.MessageQueue('/callback_queue', posix_ipc.O_CREAT)
403
404
# Request notification via callback
405
mq.request_notification((notification_callback, '/callback_queue'))
406
407
print("Waiting for message notification...")
408
409
# Send message from another thread
410
def send_message():
411
time.sleep(2)
412
sender_mq = posix_ipc.MessageQueue('/callback_queue')
413
sender_mq.send(b'Callback test message')
414
sender_mq.close()
415
416
threading.Thread(target=send_message).start()
417
418
# Wait for notification and process message
419
time.sleep(5)
420
message, priority = mq.receive()
421
print(f"Received: {message.decode()}")
422
423
mq.close()
424
mq.unlink()
425
```
426
427
### Queue Capacity and Limits
428
429
```python
430
import posix_ipc
431
432
# Create queue with specific limits
433
mq = posix_ipc.MessageQueue('/capacity_queue', posix_ipc.O_CREAT,
434
max_messages=5, max_message_size=256)
435
436
print(f"Max messages: {mq.max_messages}")
437
print(f"Max message size: {mq.max_message_size}")
438
print(f"Current messages: {mq.current_messages}")
439
440
# Fill the queue to capacity
441
for i in range(mq.max_messages):
442
mq.send(f'Message {i}'.encode())
443
print(f"Sent message {i}, queue has {mq.current_messages} messages")
444
445
# Try to send one more (should block or fail depending on timeout)
446
try:
447
mq.send(b'Overflow message', timeout=0)
448
except posix_ipc.BusyError:
449
print("Queue full - cannot send more messages")
450
451
mq.close()
452
mq.unlink()
453
```
454
455
### Producer-Consumer Pattern
456
457
```python
458
import posix_ipc
459
import os
460
import time
461
import json
462
463
def producer():
464
mq = posix_ipc.MessageQueue('/producer_consumer', posix_ipc.O_CREAT)
465
466
for i in range(10):
467
data = {'id': i, 'timestamp': time.time(), 'value': i * 10}
468
message = json.dumps(data).encode()
469
mq.send(message, priority=i % 3) # Vary priority
470
print(f"Produced: {data}")
471
time.sleep(0.5)
472
473
# Send termination message
474
mq.send(b'STOP', priority=99)
475
mq.close()
476
477
def consumer():
478
mq = posix_ipc.MessageQueue('/producer_consumer')
479
480
while True:
481
message, priority = mq.receive()
482
483
if message == b'STOP':
484
print("Consumer stopping")
485
break
486
487
data = json.loads(message.decode())
488
print(f"Consumed (priority {priority}): {data}")
489
time.sleep(0.2)
490
491
mq.close()
492
mq.unlink()
493
494
# Run producer and consumer in separate processes
495
if os.fork() == 0: # Child - consumer
496
consumer()
497
else: # Parent - producer
498
producer()
499
os.wait() # Wait for child to finish
500
```