CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq

Distributed task queue with full async support for Python applications

Overview
Eval results
Files

result-backends.mddocs/

Result Backends

Storage systems for persisting task results and progress tracking across distributed environments. Result backends enable result retrieval after task completion and provide durability for long-running distributed workflows.

Capabilities

Result Backend Interface

Abstract base class defining the interface for result storage and retrieval systems.

class AsyncResultBackend:
    """
    Abstract base class for result storage backends.
    
    Provides interface for storing and retrieving task results,
    checking result availability, and managing backend lifecycle.
    """
    
    async def startup(self) -> None:
        """Initialize the result backend (connect to storage, etc.)."""
    
    async def shutdown(self) -> None:
        """Cleanup result backend resources."""
    
    async def set_result(
        self,
        task_id: str,
        result: TaskiqResult,
    ) -> None:
        """
        Store task result.
        
        Args:
            task_id: Unique identifier for the task
            result: Task execution result to store
        """
    
    async def get_result(
        self,
        task_id: str,
        with_logs: bool = True,
    ) -> TaskiqResult:
        """
        Retrieve stored task result.
        
        Args:
            task_id: Unique identifier for the task
            with_logs: Whether to include execution logs
            
        Returns:
            Stored task result
            
        Raises:
            ResultGetError: If result cannot be retrieved
        """
    
    async def is_result_ready(self, task_id: str) -> bool:
        """
        Check if result is available for retrieval.
        
        Args:
            task_id: Unique identifier for the task
            
        Returns:
            True if result is ready, False otherwise
        """
    
    async def delete_result(self, task_id: str) -> None:
        """
        Delete stored result.
        
        Args:
            task_id: Unique identifier for the task
        """
    
    async def get_progress(
        self,
        task_id: str,
    ) -> Optional[TaskProgress]:
        """
        Get task progress information.
        
        Args:
            task_id: Unique identifier for the task
            
        Returns:
            Progress information if available
        """
    
    async def set_progress(
        self,
        task_id: str,
        progress: TaskProgress,
    ) -> None:
        """
        Store task progress information.
        
        Args:
            task_id: Unique identifier for the task
            progress: Progress information to store
        """

Dummy Result Backend

No-operation result backend for development and testing when result persistence is not needed.

class DummyResultBackend(AsyncResultBackend):
    """
    Dummy result backend that doesn't store results.
    
    Useful for development and testing scenarios where
    result persistence is not required. All operations
    are no-ops.
    """
    
    def __init__(self) -> None: ...
    
    async def set_result(
        self,
        task_id: str,
        result: TaskiqResult,
    ) -> None:
        """No-op result storage."""
    
    async def get_result(
        self,
        task_id: str,
        with_logs: bool = True,
    ) -> TaskiqResult:
        """
        Always raises ResultGetError.
        
        Raises:
            ResultGetError: Always, since no results are stored
        """
    
    async def is_result_ready(self, task_id: str) -> bool:
        """Always returns False."""
    
    async def delete_result(self, task_id: str) -> None:
        """No-op result deletion."""

In-Memory Result Backend

Built-in result backend that stores results in memory with configurable size limits.

class InmemoryResultBackend(AsyncResultBackend):
    """
    In-memory result backend with size limits.
    
    Stores results in memory with automatic cleanup
    to prevent memory exhaustion. Suitable for
    development and single-process applications.
    """
    
    def __init__(self, max_stored_results: int = 100) -> None:
        """
        Initialize in-memory backend.
        
        Args:
            max_stored_results: Maximum number of results to store.
                                Set to -1 for unlimited storage.
        """
    
    async def set_result(
        self,
        task_id: str,
        result: TaskiqResult,
    ) -> None:
        """Store result in memory with automatic cleanup."""
    
    async def get_result(
        self,
        task_id: str,
        with_logs: bool = True,
    ) -> TaskiqResult:
        """Retrieve result from memory."""
    
    async def is_result_ready(self, task_id: str) -> bool:
        """Check if result exists in memory."""
    
    async def delete_result(self, task_id: str) -> None:
        """Remove result from memory."""

Progress Tracking

Progress tracking system for long-running tasks that need to report intermediate status.

class TaskProgress:
    """
    Task progress information container.
    
    Used to track and report progress of long-running tasks
    including completion percentage, current status, and metadata.
    """
    
    progress: float
    """Progress percentage (0.0 to 1.0)."""
    
    status: str
    """Human-readable status description."""
    
    current: Optional[int]
    """Current item being processed."""
    
    total: Optional[int]
    """Total number of items to process."""
    
    meta: Dict[str, Any]
    """Additional progress metadata."""
    
    def __init__(
        self,
        progress: float = 0.0,
        status: str = "PENDING",
        current: Optional[int] = None,
        total: Optional[int] = None,
        meta: Optional[Dict[str, Any]] = None,
    ) -> None: ...
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert progress to dictionary representation."""
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "TaskProgress":
        """Create progress from dictionary representation."""

Usage Examples

Basic Result Backend Configuration

from taskiq import InMemoryBroker
from taskiq.result_backends import InmemoryResultBackend

# Create broker with custom result backend
result_backend = InmemoryResultBackend(max_stored_results=500)
broker = InMemoryBroker().with_result_backend(result_backend)

@broker.task
async def compute_fibonacci(n: int) -> int:
    """Compute fibonacci number."""
    if n <= 1:
        return n
    return await compute_fibonacci(n-1) + await compute_fibonacci(n-2)

async def main():
    await broker.startup()
    
    # Execute task and store result
    result = await compute_fibonacci.kiq(10)
    
    # Result is automatically stored in backend
    value = await result.wait_result()
    print(f"Fibonacci(10) = {value}")
    
    await broker.shutdown()

Progress Tracking in Long-Running Tasks

from taskiq import Context, TaskiqDepends
from taskiq.depends.progress_tracker import TaskProgress

@broker.task
async def process_large_dataset(
    dataset_path: str,
    context: Context = TaskiqDepends(),
) -> dict:
    """Process large dataset with progress tracking."""
    
    # Load dataset
    items = await load_dataset(dataset_path)
    total_items = len(items)
    processed = 0
    
    # Initialize progress
    progress = TaskProgress(
        progress=0.0,
        status="Starting processing",
        current=0,
        total=total_items,
        meta={"dataset": dataset_path}
    )
    await broker.result_backend.set_progress(
        context.message.task_id,
        progress
    )
    
    results = []
    for i, item in enumerate(items):
        # Process item
        result = await process_item(item)
        results.append(result)
        processed += 1
        
        # Update progress every 10 items
        if processed % 10 == 0:
            progress = TaskProgress(
                progress=processed / total_items,
                status=f"Processed {processed}/{total_items} items",
                current=processed,
                total=total_items,
                meta={"last_processed": item["id"]}
            )
            await broker.result_backend.set_progress(
                context.message.task_id,
                progress
            )
    
    # Final progress update
    progress = TaskProgress(
        progress=1.0,
        status="Processing complete",
        current=total_items,
        total=total_items,
        meta={"total_processed": len(results)}
    )
    await broker.result_backend.set_progress(
        context.message.task_id,
        progress
    )
    
    return {"processed_count": len(results), "results": results}

# Check progress from client
async def check_task_progress(task_result):
    task_id = task_result.task_id
    
    while not await task_result.is_ready():
        progress = await broker.result_backend.get_progress(task_id)
        if progress:
            print(f"Status: {progress.status}")
            print(f"Progress: {progress.progress * 100:.1f}%")
            if progress.current and progress.total:
                print(f"Items: {progress.current}/{progress.total}")
        
        await asyncio.sleep(2)
    
    final_result = await task_result.wait_result()
    print(f"Task completed: {final_result}")

Custom Result Backend Implementation

import json
import aiofiles
from pathlib import Path

class FileResultBackend(AsyncResultBackend):
    """Custom file-based result backend."""
    
    def __init__(self, storage_dir: str = "./task_results"):
        self.storage_dir = Path(storage_dir)
        self.storage_dir.mkdir(exist_ok=True)
    
    def _get_result_path(self, task_id: str) -> Path:
        return self.storage_dir / f"{task_id}.json"
    
    def _get_progress_path(self, task_id: str) -> Path:
        return self.storage_dir / f"{task_id}_progress.json"
    
    async def set_result(
        self,
        task_id: str,
        result: TaskiqResult,
    ) -> None:
        """Store result as JSON file."""
        result_data = {
            "is_err": result.is_err,
            "return_value": result.return_value,
            "execution_time": result.execution_time,
            "labels": result.labels,
            "error": str(result.error) if result.error else None,
        }
        
        result_path = self._get_result_path(task_id)
        async with aiofiles.open(result_path, 'w') as f:
            await f.write(json.dumps(result_data, default=str))
    
    async def get_result(
        self,
        task_id: str,
        with_logs: bool = True,
    ) -> TaskiqResult:
        """Load result from JSON file."""
        result_path = self._get_result_path(task_id)
        
        if not result_path.exists():
            raise ResultGetError(f"Result not found for task {task_id}")
        
        async with aiofiles.open(result_path, 'r') as f:
            data = json.loads(await f.read())
        
        # Reconstruct TaskiqResult
        result = TaskiqResult(
            is_err=data["is_err"],
            return_value=data["return_value"],
            execution_time=data["execution_time"],
            labels=data["labels"],
        )
        
        if data["error"]:
            result.error = Exception(data["error"])
        
        return result
    
    async def is_result_ready(self, task_id: str) -> bool:
        """Check if result file exists."""
        return self._get_result_path(task_id).exists()
    
    async def delete_result(self, task_id: str) -> None:
        """Delete result and progress files."""
        result_path = self._get_result_path(task_id)
        progress_path = self._get_progress_path(task_id)
        
        if result_path.exists():
            result_path.unlink()
        if progress_path.exists():
            progress_path.unlink()

# Use custom backend
custom_backend = FileResultBackend("./my_results")
broker = InMemoryBroker().with_result_backend(custom_backend)

Result Backend with External Storage

# Example configuration for external result backends
# (These would be separate packages like taskiq-redis, taskiq-postgres)

# Redis result backend
from taskiq_redis import RedisResultBackend
redis_backend = RedisResultBackend("redis://localhost:6379/0")

# PostgreSQL result backend  
from taskiq_postgres import PostgresResultBackend
postgres_backend = PostgresResultBackend(
    dsn="postgresql://user:password@localhost/taskiq_results"
)

# MongoDB result backend
from taskiq_mongo import MongoResultBackend
mongo_backend = MongoResultBackend(
    connection_string="mongodb://localhost:27017",
    database="taskiq_results"
)

# Configure broker with external backend
broker = ZeroMQBroker("tcp://localhost:5555").with_result_backend(redis_backend)

Types

class ResultGetError(TaskiqError):
    """Exception raised when result cannot be retrieved."""

class ResultIsReadyError(TaskiqError):
    """Exception raised when checking result readiness fails."""

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq

docs

brokers.md

events-state.md

exceptions.md

index.md

middleware.md

result-backends.md

scheduling.md

tasks-results.md

tile.json