CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-huey

huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features

Pending
Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Exception classes for task control flow, error handling, and task execution management. These exceptions provide fine-grained control over task behavior and enable sophisticated error handling patterns.

Capabilities

Task Control Exceptions

Exceptions that control task execution flow and retry behavior.

class CancelExecution(Exception):
    """
    Cancel task execution with optional retry control.
    
    When raised during task execution:
    - Task is marked as canceled
    - Can optionally schedule for retry
    - Pre-execute hooks can raise this to prevent execution
    - Post-execute hooks receive this as the exception parameter
    """
    
    def __init__(self, retry=None, *args, **kwargs):
        """
        Initialize CancelExecution.
        
        Parameters:
        - retry (bool): Whether to retry the task (optional)
                       None: Use task's default retry setting
                       True: Force retry even if retries=0
                       False: Don't retry even if retries>0
        - *args, **kwargs: Additional exception arguments
        """

class RetryTask(Exception):
    """
    Request task retry with optional timing control.
    
    When raised during task execution:
    - Task retry counter is incremented
    - Task can be scheduled for immediate or delayed retry
    - Overrides default retry delay if eta or delay specified
    """
    
    def __init__(self, msg=None, eta=None, delay=None, *args, **kwargs):
        """
        Initialize RetryTask.
        
        Parameters:
        - msg (str): Error message (optional)
        - eta (datetime): Specific time to retry (optional)
        - delay (int/float/timedelta): Delay before retry (optional)
        - *args, **kwargs: Additional exception arguments
        """

Task Execution Exceptions

Exceptions related to task execution failures and constraints.

class TaskException(Exception):
    """
    Exception wrapper for failed task results.
    
    When a task fails and results are retrieved, the original exception
    is wrapped in TaskException with additional metadata.
    """
    
    def __init__(self, metadata=None, *args):
        """
        Initialize TaskException.
        
        Parameters:
        - metadata (dict): Exception metadata including:
                          - 'error': String representation of original error
                          - 'traceback': Full traceback string
                          - 'task_id': ID of failed task
                          - 'retries': Number of retries attempted
        - *args: Additional exception arguments
        """

class TaskLockedException(HueyException):
    """
    Exception raised when task cannot acquire required lock.
    
    Raised when:
    - Task decorated with @lock cannot acquire lock
    - Context manager lock acquisition fails
    - Lock is already held by another worker
    """

class ResultTimeout(HueyException):
    """
    Exception raised when result retrieval times out.
    
    Raised when:
    - Result.get(blocking=True, timeout=X) exceeds timeout
    - Task takes longer than specified timeout to complete
    """

Base Exception Classes

Foundation exception classes for Huey error hierarchy.

class HueyException(Exception):
    """
    Base exception class for all Huey-specific exceptions.
    
    All Huey exceptions inherit from this class, allowing
    for broad exception handling when needed.
    """

class ConfigurationError(HueyException):
    """
    Exception raised for configuration-related errors.
    
    Raised when:
    - Invalid storage backend configuration
    - Missing required dependencies
    - Invalid consumer options
    - Incompatible parameter combinations
    """

Usage Examples

Task Retry Control

from huey import RedisHuey
from huey.exceptions import RetryTask, CancelExecution
import random
import datetime

huey = RedisHuey('exception-app')

@huey.task(retries=3, retry_delay=60)
def unreliable_api_call(endpoint):
    """Task that might fail and needs intelligent retry logic."""
    
    # Simulate API call
    if random.random() < 0.3:  # 30% failure rate
        # Different retry strategies based on error type
        error_type = random.choice(['network', 'rate_limit', 'server_error'])
        
        if error_type == 'network':
            # Network errors: retry immediately
            raise RetryTask("Network error - retrying immediately")
        
        elif error_type == 'rate_limit':
            # Rate limit: wait 5 minutes
            raise RetryTask("Rate limited", delay=300)
        
        elif error_type == 'server_error':
            # Server error: retry at specific time
            retry_time = datetime.datetime.now() + datetime.timedelta(minutes=10)
            raise RetryTask("Server error", eta=retry_time)
    
    return f"Successfully called {endpoint}"

# Task will retry with different strategies based on error type
result = unreliable_api_call('/api/users')

Conditional Task Cancellation

@huey.task()
def conditional_task(user_id, action):
    """Task that may be canceled based on conditions."""
    
    # Check if user is still active
    if not is_user_active(user_id):
        # Cancel without retry
        raise CancelExecution(retry=False, "User is no longer active")
    
    # Check system maintenance
    if is_maintenance_mode():
        # Cancel but retry after maintenance
        raise CancelExecution(retry=True, "System in maintenance mode")
    
    # Proceed with task
    return perform_action(user_id, action)

@huey.pre_execute()
def check_global_conditions(task):
    """Pre-execution hook that can cancel tasks."""
    if is_emergency_mode():
        raise CancelExecution(retry=True, "Emergency mode active")

Exception Handling in Results

from huey.exceptions import TaskException, ResultTimeout

@huey.task()
def risky_calculation(data):
    """Task that might fail with various errors."""
    if data < 0:
        raise ValueError("Negative values not supported")
    elif data > 1000:
        raise OverflowError("Value too large")
    else:
        return data ** 2

# Handle exceptions when retrieving results
result = risky_calculation(-5)

try:
    value = result.get(blocking=True, timeout=30)
    print(f"Result: {value}")
except TaskException as e:
    # Access original exception details
    error_info = e.metadata
    print(f"Task failed: {error_info['error']}")
    print(f"Task ID: {error_info['task_id']}")
    print(f"Retries attempted: {error_info['retries']}")
    print(f"Traceback: {error_info['traceback']}")
except ResultTimeout:
    print("Task took too long to complete")

Lock Exception Handling

from huey.exceptions import TaskLockedException

@huey.task()
def resource_processor(resource_id):
    """Process resource with lock handling."""
    lock_name = f'resource_{resource_id}'
    
    try:
        with huey.lock_task(lock_name):
            # Process the resource
            return process_resource(resource_id)
    except TaskLockedException:
        # Resource is locked, decide what to do
        if is_urgent_processing(resource_id):
            # For urgent tasks, retry after short delay
            raise RetryTask("Resource locked, retrying soon", delay=30)
        else:
            # For normal tasks, cancel without retry
            raise CancelExecution(
                retry=False, 
                f"Resource {resource_id} is busy"
            )

Complex Error Handling Patterns

import logging
from huey.exceptions import *

logger = logging.getLogger('task_errors')

@huey.task(retries=5, retry_delay=120)
def complex_data_processing(data_id):
    """Task with sophisticated error handling."""
    
    try:
        # Load data
        data = load_data(data_id)
        if not data:
            # No point retrying if data doesn't exist
            raise CancelExecution(
                retry=False, 
                f"Data {data_id} not found"
            )
        
        # Validate data
        if not validate_data(data):
            # Data validation failed, might be temporary
            raise RetryTask(
                "Data validation failed", 
                delay=300  # Wait 5 minutes for data to be fixed
            )
        
        # Check system resources
        if not has_sufficient_resources():
            # Resource constraint, retry during off-peak hours
            off_peak = datetime.datetime.now().replace(
                hour=2, minute=0, second=0, microsecond=0
            )
            if off_peak <= datetime.datetime.now():
                off_peak += datetime.timedelta(days=1)
            
            raise RetryTask(
                "Insufficient resources", 
                eta=off_peak
            )
        
        # Process data
        result = process_data(data)
        return result
        
    except ConnectionError as e:
        # Network issues: exponential backoff
        logger.warning(f"Connection error in task {data_id}: {e}")
        raise RetryTask("Connection error", delay=120)
        
    except PermissionError as e:
        # Permission issues: don't retry
        logger.error(f"Permission error in task {data_id}: {e}")
        raise CancelExecution(
            retry=False,
            "Insufficient permissions"
        )
        
    except Exception as e:
        # Unexpected errors: log and retry with increasing delay
        logger.exception(f"Unexpected error in task {data_id}")
        
        # Calculate exponential backoff based on retry count
        # (Note: in real implementation, you'd track retry count)
        delay = min(300 * (2 ** 0), 3600)  # Cap at 1 hour
        raise RetryTask(f"Unexpected error: {e}", delay=delay)

Global Exception Handling

@huey.post_execute()
def global_error_handler(task, task_value, exception):
    """Global exception handler for all tasks."""
    
    if exception is None:
        # Task succeeded
        logger.info(f"Task {task.name} completed successfully")
        return
    
    # Log all exceptions
    logger.error(f"Task {task.name} failed: {exception}")
    
    # Handle specific exception types
    if isinstance(exception, TaskLockedException):
        # Track lock contention
        track_lock_contention(task.name)
        
    elif isinstance(exception, (RetryTask, CancelExecution)):
        # These are handled by Huey automatically
        logger.info(f"Task {task.name} control exception: {exception}")
        
    else:
        # Unexpected exceptions
        send_error_alert(task, exception)

@huey.signal(S.SIGNAL_ERROR)
def error_signal_handler(signal, task, exception):
    """Handle error signals."""
    logger.error(f"Signal {signal}: Task {task.id} error: {exception}")
    
    # Could implement:
    # - Error metrics collection
    # - Alert systems
    # - Automatic task rescheduling
    # - Dead letter queue

Exception-based Task Routing

@huey.task()
def smart_processor(data, task_type='normal'):
    """Task that routes to different handlers based on exceptions."""
    
    try:
        if task_type == 'critical':
            return critical_processing(data)
        else:
            return normal_processing(data)
            
    except ResourceBusyError:
        # Route to different queue/priority
        if task_type == 'critical':
            # High priority retry
            raise RetryTask("Resource busy", delay=10)
        else:
            # Low priority, longer delay
            raise RetryTask("Resource busy", delay=300)
            
    except DataCorruptionError:
        # Try to repair and retry
        try:
            repair_data(data)
            raise RetryTask("Data repaired, retrying", delay=60)
        except RepairFailedError:
            # Can't repair, send to manual review
            send_to_manual_review(data)
            raise CancelExecution(
                retry=False,
                "Data corruption - sent for manual review"
            )

# Different task types with different error handling
critical_result = smart_processor(data, 'critical')
normal_result = smart_processor(data, 'normal')

Install with Tessl CLI

npx tessl i tessl/pypi-huey

docs

core-task-queue.md

exception-handling.md

index.md

locking-concurrency.md

result-management.md

scheduling.md

storage-backends.md

task-lifecycle.md

tile.json