CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-huey

huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features

Pending
Overview
Eval results
Files

core-task-queue.mddocs/

Core Task Queue Operations

The primary task queue functionality that forms the foundation of Huey's asynchronous task execution system. These operations handle task creation, enqueueing, result handling, and basic queue management.

Capabilities

Task Creation and Decoration

Convert regular Python functions into asynchronous tasks that can be queued and executed by worker processes.

def task(self, retries=0, retry_delay=0, priority=None, context=False, 
         name=None, expires=None, **kwargs):
    """
    Decorator to convert a function into a task.
    
    Parameters:
    - retries (int): Number of times to retry failed tasks (default: 0)
    - retry_delay (int): Delay in seconds between retries (default: 0)
    - priority (int): Task priority, higher values processed first (optional)
    - context (bool): Pass task instance as 'task' keyword argument (default: False)
    - name (str): Custom task name (default: function name)
    - expires (datetime/int): Task expiration time or seconds from now (optional)
    - **kwargs: Additional task configuration options
    
    Returns:
    TaskWrapper instance that can be called to enqueue tasks
    """

def context_task(self, obj, as_argument=False, **kwargs):
    """
    Decorator for tasks that require context management.
    
    Parameters:
    - obj: Context manager object to use
    - as_argument (bool): Pass context as first argument (default: False)
    - **kwargs: Additional task configuration options
    
    Returns:
    Task decorator function
    """

Task Enqueueing and Execution

Add tasks to the queue and manage their execution lifecycle.

def enqueue(self, task):
    """
    Add a task to the execution queue.
    
    Parameters:
    - task (Task): Task instance to enqueue
    
    Returns:
    Result instance if results are enabled, None otherwise
    """

def dequeue(self):
    """
    Remove and return the next task from the queue.
    
    Returns:
    Task instance or None if queue is empty
    """

def execute(self, task, timestamp=None):
    """
    Execute a task immediately.
    
    Parameters:
    - task (Task): Task to execute
    - timestamp (datetime): Execution timestamp (default: current time)
    
    Returns:
    Task result value
    """

Queue Status and Management

Monitor queue status and perform maintenance operations.

def pending_count(self):
    """
    Get the number of pending tasks in the queue.
    
    Returns:
    int: Number of tasks waiting to be processed
    """

def pending(self, limit=None):
    """
    Get list of pending tasks.
    
    Parameters:
    - limit (int): Maximum number of tasks to return (optional)
    
    Returns:
    List of Task instances
    """

def flush(self):
    """
    Remove all tasks and data from all storage areas.
    
    Returns:
    None
    """

def __len__(self):
    """
    Get queue length (same as pending_count).
    
    Returns:
    int: Number of pending tasks
    """

Data Storage Operations

Low-level key-value storage operations for custom data persistence.

def put(self, key, data):
    """
    Store arbitrary data with a key.
    
    Parameters:
    - key (str): Storage key
    - data: Data to store (will be serialized)
    
    Returns:
    bool: Success status
    """

def get(self, key, peek=False):
    """
    Retrieve data by key.
    
    Parameters:
    - key (str): Storage key
    - peek (bool): Don't remove data if True (default: False)
    
    Returns:
    Stored data or None if not found
    """

def delete(self, key):
    """
    Delete data by key.
    
    Parameters:
    - key (str): Storage key to delete
    
    Returns:
    bool: True if key existed and was deleted
    """

def put_if_empty(self, key, data):
    """
    Store data only if key doesn't exist.
    
    Parameters:
    - key (str): Storage key
    - data: Data to store
    
    Returns:
    bool: True if data was stored, False if key already existed
    """

Task Serialization

Low-level task serialization methods for custom storage implementations and debugging.

def serialize_task(self, task):
    """
    Serialize a task to bytes for storage.
    
    Parameters:
    - task (Task): Task instance to serialize
    
    Returns:
    bytes: Serialized task data
    """

def deserialize_task(self, data):
    """
    Deserialize bytes to a task instance.
    
    Parameters:
    - data (bytes): Serialized task data
    
    Returns:
    Task: Deserialized task instance
    """

Raw Storage Operations

Direct storage access methods for advanced use cases and custom implementations.

def get_raw(self, key, peek=False):
    """
    Get raw data from storage without deserialization.
    
    Parameters:
    - key (str): Storage key
    - peek (bool): Don't remove data if True (default: False)
    
    Returns:
    bytes or None: Raw stored data
    """

def put_result(self, key, data):
    """
    Store result data directly in result storage.
    
    Parameters:
    - key (str): Result key
    - data: Result data to store
    
    Returns:
    bool: Success status
    """

Consumer Management

Create and configure consumer processes for task execution.

def create_consumer(self, **options):
    """
    Create a consumer instance for processing tasks.
    
    Parameters:
    - **options: Consumer configuration options
    
    Returns:
    Consumer instance
    """

Usage Examples

Basic Task Definition and Execution

from huey import RedisHuey

huey = RedisHuey('my-app')

@huey.task()
def send_email(to, subject, body):
    # Send email logic here
    return f"Email sent to {to}"

# Enqueue task
result = send_email('user@example.com', 'Hello', 'Message body')

# Get result (blocks until ready)
message = result()
print(message)  # "Email sent to user@example.com"

Task with Retry Configuration

@huey.task(retries=3, retry_delay=60)
def process_payment(amount, card_token):
    # Payment processing that might fail
    if random.random() < 0.3:  # 30% failure rate
        raise Exception("Payment gateway error")
    return f"Processed ${amount}"

# This task will retry up to 3 times with 60-second delays
result = process_payment(100.00, 'tok_123')

Context Task with Resource Management

import sqlite3

@huey.context_task(sqlite3.connect('app.db'), as_argument=True)
def update_user_stats(db_conn, user_id, stats):
    cursor = db_conn.cursor()
    cursor.execute("UPDATE users SET stats=? WHERE id=?", (stats, user_id))
    db_conn.commit()
    return f"Updated user {user_id}"

# Database connection is automatically managed
result = update_user_stats(123, {'views': 50, 'clicks': 5})

Install with Tessl CLI

npx tessl i tessl/pypi-huey

docs

core-task-queue.md

exception-handling.md

index.md

locking-concurrency.md

result-management.md

scheduling.md

storage-backends.md

task-lifecycle.md

tile.json