CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq-redis

Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments

Overview
Eval results
Files

result-backends.mddocs/

Result Backends

TaskIQ-Redis provides async result backends for storing and retrieving task execution results with configurable expiration times. Each backend supports different Redis deployment architectures while maintaining the same API interface.

Capabilities

Standard Redis Result Backend

Async result backend for single Redis instance deployments.

class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
    def __init__(
        self,
        redis_url: str,
        keep_results: bool = True,
        result_ex_time: Optional[int] = None,
        result_px_time: Optional[int] = None,
        max_connection_pool_size: Optional[int] = None,
        serializer: Optional[TaskiqSerializer] = None,
        prefix_str: Optional[str] = None,
        **connection_kwargs: Any,
    ) -> None:
        """
        Redis async result backend.
        
        Parameters:
        - redis_url: Redis connection URL
        - keep_results: Don't remove results after reading (default: True)
        - result_ex_time: Result expiration time in seconds
        - result_px_time: Result expiration time in milliseconds
        - max_connection_pool_size: Maximum connections in pool
        - serializer: Custom serializer (default: PickleSerializer)
        - prefix_str: Prefix for Redis keys
        - connection_kwargs: Additional Redis connection arguments
        
        Raises:
        - DuplicateExpireTimeSelectedError: If both ex_time and px_time specified
        - ExpireTimeMustBeMoreThanZeroError: If expiration time <= 0
        """

    async def shutdown(self) -> None:
        """Close Redis connection pool."""

    async def set_result(
        self, 
        task_id: str, 
        result: TaskiqResult[_ReturnType]
    ) -> None:
        """
        Store task result in Redis.
        
        Parameters:
        - task_id: Unique task identifier
        - result: Task execution result to store
        """

    async def is_result_ready(self, task_id: str) -> bool:
        """
        Check if task result is available.
        
        Parameters:
        - task_id: Unique task identifier
        
        Returns:
        - bool: True if result is ready, False otherwise
        """

    async def get_result(
        self, 
        task_id: str, 
        with_logs: bool = False
    ) -> TaskiqResult[_ReturnType]:
        """
        Retrieve task result from Redis.
        
        Parameters:
        - task_id: Unique task identifier
        - with_logs: Include execution logs in result (default: False)
        
        Returns:
        - TaskiqResult: Task execution result
        
        Raises:
        - ResultIsMissingError: If result not found
        """

    async def set_progress(
        self, 
        task_id: str, 
        progress: TaskProgress[_ReturnType]
    ) -> None:
        """
        Store task progress information.
        
        Parameters:
        - task_id: Unique task identifier
        - progress: Task progress information
        """

    async def get_progress(
        self, 
        task_id: str
    ) -> Union[TaskProgress[_ReturnType], None]:
        """
        Retrieve task progress information.
        
        Parameters:
        - task_id: Unique task identifier
        
        Returns:
        - TaskProgress or None: Progress information if available
        """

Usage Example:

from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker

# Create result backend with 1 hour expiration
backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
    result_ex_time=3600,  # 1 hour in seconds
    keep_results=True
)

# Use with broker
broker = RedisStreamBroker(
    url="redis://localhost:6379",
    result_backend=backend
)

@broker.task
async def compute_task(x: int, y: int) -> int:
    return x * y

# Execute task and get result
task = await compute_task.kiq(10, 20)
result = await task.wait_result()
print(f"Result: {result.return_value}")  # Result: 200

# Check progress (if task supports it)
progress = await backend.get_progress(task.task_id)
if progress:
    print(f"Progress: {progress.progress}%")

Redis Cluster Result Backend

Async result backend for Redis Cluster deployments.

class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]):
    def __init__(
        self,
        redis_url: str,
        keep_results: bool = True,
        result_ex_time: Optional[int] = None,
        result_px_time: Optional[int] = None,
        serializer: Optional[TaskiqSerializer] = None,
        prefix_str: Optional[str] = None,
        **connection_kwargs: Any,
    ) -> None:
        """
        Redis Cluster async result backend.
        
        Parameters similar to RedisAsyncResultBackend but without
        max_connection_pool_size (managed by Redis Cluster client).
        """

    async def shutdown(self) -> None:
        """Close Redis cluster connection."""

    async def set_result(
        self, 
        task_id: str, 
        result: TaskiqResult[_ReturnType]
    ) -> None:
        """Store task result in Redis cluster."""

    async def is_result_ready(self, task_id: str) -> bool:
        """Check if task result is available in Redis cluster."""

    async def get_result(
        self, 
        task_id: str, 
        with_logs: bool = False
    ) -> TaskiqResult[_ReturnType]:
        """Retrieve task result from Redis cluster."""

    async def set_progress(
        self, 
        task_id: str, 
        progress: TaskProgress[_ReturnType]
    ) -> None:
        """Store task progress in Redis cluster."""

    async def get_progress(
        self, 
        task_id: str
    ) -> Union[TaskProgress[_ReturnType], None]:
        """Retrieve task progress from Redis cluster."""

Usage Example:

from taskiq_redis import RedisAsyncClusterResultBackend, RedisStreamClusterBroker

# Create cluster result backend
backend = RedisAsyncClusterResultBackend(
    redis_url="redis://cluster-node1:6379",
    result_ex_time=7200  # 2 hours
)

# Use with cluster broker
broker = RedisStreamClusterBroker(
    url="redis://cluster-node1:6379",
    result_backend=backend
)

Redis Sentinel Result Backend

Async result backend for Redis Sentinel deployments with high availability.

class RedisAsyncSentinelResultBackend(AsyncResultBackend[_ReturnType]):
    def __init__(
        self,
        sentinels: List[Tuple[str, int]],
        master_name: str,
        keep_results: bool = True,
        result_ex_time: Optional[int] = None,
        result_px_time: Optional[int] = None,
        min_other_sentinels: int = 0,
        sentinel_kwargs: Optional[Any] = None,
        serializer: Optional[TaskiqSerializer] = None,
        prefix_str: Optional[str] = None,
        **connection_kwargs: Any,
    ) -> None:
        """
        Redis Sentinel async result backend.
        
        Parameters:
        - sentinels: List of sentinel (host, port) pairs
        - master_name: Sentinel master name
        - keep_results: Don't remove results after reading (default: True)
        - result_ex_time: Result expiration time in seconds
        - result_px_time: Result expiration time in milliseconds
        - min_other_sentinels: Minimum other sentinels required (default: 0)
        - sentinel_kwargs: Additional sentinel configuration
        - serializer: Custom serializer (default: PickleSerializer)
        - prefix_str: Prefix for Redis keys
        - connection_kwargs: Additional Redis connection arguments
        """

    async def shutdown(self) -> None:
        """Close Redis sentinel connection."""

    async def set_result(
        self, 
        task_id: str, 
        result: TaskiqResult[_ReturnType]
    ) -> None:
        """Store task result in Redis via Sentinel."""

    async def is_result_ready(self, task_id: str) -> bool:
        """Check if task result is available via Sentinel."""

    async def get_result(
        self, 
        task_id: str, 
        with_logs: bool = False
    ) -> TaskiqResult[_ReturnType]:
        """Retrieve task result from Redis via Sentinel."""

    async def set_progress(
        self, 
        task_id: str, 
        progress: TaskProgress[_ReturnType]
    ) -> None:
        """Store task progress via Sentinel."""

    async def get_progress(
        self, 
        task_id: str
    ) -> Union[TaskProgress[_ReturnType], None]:
        """Retrieve task progress via Sentinel."""

Usage Example:

from taskiq_redis import RedisAsyncSentinelResultBackend, RedisStreamSentinelBroker

# Create high-availability result backend
backend = RedisAsyncSentinelResultBackend(
    sentinels=[
        ("sentinel1", 26379),
        ("sentinel2", 26379),
        ("sentinel3", 26379)
    ],
    master_name="mymaster",
    result_ex_time=1800,  # 30 minutes
    min_other_sentinels=1
)

# Use with sentinel broker
broker = RedisStreamSentinelBroker(
    sentinels=[("sentinel1", 26379), ("sentinel2", 26379)],
    master_name="mymaster",
    result_backend=backend
)

@broker.task
async def important_task(data: dict) -> dict:
    # Critical task with HA storage
    return {"processed": data, "success": True}

Result Management

Expiration Strategies

Results can be configured to expire automatically:

# Expire after 1 hour (3600 seconds)
backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
    result_ex_time=3600
)

# Expire after 30 minutes (1800000 milliseconds)
backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
    result_px_time=1800000
)

# Keep results indefinitely
backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
    keep_results=True
)

Progress Tracking

Tasks can report progress during execution:

from taskiq_redis import RedisAsyncResultBackend
from taskiq.depends.progress_tracker import TaskProgress

backend = RedisAsyncResultBackend("redis://localhost:6379")

@broker.task
async def long_running_task(items: List[str]) -> List[str]:
    results = []
    total = len(items)
    
    for i, item in enumerate(items):
        # Process item
        processed = await process_item(item)
        results.append(processed)
        
        # Update progress
        progress = TaskProgress(
            progress=int((i + 1) / total * 100),
            message=f"Processed {i + 1}/{total} items"
        )
        await backend.set_progress(task.task_id, progress)
    
    return results

# Monitor progress
task = await long_running_task.kiq(["item1", "item2", "item3"])
while not await backend.is_result_ready(task.task_id):
    progress = await backend.get_progress(task.task_id)
    if progress:
        print(f"Progress: {progress.progress}% - {progress.message}")
    await asyncio.sleep(1)

Types

from typing import TypeVar, Optional, Any, List, Tuple, Union
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.abc.serializer import TaskiqSerializer
from taskiq.result import TaskiqResult
from taskiq.depends.progress_tracker import TaskProgress

_ReturnType = TypeVar("_ReturnType")

# Constants
PROGRESS_KEY_SUFFIX: str = "__progress"

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq-redis

docs

brokers.md

index.md

result-backends.md

schedule-sources.md

tile.json