CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-posix-ipc

POSIX IPC primitives (semaphores, shared memory and message queues) for Python

Pending
Overview
Eval results
Files

message-queues.mddocs/

Message Queues

POSIX named message queues provide reliable, priority-based message passing between processes. Messages are ordered by priority (highest first) and FIFO within the same priority level. Message queues support both blocking and non-blocking operations, timeouts, and asynchronous notifications.

Note: Message queue support varies by platform. Check the MESSAGE_QUEUES_SUPPORTED constant before using message queue functionality. macOS does not support POSIX message queues.

Capabilities

Message Queue Creation and Management

Create and manage named POSIX message queues with configurable capacity, message size limits, and access permissions.

class MessageQueue:
    def __init__(self, name, flags=0, mode=0o600, max_messages=QUEUE_MESSAGES_MAX_DEFAULT, 
                 max_message_size=QUEUE_MESSAGE_SIZE_MAX_DEFAULT, read=True, write=True):
        """
        Create or open a named message queue.
        
        Parameters:
        - name: str or None. If None, a random name is chosen. If str, should start with '/' (e.g., '/my_queue')
        - flags: int, creation flags (O_CREAT, O_EXCL, O_CREX)
        - mode: int, permissions (octal, default 0o600)
        - max_messages: int, maximum messages in queue (default: QUEUE_MESSAGES_MAX_DEFAULT)
        - max_message_size: int, maximum message size in bytes (default: QUEUE_MESSAGE_SIZE_MAX_DEFAULT)
        - read: bool, whether this handle can receive messages
        - write: bool, whether this handle can send messages
        
        Notes:
        - max_messages and max_message_size are ignored when opening existing queue
        - Default values may be quite small (e.g., 10 messages) on some systems
        - Consider providing explicit values for production use
        - read/write permissions only affect this handle, not other handles to same queue
        """

Message Operations

Send and receive messages with priority support and timeout handling.

def send(self, message, timeout=None, priority=0):
    """
    Send a message to the queue.
    
    Parameters:
    - message: bytes or str, message content (can contain embedded NULLs)
    - timeout: None, 0, or positive float
      - None: Block indefinitely until message sent
      - 0: Non-blocking, raises BusyError if queue full
      - > 0: Wait up to timeout seconds, raises BusyError if timeout expires
    - priority: int, message priority (0 = lowest, higher values = higher priority, max = QUEUE_PRIORITY_MAX)
    
    Raises:
    - BusyError: When timeout expires or non-blocking call cannot proceed
    - ValueError: When message exceeds max_message_size
    """

def receive(self, timeout=None):
    """
    Receive a message from the queue.
    
    Parameters:
    - timeout: None, 0, or positive float (same semantics as send())
    
    Returns:
    tuple: (message, priority) where message is bytes and priority is int
    
    Messages are received in priority order (highest priority first),
    and FIFO order within the same priority level.
    
    Raises:
    - BusyError: When timeout expires or non-blocking call finds empty queue
    """

Asynchronous Notifications

Request notifications when the queue transitions from empty to non-empty.

def request_notification(self, notification=None):
    """
    Request or cancel notification when queue becomes non-empty.
    
    Parameters:
    - notification: None, int, or tuple
      - None: Cancel any existing notification request
      - int: Signal number to send when queue becomes non-empty
      - tuple: (function, param) to call function(param) in new thread when queue becomes non-empty
    
    Notes:
    - Only one notification request per queue system-wide
    - OS delivers at most one notification per request
    - Must call again for subsequent notifications
    - Fails with BusyError if another process has pending notification request
    
    Raises:
    - BusyError: When another process already has notification request pending
    """

Resource Management

Close queue handles and clean up queue resources.

def fileno(self):
    """
    Returns the message queue descriptor.
    
    Returns:
    int: Message queue descriptor (same as the mqd property)
    
    This method allows MessageQueue objects to be used with functions
    that expect file-like objects with a fileno() method.
    """

def close(self):
    """
    Close this handle to the message queue.
    
    Must be called explicitly - not automatically called on garbage collection.
    Other handles to the same queue remain valid.
    """

def unlink(self):
    """
    Request destruction of the message queue.
    
    Actual destruction is postponed until all handles are closed.
    After unlinking, new open() calls with the same name create a new queue.
    """

def __str__(self):
    """
    String representation of the message queue.
    
    Returns:
    str: Human-readable representation including name and current message count
    """

def __repr__(self):
    """
    Detailed string representation for debugging.
    
    Returns:
    str: Technical representation with class name and key attributes
    """

Message Queue Properties

Access queue metadata, capacity limits, and current state.

@property
def name(self):
    """
    The name provided in the constructor.
    
    Returns:
    str: Message queue name
    """

@property
def mode(self):
    """
    The mode (permissions) provided in the constructor.
    
    Returns:
    int: File mode/permissions (e.g., 0o600)
    """

@property
def mqd(self):
    """
    Message queue descriptor representing the queue.
    
    Returns:
    int: Message queue descriptor (platform-specific handle)
    """

@property
def block(self):
    """
    Whether send() and receive() operations may block.
    
    Returns:
    bool: True if operations may block, False if they raise BusyError instead
    """

@block.setter
def block(self, value):
    """
    Set blocking behavior for send() and receive() operations.
    
    Parameters:
    - value: bool, True to allow blocking, False to raise BusyError instead
    """

@property
def max_messages(self):
    """
    Maximum number of messages the queue can hold.
    
    Returns:
    int: Maximum message count
    """

@property
def max_message_size(self):
    """
    Maximum size of individual messages in bytes.
    
    Returns:
    int: Maximum message size
    """

@property
def current_messages(self):
    """
    Current number of messages in the queue.
    
    Returns:
    int: Current message count
    """

Module Function

Convenience function for unlinking message queues by name.

def unlink_message_queue(name):
    """
    Convenience function to unlink a message queue by name.
    
    Parameters:
    - name: str, message queue name (e.g., '/my_queue')
    
    Equivalent to opening the queue and calling unlink(), but more convenient
    when you only need to remove an existing queue.
    """

Usage Examples

Basic Message Queue Usage

import posix_ipc

# Check if message queues are supported
if not posix_ipc.MESSAGE_QUEUES_SUPPORTED:
    print("Message queues not supported on this platform")
    exit(1)

# Create a message queue
mq = posix_ipc.MessageQueue('/my_queue', posix_ipc.O_CREAT)

# Send a message
message = b'Hello, message queue!'
mq.send(message)

# Receive the message
received_message, priority = mq.receive()
print(f"Received: {received_message.decode()}, Priority: {priority}")

# Clean up
mq.close()
mq.unlink()

Priority-Based Messaging

import posix_ipc

mq = posix_ipc.MessageQueue('/priority_queue', posix_ipc.O_CREAT)

# Send messages with different priorities
mq.send(b'Low priority message', priority=1)
mq.send(b'High priority message', priority=10)  
mq.send(b'Medium priority message', priority=5)

# Receive messages (highest priority first)
for i in range(3):
    message, priority = mq.receive()
    print(f"Received (priority {priority}): {message.decode()}")

mq.close()
mq.unlink()

Non-Blocking and Timeout Operations

import posix_ipc

mq = posix_ipc.MessageQueue('/timeout_queue', posix_ipc.O_CREAT)

# Non-blocking send (queue empty, should succeed)
try:
    mq.send(b'Non-blocking message', timeout=0)
    print("Message sent immediately")
except posix_ipc.BusyError:
    print("Queue full, message not sent")

# Non-blocking receive
try:
    message, priority = mq.receive(timeout=0)
    print(f"Message received immediately: {message.decode()}")
except posix_ipc.BusyError:
    print("Queue empty, no message received")

# Timeout receive
try:
    message, priority = mq.receive(timeout=2.0)
    print(f"Message received within timeout: {message.decode()}")
except posix_ipc.BusyError:
    print("Timeout expired, no message received")

mq.close()
mq.unlink()

Using Block Property

import posix_ipc

mq = posix_ipc.MessageQueue('/block_queue', posix_ipc.O_CREAT)

# Enable non-blocking mode
mq.block = False

try:
    # This will raise BusyError immediately if queue is empty
    message, priority = mq.receive()
except posix_ipc.BusyError:
    print("Queue empty (non-blocking mode)")

# Re-enable blocking mode
mq.block = True

mq.close()
mq.unlink()

Asynchronous Notifications with Signals

import posix_ipc
import signal
import os
import time

# Signal handler
def message_arrived(signum, frame):
    print(f"Signal {signum} received - message available!")

# Set up signal handler
signal.signal(signal.SIGUSR1, message_arrived)

mq = posix_ipc.MessageQueue('/notify_queue', posix_ipc.O_CREAT)

# Request notification via signal
mq.request_notification(signal.SIGUSR1)

print("Waiting for message notification...")

# In another process, send a message to trigger notification
if os.fork() == 0:  # Child process
    time.sleep(2)  # Wait a bit
    child_mq = posix_ipc.MessageQueue('/notify_queue')
    child_mq.send(b'Notification test message')
    child_mq.close()
    exit(0)

# Parent process waits for signal
time.sleep(5)

# Receive the message
message, priority = mq.receive()
print(f"Received: {message.decode()}")

mq.close()
mq.unlink()

Asynchronous Notifications with Function Callback

import posix_ipc
import threading
import time

def notification_callback(queue_name):
    print(f"Callback called for queue: {queue_name}")
    # Could process message here or signal main thread

mq = posix_ipc.MessageQueue('/callback_queue', posix_ipc.O_CREAT)

# Request notification via callback
mq.request_notification((notification_callback, '/callback_queue'))

print("Waiting for message notification...")

# Send message from another thread
def send_message():
    time.sleep(2)
    sender_mq = posix_ipc.MessageQueue('/callback_queue')
    sender_mq.send(b'Callback test message')
    sender_mq.close()

threading.Thread(target=send_message).start()

# Wait for notification and process message
time.sleep(5)
message, priority = mq.receive()
print(f"Received: {message.decode()}")

mq.close()
mq.unlink()

Queue Capacity and Limits

import posix_ipc

# Create queue with specific limits
mq = posix_ipc.MessageQueue('/capacity_queue', posix_ipc.O_CREAT, 
                           max_messages=5, max_message_size=256)

print(f"Max messages: {mq.max_messages}")
print(f"Max message size: {mq.max_message_size}")
print(f"Current messages: {mq.current_messages}")

# Fill the queue to capacity
for i in range(mq.max_messages):
    mq.send(f'Message {i}'.encode())
    print(f"Sent message {i}, queue has {mq.current_messages} messages")

# Try to send one more (should block or fail depending on timeout)
try:
    mq.send(b'Overflow message', timeout=0)
except posix_ipc.BusyError:
    print("Queue full - cannot send more messages")

mq.close()
mq.unlink()

Producer-Consumer Pattern

import posix_ipc
import os
import time
import json

def producer():
    mq = posix_ipc.MessageQueue('/producer_consumer', posix_ipc.O_CREAT)
    
    for i in range(10):
        data = {'id': i, 'timestamp': time.time(), 'value': i * 10}
        message = json.dumps(data).encode()
        mq.send(message, priority=i % 3)  # Vary priority
        print(f"Produced: {data}")
        time.sleep(0.5)
    
    # Send termination message
    mq.send(b'STOP', priority=99)
    mq.close()

def consumer():  
    mq = posix_ipc.MessageQueue('/producer_consumer')
    
    while True:
        message, priority = mq.receive()
        
        if message == b'STOP':
            print("Consumer stopping")
            break
            
        data = json.loads(message.decode())
        print(f"Consumed (priority {priority}): {data}")
        time.sleep(0.2)
    
    mq.close()
    mq.unlink()

# Run producer and consumer in separate processes
if os.fork() == 0:  # Child - consumer
    consumer()
else:  # Parent - producer
    producer()
    os.wait()  # Wait for child to finish

Install with Tessl CLI

npx tessl i tessl/pypi-posix-ipc

docs

constants.md

index.md

message-queues.md

semaphores.md

shared-memory.md

tile.json