huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features
npx @tessl/cli install tessl/pypi-huey@2.5.0Huey is a lightweight task queue library for Python that provides a clean and simple API for asynchronous task execution. It supports multiple storage backends including Redis, SQLite, file-system, and in-memory storage, making it highly versatile for different deployment scenarios.
pip install hueyfrom huey import RedisHuey, SqliteHuey, MemoryHuey, FileHuey
from huey import crontab
from huey.exceptions import CancelExecution, RetryTaskfrom huey import RedisHuey, crontab
# Create a huey instance with Redis storage
huey = RedisHuey('my-app')
# Define a simple task
@huey.task()
def add_numbers(a, b):
return a + b
# Define a periodic task
@huey.periodic_task(crontab(minute='0', hour='2'))
def nightly_report():
generate_nightly_report()
# Enqueue and get results
result = add_numbers(5, 3)
print(result()) # Gets result when ready
# Schedule task for later execution
result = add_numbers.schedule((10, 20), delay=60) # Run in 60 secondsHuey's architecture consists of several key components:
The primary task queue functionality including task creation, enqueueing, result handling, and basic queue management operations.
class Huey:
def task(self, retries=0, retry_delay=0, priority=None, context=False,
name=None, expires=None, **kwargs): ...
def enqueue(self, task): ...
def dequeue(self): ...
def pending_count(self): ...
def flush(self): ...Advanced scheduling capabilities including delayed execution, periodic tasks with cron-like syntax, and task pipeline management.
def periodic_task(self, validate_datetime, retries=0, retry_delay=0,
priority=None, context=False, name=None, expires=None, **kwargs): ...
def crontab(minute='*', hour='*', day='*', month='*', day_of_week='*', strict=False): ...Comprehensive result handling including result storage, retrieval, blocking operations, timeouts, and result groups for batch operations.
class Result:
def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0,
revoke_on_timeout=False, preserve=False): ...
def is_revoked(self): ...
def revoke(self, revoke_once=True): ...
def reschedule(self, eta=None, delay=None, expires=None, priority=None,
preserve_pipeline=True): ...Task lifecycle management including pre/post execution hooks, startup/shutdown hooks, signal handling, and task pipeline chaining.
def pre_execute(self, name=None): ...
def post_execute(self, name=None): ...
def on_startup(self, name=None): ...
def on_shutdown(self, name=None): ...
def signal(self, *signals): ...Multiple storage backend implementations with their specific configurations and capabilities for different deployment scenarios.
class RedisHuey(Huey): ...
class SqliteHuey(Huey): ...
class MemoryHuey(Huey): ...
class FileHuey(Huey): ...
class PriorityRedisHuey(Huey): ...Task locking mechanisms, concurrency control, and synchronization features to prevent duplicate task execution and manage shared resources.
class TaskLock:
def is_locked(self): ...
def clear(self): ...
def lock_task(self, lock_name): ...
def is_locked(self, lock_name): ...Exception classes for task control flow, error handling, and task execution management.
class CancelExecution(Exception):
def __init__(self, retry=None, *args, **kwargs): ...
class RetryTask(Exception):
def __init__(self, msg=None, eta=None, delay=None, *args, **kwargs): ...class Task:
def __init__(self, args=None, kwargs=None, id=None, eta=None, retries=None,
retry_delay=None, priority=None, expires=None,
on_complete=None, on_error=None, expires_resolved=None): ...
@property
def data(self): ...
def create_id(self): ...
def resolve_expires(self, utc=True): ...
def extend_data(self, data): ...
def execute(self): ...
def then(self, task, *args, **kwargs): ...
def error(self, task, *args, **kwargs): ...
class TaskWrapper:
def schedule(self, args=None, kwargs=None, eta=None, delay=None,
priority=None, retries=None, retry_delay=None, expires=None,
id=None): ...
def map(self, it): ...
def s(self, *args, **kwargs): ...
def call_local(self, *args, **kwargs): ...
class Consumer:
def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,
backoff=1.15, max_delay=10.0, scheduler_interval=1,
worker_type='thread', check_worker_health=True,
health_check_interval=10, flush_locks=False,
extra_locks=None): ...
def start(self): ...
def stop(self, graceful=False): ...
def run(self): ...
def flush_locks(self, *names): ...