huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features
—
Comprehensive result handling including result storage, retrieval, blocking operations, timeouts, and result groups for batch operations. These features enable robust result tracking and retrieval patterns for asynchronous task execution.
Get task results with various retrieval strategies and timing controls.
class Result:
def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0,
revoke_on_timeout=False, preserve=False):
"""
Retrieve the task result.
Parameters:
- blocking (bool): Block until result is available (default: False)
- timeout (int/float): Maximum time to wait in seconds (optional)
- backoff (float): Exponential backoff multiplier (default: 1.15)
- max_delay (float): Maximum delay between checks (default: 1.0)
- revoke_on_timeout (bool): Revoke task if timeout occurs (default: False)
- preserve (bool): Don't consume result, allow multiple reads (default: False)
Returns:
Task result value
Raises:
ResultTimeout: If timeout is reached
TaskException: If task failed with an exception
"""
def __call__(self, *args, **kwargs):
"""
Shortcut for get() method.
Returns:
Task result value
"""
def get_raw_result(self, blocking=False, timeout=None, backoff=1.15,
max_delay=1.0, revoke_on_timeout=False, preserve=False):
"""
Get raw result without exception unwrapping.
Parameters:
- blocking (bool): Block until result is available (default: False)
- timeout (int/float): Maximum time to wait in seconds (optional)
- backoff (float): Exponential backoff multiplier (default: 1.15)
- max_delay (float): Maximum delay between checks (default: 1.0)
- revoke_on_timeout (bool): Revoke task on timeout (default: False)
- preserve (bool): Don't consume result (default: False)
Returns:
Raw result value or Error instance for failed tasks
"""Check and control result task execution status.
def is_revoked(self):
"""
Check if the result's task is revoked.
Returns:
bool: True if task is revoked
"""
def revoke(self, revoke_once=True):
"""
Revoke the result's task.
Parameters:
- revoke_once (bool): Revoke only one execution (default: True)
Returns:
None
"""
def restore(self):
"""
Restore a revoked task.
Returns:
bool: True if task was revoked and restored
"""
def reset(self):
"""
Reset result state to allow re-reading.
Returns:
None
"""Bulk operations for revoking and restoring multiple tasks by class or ID.
def revoke_all(self, task_class, revoke_until=None, revoke_once=False):
"""
Revoke all tasks of a specific task class.
Parameters:
- task_class (TaskWrapper): Task class to revoke
- revoke_until (datetime): Revoke tasks until this time (optional)
- revoke_once (bool): Revoke only one execution per task (default: False)
Returns:
int: Number of tasks revoked
"""
def restore_all(self, task_class):
"""
Restore all revoked tasks of a specific task class.
Parameters:
- task_class (TaskWrapper): Task class to restore
Returns:
int: Number of tasks restored
"""
def revoke_by_id(self, id, revoke_until=None, revoke_once=False):
"""
Revoke a task by its ID.
Parameters:
- id (str): Task ID to revoke
- revoke_until (datetime): Revoke task until this time (optional)
- revoke_once (bool): Revoke only one execution (default: False)
Returns:
bool: True if task was found and revoked
"""
def restore_by_id(self, id):
"""
Restore a revoked task by its ID.
Parameters:
- id (str): Task ID to restore
Returns:
bool: True if task was found and restored
"""Reschedule tasks associated with results for different execution times.
def reschedule(self, eta=None, delay=None, expires=None, priority=None,
preserve_pipeline=True):
"""
Reschedule the result's task for different execution.
Parameters:
- eta (datetime): New execution time (optional)
- delay (int/float/timedelta): Delay before execution (optional)
- expires (datetime/int): New expiration time (optional)
- priority (int): New priority level (optional)
- preserve_pipeline (bool): Keep task pipeline (default: True)
Returns:
Result: New result instance for rescheduled task
"""Handle multiple results as a single unit for batch operations.
class ResultGroup:
def get(self, *args, **kwargs):
"""
Get all results in the group.
Parameters:
- *args, **kwargs: Passed to individual Result.get() calls
Returns:
List of result values in same order as tasks
"""
def __call__(self, *args, **kwargs):
"""
Shortcut for get() method.
Returns:
List of result values
"""
def __getitem__(self, idx):
"""
Get result by index with blocking.
Parameters:
- idx (int): Result index
Returns:
Individual result value
"""
def __iter__(self):
"""
Iterate over Result instances.
Returns:
Iterator of Result instances
"""
def __len__(self):
"""
Get number of results in group.
Returns:
int: Number of results
"""
def as_completed(self, backoff=1.15, max_delay=1.0):
"""
Iterate over results as they complete.
Parameters:
- backoff (float): Exponential backoff multiplier (default: 1.15)
- max_delay (float): Maximum delay between checks (default: 1.0)
Yields:
Result values as they become available
"""Result-related methods on the main Huey instance.
def result(self, id, blocking=False, timeout=None, backoff=1.15,
max_delay=1.0, revoke_on_timeout=False, preserve=False):
"""
Get result by task ID.
Parameters:
- id (str): Task ID
- blocking (bool): Block until result available (default: False)
- timeout (int/float): Maximum wait time (optional)
- backoff (float): Exponential backoff multiplier (default: 1.15)
- max_delay (float): Maximum delay between checks (default: 1.0)
- revoke_on_timeout (bool): Revoke on timeout (default: False)
- preserve (bool): Don't consume result (default: False)
Returns:
Task result value
"""
def all_results(self):
"""
Get all stored results.
Returns:
List of all result data
"""
def result_count(self):
"""
Get total number of stored results.
Returns:
int: Number of results in storage
"""from huey import RedisHuey
import time
huey = RedisHuey('results-app')
@huey.task()
def calculate_fibonacci(n):
if n <= 1:
return n
return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)
# Enqueue task and get result handle
result = calculate_fibonacci(10)
# Non-blocking check
try:
value = result(blocking=False)
print(f"Result ready: {value}")
except:
print("Result not ready yet")
# Blocking retrieval with timeout
try:
value = result(blocking=True, timeout=30)
print(f"Fibonacci(10) = {value}")
except ResultTimeout:
print("Task took too long")@huey.task()
def process_data(data_chunk):
# Simulate processing time
time.sleep(2)
return len(data_chunk)
# Process multiple chunks
data_chunks = [
[1, 2, 3], [4, 5, 6], [7, 8, 9]
]
results = []
for chunk in data_chunks:
result = process_data(chunk)
results.append(result)
# Wait for all results with timeout
try:
values = [r.get(blocking=True, timeout=10) for r in results]
print(f"Processed chunks: {values}")
except ResultTimeout:
print("Some tasks timed out")# Using map for batch processing
@huey.task()
def square_number(x):
return x * x
numbers = [1, 2, 3, 4, 5]
result_group = square_number.map(numbers)
# Get all results at once
squared = result_group.get(blocking=True, timeout=30)
print(f"Squared numbers: {squared}")
# Process results as they complete
print("Processing results as completed:")
for value in result_group.as_completed():
print(f"Got result: {value}")@huey.task(retries=2)
def unreliable_task(data):
# Simulate unreliable operation
import random
if random.random() < 0.3:
raise Exception("Random failure")
return f"Processed: {data}"
result = unreliable_task("important_data")
# Check if task was revoked
if result.is_revoked():
print("Task was revoked")
result.restore() # Restore if needed
# Reschedule for later if needed
if some_condition:
new_result = result.reschedule(delay=3600) # Reschedule for 1 hour later
print(f"Rescheduled task: {new_result.id}")@huey.task()
def expensive_computation(params):
# Simulate expensive computation
time.sleep(10)
return f"Result for {params}"
result = expensive_computation("dataset_1")
# Get result but preserve it for later use
value1 = result.get(preserve=True)
print(f"First read: {value1}")
# Can read again because preserve=True
value2 = result.get(preserve=True)
print(f"Second read: {value2}")
# Reset and read again
result.reset()
value3 = result.get()
print(f"After reset: {value3}")# Check result store status
total_results = huey.result_count()
print(f"Total stored results: {total_results}")
# Get specific result by task ID
task_id = "some-task-id"
try:
value = huey.result(task_id, blocking=True, timeout=5)
print(f"Task {task_id} result: {value}")
except ResultTimeout:
print(f"Task {task_id} timed out")
# Get all results (be careful with large result stores)
if total_results < 100: # Only if reasonable number
all_results = huey.all_results()
print(f"All results: {len(all_results)} items")Install with Tessl CLI
npx tessl i tessl/pypi-huey