Cross-platform file locking library that provides reliable file locking mechanisms across Windows, Linux, Unix, and macOS systems
—
Bounded semaphores for limiting concurrent processes accessing shared resources, with support for named semaphores across process boundaries. These classes help coordinate multiple processes by allowing only a specified number to access a resource simultaneously.
The recommended semaphore implementation that provides named bounded semaphores for cross-process resource coordination.
class NamedBoundedSemaphore:
"""
Named bounded semaphore for limiting concurrent processes.
Parameters:
- maximum: Maximum number of concurrent processes allowed
- name: Semaphore name for cross-process coordination (auto-generated if None)
- filename_pattern: Pattern for lock filenames (default: '{name}.{number:02d}.lock')
- directory: Directory for lock files (default: system temp directory)
- timeout: Timeout when trying to acquire semaphore (default: 5.0)
- check_interval: Check interval while waiting (default: 0.25)
- fail_when_locked: Fail immediately if no slots available (default: True)
"""
def __init__(self, maximum: int, name: str | None = None,
filename_pattern: str = '{name}.{number:02d}.lock',
directory: str = tempfile.gettempdir(), timeout: float | None = 5.0,
check_interval: float | None = 0.25, fail_when_locked: bool | None = True) -> None: ...
def acquire(self, timeout: float | None = None, check_interval: float | None = None,
fail_when_locked: bool | None = None) -> Lock | None:
"""
Acquire a semaphore slot.
Returns:
- Lock object for the acquired slot, or None if acquisition failed
Raises:
- AlreadyLocked: If no slots available and fail_when_locked=True
"""
def release(self) -> None:
"""Release the currently held semaphore slot"""
def get_filenames(self) -> typing.Sequence[pathlib.Path]:
"""Get all possible lock filenames for this semaphore"""
def get_random_filenames(self) -> typing.Sequence[pathlib.Path]:
"""Get lock filenames in random order (for load balancing)"""
def __enter__(self) -> Lock:
"""Context manager entry - acquire semaphore slot"""
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context manager exit - release semaphore slot"""The original semaphore implementation, deprecated in favor of NamedBoundedSemaphore.
class BoundedSemaphore:
"""
DEPRECATED: Use NamedBoundedSemaphore instead.
Bounded semaphore with potential naming conflicts between unrelated processes.
"""
def __init__(self, maximum: int, name: str = 'bounded_semaphore',
filename_pattern: str = '{name}.{number:02d}.lock',
directory: str = tempfile.gettempdir(), **kwargs) -> None: ...Basic semaphore usage for limiting concurrent processes:
import portalocker
# Allow maximum 3 concurrent processes
with portalocker.NamedBoundedSemaphore(3, name='database_workers') as lock:
# Only 3 processes can be in this block simultaneously
print("Processing database batch...")
process_database_batch()
print("Batch completed")
# Semaphore slot released automaticallyManual semaphore management:
import portalocker
# Create semaphore for 2 concurrent downloaders
semaphore = portalocker.NamedBoundedSemaphore(2, name='file_downloaders')
try:
# Try to acquire a slot
lock = semaphore.acquire(timeout=10.0)
if lock:
print("Starting download...")
download_large_file()
print("Download completed")
else:
print("No download slots available")
finally:
if lock:
semaphore.release()Resource pool management:
import portalocker
import time
def worker_process(worker_id: int):
"""Worker that processes items with limited concurrency"""
# Limit to 5 concurrent workers across all processes
semaphore = portalocker.NamedBoundedSemaphore(
maximum=5,
name='worker_pool',
timeout=30.0 # Wait up to 30 seconds for a slot
)
try:
with semaphore:
print(f"Worker {worker_id} starting...")
# Simulate work that should be limited
process_cpu_intensive_task()
print(f"Worker {worker_id} completed")
except portalocker.AlreadyLocked:
print(f"Worker {worker_id} could not get a slot - too many workers running")
# Start multiple worker processes
for i in range(10):
worker_process(i)Custom lock file locations and patterns:
import portalocker
# Custom semaphore with specific lock file pattern
semaphore = portalocker.NamedBoundedSemaphore(
maximum=4,
name='video_processors',
filename_pattern='video_{name}_slot_{number:03d}.lock',
directory='/var/lock/myapp',
timeout=60.0
)
with semaphore:
# Lock files will be created like:
# /var/lock/myapp/video_video_processors_slot_000.lock
# /var/lock/myapp/video_video_processors_slot_001.lock
# etc.
process_video_file()Cross-process coordination:
import portalocker
import multiprocessing
import os
def worker_function(worker_id):
"""Function to run in separate processes"""
# Same semaphore name ensures coordination across processes
with portalocker.NamedBoundedSemaphore(2, name='shared_resource') as lock:
print(f"Process {os.getpid()} (worker {worker_id}) acquired resource")
# Simulate resource usage
time.sleep(2)
print(f"Process {os.getpid()} (worker {worker_id}) releasing resource")
# Start multiple processes - only 2 will run concurrently
processes = []
for i in range(6):
p = multiprocessing.Process(target=worker_function, args=(i,))
p.start()
processes.append(p)
# Wait for all processes to complete
for p in processes:
p.join()Fail-fast behavior for non-blocking semaphores:
import portalocker
# Create semaphore that fails immediately if no slots available
semaphore = portalocker.NamedBoundedSemaphore(
maximum=1,
name='singleton_process',
fail_when_locked=True,
timeout=0 # Don't wait
)
try:
with semaphore:
# This will only succeed if no other instance is running
print("Running singleton process...")
run_singleton_task()
except portalocker.AlreadyLocked:
print("Another instance is already running. Exiting.")
exit(1)Semaphores automatically randomize lock file order to distribute load:
import portalocker
# The semaphore will try lock files in random order
# to avoid all processes competing for the same slot
semaphore = portalocker.NamedBoundedSemaphore(10, name='load_balanced_workers')
with semaphore:
# Processes will naturally distribute across available slots
process_work_item()Semaphores use the same exception types as other portalocker components:
import portalocker
try:
with portalocker.NamedBoundedSemaphore(3, name='workers') as lock:
do_work()
except portalocker.AlreadyLocked:
print("All semaphore slots are currently in use")
except portalocker.LockException as e:
print(f"Semaphore error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")To migrate from deprecated BoundedSemaphore to NamedBoundedSemaphore:
# Old (deprecated)
semaphore = portalocker.BoundedSemaphore(5, name='my_workers')
# New (recommended)
semaphore = portalocker.NamedBoundedSemaphore(5, name='my_workers')
# The interface is identical, just change the class nameimport pathlib
import tempfile
import typing
# Filename types
Filename = Union[str, pathlib.Path]
# Default values
DEFAULT_TIMEOUT = 5.0
DEFAULT_CHECK_INTERVAL = 0.25
# Semaphore uses Lock objects internally
from portalocker.utils import LockInstall with Tessl CLI
npx tessl i tessl/pypi-portalocker