or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdevents-state.mdexceptions.mdindex.mdmiddleware.mdresult-backends.mdscheduling.mdtasks-results.md

exceptions.mddocs/

0

# Exceptions

1

2

Comprehensive exception hierarchy for handling various error conditions in distributed task processing. Provides specific error types for different failure scenarios to enable precise error handling and debugging.

3

4

## Capabilities

5

6

### Base Exception Classes

7

8

Core exception hierarchy providing the foundation for all taskiq-specific errors.

9

10

```python { .api }

11

class TaskiqError(Exception):

12

"""

13

Base exception class for all taskiq-related errors.

14

15

All taskiq-specific exceptions inherit from this class,

16

allowing for broad exception handling when needed.

17

"""

18

19

def __init__(self, message: str = "") -> None:

20

"""

21

Initialize taskiq error.

22

23

Args:

24

message: Error description

25

"""

26

```

27

28

### Result-Related Exceptions

29

30

Exceptions related to task result retrieval and management.

31

32

```python { .api }

33

class NoResultError(TaskiqError):

34

"""

35

Exception raised when no result is available.

36

37

Typically raised by task context methods like requeue()

38

to indicate that no result should be stored for the current

39

task execution.

40

"""

41

42

class ResultGetError(TaskiqError):

43

"""

44

Exception raised when task result cannot be retrieved.

45

46

Occurs when attempting to get results from result backend

47

fails due to storage issues, missing results, or backend errors.

48

"""

49

50

class ResultIsReadyError(TaskiqError):

51

"""

52

Exception raised when checking result readiness fails.

53

54

Occurs when the result backend cannot determine if a

55

result is ready for retrieval.

56

"""

57

58

class TaskiqResultTimeoutError(TaskiqError):

59

"""

60

Exception raised when waiting for result times out.

61

62

Thrown by wait_result() when the specified timeout

63

is exceeded before the task completes.

64

"""

65

66

def __init__(

67

self,

68

task_id: str,

69

timeout: float,

70

message: str = "",

71

) -> None:

72

"""

73

Initialize timeout error.

74

75

Args:

76

task_id: ID of the task that timed out

77

timeout: Timeout duration that was exceeded

78

message: Additional error description

79

"""

80

```

81

82

### Task Execution Exceptions

83

84

Exceptions related to task execution and processing.

85

86

```python { .api }

87

class SendTaskError(TaskiqError):

88

"""

89

Exception raised when task cannot be sent to broker.

90

91

Occurs when broker fails to accept or queue a task

92

due to broker unavailability, queue full conditions,

93

or serialization errors.

94

"""

95

96

class TaskRejectedError(TaskiqError):

97

"""

98

Exception raised when task is explicitly rejected.

99

100

Thrown by context.reject() to indicate that the

101

current task should be rejected and not retried.

102

"""

103

104

class UnknownTaskError(TaskiqError):

105

"""

106

Exception raised when attempting to execute unknown task.

107

108

Occurs when a worker receives a task that is not

109

registered in the broker's task registry.

110

"""

111

112

def __init__(self, task_name: str, message: str = "") -> None:

113

"""

114

Initialize unknown task error.

115

116

Args:

117

task_name: Name of the unknown task

118

message: Additional error description

119

"""

120

```

121

122

### Security and Validation Exceptions

123

124

Exceptions related to security, authentication, and data validation.

125

126

```python { .api }

127

class SecurityError(TaskiqError):

128

"""

129

Exception raised for security-related issues.

130

131

Occurs when security validation fails, such as

132

unauthorized access attempts, invalid signatures,

133

or security policy violations.

134

"""

135

136

class TaskBrokerMismatchError(TaskiqError):

137

"""

138

Exception raised when task is registered to wrong broker.

139

140

Occurs when attempting to register a task that was

141

already registered to a different broker instance.

142

"""

143

144

def __init__(self, broker: "AsyncBroker", message: str = "") -> None:

145

"""

146

Initialize broker mismatch error.

147

148

Args:

149

broker: The broker that owns the task

150

message: Additional error description

151

"""

152

```

153

154

### Scheduling Exceptions

155

156

Exceptions related to task scheduling and timing.

157

158

```python { .api }

159

class ScheduledTaskCancelledError(TaskiqError):

160

"""

161

Exception raised when scheduled task is cancelled.

162

163

Thrown by schedule sources to prevent execution

164

of scheduled tasks based on custom conditions.

165

"""

166

```

167

168

## Usage Examples

169

170

### Basic Exception Handling

171

172

```python

173

from taskiq import InMemoryBroker, TaskiqError, ResultGetError

174

175

broker = InMemoryBroker()

176

177

@broker.task

178

async def risky_operation(value: int) -> int:

179

"""Task that might fail."""

180

if value < 0:

181

raise ValueError("Negative values not supported")

182

if value > 100:

183

raise RuntimeError("Value too large")

184

return value * 2

185

186

async def handle_task_execution():

187

"""Example of handling task execution errors."""

188

try:

189

# Execute task

190

result = await risky_operation.kiq(-5)

191

value = await result.wait_result(timeout=10.0)

192

print(f"Success: {value}")

193

194

except TaskiqResultTimeoutError as e:

195

print(f"Task timed out after {e.timeout} seconds")

196

197

except ValueError as e:

198

print(f"Task failed with validation error: {e}")

199

200

except RuntimeError as e:

201

print(f"Task failed with runtime error: {e}")

202

203

except TaskiqError as e:

204

print(f"Taskiq system error: {e}")

205

206

except Exception as e:

207

print(f"Unexpected error: {e}")

208

```

209

210

### Result Backend Error Handling

211

212

```python

213

from taskiq.exceptions import ResultGetError, ResultIsReadyError

214

215

async def safe_result_retrieval(task_result):

216

"""Safely retrieve task result with error handling."""

217

task_id = task_result.task_id

218

219

try:

220

# Check if result is ready

221

is_ready = await task_result.is_ready()

222

if not is_ready:

223

print("Result not ready yet")

224

return None

225

226

except ResultIsReadyError as e:

227

print(f"Cannot check result status: {e}")

228

return None

229

230

try:

231

# Get the result

232

result = await task_result.wait_result(timeout=30.0)

233

return result

234

235

except ResultGetError as e:

236

print(f"Failed to retrieve result: {e}")

237

return None

238

239

except TaskiqResultTimeoutError as e:

240

print(f"Result retrieval timed out: {e}")

241

return None

242

```

243

244

### Task Registration Error Handling

245

246

```python

247

from taskiq.exceptions import TaskBrokerMismatchError, UnknownTaskError

248

249

broker1 = InMemoryBroker()

250

broker2 = InMemoryBroker()

251

252

@broker1.task

253

async def my_task(x: int) -> int:

254

return x * 2

255

256

async def handle_registration_errors():

257

"""Handle task registration issues."""

258

try:

259

# This will fail - task already registered to broker1

260

broker2.register_task(my_task.original_func, "my_task")

261

262

except TaskBrokerMismatchError as e:

263

print(f"Task already registered to different broker: {e}")

264

265

# Handle unknown task execution

266

try:

267

# Simulate receiving unknown task in worker

268

unknown_task = broker1.find_task("nonexistent_task")

269

if unknown_task is None:

270

raise UnknownTaskError("nonexistent_task")

271

272

except UnknownTaskError as e:

273

print(f"Unknown task requested: {e}")

274

```

275

276

### Context Exception Handling

277

278

```python

279

from taskiq import Context, TaskiqDepends

280

from taskiq.exceptions import NoResultError, TaskRejectedError

281

282

@broker.task

283

async def conditional_task(

284

data: dict,

285

context: Context = TaskiqDepends(),

286

) -> dict:

287

"""Task with conditional processing and context control."""

288

289

# Validate input data

290

if not data.get("valid", True):

291

print("Invalid data, rejecting task")

292

context.reject() # Raises TaskRejectedError

293

294

# Check if requeue is needed

295

if data.get("needs_retry", False):

296

retry_count = int(context.message.labels.get("retry_count", 0))

297

if retry_count < 3:

298

print(f"Requeuing task (attempt {retry_count + 1})")

299

context.message.labels["retry_count"] = str(retry_count + 1)

300

await context.requeue() # Raises NoResultError

301

302

# Process data normally

303

return {"processed": data, "status": "success"}

304

305

async def handle_context_exceptions():

306

"""Handle context-related exceptions."""

307

try:

308

result = await conditional_task.kiq({"valid": False})

309

value = await result.wait_result()

310

311

except TaskRejectedError:

312

print("Task was rejected due to invalid data")

313

314

except NoResultError:

315

print("Task was requeued, no result available")

316

```

317

318

### Scheduling Exception Handling

319

320

```python

321

from taskiq.exceptions import ScheduledTaskCancelledError

322

from taskiq.scheduler import ScheduleSource, ScheduledTask

323

324

class ConditionalScheduleSource(ScheduleSource):

325

"""Schedule source with conditional task execution."""

326

327

async def pre_send(self, task: ScheduledTask) -> None:

328

"""Check conditions before task execution."""

329

330

# Check system load

331

if await self._system_overloaded():

332

raise ScheduledTaskCancelledError(

333

f"System overloaded, cancelling {task.task_name}"

334

)

335

336

# Check maintenance window

337

if await self._in_maintenance_window():

338

raise ScheduledTaskCancelledError(

339

f"In maintenance window, cancelling {task.task_name}"

340

)

341

342

async def _system_overloaded(self) -> bool:

343

# Check system metrics

344

import psutil

345

return psutil.cpu_percent() > 90

346

347

async def _in_maintenance_window(self) -> bool:

348

# Check if current time is in maintenance window

349

from datetime import datetime, time

350

now = datetime.now().time()

351

return time(2, 0) <= now <= time(4, 0) # 2-4 AM maintenance

352

353

# Scheduler will handle ScheduledTaskCancelledError gracefully

354

schedule_source = ConditionalScheduleSource()

355

scheduler = TaskiqScheduler(broker, [schedule_source])

356

```

357

358

### Custom Exception Classes

359

360

```python

361

class DataValidationError(TaskiqError):

362

"""Custom exception for data validation failures."""

363

364

def __init__(self, field: str, value: Any, message: str = "") -> None:

365

self.field = field

366

self.value = value

367

super().__init__(f"Validation failed for {field}={value}: {message}")

368

369

class ExternalServiceError(TaskiqError):

370

"""Custom exception for external service failures."""

371

372

def __init__(self, service: str, status_code: int, message: str = "") -> None:

373

self.service = service

374

self.status_code = status_code

375

super().__init__(f"{service} error (HTTP {status_code}): {message}")

376

377

@broker.task

378

async def process_user_data(user_data: dict) -> dict:

379

"""Task with custom exception handling."""

380

381

# Validate required fields

382

if "email" not in user_data:

383

raise DataValidationError("email", None, "Email is required")

384

385

if not user_data["email"].endswith("@company.com"):

386

raise DataValidationError(

387

"email",

388

user_data["email"],

389

"Must be company email"

390

)

391

392

# Call external service

393

try:

394

response = await external_api_call(user_data)

395

if response.status_code != 200:

396

raise ExternalServiceError(

397

"UserService",

398

response.status_code,

399

response.text

400

)

401

except httpx.TimeoutException:

402

raise ExternalServiceError("UserService", 0, "Request timeout")

403

404

return {"processed": True, "user_id": response.json()["id"]}

405

406

# Handle custom exceptions

407

async def handle_custom_exceptions():

408

try:

409

result = await process_user_data.kiq({"name": "John"})

410

await result.wait_result()

411

412

except DataValidationError as e:

413

print(f"Data validation failed: {e}")

414

print(f"Field: {e.field}, Value: {e.value}")

415

416

except ExternalServiceError as e:

417

print(f"External service error: {e}")

418

print(f"Service: {e.service}, Status: {e.status_code}")

419

```