CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-huey

huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features

Pending
Overview
Eval results
Files

task-lifecycle.mddocs/

Task Lifecycle and Hooks

Task lifecycle management including pre/post execution hooks, startup/shutdown hooks, signal handling, and task pipeline chaining. These features enable comprehensive task orchestration and monitoring.

Capabilities

Execution Hooks

Register callbacks that run before and after task execution for monitoring, logging, and custom processing.

def pre_execute(self, name=None):
    """
    Decorator to register pre-execution hook.
    
    Parameters:
    - name (str): Hook name (default: function name)
    
    Returns:
    Decorator function
    
    Hook function signature:
    def hook(task): ...
    
    Hooks can raise CancelExecution to prevent task execution.
    """

def post_execute(self, name=None):
    """
    Decorator to register post-execution hook.
    
    Parameters:
    - name (str): Hook name (default: function name)
    
    Returns:
    Decorator function
    
    Hook function signature:
    def hook(task, task_value, exception): ...
    
    Parameters:
    - task: Task instance that executed
    - task_value: Return value from task (None if exception occurred)
    - exception: Exception instance if task failed (None if successful)
    """

def unregister_pre_execute(self, name):
    """
    Remove a pre-execution hook.
    
    Parameters:
    - name (str or function): Hook name or function to remove
    
    Returns:
    bool: True if hook was removed
    """

def unregister_post_execute(self, name):
    """
    Remove a post-execution hook.
    
    Parameters:
    - name (str or function): Hook name or function to remove
    
    Returns:
    bool: True if hook was removed
    """

Startup and Shutdown Hooks

Register callbacks for consumer process lifecycle events.

def on_startup(self, name=None):
    """
    Decorator to register startup hook.
    
    Parameters:
    - name (str): Hook name (default: function name)
    
    Returns:
    Decorator function
    
    Hook function signature:
    def hook(): ...
    """

def on_shutdown(self, name=None):
    """
    Decorator to register shutdown hook.
    
    Parameters:
    - name (str): Hook name (default: function name)
    
    Returns:
    Decorator function
    
    Hook function signature:
    def hook(): ...
    """

def unregister_on_startup(self, name):
    """
    Remove a startup hook.
    
    Parameters:
    - name (str or function): Hook name or function to remove
    
    Returns:
    bool: True if hook was removed
    """

def unregister_on_shutdown(self, name):
    """
    Remove a shutdown hook.
    
    Parameters:
    - name (str or function): Hook name or function to remove
    
    Returns:
    bool: True if hook was removed
    """

Signal Handling

Register signal handlers for various task execution events.

def signal(self, *signals):
    """
    Decorator to register signal handler.
    
    Parameters:
    - *signals: Signal names to handle
    
    Available signals:
    - SIGNAL_ENQUEUED: Task was added to queue
    - SIGNAL_EXECUTING: Task execution started
    - SIGNAL_COMPLETE: Task completed successfully
    - SIGNAL_ERROR: Task failed with exception
    - SIGNAL_RETRYING: Task is being retried
    - SIGNAL_REVOKED: Task was revoked
    - SIGNAL_CANCELED: Task was canceled
    - SIGNAL_SCHEDULED: Task was added to schedule
    - SIGNAL_LOCKED: Task could not acquire lock
    - SIGNAL_EXPIRED: Task expired before execution
    - SIGNAL_INTERRUPTED: Task was interrupted
    
    Returns:
    Decorator function
    
    Handler function signature:
    def handler(signal, task, *args, **kwargs): ...
    """

def disconnect_signal(self, receiver, *signals):
    """
    Disconnect a signal handler.
    
    Parameters:
    - receiver: Handler function to disconnect
    - *signals: Signal names to disconnect from
    
    Returns:
    None
    """

Task Pipeline and Chaining

Create task chains and pipelines for complex workflows.

def then(self, task, *args, **kwargs):
    """
    Chain another task to run after this one completes successfully.
    
    Parameters:
    - task (TaskWrapper or Task): Task to run next
    - *args: Arguments to pass to next task
    - **kwargs: Keyword arguments to pass to next task
    
    Returns:
    Task: Self for method chaining
    """

def error(self, task, *args, **kwargs):
    """
    Chain another task to run if this one fails.
    
    Parameters:
    - task (TaskWrapper or Task): Task to run on error
    - *args: Arguments to pass to error task
    - **kwargs: Keyword arguments to pass to error task
    
    Returns:
    Task: Self for method chaining
    """

Task Revocation and Control

Control task execution with revocation and restoration capabilities.

def revoke_all(self, task_class, revoke_until=None, revoke_once=False):
    """
    Revoke all instances of a task type.
    
    Parameters:
    - task_class: Task class to revoke
    - revoke_until (datetime): Revoke until specific time (optional)
    - revoke_once (bool): Revoke only next execution (default: False)
    
    Returns:
    None
    """

def restore_all(self, task_class):
    """
    Restore all instances of a revoked task type.
    
    Parameters:
    - task_class: Task class to restore
    
    Returns:
    bool: True if tasks were revoked and restored
    """

def revoke_by_id(self, id, revoke_until=None, revoke_once=False):
    """
    Revoke specific task by ID.
    
    Parameters:
    - id (str): Task ID to revoke
    - revoke_until (datetime): Revoke until specific time (optional)
    - revoke_once (bool): Revoke only this execution (default: False)
    
    Returns:
    None
    """

def restore_by_id(self, id):
    """
    Restore specific task by ID.
    
    Parameters:
    - id (str): Task ID to restore
    
    Returns:
    bool: True if task was revoked and restored
    """

def is_revoked(self, task, timestamp=None, peek=True):
    """
    Check if task or task type is revoked.
    
    Parameters:
    - task: Task instance, task class, or task ID
    - timestamp (datetime): Check time (default: now)
    - peek (bool): Don't consume revocation data (default: True)
    
    Returns:
    bool: True if revoked
    """

Usage Examples

Pre and Post Execution Hooks

from huey import RedisHuey
import logging
import time

huey = RedisHuey('lifecycle-app')

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('task-hooks')

@huey.pre_execute()
def log_task_start(task):
    logger.info(f"Starting task: {task.name} (ID: {task.id})")
    # Could add authentication, resource checks, etc.

@huey.post_execute()
def log_task_complete(task, task_value, exception):
    if exception:
        logger.error(f"Task {task.name} failed: {exception}")
    else:
        logger.info(f"Task {task.name} completed: {task_value}")

@huey.task()
def process_order(order_id):
    time.sleep(2)  # Simulate processing
    return f"Order {order_id} processed"

# Task execution will trigger hooks
result = process_order(12345)

Startup and Shutdown Hooks

import redis

@huey.on_startup()
def initialize_connections():
    logger.info("Consumer starting up - initializing connections")
    # Initialize database connections, cache, etc.
    global redis_client
    redis_client = redis.Redis(host='localhost', port=6379, db=0)

@huey.on_shutdown()
def cleanup_resources():
    logger.info("Consumer shutting down - cleaning up resources")
    # Close connections, save state, etc.
    if 'redis_client' in globals():
        redis_client.close()

@huey.task()
def cache_data(key, value):
    redis_client.set(key, value)
    return f"Cached {key}"

Signal Handling

from huey import signals as S

@huey.signal(S.SIGNAL_ENQUEUED)
def task_enqueued(signal, task):
    logger.info(f"Task enqueued: {task.name}")

@huey.signal(S.SIGNAL_ERROR)
def task_error(signal, task, exception):
    logger.error(f"Task {task.name} failed: {exception}")
    # Could send alerts, update metrics, etc.

@huey.signal(S.SIGNAL_RETRYING)
def task_retrying(signal, task):
    logger.warning(f"Retrying task: {task.name} ({task.retries} retries left)")

@huey.signal(S.SIGNAL_COMPLETE)
def task_complete(signal, task):
    logger.info(f"Task completed: {task.name}")
    # Could update progress tracking, send notifications, etc.

Task Chaining and Pipelines

@huey.task()
def download_file(url):
    # Download file logic
    return f"downloaded_{url.split('/')[-1]}"

@huey.task()
def process_file(filename):
    # Process file logic
    return f"processed_{filename}"

@huey.task()
def cleanup_file(filename):
    # Cleanup logic
    return f"cleaned_{filename}"

@huey.task()
def send_notification(message):
    # Send notification
    return f"notified: {message}"

# Create task pipeline
task = download_file.s("http://example.com/data.csv")
task = task.then(process_file)
task = task.then(cleanup_file)
task = task.then(send_notification, "Processing complete")
task = task.error(send_notification, "Processing failed")

# Enqueue the pipeline
result = huey.enqueue(task)

Task Revocation and Control

@huey.task()
def long_running_task(data):
    # Simulate long-running task
    time.sleep(60)
    return f"Processed {data}"

# Start some tasks
results = []
for i in range(5):
    result = long_running_task(f"data_{i}")
    results.append(result)

# Revoke specific task
results[0].revoke()

# Revoke all instances of a task type
huey.revoke_all(long_running_task.task_class, revoke_once=True)

# Check if task is revoked
if results[1].is_revoked():
    print("Task was revoked")
    results[1].restore()  # Restore if needed

# Revoke task by ID
task_id = results[2].id
huey.revoke_by_id(task_id, revoke_once=True)

Advanced Hook Patterns

# Context-aware hooks
current_user = None

@huey.pre_execute()
def set_task_context(task):
    global current_user
    # Extract user context from task data
    if hasattr(task, 'kwargs') and 'user_id' in task.kwargs:
        current_user = get_user(task.kwargs['user_id'])

@huey.post_execute()
def clear_task_context(task, task_value, exception):
    global current_user
    current_user = None

# Performance monitoring hook
task_times = {}

@huey.pre_execute('performance_monitor')
def start_timer(task):
    task_times[task.id] = time.time()

@huey.post_execute('performance_monitor')
def end_timer(task, task_value, exception):
    if task.id in task_times:
        duration = time.time() - task_times[task.id]
        logger.info(f"Task {task.name} took {duration:.2f} seconds")
        del task_times[task.id]

# Conditional execution hook
@huey.pre_execute()
def check_maintenance_mode(task):
    if is_maintenance_mode() and not task.name.startswith('maintenance_'):
        raise CancelExecution("System in maintenance mode")

Install with Tessl CLI

npx tessl i tessl/pypi-huey

docs

core-task-queue.md

exception-handling.md

index.md

locking-concurrency.md

result-management.md

scheduling.md

storage-backends.md

task-lifecycle.md

tile.json