huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features
—
Task locking mechanisms, concurrency control, and synchronization features to prevent duplicate task execution and manage shared resources. These features ensure proper coordination in multi-worker environments.
Prevent multiple workers from executing the same critical section simultaneously.
def lock_task(self, lock_name):
"""
Create a task lock for coordinating access to shared resources.
Parameters:
- lock_name (str): Name of the lock
Returns:
TaskLock: Lock instance that can be used as context manager or decorator
"""
def is_locked(self, lock_name):
"""
Check if a named lock is currently held.
Parameters:
- lock_name (str): Name of the lock to check
Returns:
bool: True if lock is currently held
"""
def flush_locks(self, *names):
"""
Remove specified locks or all locks if no names given.
Parameters:
- *names: Lock names to remove (optional, removes all if empty)
Returns:
set: Names of locks that were removed
"""Context manager and decorator for task synchronization.
class TaskLock:
def __init__(self, huey, name):
"""
Initialize task lock.
Parameters:
- huey: Huey instance
- name (str): Lock name
"""
def is_locked(self):
"""
Check if this lock is currently held.
Returns:
bool: True if lock is held
"""
def clear(self):
"""
Force release this lock.
Returns:
bool: True if lock was held and released
"""
def __call__(self, fn):
"""
Use lock as a decorator.
Parameters:
- fn: Function to wrap with lock
Returns:
Wrapped function that acquires lock before execution
"""
def __enter__(self):
"""
Acquire lock (context manager entry).
Raises:
TaskLockedException: If lock cannot be acquired
"""
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Release lock (context manager exit).
"""Exception raised when lock cannot be acquired.
class TaskLockedException(HueyException):
"""
Exception raised when a task cannot acquire a required lock.
This exception is raised when:
- A task decorated with @lock cannot acquire the lock
- A context manager lock cannot be acquired
- A task tries to acquire a lock that's already held
"""from huey import RedisHuey
from huey.exceptions import TaskLockedException
huey = RedisHuey('locking-app')
@huey.task()
def update_user_count():
# Use lock as context manager
with huey.lock_task('user_count_update'):
# Only one worker can execute this block at a time
current_count = get_user_count()
new_count = recalculate_user_count()
update_user_count_in_db(new_count)
return new_count
# Multiple workers can enqueue this task, but only one executes at a time
result1 = update_user_count()
result2 = update_user_count() # Will wait for first to complete# Create a reusable lock
user_stats_lock = huey.lock_task('user_stats')
@huey.task()
@user_stats_lock
def update_user_stats(user_id):
# This entire function is protected by the lock
stats = calculate_user_stats(user_id)
save_user_stats(user_id, stats)
return stats
# Alternative: inline decorator
@huey.task()
@huey.lock_task('report_generation')
def generate_daily_report():
# Generate report logic
return "Report generated"@huey.task()
def process_user_data(user_id):
# Use user-specific locks
lock_name = f'user_{user_id}_processing'
try:
with huey.lock_task(lock_name):
# Process user data
data = load_user_data(user_id)
processed = process_data(data)
save_processed_data(user_id, processed)
return f"Processed user {user_id}"
except TaskLockedException:
# Another worker is already processing this user
return f"User {user_id} already being processed"
# Each user can be processed independently
results = []
for user_id in [1, 2, 3, 1, 2]: # Note: duplicates
result = process_user_data(user_id)
results.append(result)@huey.task()
def monitor_locks():
# Check specific locks
critical_locks = ['database_backup', 'user_count_update', 'report_generation']
lock_status = {}
for lock_name in critical_locks:
is_locked = huey.is_locked(lock_name)
lock_status[lock_name] = is_locked
return lock_status
@huey.task()
def emergency_unlock():
# Force release all locks (use with caution!)
released = huey.flush_locks()
return f"Released locks: {released}"
@huey.task()
def unlock_specific_locks():
# Release specific locks
released = huey.flush_locks('stale_lock_1', 'stale_lock_2')
return f"Released locks: {released}"@huey.task()
def conditional_processing(resource_id):
lock_name = f'resource_{resource_id}'
# Check if already locked before attempting
if huey.is_locked(lock_name):
return f"Resource {resource_id} is busy, skipping"
try:
with huey.lock_task(lock_name):
# Process resource
result = process_resource(resource_id)
return result
except TaskLockedException:
# Lock was acquired between check and acquisition
return f"Resource {resource_id} became busy"import time
from contextlib import contextmanager
@contextmanager
def timed_lock(huey_instance, lock_name, timeout=30, check_interval=0.5):
"""Custom lock with timeout capability."""
start_time = time.time()
while time.time() - start_time < timeout:
try:
with huey_instance.lock_task(lock_name):
yield
return
except TaskLockedException:
time.sleep(check_interval)
raise TimeoutError(f"Could not acquire lock '{lock_name}' within {timeout} seconds")
@huey.task()
def process_with_timeout(data):
try:
with timed_lock(huey, 'critical_resource', timeout=60):
# Process data with timeout
result = expensive_processing(data)
return result
except TimeoutError as e:
return f"Processing failed: {e}"import sqlite3
import threading
# Shared resource that needs protection
db_connections = {}
connection_lock = threading.Lock()
@huey.task()
def database_task(query, db_name='default'):
# Use lock to coordinate database access
lock_name = f'db_access_{db_name}'
with huey.lock_task(lock_name):
# Get or create database connection
if db_name not in db_connections:
db_connections[db_name] = sqlite3.connect(f'{db_name}.db')
conn = db_connections[db_name]
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchall()
conn.commit()
return f"Query executed: {len(result)} rows"@huey.task()
def singleton_task():
"""Ensure only one instance of this task runs across all workers."""
lock_name = 'singleton_task_global'
try:
with huey.lock_task(lock_name):
# This code runs on only one worker globally
perform_singleton_operation()
return "Singleton task completed"
except TaskLockedException:
return "Singleton task already running"
@huey.task()
def batch_processor(batch_id):
"""Process batches with coordination between workers."""
# Lock the entire batch
batch_lock = f'batch_{batch_id}'
try:
with huey.lock_task(batch_lock):
items = get_batch_items(batch_id)
# Process items with item-level locks for fine-grained control
results = []
for item_id in items:
item_lock = f'item_{item_id}'
try:
with huey.lock_task(item_lock):
result = process_item(item_id)
results.append(result)
except TaskLockedException:
results.append(f"Item {item_id} locked")
return f"Batch {batch_id}: {len(results)} items processed"
except TaskLockedException:
return f"Batch {batch_id} already being processed"@huey.periodic_task(crontab(minute='*/10')) # Every 10 minutes
def cleanup_stale_locks():
"""Periodic cleanup of potentially stale locks."""
# In production, you might want to track lock timestamps
# and clean up locks that are older than expected task duration
# For now, just report on current locks
# (Manual cleanup would require custom lock tracking)
# Check critical locks
critical_locks = ['database_backup', 'report_generation']
stale_locks = []
for lock_name in critical_locks:
if huey.is_locked(lock_name):
# In real implementation, check if lock is truly stale
# based on timestamps or other criteria
stale_locks.append(lock_name)
if stale_locks:
return f"Warning: Long-running locks detected: {stale_locks}"
else:
return "All locks appear healthy"
@huey.task()
def force_unlock_emergency(lock_names):
"""Emergency lock release (use with extreme caution)."""
if not isinstance(lock_names, list):
lock_names = [lock_names]
released = huey.flush_locks(*lock_names)
return f"Emergency unlock completed. Released: {released}"Install with Tessl CLI
npx tessl i tessl/pypi-huey