0
# Exceptions
1
2
Comprehensive exception hierarchy for handling various error conditions in distributed task processing. Provides specific error types for different failure scenarios to enable precise error handling and debugging.
3
4
## Capabilities
5
6
### Base Exception Classes
7
8
Core exception hierarchy providing the foundation for all taskiq-specific errors.
9
10
```python { .api }
11
class TaskiqError(Exception):
12
"""
13
Base exception class for all taskiq-related errors.
14
15
All taskiq-specific exceptions inherit from this class,
16
allowing for broad exception handling when needed.
17
"""
18
19
def __init__(self, message: str = "") -> None:
20
"""
21
Initialize taskiq error.
22
23
Args:
24
message: Error description
25
"""
26
```
27
28
### Result-Related Exceptions
29
30
Exceptions related to task result retrieval and management.
31
32
```python { .api }
33
class NoResultError(TaskiqError):
34
"""
35
Exception raised when no result is available.
36
37
Typically raised by task context methods like requeue()
38
to indicate that no result should be stored for the current
39
task execution.
40
"""
41
42
class ResultGetError(TaskiqError):
43
"""
44
Exception raised when task result cannot be retrieved.
45
46
Occurs when attempting to get results from result backend
47
fails due to storage issues, missing results, or backend errors.
48
"""
49
50
class ResultIsReadyError(TaskiqError):
51
"""
52
Exception raised when checking result readiness fails.
53
54
Occurs when the result backend cannot determine if a
55
result is ready for retrieval.
56
"""
57
58
class TaskiqResultTimeoutError(TaskiqError):
59
"""
60
Exception raised when waiting for result times out.
61
62
Thrown by wait_result() when the specified timeout
63
is exceeded before the task completes.
64
"""
65
66
def __init__(
67
self,
68
task_id: str,
69
timeout: float,
70
message: str = "",
71
) -> None:
72
"""
73
Initialize timeout error.
74
75
Args:
76
task_id: ID of the task that timed out
77
timeout: Timeout duration that was exceeded
78
message: Additional error description
79
"""
80
```
81
82
### Task Execution Exceptions
83
84
Exceptions related to task execution and processing.
85
86
```python { .api }
87
class SendTaskError(TaskiqError):
88
"""
89
Exception raised when task cannot be sent to broker.
90
91
Occurs when broker fails to accept or queue a task
92
due to broker unavailability, queue full conditions,
93
or serialization errors.
94
"""
95
96
class TaskRejectedError(TaskiqError):
97
"""
98
Exception raised when task is explicitly rejected.
99
100
Thrown by context.reject() to indicate that the
101
current task should be rejected and not retried.
102
"""
103
104
class UnknownTaskError(TaskiqError):
105
"""
106
Exception raised when attempting to execute unknown task.
107
108
Occurs when a worker receives a task that is not
109
registered in the broker's task registry.
110
"""
111
112
def __init__(self, task_name: str, message: str = "") -> None:
113
"""
114
Initialize unknown task error.
115
116
Args:
117
task_name: Name of the unknown task
118
message: Additional error description
119
"""
120
```
121
122
### Security and Validation Exceptions
123
124
Exceptions related to security, authentication, and data validation.
125
126
```python { .api }
127
class SecurityError(TaskiqError):
128
"""
129
Exception raised for security-related issues.
130
131
Occurs when security validation fails, such as
132
unauthorized access attempts, invalid signatures,
133
or security policy violations.
134
"""
135
136
class TaskBrokerMismatchError(TaskiqError):
137
"""
138
Exception raised when task is registered to wrong broker.
139
140
Occurs when attempting to register a task that was
141
already registered to a different broker instance.
142
"""
143
144
def __init__(self, broker: "AsyncBroker", message: str = "") -> None:
145
"""
146
Initialize broker mismatch error.
147
148
Args:
149
broker: The broker that owns the task
150
message: Additional error description
151
"""
152
```
153
154
### Scheduling Exceptions
155
156
Exceptions related to task scheduling and timing.
157
158
```python { .api }
159
class ScheduledTaskCancelledError(TaskiqError):
160
"""
161
Exception raised when scheduled task is cancelled.
162
163
Thrown by schedule sources to prevent execution
164
of scheduled tasks based on custom conditions.
165
"""
166
```
167
168
## Usage Examples
169
170
### Basic Exception Handling
171
172
```python
173
from taskiq import InMemoryBroker, TaskiqError, ResultGetError
174
175
broker = InMemoryBroker()
176
177
@broker.task
178
async def risky_operation(value: int) -> int:
179
"""Task that might fail."""
180
if value < 0:
181
raise ValueError("Negative values not supported")
182
if value > 100:
183
raise RuntimeError("Value too large")
184
return value * 2
185
186
async def handle_task_execution():
187
"""Example of handling task execution errors."""
188
try:
189
# Execute task
190
result = await risky_operation.kiq(-5)
191
value = await result.wait_result(timeout=10.0)
192
print(f"Success: {value}")
193
194
except TaskiqResultTimeoutError as e:
195
print(f"Task timed out after {e.timeout} seconds")
196
197
except ValueError as e:
198
print(f"Task failed with validation error: {e}")
199
200
except RuntimeError as e:
201
print(f"Task failed with runtime error: {e}")
202
203
except TaskiqError as e:
204
print(f"Taskiq system error: {e}")
205
206
except Exception as e:
207
print(f"Unexpected error: {e}")
208
```
209
210
### Result Backend Error Handling
211
212
```python
213
from taskiq.exceptions import ResultGetError, ResultIsReadyError
214
215
async def safe_result_retrieval(task_result):
216
"""Safely retrieve task result with error handling."""
217
task_id = task_result.task_id
218
219
try:
220
# Check if result is ready
221
is_ready = await task_result.is_ready()
222
if not is_ready:
223
print("Result not ready yet")
224
return None
225
226
except ResultIsReadyError as e:
227
print(f"Cannot check result status: {e}")
228
return None
229
230
try:
231
# Get the result
232
result = await task_result.wait_result(timeout=30.0)
233
return result
234
235
except ResultGetError as e:
236
print(f"Failed to retrieve result: {e}")
237
return None
238
239
except TaskiqResultTimeoutError as e:
240
print(f"Result retrieval timed out: {e}")
241
return None
242
```
243
244
### Task Registration Error Handling
245
246
```python
247
from taskiq.exceptions import TaskBrokerMismatchError, UnknownTaskError
248
249
broker1 = InMemoryBroker()
250
broker2 = InMemoryBroker()
251
252
@broker1.task
253
async def my_task(x: int) -> int:
254
return x * 2
255
256
async def handle_registration_errors():
257
"""Handle task registration issues."""
258
try:
259
# This will fail - task already registered to broker1
260
broker2.register_task(my_task.original_func, "my_task")
261
262
except TaskBrokerMismatchError as e:
263
print(f"Task already registered to different broker: {e}")
264
265
# Handle unknown task execution
266
try:
267
# Simulate receiving unknown task in worker
268
unknown_task = broker1.find_task("nonexistent_task")
269
if unknown_task is None:
270
raise UnknownTaskError("nonexistent_task")
271
272
except UnknownTaskError as e:
273
print(f"Unknown task requested: {e}")
274
```
275
276
### Context Exception Handling
277
278
```python
279
from taskiq import Context, TaskiqDepends
280
from taskiq.exceptions import NoResultError, TaskRejectedError
281
282
@broker.task
283
async def conditional_task(
284
data: dict,
285
context: Context = TaskiqDepends(),
286
) -> dict:
287
"""Task with conditional processing and context control."""
288
289
# Validate input data
290
if not data.get("valid", True):
291
print("Invalid data, rejecting task")
292
context.reject() # Raises TaskRejectedError
293
294
# Check if requeue is needed
295
if data.get("needs_retry", False):
296
retry_count = int(context.message.labels.get("retry_count", 0))
297
if retry_count < 3:
298
print(f"Requeuing task (attempt {retry_count + 1})")
299
context.message.labels["retry_count"] = str(retry_count + 1)
300
await context.requeue() # Raises NoResultError
301
302
# Process data normally
303
return {"processed": data, "status": "success"}
304
305
async def handle_context_exceptions():
306
"""Handle context-related exceptions."""
307
try:
308
result = await conditional_task.kiq({"valid": False})
309
value = await result.wait_result()
310
311
except TaskRejectedError:
312
print("Task was rejected due to invalid data")
313
314
except NoResultError:
315
print("Task was requeued, no result available")
316
```
317
318
### Scheduling Exception Handling
319
320
```python
321
from taskiq.exceptions import ScheduledTaskCancelledError
322
from taskiq.scheduler import ScheduleSource, ScheduledTask
323
324
class ConditionalScheduleSource(ScheduleSource):
325
"""Schedule source with conditional task execution."""
326
327
async def pre_send(self, task: ScheduledTask) -> None:
328
"""Check conditions before task execution."""
329
330
# Check system load
331
if await self._system_overloaded():
332
raise ScheduledTaskCancelledError(
333
f"System overloaded, cancelling {task.task_name}"
334
)
335
336
# Check maintenance window
337
if await self._in_maintenance_window():
338
raise ScheduledTaskCancelledError(
339
f"In maintenance window, cancelling {task.task_name}"
340
)
341
342
async def _system_overloaded(self) -> bool:
343
# Check system metrics
344
import psutil
345
return psutil.cpu_percent() > 90
346
347
async def _in_maintenance_window(self) -> bool:
348
# Check if current time is in maintenance window
349
from datetime import datetime, time
350
now = datetime.now().time()
351
return time(2, 0) <= now <= time(4, 0) # 2-4 AM maintenance
352
353
# Scheduler will handle ScheduledTaskCancelledError gracefully
354
schedule_source = ConditionalScheduleSource()
355
scheduler = TaskiqScheduler(broker, [schedule_source])
356
```
357
358
### Custom Exception Classes
359
360
```python
361
class DataValidationError(TaskiqError):
362
"""Custom exception for data validation failures."""
363
364
def __init__(self, field: str, value: Any, message: str = "") -> None:
365
self.field = field
366
self.value = value
367
super().__init__(f"Validation failed for {field}={value}: {message}")
368
369
class ExternalServiceError(TaskiqError):
370
"""Custom exception for external service failures."""
371
372
def __init__(self, service: str, status_code: int, message: str = "") -> None:
373
self.service = service
374
self.status_code = status_code
375
super().__init__(f"{service} error (HTTP {status_code}): {message}")
376
377
@broker.task
378
async def process_user_data(user_data: dict) -> dict:
379
"""Task with custom exception handling."""
380
381
# Validate required fields
382
if "email" not in user_data:
383
raise DataValidationError("email", None, "Email is required")
384
385
if not user_data["email"].endswith("@company.com"):
386
raise DataValidationError(
387
"email",
388
user_data["email"],
389
"Must be company email"
390
)
391
392
# Call external service
393
try:
394
response = await external_api_call(user_data)
395
if response.status_code != 200:
396
raise ExternalServiceError(
397
"UserService",
398
response.status_code,
399
response.text
400
)
401
except httpx.TimeoutException:
402
raise ExternalServiceError("UserService", 0, "Request timeout")
403
404
return {"processed": True, "user_id": response.json()["id"]}
405
406
# Handle custom exceptions
407
async def handle_custom_exceptions():
408
try:
409
result = await process_user_data.kiq({"name": "John"})
410
await result.wait_result()
411
412
except DataValidationError as e:
413
print(f"Data validation failed: {e}")
414
print(f"Field: {e.field}, Value: {e.value}")
415
416
except ExternalServiceError as e:
417
print(f"External service error: {e}")
418
print(f"Service: {e.service}, Status: {e.status_code}")
419
```