CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq

Distributed task queue with full async support for Python applications

Overview
Eval results
Files

exceptions.mddocs/

Exceptions

Comprehensive exception hierarchy for handling various error conditions in distributed task processing. Provides specific error types for different failure scenarios to enable precise error handling and debugging.

Capabilities

Base Exception Classes

Core exception hierarchy providing the foundation for all taskiq-specific errors.

class TaskiqError(Exception):
    """
    Base exception class for all taskiq-related errors.
    
    All taskiq-specific exceptions inherit from this class,
    allowing for broad exception handling when needed.
    """
    
    def __init__(self, message: str = "") -> None:
        """
        Initialize taskiq error.
        
        Args:
            message: Error description
        """

Result-Related Exceptions

Exceptions related to task result retrieval and management.

class NoResultError(TaskiqError):
    """
    Exception raised when no result is available.
    
    Typically raised by task context methods like requeue()
    to indicate that no result should be stored for the current
    task execution.
    """

class ResultGetError(TaskiqError):
    """
    Exception raised when task result cannot be retrieved.
    
    Occurs when attempting to get results from result backend
    fails due to storage issues, missing results, or backend errors.
    """

class ResultIsReadyError(TaskiqError):
    """
    Exception raised when checking result readiness fails.
    
    Occurs when the result backend cannot determine if a
    result is ready for retrieval.
    """

class TaskiqResultTimeoutError(TaskiqError):
    """
    Exception raised when waiting for result times out.
    
    Thrown by wait_result() when the specified timeout
    is exceeded before the task completes.
    """
    
    def __init__(
        self,
        task_id: str,
        timeout: float,
        message: str = "",
    ) -> None:
        """
        Initialize timeout error.
        
        Args:
            task_id: ID of the task that timed out
            timeout: Timeout duration that was exceeded
            message: Additional error description
        """

Task Execution Exceptions

Exceptions related to task execution and processing.

class SendTaskError(TaskiqError):
    """
    Exception raised when task cannot be sent to broker.
    
    Occurs when broker fails to accept or queue a task
    due to broker unavailability, queue full conditions,
    or serialization errors.
    """

class TaskRejectedError(TaskiqError):
    """
    Exception raised when task is explicitly rejected.
    
    Thrown by context.reject() to indicate that the
    current task should be rejected and not retried.
    """

class UnknownTaskError(TaskiqError):
    """
    Exception raised when attempting to execute unknown task.
    
    Occurs when a worker receives a task that is not
    registered in the broker's task registry.
    """
    
    def __init__(self, task_name: str, message: str = "") -> None:
        """
        Initialize unknown task error.
        
        Args:
            task_name: Name of the unknown task
            message: Additional error description
        """

Security and Validation Exceptions

Exceptions related to security, authentication, and data validation.

class SecurityError(TaskiqError):
    """
    Exception raised for security-related issues.
    
    Occurs when security validation fails, such as
    unauthorized access attempts, invalid signatures,
    or security policy violations.
    """

class TaskBrokerMismatchError(TaskiqError):
    """
    Exception raised when task is registered to wrong broker.
    
    Occurs when attempting to register a task that was
    already registered to a different broker instance.
    """
    
    def __init__(self, broker: "AsyncBroker", message: str = "") -> None:
        """
        Initialize broker mismatch error.
        
        Args:
            broker: The broker that owns the task
            message: Additional error description
        """

Scheduling Exceptions

Exceptions related to task scheduling and timing.

class ScheduledTaskCancelledError(TaskiqError):
    """
    Exception raised when scheduled task is cancelled.
    
    Thrown by schedule sources to prevent execution
    of scheduled tasks based on custom conditions.
    """

Usage Examples

Basic Exception Handling

from taskiq import InMemoryBroker, TaskiqError, ResultGetError

broker = InMemoryBroker()

@broker.task
async def risky_operation(value: int) -> int:
    """Task that might fail."""
    if value < 0:
        raise ValueError("Negative values not supported")
    if value > 100:
        raise RuntimeError("Value too large")
    return value * 2

async def handle_task_execution():
    """Example of handling task execution errors."""
    try:
        # Execute task
        result = await risky_operation.kiq(-5)
        value = await result.wait_result(timeout=10.0)
        print(f"Success: {value}")
        
    except TaskiqResultTimeoutError as e:
        print(f"Task timed out after {e.timeout} seconds")
        
    except ValueError as e:
        print(f"Task failed with validation error: {e}")
        
    except RuntimeError as e:
        print(f"Task failed with runtime error: {e}")
        
    except TaskiqError as e:
        print(f"Taskiq system error: {e}")
        
    except Exception as e:
        print(f"Unexpected error: {e}")

Result Backend Error Handling

from taskiq.exceptions import ResultGetError, ResultIsReadyError

async def safe_result_retrieval(task_result):
    """Safely retrieve task result with error handling."""
    task_id = task_result.task_id
    
    try:
        # Check if result is ready
        is_ready = await task_result.is_ready()
        if not is_ready:
            print("Result not ready yet")
            return None
            
    except ResultIsReadyError as e:
        print(f"Cannot check result status: {e}")
        return None
    
    try:
        # Get the result
        result = await task_result.wait_result(timeout=30.0)
        return result
        
    except ResultGetError as e:
        print(f"Failed to retrieve result: {e}")
        return None
        
    except TaskiqResultTimeoutError as e:
        print(f"Result retrieval timed out: {e}")
        return None

Task Registration Error Handling

from taskiq.exceptions import TaskBrokerMismatchError, UnknownTaskError

broker1 = InMemoryBroker()
broker2 = InMemoryBroker()

@broker1.task
async def my_task(x: int) -> int:
    return x * 2

async def handle_registration_errors():
    """Handle task registration issues."""
    try:
        # This will fail - task already registered to broker1
        broker2.register_task(my_task.original_func, "my_task")
        
    except TaskBrokerMismatchError as e:
        print(f"Task already registered to different broker: {e}")
    
    # Handle unknown task execution
    try:
        # Simulate receiving unknown task in worker
        unknown_task = broker1.find_task("nonexistent_task")
        if unknown_task is None:
            raise UnknownTaskError("nonexistent_task")
            
    except UnknownTaskError as e:
        print(f"Unknown task requested: {e}")

Context Exception Handling

from taskiq import Context, TaskiqDepends
from taskiq.exceptions import NoResultError, TaskRejectedError

@broker.task
async def conditional_task(
    data: dict,
    context: Context = TaskiqDepends(),
) -> dict:
    """Task with conditional processing and context control."""
    
    # Validate input data
    if not data.get("valid", True):
        print("Invalid data, rejecting task")
        context.reject()  # Raises TaskRejectedError
    
    # Check if requeue is needed
    if data.get("needs_retry", False):
        retry_count = int(context.message.labels.get("retry_count", 0))
        if retry_count < 3:
            print(f"Requeuing task (attempt {retry_count + 1})")
            context.message.labels["retry_count"] = str(retry_count + 1)
            await context.requeue()  # Raises NoResultError
    
    # Process data normally
    return {"processed": data, "status": "success"}

async def handle_context_exceptions():
    """Handle context-related exceptions."""
    try:
        result = await conditional_task.kiq({"valid": False})
        value = await result.wait_result()
        
    except TaskRejectedError:
        print("Task was rejected due to invalid data")
        
    except NoResultError:
        print("Task was requeued, no result available")

Scheduling Exception Handling

from taskiq.exceptions import ScheduledTaskCancelledError
from taskiq.scheduler import ScheduleSource, ScheduledTask

class ConditionalScheduleSource(ScheduleSource):
    """Schedule source with conditional task execution."""
    
    async def pre_send(self, task: ScheduledTask) -> None:
        """Check conditions before task execution."""
        
        # Check system load
        if await self._system_overloaded():
            raise ScheduledTaskCancelledError(
                f"System overloaded, cancelling {task.task_name}"
            )
        
        # Check maintenance window
        if await self._in_maintenance_window():
            raise ScheduledTaskCancelledError(
                f"In maintenance window, cancelling {task.task_name}"
            )
    
    async def _system_overloaded(self) -> bool:
        # Check system metrics
        import psutil
        return psutil.cpu_percent() > 90
    
    async def _in_maintenance_window(self) -> bool:
        # Check if current time is in maintenance window
        from datetime import datetime, time
        now = datetime.now().time()
        return time(2, 0) <= now <= time(4, 0)  # 2-4 AM maintenance

# Scheduler will handle ScheduledTaskCancelledError gracefully
schedule_source = ConditionalScheduleSource()
scheduler = TaskiqScheduler(broker, [schedule_source])

Custom Exception Classes

class DataValidationError(TaskiqError):
    """Custom exception for data validation failures."""
    
    def __init__(self, field: str, value: Any, message: str = "") -> None:
        self.field = field
        self.value = value
        super().__init__(f"Validation failed for {field}={value}: {message}")

class ExternalServiceError(TaskiqError):
    """Custom exception for external service failures."""
    
    def __init__(self, service: str, status_code: int, message: str = "") -> None:
        self.service = service
        self.status_code = status_code
        super().__init__(f"{service} error (HTTP {status_code}): {message}")

@broker.task
async def process_user_data(user_data: dict) -> dict:
    """Task with custom exception handling."""
    
    # Validate required fields
    if "email" not in user_data:
        raise DataValidationError("email", None, "Email is required")
    
    if not user_data["email"].endswith("@company.com"):
        raise DataValidationError(
            "email", 
            user_data["email"], 
            "Must be company email"
        )
    
    # Call external service
    try:
        response = await external_api_call(user_data)
        if response.status_code != 200:
            raise ExternalServiceError(
                "UserService",
                response.status_code,
                response.text
            )
    except httpx.TimeoutException:
        raise ExternalServiceError("UserService", 0, "Request timeout")
    
    return {"processed": True, "user_id": response.json()["id"]}

# Handle custom exceptions
async def handle_custom_exceptions():
    try:
        result = await process_user_data.kiq({"name": "John"})
        await result.wait_result()
        
    except DataValidationError as e:
        print(f"Data validation failed: {e}")
        print(f"Field: {e.field}, Value: {e.value}")
        
    except ExternalServiceError as e:
        print(f"External service error: {e}")
        print(f"Service: {e.service}, Status: {e.status_code}")

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq

docs

brokers.md

events-state.md

exceptions.md

index.md

middleware.md

result-backends.md

scheduling.md

tasks-results.md

tile.json