Distributed task queue with full async support for Python applications
Storage systems for persisting task results and progress tracking across distributed environments. Result backends enable result retrieval after task completion and provide durability for long-running distributed workflows.
Abstract base class defining the interface for result storage and retrieval systems.
class AsyncResultBackend:
"""
Abstract base class for result storage backends.
Provides interface for storing and retrieving task results,
checking result availability, and managing backend lifecycle.
"""
async def startup(self) -> None:
"""Initialize the result backend (connect to storage, etc.)."""
async def shutdown(self) -> None:
"""Cleanup result backend resources."""
async def set_result(
self,
task_id: str,
result: TaskiqResult,
) -> None:
"""
Store task result.
Args:
task_id: Unique identifier for the task
result: Task execution result to store
"""
async def get_result(
self,
task_id: str,
with_logs: bool = True,
) -> TaskiqResult:
"""
Retrieve stored task result.
Args:
task_id: Unique identifier for the task
with_logs: Whether to include execution logs
Returns:
Stored task result
Raises:
ResultGetError: If result cannot be retrieved
"""
async def is_result_ready(self, task_id: str) -> bool:
"""
Check if result is available for retrieval.
Args:
task_id: Unique identifier for the task
Returns:
True if result is ready, False otherwise
"""
async def delete_result(self, task_id: str) -> None:
"""
Delete stored result.
Args:
task_id: Unique identifier for the task
"""
async def get_progress(
self,
task_id: str,
) -> Optional[TaskProgress]:
"""
Get task progress information.
Args:
task_id: Unique identifier for the task
Returns:
Progress information if available
"""
async def set_progress(
self,
task_id: str,
progress: TaskProgress,
) -> None:
"""
Store task progress information.
Args:
task_id: Unique identifier for the task
progress: Progress information to store
"""No-operation result backend for development and testing when result persistence is not needed.
class DummyResultBackend(AsyncResultBackend):
"""
Dummy result backend that doesn't store results.
Useful for development and testing scenarios where
result persistence is not required. All operations
are no-ops.
"""
def __init__(self) -> None: ...
async def set_result(
self,
task_id: str,
result: TaskiqResult,
) -> None:
"""No-op result storage."""
async def get_result(
self,
task_id: str,
with_logs: bool = True,
) -> TaskiqResult:
"""
Always raises ResultGetError.
Raises:
ResultGetError: Always, since no results are stored
"""
async def is_result_ready(self, task_id: str) -> bool:
"""Always returns False."""
async def delete_result(self, task_id: str) -> None:
"""No-op result deletion."""Built-in result backend that stores results in memory with configurable size limits.
class InmemoryResultBackend(AsyncResultBackend):
"""
In-memory result backend with size limits.
Stores results in memory with automatic cleanup
to prevent memory exhaustion. Suitable for
development and single-process applications.
"""
def __init__(self, max_stored_results: int = 100) -> None:
"""
Initialize in-memory backend.
Args:
max_stored_results: Maximum number of results to store.
Set to -1 for unlimited storage.
"""
async def set_result(
self,
task_id: str,
result: TaskiqResult,
) -> None:
"""Store result in memory with automatic cleanup."""
async def get_result(
self,
task_id: str,
with_logs: bool = True,
) -> TaskiqResult:
"""Retrieve result from memory."""
async def is_result_ready(self, task_id: str) -> bool:
"""Check if result exists in memory."""
async def delete_result(self, task_id: str) -> None:
"""Remove result from memory."""Progress tracking system for long-running tasks that need to report intermediate status.
class TaskProgress:
"""
Task progress information container.
Used to track and report progress of long-running tasks
including completion percentage, current status, and metadata.
"""
progress: float
"""Progress percentage (0.0 to 1.0)."""
status: str
"""Human-readable status description."""
current: Optional[int]
"""Current item being processed."""
total: Optional[int]
"""Total number of items to process."""
meta: Dict[str, Any]
"""Additional progress metadata."""
def __init__(
self,
progress: float = 0.0,
status: str = "PENDING",
current: Optional[int] = None,
total: Optional[int] = None,
meta: Optional[Dict[str, Any]] = None,
) -> None: ...
def to_dict(self) -> Dict[str, Any]:
"""Convert progress to dictionary representation."""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TaskProgress":
"""Create progress from dictionary representation."""from taskiq import InMemoryBroker
from taskiq.result_backends import InmemoryResultBackend
# Create broker with custom result backend
result_backend = InmemoryResultBackend(max_stored_results=500)
broker = InMemoryBroker().with_result_backend(result_backend)
@broker.task
async def compute_fibonacci(n: int) -> int:
"""Compute fibonacci number."""
if n <= 1:
return n
return await compute_fibonacci(n-1) + await compute_fibonacci(n-2)
async def main():
await broker.startup()
# Execute task and store result
result = await compute_fibonacci.kiq(10)
# Result is automatically stored in backend
value = await result.wait_result()
print(f"Fibonacci(10) = {value}")
await broker.shutdown()from taskiq import Context, TaskiqDepends
from taskiq.depends.progress_tracker import TaskProgress
@broker.task
async def process_large_dataset(
dataset_path: str,
context: Context = TaskiqDepends(),
) -> dict:
"""Process large dataset with progress tracking."""
# Load dataset
items = await load_dataset(dataset_path)
total_items = len(items)
processed = 0
# Initialize progress
progress = TaskProgress(
progress=0.0,
status="Starting processing",
current=0,
total=total_items,
meta={"dataset": dataset_path}
)
await broker.result_backend.set_progress(
context.message.task_id,
progress
)
results = []
for i, item in enumerate(items):
# Process item
result = await process_item(item)
results.append(result)
processed += 1
# Update progress every 10 items
if processed % 10 == 0:
progress = TaskProgress(
progress=processed / total_items,
status=f"Processed {processed}/{total_items} items",
current=processed,
total=total_items,
meta={"last_processed": item["id"]}
)
await broker.result_backend.set_progress(
context.message.task_id,
progress
)
# Final progress update
progress = TaskProgress(
progress=1.0,
status="Processing complete",
current=total_items,
total=total_items,
meta={"total_processed": len(results)}
)
await broker.result_backend.set_progress(
context.message.task_id,
progress
)
return {"processed_count": len(results), "results": results}
# Check progress from client
async def check_task_progress(task_result):
task_id = task_result.task_id
while not await task_result.is_ready():
progress = await broker.result_backend.get_progress(task_id)
if progress:
print(f"Status: {progress.status}")
print(f"Progress: {progress.progress * 100:.1f}%")
if progress.current and progress.total:
print(f"Items: {progress.current}/{progress.total}")
await asyncio.sleep(2)
final_result = await task_result.wait_result()
print(f"Task completed: {final_result}")import json
import aiofiles
from pathlib import Path
class FileResultBackend(AsyncResultBackend):
"""Custom file-based result backend."""
def __init__(self, storage_dir: str = "./task_results"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
def _get_result_path(self, task_id: str) -> Path:
return self.storage_dir / f"{task_id}.json"
def _get_progress_path(self, task_id: str) -> Path:
return self.storage_dir / f"{task_id}_progress.json"
async def set_result(
self,
task_id: str,
result: TaskiqResult,
) -> None:
"""Store result as JSON file."""
result_data = {
"is_err": result.is_err,
"return_value": result.return_value,
"execution_time": result.execution_time,
"labels": result.labels,
"error": str(result.error) if result.error else None,
}
result_path = self._get_result_path(task_id)
async with aiofiles.open(result_path, 'w') as f:
await f.write(json.dumps(result_data, default=str))
async def get_result(
self,
task_id: str,
with_logs: bool = True,
) -> TaskiqResult:
"""Load result from JSON file."""
result_path = self._get_result_path(task_id)
if not result_path.exists():
raise ResultGetError(f"Result not found for task {task_id}")
async with aiofiles.open(result_path, 'r') as f:
data = json.loads(await f.read())
# Reconstruct TaskiqResult
result = TaskiqResult(
is_err=data["is_err"],
return_value=data["return_value"],
execution_time=data["execution_time"],
labels=data["labels"],
)
if data["error"]:
result.error = Exception(data["error"])
return result
async def is_result_ready(self, task_id: str) -> bool:
"""Check if result file exists."""
return self._get_result_path(task_id).exists()
async def delete_result(self, task_id: str) -> None:
"""Delete result and progress files."""
result_path = self._get_result_path(task_id)
progress_path = self._get_progress_path(task_id)
if result_path.exists():
result_path.unlink()
if progress_path.exists():
progress_path.unlink()
# Use custom backend
custom_backend = FileResultBackend("./my_results")
broker = InMemoryBroker().with_result_backend(custom_backend)# Example configuration for external result backends
# (These would be separate packages like taskiq-redis, taskiq-postgres)
# Redis result backend
from taskiq_redis import RedisResultBackend
redis_backend = RedisResultBackend("redis://localhost:6379/0")
# PostgreSQL result backend
from taskiq_postgres import PostgresResultBackend
postgres_backend = PostgresResultBackend(
dsn="postgresql://user:password@localhost/taskiq_results"
)
# MongoDB result backend
from taskiq_mongo import MongoResultBackend
mongo_backend = MongoResultBackend(
connection_string="mongodb://localhost:27017",
database="taskiq_results"
)
# Configure broker with external backend
broker = ZeroMQBroker("tcp://localhost:5555").with_result_backend(redis_backend)class ResultGetError(TaskiqError):
"""Exception raised when result cannot be retrieved."""
class ResultIsReadyError(TaskiqError):
"""Exception raised when checking result readiness fails."""Install with Tessl CLI
npx tessl i tessl/pypi-taskiq