POSIX IPC primitives (semaphores, shared memory and message queues) for Python
—
POSIX named message queues provide reliable, priority-based message passing between processes. Messages are ordered by priority (highest first) and FIFO within the same priority level. Message queues support both blocking and non-blocking operations, timeouts, and asynchronous notifications.
Note: Message queue support varies by platform. Check the MESSAGE_QUEUES_SUPPORTED constant before using message queue functionality. macOS does not support POSIX message queues.
Create and manage named POSIX message queues with configurable capacity, message size limits, and access permissions.
class MessageQueue:
def __init__(self, name, flags=0, mode=0o600, max_messages=QUEUE_MESSAGES_MAX_DEFAULT,
max_message_size=QUEUE_MESSAGE_SIZE_MAX_DEFAULT, read=True, write=True):
"""
Create or open a named message queue.
Parameters:
- name: str or None. If None, a random name is chosen. If str, should start with '/' (e.g., '/my_queue')
- flags: int, creation flags (O_CREAT, O_EXCL, O_CREX)
- mode: int, permissions (octal, default 0o600)
- max_messages: int, maximum messages in queue (default: QUEUE_MESSAGES_MAX_DEFAULT)
- max_message_size: int, maximum message size in bytes (default: QUEUE_MESSAGE_SIZE_MAX_DEFAULT)
- read: bool, whether this handle can receive messages
- write: bool, whether this handle can send messages
Notes:
- max_messages and max_message_size are ignored when opening existing queue
- Default values may be quite small (e.g., 10 messages) on some systems
- Consider providing explicit values for production use
- read/write permissions only affect this handle, not other handles to same queue
"""Send and receive messages with priority support and timeout handling.
def send(self, message, timeout=None, priority=0):
"""
Send a message to the queue.
Parameters:
- message: bytes or str, message content (can contain embedded NULLs)
- timeout: None, 0, or positive float
- None: Block indefinitely until message sent
- 0: Non-blocking, raises BusyError if queue full
- > 0: Wait up to timeout seconds, raises BusyError if timeout expires
- priority: int, message priority (0 = lowest, higher values = higher priority, max = QUEUE_PRIORITY_MAX)
Raises:
- BusyError: When timeout expires or non-blocking call cannot proceed
- ValueError: When message exceeds max_message_size
"""
def receive(self, timeout=None):
"""
Receive a message from the queue.
Parameters:
- timeout: None, 0, or positive float (same semantics as send())
Returns:
tuple: (message, priority) where message is bytes and priority is int
Messages are received in priority order (highest priority first),
and FIFO order within the same priority level.
Raises:
- BusyError: When timeout expires or non-blocking call finds empty queue
"""Request notifications when the queue transitions from empty to non-empty.
def request_notification(self, notification=None):
"""
Request or cancel notification when queue becomes non-empty.
Parameters:
- notification: None, int, or tuple
- None: Cancel any existing notification request
- int: Signal number to send when queue becomes non-empty
- tuple: (function, param) to call function(param) in new thread when queue becomes non-empty
Notes:
- Only one notification request per queue system-wide
- OS delivers at most one notification per request
- Must call again for subsequent notifications
- Fails with BusyError if another process has pending notification request
Raises:
- BusyError: When another process already has notification request pending
"""Close queue handles and clean up queue resources.
def fileno(self):
"""
Returns the message queue descriptor.
Returns:
int: Message queue descriptor (same as the mqd property)
This method allows MessageQueue objects to be used with functions
that expect file-like objects with a fileno() method.
"""
def close(self):
"""
Close this handle to the message queue.
Must be called explicitly - not automatically called on garbage collection.
Other handles to the same queue remain valid.
"""
def unlink(self):
"""
Request destruction of the message queue.
Actual destruction is postponed until all handles are closed.
After unlinking, new open() calls with the same name create a new queue.
"""
def __str__(self):
"""
String representation of the message queue.
Returns:
str: Human-readable representation including name and current message count
"""
def __repr__(self):
"""
Detailed string representation for debugging.
Returns:
str: Technical representation with class name and key attributes
"""Access queue metadata, capacity limits, and current state.
@property
def name(self):
"""
The name provided in the constructor.
Returns:
str: Message queue name
"""
@property
def mode(self):
"""
The mode (permissions) provided in the constructor.
Returns:
int: File mode/permissions (e.g., 0o600)
"""
@property
def mqd(self):
"""
Message queue descriptor representing the queue.
Returns:
int: Message queue descriptor (platform-specific handle)
"""
@property
def block(self):
"""
Whether send() and receive() operations may block.
Returns:
bool: True if operations may block, False if they raise BusyError instead
"""
@block.setter
def block(self, value):
"""
Set blocking behavior for send() and receive() operations.
Parameters:
- value: bool, True to allow blocking, False to raise BusyError instead
"""
@property
def max_messages(self):
"""
Maximum number of messages the queue can hold.
Returns:
int: Maximum message count
"""
@property
def max_message_size(self):
"""
Maximum size of individual messages in bytes.
Returns:
int: Maximum message size
"""
@property
def current_messages(self):
"""
Current number of messages in the queue.
Returns:
int: Current message count
"""Convenience function for unlinking message queues by name.
def unlink_message_queue(name):
"""
Convenience function to unlink a message queue by name.
Parameters:
- name: str, message queue name (e.g., '/my_queue')
Equivalent to opening the queue and calling unlink(), but more convenient
when you only need to remove an existing queue.
"""import posix_ipc
# Check if message queues are supported
if not posix_ipc.MESSAGE_QUEUES_SUPPORTED:
print("Message queues not supported on this platform")
exit(1)
# Create a message queue
mq = posix_ipc.MessageQueue('/my_queue', posix_ipc.O_CREAT)
# Send a message
message = b'Hello, message queue!'
mq.send(message)
# Receive the message
received_message, priority = mq.receive()
print(f"Received: {received_message.decode()}, Priority: {priority}")
# Clean up
mq.close()
mq.unlink()import posix_ipc
mq = posix_ipc.MessageQueue('/priority_queue', posix_ipc.O_CREAT)
# Send messages with different priorities
mq.send(b'Low priority message', priority=1)
mq.send(b'High priority message', priority=10)
mq.send(b'Medium priority message', priority=5)
# Receive messages (highest priority first)
for i in range(3):
message, priority = mq.receive()
print(f"Received (priority {priority}): {message.decode()}")
mq.close()
mq.unlink()import posix_ipc
mq = posix_ipc.MessageQueue('/timeout_queue', posix_ipc.O_CREAT)
# Non-blocking send (queue empty, should succeed)
try:
mq.send(b'Non-blocking message', timeout=0)
print("Message sent immediately")
except posix_ipc.BusyError:
print("Queue full, message not sent")
# Non-blocking receive
try:
message, priority = mq.receive(timeout=0)
print(f"Message received immediately: {message.decode()}")
except posix_ipc.BusyError:
print("Queue empty, no message received")
# Timeout receive
try:
message, priority = mq.receive(timeout=2.0)
print(f"Message received within timeout: {message.decode()}")
except posix_ipc.BusyError:
print("Timeout expired, no message received")
mq.close()
mq.unlink()import posix_ipc
mq = posix_ipc.MessageQueue('/block_queue', posix_ipc.O_CREAT)
# Enable non-blocking mode
mq.block = False
try:
# This will raise BusyError immediately if queue is empty
message, priority = mq.receive()
except posix_ipc.BusyError:
print("Queue empty (non-blocking mode)")
# Re-enable blocking mode
mq.block = True
mq.close()
mq.unlink()import posix_ipc
import signal
import os
import time
# Signal handler
def message_arrived(signum, frame):
print(f"Signal {signum} received - message available!")
# Set up signal handler
signal.signal(signal.SIGUSR1, message_arrived)
mq = posix_ipc.MessageQueue('/notify_queue', posix_ipc.O_CREAT)
# Request notification via signal
mq.request_notification(signal.SIGUSR1)
print("Waiting for message notification...")
# In another process, send a message to trigger notification
if os.fork() == 0: # Child process
time.sleep(2) # Wait a bit
child_mq = posix_ipc.MessageQueue('/notify_queue')
child_mq.send(b'Notification test message')
child_mq.close()
exit(0)
# Parent process waits for signal
time.sleep(5)
# Receive the message
message, priority = mq.receive()
print(f"Received: {message.decode()}")
mq.close()
mq.unlink()import posix_ipc
import threading
import time
def notification_callback(queue_name):
print(f"Callback called for queue: {queue_name}")
# Could process message here or signal main thread
mq = posix_ipc.MessageQueue('/callback_queue', posix_ipc.O_CREAT)
# Request notification via callback
mq.request_notification((notification_callback, '/callback_queue'))
print("Waiting for message notification...")
# Send message from another thread
def send_message():
time.sleep(2)
sender_mq = posix_ipc.MessageQueue('/callback_queue')
sender_mq.send(b'Callback test message')
sender_mq.close()
threading.Thread(target=send_message).start()
# Wait for notification and process message
time.sleep(5)
message, priority = mq.receive()
print(f"Received: {message.decode()}")
mq.close()
mq.unlink()import posix_ipc
# Create queue with specific limits
mq = posix_ipc.MessageQueue('/capacity_queue', posix_ipc.O_CREAT,
max_messages=5, max_message_size=256)
print(f"Max messages: {mq.max_messages}")
print(f"Max message size: {mq.max_message_size}")
print(f"Current messages: {mq.current_messages}")
# Fill the queue to capacity
for i in range(mq.max_messages):
mq.send(f'Message {i}'.encode())
print(f"Sent message {i}, queue has {mq.current_messages} messages")
# Try to send one more (should block or fail depending on timeout)
try:
mq.send(b'Overflow message', timeout=0)
except posix_ipc.BusyError:
print("Queue full - cannot send more messages")
mq.close()
mq.unlink()import posix_ipc
import os
import time
import json
def producer():
mq = posix_ipc.MessageQueue('/producer_consumer', posix_ipc.O_CREAT)
for i in range(10):
data = {'id': i, 'timestamp': time.time(), 'value': i * 10}
message = json.dumps(data).encode()
mq.send(message, priority=i % 3) # Vary priority
print(f"Produced: {data}")
time.sleep(0.5)
# Send termination message
mq.send(b'STOP', priority=99)
mq.close()
def consumer():
mq = posix_ipc.MessageQueue('/producer_consumer')
while True:
message, priority = mq.receive()
if message == b'STOP':
print("Consumer stopping")
break
data = json.loads(message.decode())
print(f"Consumed (priority {priority}): {data}")
time.sleep(0.2)
mq.close()
mq.unlink()
# Run producer and consumer in separate processes
if os.fork() == 0: # Child - consumer
consumer()
else: # Parent - producer
producer()
os.wait() # Wait for child to finishInstall with Tessl CLI
npx tessl i tessl/pypi-posix-ipc