POSIX IPC primitives (semaphores, shared memory and message queues) for Python
—
POSIX named semaphores provide process synchronization and resource counting functionality. They maintain a non-negative integer value that can be atomically incremented (released) and decremented (acquired), making them ideal for controlling access to shared resources and coordinating between processes.
Create and manage named POSIX semaphores with configurable initial values, permissions, and creation flags.
class Semaphore:
def __init__(self, name, flags=0, mode=0o600, initial_value=0):
"""
Create or open a named semaphore.
Parameters:
- name: str or None. If None, a random name is chosen. If str, should start with '/' (e.g., '/my_semaphore')
- flags: int, creation flags (O_CREAT, O_EXCL, or O_CREX)
- mode: int, permissions (octal, default 0o600)
- initial_value: int, initial semaphore value (ignored if opening existing semaphore)
"""Acquire (decrement) and release (increment) semaphore values with optional timeout support.
def acquire(self, timeout=None):
"""
Wait for and acquire the semaphore (decrement its value).
Parameters:
- timeout: None, 0, or positive float
- None: Block indefinitely until semaphore available
- 0: Non-blocking, raises BusyError if not immediately available
- > 0: Wait up to timeout seconds, raises BusyError if timeout expires
(Note: timeout > 0 treated as infinite on platforms without sem_timedwait support)
Raises:
- BusyError: When timeout expires or non-blocking call cannot proceed
- SignalError: When interrupted by signal (other than KeyboardInterrupt)
"""
def release(self):
"""
Release the semaphore (increment its value).
Allows waiting processes to proceed.
"""Properly close and clean up semaphore resources.
def close(self):
"""
Close the semaphore handle for the current process.
Must be called explicitly - not automatically called on garbage collection.
The semaphore can be reopened after closing (assuming it still exists).
"""
def unlink(self):
"""
Mark the semaphore for destruction.
If other processes have the semaphore open, destruction is postponed until
all processes close it. Once unlinked, new open() calls with the same name
refer to a new semaphore.
"""Access semaphore metadata and current state.
@property
def name(self):
"""
The name provided in the constructor.
Returns:
str: Semaphore name
"""
@property
def mode(self):
"""
The mode (permissions) provided in the constructor.
Returns:
int: File mode/permissions (e.g., 0o600)
"""
@property
def value(self):
"""
Current integer value of the semaphore.
Returns:
int: Current semaphore value
Note: Not available on macOS (raises AttributeError). Check
SEMAPHORE_VALUE_SUPPORTED constant before using.
"""Use semaphores with Python's with statement for automatic acquire/release.
def __enter__(self):
"""
Context manager entry - calls acquire().
Returns:
self: The semaphore instance
"""
def __exit__(self, exc_type, exc_value, traceback):
"""
Context manager exit - calls release().
Parameters:
- exc_type: Exception type (if any)
- exc_value: Exception value (if any)
- traceback: Exception traceback (if any)
Always calls release() regardless of whether an exception occurred.
"""
def __str__(self):
"""
String representation of the semaphore.
Returns:
str: Human-readable representation including name and current value (if available)
"""
def __repr__(self):
"""
Detailed string representation for debugging.
Returns:
str: Technical representation with class name and key attributes
"""# Context manager automatically handles acquire/release
with posix_ipc.Semaphore('/my_semaphore') as sem:
# Critical section - semaphore is acquired
pass
# Semaphore is automatically released when exiting the with blockConvenience function for unlinking semaphores by name.
def unlink_semaphore(name):
"""
Convenience function to unlink a semaphore by name.
Parameters:
- name: str, semaphore name (e.g., '/my_semaphore')
Equivalent to opening the semaphore and calling unlink(), but more convenient
when you only need to remove an existing semaphore.
"""import posix_ipc
# Create a binary semaphore (mutex)
mutex = posix_ipc.Semaphore('/my_mutex', posix_ipc.O_CREAT, initial_value=1)
# Critical section with manual acquire/release
mutex.acquire()
try:
# Protected code here
print("In critical section")
finally:
mutex.release()
# Clean up
mutex.close()
mutex.unlink()import posix_ipc
# Create semaphore
sem = posix_ipc.Semaphore('/my_sem', posix_ipc.O_CREAT, initial_value=1)
# Use context manager for automatic acquire/release
with sem:
print("Semaphore acquired automatically")
# Semaphore is released when exiting the with block
# Clean up
sem.close()
sem.unlink()import posix_ipc
sem = posix_ipc.Semaphore('/timeout_sem', posix_ipc.O_CREAT, initial_value=0)
# Non-blocking attempt
try:
sem.acquire(timeout=0)
print("Semaphore acquired immediately")
except posix_ipc.BusyError:
print("Semaphore not available")
# Timeout attempt (if platform supports it)
if posix_ipc.SEMAPHORE_TIMEOUT_SUPPORTED:
try:
sem.acquire(timeout=5.0) # Wait up to 5 seconds
print("Semaphore acquired within timeout")
except posix_ipc.BusyError:
print("Timeout expired")
else:
print("Timeout not supported on this platform")
sem.close()
sem.unlink()import posix_ipc
# Create a counting semaphore (e.g., for connection pool with 5 connections)
pool_sem = posix_ipc.Semaphore('/connection_pool', posix_ipc.O_CREAT, initial_value=5)
# Acquire a resource
pool_sem.acquire()
print("Resource acquired")
# Check current count (if supported)
if posix_ipc.SEMAPHORE_VALUE_SUPPORTED:
print(f"Resources remaining: {pool_sem.value}")
# Release the resource
pool_sem.release()
pool_sem.close()
pool_sem.unlink()Install with Tessl CLI
npx tessl i tessl/pypi-posix-ipc