CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-portalocker

Cross-platform file locking library that provides reliable file locking mechanisms across Windows, Linux, Unix, and macOS systems

Pending
Overview
Eval results
Files

semaphores.mddocs/

Semaphores and Resource Management

Bounded semaphores for limiting concurrent processes accessing shared resources, with support for named semaphores across process boundaries. These classes help coordinate multiple processes by allowing only a specified number to access a resource simultaneously.

Capabilities

NamedBoundedSemaphore Class

The recommended semaphore implementation that provides named bounded semaphores for cross-process resource coordination.

class NamedBoundedSemaphore:
    """
    Named bounded semaphore for limiting concurrent processes.
    
    Parameters:
    - maximum: Maximum number of concurrent processes allowed
    - name: Semaphore name for cross-process coordination (auto-generated if None)
    - filename_pattern: Pattern for lock filenames (default: '{name}.{number:02d}.lock')
    - directory: Directory for lock files (default: system temp directory)
    - timeout: Timeout when trying to acquire semaphore (default: 5.0)
    - check_interval: Check interval while waiting (default: 0.25)
    - fail_when_locked: Fail immediately if no slots available (default: True)
    """
    
    def __init__(self, maximum: int, name: str | None = None, 
                 filename_pattern: str = '{name}.{number:02d}.lock',
                 directory: str = tempfile.gettempdir(), timeout: float | None = 5.0,
                 check_interval: float | None = 0.25, fail_when_locked: bool | None = True) -> None: ...
    
    def acquire(self, timeout: float | None = None, check_interval: float | None = None,
                fail_when_locked: bool | None = None) -> Lock | None:
        """
        Acquire a semaphore slot.
        
        Returns:
        - Lock object for the acquired slot, or None if acquisition failed
        
        Raises:
        - AlreadyLocked: If no slots available and fail_when_locked=True
        """
    
    def release(self) -> None:
        """Release the currently held semaphore slot"""
    
    def get_filenames(self) -> typing.Sequence[pathlib.Path]:
        """Get all possible lock filenames for this semaphore"""
    
    def get_random_filenames(self) -> typing.Sequence[pathlib.Path]:
        """Get lock filenames in random order (for load balancing)"""
    
    def __enter__(self) -> Lock:
        """Context manager entry - acquire semaphore slot"""
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Context manager exit - release semaphore slot"""

BoundedSemaphore Class (Deprecated)

The original semaphore implementation, deprecated in favor of NamedBoundedSemaphore.

class BoundedSemaphore:
    """
    DEPRECATED: Use NamedBoundedSemaphore instead.
    
    Bounded semaphore with potential naming conflicts between unrelated processes.
    """
    
    def __init__(self, maximum: int, name: str = 'bounded_semaphore',
                 filename_pattern: str = '{name}.{number:02d}.lock', 
                 directory: str = tempfile.gettempdir(), **kwargs) -> None: ...

Usage Examples

Basic semaphore usage for limiting concurrent processes:

import portalocker

# Allow maximum 3 concurrent processes
with portalocker.NamedBoundedSemaphore(3, name='database_workers') as lock:
    # Only 3 processes can be in this block simultaneously
    print("Processing database batch...")
    process_database_batch()
    print("Batch completed")
# Semaphore slot released automatically

Manual semaphore management:

import portalocker

# Create semaphore for 2 concurrent downloaders  
semaphore = portalocker.NamedBoundedSemaphore(2, name='file_downloaders')

try:
    # Try to acquire a slot
    lock = semaphore.acquire(timeout=10.0)
    if lock:
        print("Starting download...")
        download_large_file()
        print("Download completed")
    else:
        print("No download slots available")
finally:
    if lock:
        semaphore.release()

Resource pool management:

import portalocker
import time

def worker_process(worker_id: int):
    """Worker that processes items with limited concurrency"""
    
    # Limit to 5 concurrent workers across all processes
    semaphore = portalocker.NamedBoundedSemaphore(
        maximum=5, 
        name='worker_pool',
        timeout=30.0  # Wait up to 30 seconds for a slot
    )
    
    try:
        with semaphore:
            print(f"Worker {worker_id} starting...")
            
            # Simulate work that should be limited
            process_cpu_intensive_task()
            
            print(f"Worker {worker_id} completed")
            
    except portalocker.AlreadyLocked:
        print(f"Worker {worker_id} could not get a slot - too many workers running")

# Start multiple worker processes
for i in range(10):
    worker_process(i)

Custom lock file locations and patterns:

import portalocker

# Custom semaphore with specific lock file pattern
semaphore = portalocker.NamedBoundedSemaphore(
    maximum=4,
    name='video_processors', 
    filename_pattern='video_{name}_slot_{number:03d}.lock',
    directory='/var/lock/myapp',
    timeout=60.0
)

with semaphore:
    # Lock files will be created like:
    # /var/lock/myapp/video_video_processors_slot_000.lock
    # /var/lock/myapp/video_video_processors_slot_001.lock
    # etc.
    process_video_file()

Cross-process coordination:

import portalocker
import multiprocessing
import os

def worker_function(worker_id):
    """Function to run in separate processes"""
    
    # Same semaphore name ensures coordination across processes
    with portalocker.NamedBoundedSemaphore(2, name='shared_resource') as lock:
        print(f"Process {os.getpid()} (worker {worker_id}) acquired resource")
        
        # Simulate resource usage
        time.sleep(2)
        
        print(f"Process {os.getpid()} (worker {worker_id}) releasing resource")

# Start multiple processes - only 2 will run concurrently
processes = []
for i in range(6):
    p = multiprocessing.Process(target=worker_function, args=(i,))
    p.start()
    processes.append(p)

# Wait for all processes to complete
for p in processes:
    p.join()

Fail-fast behavior for non-blocking semaphores:

import portalocker

# Create semaphore that fails immediately if no slots available
semaphore = portalocker.NamedBoundedSemaphore(
    maximum=1,
    name='singleton_process',
    fail_when_locked=True,
    timeout=0  # Don't wait
)

try:
    with semaphore:
        # This will only succeed if no other instance is running
        print("Running singleton process...")
        run_singleton_task()
except portalocker.AlreadyLocked:
    print("Another instance is already running. Exiting.")
    exit(1)

Load Balancing

Semaphores automatically randomize lock file order to distribute load:

import portalocker

# The semaphore will try lock files in random order
# to avoid all processes competing for the same slot
semaphore = portalocker.NamedBoundedSemaphore(10, name='load_balanced_workers')

with semaphore:
    # Processes will naturally distribute across available slots
    process_work_item()

Error Handling

Semaphores use the same exception types as other portalocker components:

import portalocker

try:
    with portalocker.NamedBoundedSemaphore(3, name='workers') as lock:
        do_work()
except portalocker.AlreadyLocked:
    print("All semaphore slots are currently in use")
except portalocker.LockException as e:
    print(f"Semaphore error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Migration from BoundedSemaphore

To migrate from deprecated BoundedSemaphore to NamedBoundedSemaphore:

# Old (deprecated)
semaphore = portalocker.BoundedSemaphore(5, name='my_workers')

# New (recommended) 
semaphore = portalocker.NamedBoundedSemaphore(5, name='my_workers')

# The interface is identical, just change the class name

Type Definitions

import pathlib
import tempfile
import typing

# Filename types
Filename = Union[str, pathlib.Path]

# Default values
DEFAULT_TIMEOUT = 5.0
DEFAULT_CHECK_INTERVAL = 0.25

# Semaphore uses Lock objects internally
from portalocker.utils import Lock

Install with Tessl CLI

npx tessl i tessl/pypi-portalocker

docs

file-locking.md

index.md

lock-classes.md

redis-locking.md

semaphores.md

utilities.md

tile.json