Distributed task queue with full async support for Python applications
Task execution system that handles the lifecycle of tasks from creation to result retrieval. This includes decorated task wrappers, result containers, execution context, and utilities for working with task outcomes.
Tasks are created by decorating functions with the broker's @task decorator, which converts them into distributed task objects that can be executed asynchronously.
class AsyncTaskiqDecoratedTask:
"""
Decorated task wrapper that enables distributed execution.
Created automatically when using @broker.task decorator.
Provides methods for sending tasks to workers and calling locally.
"""
task_name: str
broker: AsyncBroker
labels: Dict[str, Any]
async def kiq(self, *args, **kwargs) -> TaskiqResult:
"""
Send task to broker for distributed execution.
Args:
*args: Positional arguments for the task function
**kwargs: Keyword arguments for the task function
Returns:
TaskiqResult object for retrieving the result
"""
async def __call__(self, *args, **kwargs) -> Any:
"""
Execute task locally in current process.
Args:
*args: Positional arguments for the task function
**kwargs: Keyword arguments for the task function
Returns:
Direct result of task function execution
"""
def kicker(self) -> AsyncKicker:
"""
Get kicker object for advanced task configuration.
Kicker allows modifying task parameters before sending.
Returns:
AsyncKicker instance for this task
"""
async def schedule_by_cron(
self,
source: ScheduleSource,
cron: Union[str, CronSpec],
*args,
**kwargs,
) -> CreatedSchedule:
"""
Schedule task to run on cron pattern.
Args:
source: Schedule source that supports dynamic scheduling
cron: Cron string or CronSpec instance
*args: Positional arguments for the task function
**kwargs: Keyword arguments for the task function
Returns:
CreatedSchedule object with schedule details
"""
async def schedule_by_time(
self,
source: ScheduleSource,
time: datetime,
*args,
**kwargs,
) -> CreatedSchedule:
"""
Schedule task to run at specific time.
Args:
source: Schedule source that supports dynamic scheduling
time: Specific datetime to run the task
*args: Positional arguments for the task function
**kwargs: Keyword arguments for the task function
Returns:
CreatedSchedule object with schedule details
"""
class AsyncTaskiqTask:
"""
Task execution wrapper for handling async task invocation.
"""
def __init__(
self,
task_name: str,
broker: AsyncBroker,
labels: Optional[Dict[str, Any]] = None,
) -> None: ...
async def kiq(self, *args, **kwargs) -> TaskiqResult:
"""Send task for execution and return result handle."""
async def is_ready(self) -> bool:
"""
Check if task result is ready.
Returns:
True if task is completed, False otherwise
Raises:
ResultIsReadyError: If unable to check task readiness
"""
async def get_result(self, with_logs: bool = False) -> TaskiqResult:
"""
Get task result from result backend.
Args:
with_logs: Whether to fetch execution logs
Returns:
TaskiqResult with execution outcome
Raises:
ResultGetError: If unable to retrieve result
"""
async def wait_result(
self,
check_interval: float = 0.2,
timeout: float = -1.0,
with_logs: bool = False,
) -> TaskiqResult:
"""
Wait for task completion and return result.
Args:
check_interval: How often to check for completion (seconds)
timeout: Maximum time to wait (-1 for no timeout)
with_logs: Whether to fetch execution logs
Returns:
TaskiqResult with execution outcome
Raises:
TaskiqResultTimeoutError: If timeout is exceeded
"""
async def get_progress(self) -> Optional[TaskProgress[Any]]:
"""
Get current task execution progress.
Returns:
TaskProgress object or None if no progress tracking
"""Result objects provide access to task execution outcomes, including return values, errors, execution metadata, and status checking.
class TaskiqResult:
"""
Container for task execution results and metadata.
Supports both successful results and error conditions,
along with execution timing and custom labels.
"""
is_err: bool
"""Whether the task execution resulted in an error."""
return_value: Any
"""The return value from successful task execution."""
execution_time: float
"""Task execution time in seconds."""
labels: Dict[str, Any]
"""Custom labels attached to the task result."""
error: Optional[BaseException]
"""Exception object if task execution failed."""
log: Optional[str]
"""Deprecated: Task execution logs (may be removed in future)."""
async def wait_result(
self,
timeout: Optional[float] = None,
check_interval: float = 0.5,
) -> Any:
"""
Wait for task completion and return the result.
Args:
timeout: Maximum time to wait in seconds
check_interval: How often to check for completion
Returns:
The task return value
Raises:
TaskiqResultTimeoutError: If timeout is exceeded
Exception: Any exception raised by the task
"""
async def is_ready(self) -> bool:
"""
Check if task result is available.
Returns:
True if result is ready, False otherwise
"""
def __await__(self):
"""Enable direct awaiting of TaskiqResult objects."""
def raise_for_error(self) -> TaskiqResult:
"""
Raise exception if task resulted in error.
Returns:
Self if no error occurred
Raises:
Exception: The original task exception if is_err is True
"""Context objects provide task execution environment information and control capabilities during task processing.
class Context:
"""
Task execution context providing access to message data,
broker instance, and task control operations.
"""
message: TaskiqMessage
"""The original task message with metadata."""
broker: AsyncBroker
"""Broker instance executing this task."""
state: TaskiqState
"""Shared state object for the broker."""
def __init__(self, message: TaskiqMessage, broker: AsyncBroker) -> None: ...
async def requeue(self) -> None:
"""
Requeue the current task for later execution.
Increments requeue counter and sends task back to broker.
Always raises NoResultError to prevent result storage.
Raises:
NoResultError: Always raised to stop current execution
"""
def reject(self) -> None:
"""
Reject the current task and prevent reprocessing.
Always raises TaskRejectedError to mark task as rejected.
Raises:
TaskRejectedError: Always raised to reject the task
"""Utility functions for working with multiple task results concurrently.
async def gather(
*tasks: AsyncTaskiqTask[Any],
timeout: float = -1,
with_logs: bool = False,
periodicity: float = 0.1,
) -> Tuple[TaskiqResult[Any], ...]:
"""
Wait for multiple task results concurrently.
Similar to asyncio.gather but works with AsyncTaskiqTask objects.
Args:
*tasks: AsyncTaskiqTask objects to wait for
timeout: Maximum time to wait in seconds, -1 for no timeout
with_logs: Whether to fetch logs from result backend
periodicity: How often to check for task completion
Returns:
Tuple of TaskiqResult objects in the same order as input tasks
Raises:
TaskiqResultTimeoutError: If timeout is exceeded
"""from taskiq import InMemoryBroker
broker = InMemoryBroker()
@broker.task
async def calculate_sum(numbers: List[int]) -> int:
"""Calculate sum of numbers with artificial delay."""
await asyncio.sleep(1) # Simulate work
return sum(numbers)
# Execute task
async def main():
await broker.startup()
# Send task for execution
result = await calculate_sum.kiq([1, 2, 3, 4, 5])
# Wait for result
total = await result.wait_result(timeout=10.0)
print(f"Sum: {total}") # Sum: 15
await broker.shutdown()@broker.task
async def risky_operation(value: int) -> int:
if value < 0:
raise ValueError("Negative values not allowed")
return value * 2
async def handle_results():
result = await risky_operation.kiq(-5)
# Check if result is ready
if await result.is_ready():
try:
value = await result.wait_result()
print(f"Success: {value}")
except ValueError as e:
print(f"Task failed: {e}")
# Inspect result metadata
print(f"Execution time: {result.execution_time}s")
print(f"Had error: {result.is_err}")
if result.error:
print(f"Error type: {type(result.error).__name__}")from taskiq import Context, TaskiqDepends
@broker.task
async def context_aware_task(
data: str,
context: Context = TaskiqDepends(),
) -> str:
"""Task that uses execution context."""
# Access task metadata
task_id = context.message.task_id
requeue_count = context.message.labels.get("X-Taskiq-requeue", "0")
# Conditional requeue logic
if data == "retry_me" and int(requeue_count) < 2:
print(f"Requeuing task {task_id} (attempt {int(requeue_count) + 1})")
await context.requeue()
# Reject invalid data
if data == "invalid":
context.reject()
return f"Processed: {data} (ID: {task_id})"from taskiq import gather
@broker.task
async def fetch_data(url: str) -> dict:
# Simulate API call
await asyncio.sleep(random.uniform(0.5, 2.0))
return {"url": url, "data": f"content from {url}"}
async def fetch_multiple_sources():
# Start multiple tasks
tasks = [
fetch_data.kiq("https://api1.example.com"),
fetch_data.kiq("https://api2.example.com"),
fetch_data.kiq("https://api3.example.com"),
]
# Wait for all results
results = await gather(*tasks)
# Process combined results
all_data = {}
for result in results:
all_data[result["url"]] = result["data"]
return all_dataclass TaskiqMessage:
"""Message format for task data and metadata."""
task_id: str
task_name: str
labels: Dict[str, Any]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
class AsyncKicker:
"""Kicker object for advanced task parameter configuration."""
def __init__(
self,
task_name: str,
broker: AsyncBroker,
labels: Dict[str, Any],
return_type: Optional[Type[Any]] = None,
) -> None: ...
async def kiq(self, *args, **kwargs) -> AsyncTaskiqTask[Any]: ...
async def schedule_by_cron(
self,
source: ScheduleSource,
cron: Union[str, CronSpec],
*args,
**kwargs,
) -> CreatedSchedule[Any]: ...
async def schedule_by_time(
self,
source: ScheduleSource,
time: datetime,
*args,
**kwargs,
) -> CreatedSchedule[Any]: ...
class CreatedSchedule:
"""Container for created schedule information."""
schedule_id: str
source: ScheduleSource
CronSpec = str # Type alias for cron specification strings
class TaskProgress:
"""Progress tracking container for long-running tasks."""
def __init__(self, current: int, total: int, message: str = "") -> None: ...
current: int
"""Current progress value."""
total: int
"""Total expected value."""
message: str
"""Optional progress message."""Install with Tessl CLI
npx tessl i tessl/pypi-taskiq