CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-huey

huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features

Pending
Overview
Eval results
Files

locking-concurrency.mddocs/

Task Locking and Concurrency

Task locking mechanisms, concurrency control, and synchronization features to prevent duplicate task execution and manage shared resources. These features ensure proper coordination in multi-worker environments.

Capabilities

Task Locking

Prevent multiple workers from executing the same critical section simultaneously.

def lock_task(self, lock_name):
    """
    Create a task lock for coordinating access to shared resources.
    
    Parameters:
    - lock_name (str): Name of the lock
    
    Returns:
    TaskLock: Lock instance that can be used as context manager or decorator
    """

def is_locked(self, lock_name):
    """
    Check if a named lock is currently held.
    
    Parameters:
    - lock_name (str): Name of the lock to check
    
    Returns:
    bool: True if lock is currently held
    """

def flush_locks(self, *names):
    """
    Remove specified locks or all locks if no names given.
    
    Parameters:
    - *names: Lock names to remove (optional, removes all if empty)
    
    Returns:
    set: Names of locks that were removed
    """

TaskLock Class

Context manager and decorator for task synchronization.

class TaskLock:
    def __init__(self, huey, name):
        """
        Initialize task lock.
        
        Parameters:
        - huey: Huey instance
        - name (str): Lock name
        """
    
    def is_locked(self):
        """
        Check if this lock is currently held.
        
        Returns:
        bool: True if lock is held
        """
    
    def clear(self):
        """
        Force release this lock.
        
        Returns:
        bool: True if lock was held and released
        """
    
    def __call__(self, fn):
        """
        Use lock as a decorator.
        
        Parameters:
        - fn: Function to wrap with lock
        
        Returns:
        Wrapped function that acquires lock before execution
        """
    
    def __enter__(self):
        """
        Acquire lock (context manager entry).
        
        Raises:
        TaskLockedException: If lock cannot be acquired
        """
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Release lock (context manager exit).
        """

Lock Exception Handling

Exception raised when lock cannot be acquired.

class TaskLockedException(HueyException):
    """
    Exception raised when a task cannot acquire a required lock.
    
    This exception is raised when:
    - A task decorated with @lock cannot acquire the lock
    - A context manager lock cannot be acquired
    - A task tries to acquire a lock that's already held
    """

Usage Examples

Basic Task Locking

from huey import RedisHuey
from huey.exceptions import TaskLockedException

huey = RedisHuey('locking-app')

@huey.task()
def update_user_count():
    # Use lock as context manager
    with huey.lock_task('user_count_update'):
        # Only one worker can execute this block at a time
        current_count = get_user_count()
        new_count = recalculate_user_count()
        update_user_count_in_db(new_count)
        return new_count

# Multiple workers can enqueue this task, but only one executes at a time
result1 = update_user_count()
result2 = update_user_count()  # Will wait for first to complete

Lock as Decorator

# Create a reusable lock
user_stats_lock = huey.lock_task('user_stats')

@huey.task()
@user_stats_lock
def update_user_stats(user_id):
    # This entire function is protected by the lock
    stats = calculate_user_stats(user_id)
    save_user_stats(user_id, stats)
    return stats

# Alternative: inline decorator
@huey.task()
@huey.lock_task('report_generation')
def generate_daily_report():
    # Generate report logic
    return "Report generated"

Fine-grained Locking

@huey.task()
def process_user_data(user_id):
    # Use user-specific locks
    lock_name = f'user_{user_id}_processing'
    
    try:
        with huey.lock_task(lock_name):
            # Process user data
            data = load_user_data(user_id)
            processed = process_data(data)
            save_processed_data(user_id, processed)
            return f"Processed user {user_id}"
    except TaskLockedException:
        # Another worker is already processing this user
        return f"User {user_id} already being processed"

# Each user can be processed independently
results = []
for user_id in [1, 2, 3, 1, 2]:  # Note: duplicates
    result = process_user_data(user_id)
    results.append(result)

Lock Status Monitoring

@huey.task()
def monitor_locks():
    # Check specific locks
    critical_locks = ['database_backup', 'user_count_update', 'report_generation']
    
    lock_status = {}
    for lock_name in critical_locks:
        is_locked = huey.is_locked(lock_name)
        lock_status[lock_name] = is_locked
        
    return lock_status

@huey.task()
def emergency_unlock():
    # Force release all locks (use with caution!)
    released = huey.flush_locks()
    return f"Released locks: {released}"

@huey.task()
def unlock_specific_locks():
    # Release specific locks
    released = huey.flush_locks('stale_lock_1', 'stale_lock_2')
    return f"Released locks: {released}"

Conditional Locking

@huey.task()
def conditional_processing(resource_id):
    lock_name = f'resource_{resource_id}'
    
    # Check if already locked before attempting
    if huey.is_locked(lock_name):
        return f"Resource {resource_id} is busy, skipping"
    
    try:
        with huey.lock_task(lock_name):
            # Process resource
            result = process_resource(resource_id)
            return result
    except TaskLockedException:
        # Lock was acquired between check and acquisition
        return f"Resource {resource_id} became busy"

Lock with Timeout Pattern

import time
from contextlib import contextmanager

@contextmanager
def timed_lock(huey_instance, lock_name, timeout=30, check_interval=0.5):
    """Custom lock with timeout capability."""
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        try:
            with huey_instance.lock_task(lock_name):
                yield
                return
        except TaskLockedException:
            time.sleep(check_interval)
    
    raise TimeoutError(f"Could not acquire lock '{lock_name}' within {timeout} seconds")

@huey.task()
def process_with_timeout(data):
    try:
        with timed_lock(huey, 'critical_resource', timeout=60):
            # Process data with timeout
            result = expensive_processing(data)
            return result
    except TimeoutError as e:
        return f"Processing failed: {e}"

Database Connection Pooling with Locks

import sqlite3
import threading

# Shared resource that needs protection
db_connections = {}
connection_lock = threading.Lock()

@huey.task()
def database_task(query, db_name='default'):
    # Use lock to coordinate database access
    lock_name = f'db_access_{db_name}'
    
    with huey.lock_task(lock_name):
        # Get or create database connection
        if db_name not in db_connections:
            db_connections[db_name] = sqlite3.connect(f'{db_name}.db')
        
        conn = db_connections[db_name]
        cursor = conn.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
        conn.commit()
        
        return f"Query executed: {len(result)} rows"

Distributed Lock Patterns

@huey.task()
def singleton_task():
    """Ensure only one instance of this task runs across all workers."""
    lock_name = 'singleton_task_global'
    
    try:
        with huey.lock_task(lock_name):
            # This code runs on only one worker globally
            perform_singleton_operation()
            return "Singleton task completed"
    except TaskLockedException:
        return "Singleton task already running"

@huey.task()
def batch_processor(batch_id):
    """Process batches with coordination between workers."""
    # Lock the entire batch
    batch_lock = f'batch_{batch_id}'
    
    try:
        with huey.lock_task(batch_lock):
            items = get_batch_items(batch_id)
            
            # Process items with item-level locks for fine-grained control
            results = []
            for item_id in items:
                item_lock = f'item_{item_id}'
                try:
                    with huey.lock_task(item_lock):
                        result = process_item(item_id)
                        results.append(result)
                except TaskLockedException:
                    results.append(f"Item {item_id} locked")
            
            return f"Batch {batch_id}: {len(results)} items processed"
    except TaskLockedException:
        return f"Batch {batch_id} already being processed"

Lock Cleanup and Maintenance

@huey.periodic_task(crontab(minute='*/10'))  # Every 10 minutes
def cleanup_stale_locks():
    """Periodic cleanup of potentially stale locks."""
    # In production, you might want to track lock timestamps
    # and clean up locks that are older than expected task duration
    
    # For now, just report on current locks
    # (Manual cleanup would require custom lock tracking)
    
    # Check critical locks
    critical_locks = ['database_backup', 'report_generation']
    stale_locks = []
    
    for lock_name in critical_locks:
        if huey.is_locked(lock_name):
            # In real implementation, check if lock is truly stale
            # based on timestamps or other criteria
            stale_locks.append(lock_name)
    
    if stale_locks:
        return f"Warning: Long-running locks detected: {stale_locks}"
    else:
        return "All locks appear healthy"

@huey.task()
def force_unlock_emergency(lock_names):
    """Emergency lock release (use with extreme caution)."""
    if not isinstance(lock_names, list):
        lock_names = [lock_names]
    
    released = huey.flush_locks(*lock_names)
    return f"Emergency unlock completed. Released: {released}"

Install with Tessl CLI

npx tessl i tessl/pypi-huey

docs

core-task-queue.md

exception-handling.md

index.md

locking-concurrency.md

result-management.md

scheduling.md

storage-backends.md

task-lifecycle.md

tile.json