or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-task-queue.mdexception-handling.mdindex.mdlocking-concurrency.mdresult-management.mdscheduling.mdstorage-backends.mdtask-lifecycle.md
tile.json

tessl/pypi-huey

huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/huey@2.5.x

To install, run

npx @tessl/cli install tessl/pypi-huey@2.5.0

index.mddocs/

Huey

Huey is a lightweight task queue library for Python that provides a clean and simple API for asynchronous task execution. It supports multiple storage backends including Redis, SQLite, file-system, and in-memory storage, making it highly versatile for different deployment scenarios.

Package Information

  • Package Name: huey
  • Language: Python
  • Installation: pip install huey

Core Imports

from huey import RedisHuey, SqliteHuey, MemoryHuey, FileHuey
from huey import crontab
from huey.exceptions import CancelExecution, RetryTask

Basic Usage

from huey import RedisHuey, crontab

# Create a huey instance with Redis storage
huey = RedisHuey('my-app')

# Define a simple task
@huey.task()
def add_numbers(a, b):
    return a + b

# Define a periodic task
@huey.periodic_task(crontab(minute='0', hour='2'))
def nightly_report():
    generate_nightly_report()

# Enqueue and get results
result = add_numbers(5, 3)
print(result())  # Gets result when ready

# Schedule task for later execution
result = add_numbers.schedule((10, 20), delay=60)  # Run in 60 seconds

Architecture

Huey's architecture consists of several key components:

  • Huey Instance: Central coordinator managing task queues, storage, and execution
  • Storage Backends: Pluggable storage implementations (Redis, SQLite, file, memory)
  • Task Decorators: Function decorators that convert regular functions into queueable tasks
  • Consumer Process: Separate worker process that executes queued tasks
  • Result Handling: Optional result storage and retrieval system
  • Scheduling System: Support for delayed and periodic task execution

Capabilities

Core Task Queue Operations

The primary task queue functionality including task creation, enqueueing, result handling, and basic queue management operations.

class Huey:
    def task(self, retries=0, retry_delay=0, priority=None, context=False, 
             name=None, expires=None, **kwargs): ...
    def enqueue(self, task): ...
    def dequeue(self): ...
    def pending_count(self): ...
    def flush(self): ...

Core Task Queue

Task Scheduling and Periodic Tasks

Advanced scheduling capabilities including delayed execution, periodic tasks with cron-like syntax, and task pipeline management.

def periodic_task(self, validate_datetime, retries=0, retry_delay=0,
                  priority=None, context=False, name=None, expires=None, **kwargs): ...
def crontab(minute='*', hour='*', day='*', month='*', day_of_week='*', strict=False): ...

Scheduling

Result Management

Comprehensive result handling including result storage, retrieval, blocking operations, timeouts, and result groups for batch operations.

class Result:
    def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0,
            revoke_on_timeout=False, preserve=False): ...
    def is_revoked(self): ...
    def revoke(self, revoke_once=True): ...
    def reschedule(self, eta=None, delay=None, expires=None, priority=None,
                   preserve_pipeline=True): ...

Result Management

Task Lifecycle and Hooks

Task lifecycle management including pre/post execution hooks, startup/shutdown hooks, signal handling, and task pipeline chaining.

def pre_execute(self, name=None): ...
def post_execute(self, name=None): ...
def on_startup(self, name=None): ...
def on_shutdown(self, name=None): ...
def signal(self, *signals): ...

Task Lifecycle

Storage Backends

Multiple storage backend implementations with their specific configurations and capabilities for different deployment scenarios.

class RedisHuey(Huey): ...
class SqliteHuey(Huey): ...
class MemoryHuey(Huey): ...
class FileHuey(Huey): ...
class PriorityRedisHuey(Huey): ...

Storage Backends

Task Locking and Concurrency

Task locking mechanisms, concurrency control, and synchronization features to prevent duplicate task execution and manage shared resources.

class TaskLock:
    def is_locked(self): ...
    def clear(self): ...
def lock_task(self, lock_name): ...
def is_locked(self, lock_name): ...

Locking and Concurrency

Exception Handling

Exception classes for task control flow, error handling, and task execution management.

class CancelExecution(Exception):
    def __init__(self, retry=None, *args, **kwargs): ...

class RetryTask(Exception):
    def __init__(self, msg=None, eta=None, delay=None, *args, **kwargs): ...

Exception Handling

Types

class Task:
    def __init__(self, args=None, kwargs=None, id=None, eta=None, retries=None,
                 retry_delay=None, priority=None, expires=None,
                 on_complete=None, on_error=None, expires_resolved=None): ...
    
    @property
    def data(self): ...
    
    def create_id(self): ...
    def resolve_expires(self, utc=True): ...
    def extend_data(self, data): ...
    def execute(self): ...
    def then(self, task, *args, **kwargs): ...
    def error(self, task, *args, **kwargs): ...

class TaskWrapper:
    def schedule(self, args=None, kwargs=None, eta=None, delay=None,
                 priority=None, retries=None, retry_delay=None, expires=None,
                 id=None): ...
    def map(self, it): ...
    def s(self, *args, **kwargs): ...
    def call_local(self, *args, **kwargs): ...

class Consumer:
    def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,
                 backoff=1.15, max_delay=10.0, scheduler_interval=1,
                 worker_type='thread', check_worker_health=True,
                 health_check_interval=10, flush_locks=False,
                 extra_locks=None): ...
    def start(self): ...
    def stop(self, graceful=False): ...
    def run(self): ...
    def flush_locks(self, *names): ...