Distributed task queue with full async support for Python applications
Comprehensive exception hierarchy for handling various error conditions in distributed task processing. Provides specific error types for different failure scenarios to enable precise error handling and debugging.
Core exception hierarchy providing the foundation for all taskiq-specific errors.
class TaskiqError(Exception):
"""
Base exception class for all taskiq-related errors.
All taskiq-specific exceptions inherit from this class,
allowing for broad exception handling when needed.
"""
def __init__(self, message: str = "") -> None:
"""
Initialize taskiq error.
Args:
message: Error description
"""Exceptions related to task result retrieval and management.
class NoResultError(TaskiqError):
"""
Exception raised when no result is available.
Typically raised by task context methods like requeue()
to indicate that no result should be stored for the current
task execution.
"""
class ResultGetError(TaskiqError):
"""
Exception raised when task result cannot be retrieved.
Occurs when attempting to get results from result backend
fails due to storage issues, missing results, or backend errors.
"""
class ResultIsReadyError(TaskiqError):
"""
Exception raised when checking result readiness fails.
Occurs when the result backend cannot determine if a
result is ready for retrieval.
"""
class TaskiqResultTimeoutError(TaskiqError):
"""
Exception raised when waiting for result times out.
Thrown by wait_result() when the specified timeout
is exceeded before the task completes.
"""
def __init__(
self,
task_id: str,
timeout: float,
message: str = "",
) -> None:
"""
Initialize timeout error.
Args:
task_id: ID of the task that timed out
timeout: Timeout duration that was exceeded
message: Additional error description
"""Exceptions related to task execution and processing.
class SendTaskError(TaskiqError):
"""
Exception raised when task cannot be sent to broker.
Occurs when broker fails to accept or queue a task
due to broker unavailability, queue full conditions,
or serialization errors.
"""
class TaskRejectedError(TaskiqError):
"""
Exception raised when task is explicitly rejected.
Thrown by context.reject() to indicate that the
current task should be rejected and not retried.
"""
class UnknownTaskError(TaskiqError):
"""
Exception raised when attempting to execute unknown task.
Occurs when a worker receives a task that is not
registered in the broker's task registry.
"""
def __init__(self, task_name: str, message: str = "") -> None:
"""
Initialize unknown task error.
Args:
task_name: Name of the unknown task
message: Additional error description
"""Exceptions related to security, authentication, and data validation.
class SecurityError(TaskiqError):
"""
Exception raised for security-related issues.
Occurs when security validation fails, such as
unauthorized access attempts, invalid signatures,
or security policy violations.
"""
class TaskBrokerMismatchError(TaskiqError):
"""
Exception raised when task is registered to wrong broker.
Occurs when attempting to register a task that was
already registered to a different broker instance.
"""
def __init__(self, broker: "AsyncBroker", message: str = "") -> None:
"""
Initialize broker mismatch error.
Args:
broker: The broker that owns the task
message: Additional error description
"""Exceptions related to task scheduling and timing.
class ScheduledTaskCancelledError(TaskiqError):
"""
Exception raised when scheduled task is cancelled.
Thrown by schedule sources to prevent execution
of scheduled tasks based on custom conditions.
"""from taskiq import InMemoryBroker, TaskiqError, ResultGetError
broker = InMemoryBroker()
@broker.task
async def risky_operation(value: int) -> int:
"""Task that might fail."""
if value < 0:
raise ValueError("Negative values not supported")
if value > 100:
raise RuntimeError("Value too large")
return value * 2
async def handle_task_execution():
"""Example of handling task execution errors."""
try:
# Execute task
result = await risky_operation.kiq(-5)
value = await result.wait_result(timeout=10.0)
print(f"Success: {value}")
except TaskiqResultTimeoutError as e:
print(f"Task timed out after {e.timeout} seconds")
except ValueError as e:
print(f"Task failed with validation error: {e}")
except RuntimeError as e:
print(f"Task failed with runtime error: {e}")
except TaskiqError as e:
print(f"Taskiq system error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")from taskiq.exceptions import ResultGetError, ResultIsReadyError
async def safe_result_retrieval(task_result):
"""Safely retrieve task result with error handling."""
task_id = task_result.task_id
try:
# Check if result is ready
is_ready = await task_result.is_ready()
if not is_ready:
print("Result not ready yet")
return None
except ResultIsReadyError as e:
print(f"Cannot check result status: {e}")
return None
try:
# Get the result
result = await task_result.wait_result(timeout=30.0)
return result
except ResultGetError as e:
print(f"Failed to retrieve result: {e}")
return None
except TaskiqResultTimeoutError as e:
print(f"Result retrieval timed out: {e}")
return Nonefrom taskiq.exceptions import TaskBrokerMismatchError, UnknownTaskError
broker1 = InMemoryBroker()
broker2 = InMemoryBroker()
@broker1.task
async def my_task(x: int) -> int:
return x * 2
async def handle_registration_errors():
"""Handle task registration issues."""
try:
# This will fail - task already registered to broker1
broker2.register_task(my_task.original_func, "my_task")
except TaskBrokerMismatchError as e:
print(f"Task already registered to different broker: {e}")
# Handle unknown task execution
try:
# Simulate receiving unknown task in worker
unknown_task = broker1.find_task("nonexistent_task")
if unknown_task is None:
raise UnknownTaskError("nonexistent_task")
except UnknownTaskError as e:
print(f"Unknown task requested: {e}")from taskiq import Context, TaskiqDepends
from taskiq.exceptions import NoResultError, TaskRejectedError
@broker.task
async def conditional_task(
data: dict,
context: Context = TaskiqDepends(),
) -> dict:
"""Task with conditional processing and context control."""
# Validate input data
if not data.get("valid", True):
print("Invalid data, rejecting task")
context.reject() # Raises TaskRejectedError
# Check if requeue is needed
if data.get("needs_retry", False):
retry_count = int(context.message.labels.get("retry_count", 0))
if retry_count < 3:
print(f"Requeuing task (attempt {retry_count + 1})")
context.message.labels["retry_count"] = str(retry_count + 1)
await context.requeue() # Raises NoResultError
# Process data normally
return {"processed": data, "status": "success"}
async def handle_context_exceptions():
"""Handle context-related exceptions."""
try:
result = await conditional_task.kiq({"valid": False})
value = await result.wait_result()
except TaskRejectedError:
print("Task was rejected due to invalid data")
except NoResultError:
print("Task was requeued, no result available")from taskiq.exceptions import ScheduledTaskCancelledError
from taskiq.scheduler import ScheduleSource, ScheduledTask
class ConditionalScheduleSource(ScheduleSource):
"""Schedule source with conditional task execution."""
async def pre_send(self, task: ScheduledTask) -> None:
"""Check conditions before task execution."""
# Check system load
if await self._system_overloaded():
raise ScheduledTaskCancelledError(
f"System overloaded, cancelling {task.task_name}"
)
# Check maintenance window
if await self._in_maintenance_window():
raise ScheduledTaskCancelledError(
f"In maintenance window, cancelling {task.task_name}"
)
async def _system_overloaded(self) -> bool:
# Check system metrics
import psutil
return psutil.cpu_percent() > 90
async def _in_maintenance_window(self) -> bool:
# Check if current time is in maintenance window
from datetime import datetime, time
now = datetime.now().time()
return time(2, 0) <= now <= time(4, 0) # 2-4 AM maintenance
# Scheduler will handle ScheduledTaskCancelledError gracefully
schedule_source = ConditionalScheduleSource()
scheduler = TaskiqScheduler(broker, [schedule_source])class DataValidationError(TaskiqError):
"""Custom exception for data validation failures."""
def __init__(self, field: str, value: Any, message: str = "") -> None:
self.field = field
self.value = value
super().__init__(f"Validation failed for {field}={value}: {message}")
class ExternalServiceError(TaskiqError):
"""Custom exception for external service failures."""
def __init__(self, service: str, status_code: int, message: str = "") -> None:
self.service = service
self.status_code = status_code
super().__init__(f"{service} error (HTTP {status_code}): {message}")
@broker.task
async def process_user_data(user_data: dict) -> dict:
"""Task with custom exception handling."""
# Validate required fields
if "email" not in user_data:
raise DataValidationError("email", None, "Email is required")
if not user_data["email"].endswith("@company.com"):
raise DataValidationError(
"email",
user_data["email"],
"Must be company email"
)
# Call external service
try:
response = await external_api_call(user_data)
if response.status_code != 200:
raise ExternalServiceError(
"UserService",
response.status_code,
response.text
)
except httpx.TimeoutException:
raise ExternalServiceError("UserService", 0, "Request timeout")
return {"processed": True, "user_id": response.json()["id"]}
# Handle custom exceptions
async def handle_custom_exceptions():
try:
result = await process_user_data.kiq({"name": "John"})
await result.wait_result()
except DataValidationError as e:
print(f"Data validation failed: {e}")
print(f"Field: {e.field}, Value: {e.value}")
except ExternalServiceError as e:
print(f"External service error: {e}")
print(f"Service: {e.service}, Status: {e.status_code}")Install with Tessl CLI
npx tessl i tessl/pypi-taskiq