CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-huey

huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features

Pending
Overview
Eval results
Files

result-management.mddocs/

Result Management

Comprehensive result handling including result storage, retrieval, blocking operations, timeouts, and result groups for batch operations. These features enable robust result tracking and retrieval patterns for asynchronous task execution.

Capabilities

Result Retrieval

Get task results with various retrieval strategies and timing controls.

class Result:
    def get(self, blocking=False, timeout=None, backoff=1.15, max_delay=1.0,
            revoke_on_timeout=False, preserve=False):
        """
        Retrieve the task result.
        
        Parameters:
        - blocking (bool): Block until result is available (default: False)
        - timeout (int/float): Maximum time to wait in seconds (optional)
        - backoff (float): Exponential backoff multiplier (default: 1.15)
        - max_delay (float): Maximum delay between checks (default: 1.0)
        - revoke_on_timeout (bool): Revoke task if timeout occurs (default: False)
        - preserve (bool): Don't consume result, allow multiple reads (default: False)
        
        Returns:
        Task result value
        
        Raises:
        ResultTimeout: If timeout is reached
        TaskException: If task failed with an exception
        """
    
    def __call__(self, *args, **kwargs):
        """
        Shortcut for get() method.
        
        Returns:
        Task result value
        """
    
    def get_raw_result(self, blocking=False, timeout=None, backoff=1.15,
                       max_delay=1.0, revoke_on_timeout=False, preserve=False):
        """
        Get raw result without exception unwrapping.
        
        Parameters:
        - blocking (bool): Block until result is available (default: False)
        - timeout (int/float): Maximum time to wait in seconds (optional)
        - backoff (float): Exponential backoff multiplier (default: 1.15)
        - max_delay (float): Maximum delay between checks (default: 1.0)
        - revoke_on_timeout (bool): Revoke task on timeout (default: False)
        - preserve (bool): Don't consume result (default: False)
        
        Returns:
        Raw result value or Error instance for failed tasks
        """

Result Status and Control

Check and control result task execution status.

def is_revoked(self):
    """
    Check if the result's task is revoked.
    
    Returns:
    bool: True if task is revoked
    """

def revoke(self, revoke_once=True):
    """
    Revoke the result's task.
    
    Parameters:
    - revoke_once (bool): Revoke only one execution (default: True)
    
    Returns:
    None
    """

def restore(self):
    """
    Restore a revoked task.
    
    Returns:
    bool: True if task was revoked and restored
    """

def reset(self):
    """
    Reset result state to allow re-reading.
    
    Returns:
    None
    """

Bulk Task Revocation and Restoration

Bulk operations for revoking and restoring multiple tasks by class or ID.

def revoke_all(self, task_class, revoke_until=None, revoke_once=False):
    """
    Revoke all tasks of a specific task class.
    
    Parameters:
    - task_class (TaskWrapper): Task class to revoke
    - revoke_until (datetime): Revoke tasks until this time (optional)
    - revoke_once (bool): Revoke only one execution per task (default: False)
    
    Returns:
    int: Number of tasks revoked
    """

def restore_all(self, task_class):
    """
    Restore all revoked tasks of a specific task class.
    
    Parameters:
    - task_class (TaskWrapper): Task class to restore
    
    Returns:
    int: Number of tasks restored
    """

def revoke_by_id(self, id, revoke_until=None, revoke_once=False):
    """
    Revoke a task by its ID.
    
    Parameters:
    - id (str): Task ID to revoke
    - revoke_until (datetime): Revoke task until this time (optional)
    - revoke_once (bool): Revoke only one execution (default: False)
    
    Returns:
    bool: True if task was found and revoked
    """

def restore_by_id(self, id):
    """
    Restore a revoked task by its ID.
    
    Parameters:
    - id (str): Task ID to restore
    
    Returns:
    bool: True if task was found and restored
    """

Result Rescheduling

Reschedule tasks associated with results for different execution times.

def reschedule(self, eta=None, delay=None, expires=None, priority=None,
               preserve_pipeline=True):
    """
    Reschedule the result's task for different execution.
    
    Parameters:
    - eta (datetime): New execution time (optional)
    - delay (int/float/timedelta): Delay before execution (optional)
    - expires (datetime/int): New expiration time (optional)
    - priority (int): New priority level (optional)
    - preserve_pipeline (bool): Keep task pipeline (default: True)
    
    Returns:
    Result: New result instance for rescheduled task
    """

Result Groups

Handle multiple results as a single unit for batch operations.

class ResultGroup:
    def get(self, *args, **kwargs):
        """
        Get all results in the group.
        
        Parameters:
        - *args, **kwargs: Passed to individual Result.get() calls
        
        Returns:
        List of result values in same order as tasks
        """
    
    def __call__(self, *args, **kwargs):
        """
        Shortcut for get() method.
        
        Returns:
        List of result values
        """
    
    def __getitem__(self, idx):
        """
        Get result by index with blocking.
        
        Parameters:
        - idx (int): Result index
        
        Returns:
        Individual result value
        """
    
    def __iter__(self):
        """
        Iterate over Result instances.
        
        Returns:
        Iterator of Result instances
        """
    
    def __len__(self):
        """
        Get number of results in group.
        
        Returns:
        int: Number of results
        """
    
    def as_completed(self, backoff=1.15, max_delay=1.0):
        """
        Iterate over results as they complete.
        
        Parameters:
        - backoff (float): Exponential backoff multiplier (default: 1.15)
        - max_delay (float): Maximum delay between checks (default: 1.0)
        
        Yields:
        Result values as they become available
        """

Huey Result Methods

Result-related methods on the main Huey instance.

def result(self, id, blocking=False, timeout=None, backoff=1.15,
           max_delay=1.0, revoke_on_timeout=False, preserve=False):
    """
    Get result by task ID.
    
    Parameters:
    - id (str): Task ID
    - blocking (bool): Block until result available (default: False)
    - timeout (int/float): Maximum wait time (optional)
    - backoff (float): Exponential backoff multiplier (default: 1.15)
    - max_delay (float): Maximum delay between checks (default: 1.0)
    - revoke_on_timeout (bool): Revoke on timeout (default: False)
    - preserve (bool): Don't consume result (default: False)
    
    Returns:
    Task result value
    """

def all_results(self):
    """
    Get all stored results.
    
    Returns:
    List of all result data
    """

def result_count(self):
    """
    Get total number of stored results.
    
    Returns:
    int: Number of results in storage
    """

Usage Examples

Basic Result Handling

from huey import RedisHuey
import time

huey = RedisHuey('results-app')

@huey.task()
def calculate_fibonacci(n):
    if n <= 1:
        return n
    return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)

# Enqueue task and get result handle
result = calculate_fibonacci(10)

# Non-blocking check
try:
    value = result(blocking=False)
    print(f"Result ready: {value}")
except:
    print("Result not ready yet")

# Blocking retrieval with timeout
try:
    value = result(blocking=True, timeout=30)
    print(f"Fibonacci(10) = {value}")
except ResultTimeout:
    print("Task took too long")

Advanced Result Patterns

@huey.task()
def process_data(data_chunk):
    # Simulate processing time
    time.sleep(2)
    return len(data_chunk)

# Process multiple chunks
data_chunks = [
    [1, 2, 3], [4, 5, 6], [7, 8, 9]
]

results = []
for chunk in data_chunks:
    result = process_data(chunk)
    results.append(result)

# Wait for all results with timeout
try:
    values = [r.get(blocking=True, timeout=10) for r in results]
    print(f"Processed chunks: {values}")
except ResultTimeout:
    print("Some tasks timed out")

Result Groups and Batch Operations

# Using map for batch processing
@huey.task()
def square_number(x):
    return x * x

numbers = [1, 2, 3, 4, 5]
result_group = square_number.map(numbers)

# Get all results at once
squared = result_group.get(blocking=True, timeout=30)
print(f"Squared numbers: {squared}")

# Process results as they complete
print("Processing results as completed:")
for value in result_group.as_completed():
    print(f"Got result: {value}")

Result State Management

@huey.task(retries=2)
def unreliable_task(data):
    # Simulate unreliable operation
    import random
    if random.random() < 0.3:
        raise Exception("Random failure")
    return f"Processed: {data}"

result = unreliable_task("important_data")

# Check if task was revoked
if result.is_revoked():
    print("Task was revoked")
    result.restore()  # Restore if needed

# Reschedule for later if needed
if some_condition:
    new_result = result.reschedule(delay=3600)  # Reschedule for 1 hour later
    print(f"Rescheduled task: {new_result.id}")

Result Preservation and Reuse

@huey.task()
def expensive_computation(params):
    # Simulate expensive computation
    time.sleep(10)
    return f"Result for {params}"

result = expensive_computation("dataset_1")

# Get result but preserve it for later use
value1 = result.get(preserve=True)
print(f"First read: {value1}")

# Can read again because preserve=True
value2 = result.get(preserve=True)
print(f"Second read: {value2}")

# Reset and read again
result.reset()
value3 = result.get()
print(f"After reset: {value3}")

Monitoring Results

# Check result store status
total_results = huey.result_count()
print(f"Total stored results: {total_results}")

# Get specific result by task ID
task_id = "some-task-id"
try:
    value = huey.result(task_id, blocking=True, timeout=5)
    print(f"Task {task_id} result: {value}")
except ResultTimeout:
    print(f"Task {task_id} timed out")

# Get all results (be careful with large result stores)
if total_results < 100:  # Only if reasonable number
    all_results = huey.all_results()
    print(f"All results: {len(all_results)} items")

Install with Tessl CLI

npx tessl i tessl/pypi-huey

docs

core-task-queue.md

exception-handling.md

index.md

locking-concurrency.md

result-management.md

scheduling.md

storage-backends.md

task-lifecycle.md

tile.json