huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features
—
Exception classes for task control flow, error handling, and task execution management. These exceptions provide fine-grained control over task behavior and enable sophisticated error handling patterns.
Exceptions that control task execution flow and retry behavior.
class CancelExecution(Exception):
"""
Cancel task execution with optional retry control.
When raised during task execution:
- Task is marked as canceled
- Can optionally schedule for retry
- Pre-execute hooks can raise this to prevent execution
- Post-execute hooks receive this as the exception parameter
"""
def __init__(self, retry=None, *args, **kwargs):
"""
Initialize CancelExecution.
Parameters:
- retry (bool): Whether to retry the task (optional)
None: Use task's default retry setting
True: Force retry even if retries=0
False: Don't retry even if retries>0
- *args, **kwargs: Additional exception arguments
"""
class RetryTask(Exception):
"""
Request task retry with optional timing control.
When raised during task execution:
- Task retry counter is incremented
- Task can be scheduled for immediate or delayed retry
- Overrides default retry delay if eta or delay specified
"""
def __init__(self, msg=None, eta=None, delay=None, *args, **kwargs):
"""
Initialize RetryTask.
Parameters:
- msg (str): Error message (optional)
- eta (datetime): Specific time to retry (optional)
- delay (int/float/timedelta): Delay before retry (optional)
- *args, **kwargs: Additional exception arguments
"""Exceptions related to task execution failures and constraints.
class TaskException(Exception):
"""
Exception wrapper for failed task results.
When a task fails and results are retrieved, the original exception
is wrapped in TaskException with additional metadata.
"""
def __init__(self, metadata=None, *args):
"""
Initialize TaskException.
Parameters:
- metadata (dict): Exception metadata including:
- 'error': String representation of original error
- 'traceback': Full traceback string
- 'task_id': ID of failed task
- 'retries': Number of retries attempted
- *args: Additional exception arguments
"""
class TaskLockedException(HueyException):
"""
Exception raised when task cannot acquire required lock.
Raised when:
- Task decorated with @lock cannot acquire lock
- Context manager lock acquisition fails
- Lock is already held by another worker
"""
class ResultTimeout(HueyException):
"""
Exception raised when result retrieval times out.
Raised when:
- Result.get(blocking=True, timeout=X) exceeds timeout
- Task takes longer than specified timeout to complete
"""Foundation exception classes for Huey error hierarchy.
class HueyException(Exception):
"""
Base exception class for all Huey-specific exceptions.
All Huey exceptions inherit from this class, allowing
for broad exception handling when needed.
"""
class ConfigurationError(HueyException):
"""
Exception raised for configuration-related errors.
Raised when:
- Invalid storage backend configuration
- Missing required dependencies
- Invalid consumer options
- Incompatible parameter combinations
"""from huey import RedisHuey
from huey.exceptions import RetryTask, CancelExecution
import random
import datetime
huey = RedisHuey('exception-app')
@huey.task(retries=3, retry_delay=60)
def unreliable_api_call(endpoint):
"""Task that might fail and needs intelligent retry logic."""
# Simulate API call
if random.random() < 0.3: # 30% failure rate
# Different retry strategies based on error type
error_type = random.choice(['network', 'rate_limit', 'server_error'])
if error_type == 'network':
# Network errors: retry immediately
raise RetryTask("Network error - retrying immediately")
elif error_type == 'rate_limit':
# Rate limit: wait 5 minutes
raise RetryTask("Rate limited", delay=300)
elif error_type == 'server_error':
# Server error: retry at specific time
retry_time = datetime.datetime.now() + datetime.timedelta(minutes=10)
raise RetryTask("Server error", eta=retry_time)
return f"Successfully called {endpoint}"
# Task will retry with different strategies based on error type
result = unreliable_api_call('/api/users')@huey.task()
def conditional_task(user_id, action):
"""Task that may be canceled based on conditions."""
# Check if user is still active
if not is_user_active(user_id):
# Cancel without retry
raise CancelExecution(retry=False, "User is no longer active")
# Check system maintenance
if is_maintenance_mode():
# Cancel but retry after maintenance
raise CancelExecution(retry=True, "System in maintenance mode")
# Proceed with task
return perform_action(user_id, action)
@huey.pre_execute()
def check_global_conditions(task):
"""Pre-execution hook that can cancel tasks."""
if is_emergency_mode():
raise CancelExecution(retry=True, "Emergency mode active")from huey.exceptions import TaskException, ResultTimeout
@huey.task()
def risky_calculation(data):
"""Task that might fail with various errors."""
if data < 0:
raise ValueError("Negative values not supported")
elif data > 1000:
raise OverflowError("Value too large")
else:
return data ** 2
# Handle exceptions when retrieving results
result = risky_calculation(-5)
try:
value = result.get(blocking=True, timeout=30)
print(f"Result: {value}")
except TaskException as e:
# Access original exception details
error_info = e.metadata
print(f"Task failed: {error_info['error']}")
print(f"Task ID: {error_info['task_id']}")
print(f"Retries attempted: {error_info['retries']}")
print(f"Traceback: {error_info['traceback']}")
except ResultTimeout:
print("Task took too long to complete")from huey.exceptions import TaskLockedException
@huey.task()
def resource_processor(resource_id):
"""Process resource with lock handling."""
lock_name = f'resource_{resource_id}'
try:
with huey.lock_task(lock_name):
# Process the resource
return process_resource(resource_id)
except TaskLockedException:
# Resource is locked, decide what to do
if is_urgent_processing(resource_id):
# For urgent tasks, retry after short delay
raise RetryTask("Resource locked, retrying soon", delay=30)
else:
# For normal tasks, cancel without retry
raise CancelExecution(
retry=False,
f"Resource {resource_id} is busy"
)import logging
from huey.exceptions import *
logger = logging.getLogger('task_errors')
@huey.task(retries=5, retry_delay=120)
def complex_data_processing(data_id):
"""Task with sophisticated error handling."""
try:
# Load data
data = load_data(data_id)
if not data:
# No point retrying if data doesn't exist
raise CancelExecution(
retry=False,
f"Data {data_id} not found"
)
# Validate data
if not validate_data(data):
# Data validation failed, might be temporary
raise RetryTask(
"Data validation failed",
delay=300 # Wait 5 minutes for data to be fixed
)
# Check system resources
if not has_sufficient_resources():
# Resource constraint, retry during off-peak hours
off_peak = datetime.datetime.now().replace(
hour=2, minute=0, second=0, microsecond=0
)
if off_peak <= datetime.datetime.now():
off_peak += datetime.timedelta(days=1)
raise RetryTask(
"Insufficient resources",
eta=off_peak
)
# Process data
result = process_data(data)
return result
except ConnectionError as e:
# Network issues: exponential backoff
logger.warning(f"Connection error in task {data_id}: {e}")
raise RetryTask("Connection error", delay=120)
except PermissionError as e:
# Permission issues: don't retry
logger.error(f"Permission error in task {data_id}: {e}")
raise CancelExecution(
retry=False,
"Insufficient permissions"
)
except Exception as e:
# Unexpected errors: log and retry with increasing delay
logger.exception(f"Unexpected error in task {data_id}")
# Calculate exponential backoff based on retry count
# (Note: in real implementation, you'd track retry count)
delay = min(300 * (2 ** 0), 3600) # Cap at 1 hour
raise RetryTask(f"Unexpected error: {e}", delay=delay)@huey.post_execute()
def global_error_handler(task, task_value, exception):
"""Global exception handler for all tasks."""
if exception is None:
# Task succeeded
logger.info(f"Task {task.name} completed successfully")
return
# Log all exceptions
logger.error(f"Task {task.name} failed: {exception}")
# Handle specific exception types
if isinstance(exception, TaskLockedException):
# Track lock contention
track_lock_contention(task.name)
elif isinstance(exception, (RetryTask, CancelExecution)):
# These are handled by Huey automatically
logger.info(f"Task {task.name} control exception: {exception}")
else:
# Unexpected exceptions
send_error_alert(task, exception)
@huey.signal(S.SIGNAL_ERROR)
def error_signal_handler(signal, task, exception):
"""Handle error signals."""
logger.error(f"Signal {signal}: Task {task.id} error: {exception}")
# Could implement:
# - Error metrics collection
# - Alert systems
# - Automatic task rescheduling
# - Dead letter queue@huey.task()
def smart_processor(data, task_type='normal'):
"""Task that routes to different handlers based on exceptions."""
try:
if task_type == 'critical':
return critical_processing(data)
else:
return normal_processing(data)
except ResourceBusyError:
# Route to different queue/priority
if task_type == 'critical':
# High priority retry
raise RetryTask("Resource busy", delay=10)
else:
# Low priority, longer delay
raise RetryTask("Resource busy", delay=300)
except DataCorruptionError:
# Try to repair and retry
try:
repair_data(data)
raise RetryTask("Data repaired, retrying", delay=60)
except RepairFailedError:
# Can't repair, send to manual review
send_to_manual_review(data)
raise CancelExecution(
retry=False,
"Data corruption - sent for manual review"
)
# Different task types with different error handling
critical_result = smart_processor(data, 'critical')
normal_result = smart_processor(data, 'normal')Install with Tessl CLI
npx tessl i tessl/pypi-huey