or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-endpoints.mdasync-operations.mdauthentication.mdclient-apis.mdconnection-session.mderror-handling.mdhigh-level-messaging.mdindex.mdlow-level-protocol.mdmessage-management.mdtypes-constants.md

error-handling.mddocs/

0

# Error Handling

1

2

Comprehensive error handling with custom exceptions, error policies, and retry mechanisms for robust messaging applications that can gracefully handle network issues, authentication failures, and protocol errors.

3

4

## Capabilities

5

6

### Error Policy Configuration

7

8

Configurable error handling policies that define how clients should respond to different types of errors.

9

10

```python { .api }

11

class ErrorPolicy:

12

def __init__(self, max_retries=3, on_error=None):

13

"""

14

Error handling policy configuration.

15

16

Parameters:

17

- max_retries (int): Maximum number of retry attempts

18

- on_error (ErrorAction): Default error action to take

19

"""

20

21

def on_unrecognized_error(self, error):

22

"""Handle unrecognized errors."""

23

24

def on_message_error(self, error):

25

"""Handle message-level errors."""

26

27

def on_link_error(self, error):

28

"""Handle link-level errors."""

29

30

def on_connection_error(self, error):

31

"""Handle connection-level errors."""

32

```

33

34

### Error Actions

35

36

Configuration for specific error handling actions including retry behavior and backoff strategies.

37

38

```python { .api }

39

class ErrorAction:

40

def __init__(self, retry=True, backoff=1.0, increment_retries=True):

41

"""

42

Error action specification.

43

44

Parameters:

45

- retry (bool): Whether to retry the operation

46

- backoff (float): Backoff delay in seconds

47

- increment_retries (bool): Whether to increment retry counter

48

"""

49

50

@property

51

def retry: bool

52

"""Whether to retry the failed operation."""

53

54

@property

55

def backoff: float

56

"""Backoff delay before retry in seconds."""

57

58

@property

59

def increment_retries: bool

60

"""Whether to count this as a retry attempt."""

61

```

62

63

**Usage Example:**

64

65

```python

66

from uamqp.errors import ErrorPolicy, ErrorAction

67

68

# Custom error policy with exponential backoff

69

def create_retry_action(attempt):

70

backoff_delay = min(2.0 ** attempt, 60.0) # Cap at 60 seconds

71

return ErrorAction(retry=True, backoff=backoff_delay)

72

73

error_policy = ErrorPolicy(

74

max_retries=5,

75

on_error=ErrorAction(retry=True, backoff=1.0)

76

)

77

78

# Use with client

79

from uamqp import SendClient

80

client = SendClient(target, auth=auth, error_policy=error_policy)

81

```

82

83

## Exception Hierarchy

84

85

### Base AMQP Errors

86

87

Root exception classes for AMQP-related errors.

88

89

```python { .api }

90

class AMQPError(Exception):

91

"""Base exception for all AMQP-related errors."""

92

93

class AMQPConnectionError(AMQPError):

94

"""Connection-related errors including network and protocol issues."""

95

96

class AMQPClientShutdown(AMQPError):

97

"""Exception raised when client is shutting down."""

98

99

class MessageHandlerError(AMQPError):

100

"""Errors in message handling and processing."""

101

```

102

103

### Connection Errors

104

105

Errors related to AMQP connection management and lifecycle.

106

107

```python { .api }

108

class ConnectionClose(AMQPConnectionError):

109

"""Connection was closed by the broker."""

110

111

class VendorConnectionClose(ConnectionClose):

112

"""Vendor-specific connection closure."""

113

```

114

115

### Link Errors

116

117

Errors related to AMQP links (message senders and receivers).

118

119

```python { .api }

120

class LinkDetach(AMQPError):

121

"""Link was detached by remote peer."""

122

123

class VendorLinkDetach(LinkDetach):

124

"""Vendor-specific link detachment."""

125

126

class LinkRedirect(AMQPError):

127

"""Link redirected to different endpoint."""

128

```

129

130

### Authentication Errors

131

132

Errors related to authentication and authorization.

133

134

```python { .api }

135

class AuthenticationException(AMQPError):

136

"""General authentication failure."""

137

138

class TokenExpired(AuthenticationException):

139

"""Authentication token has expired."""

140

141

class TokenAuthFailure(AuthenticationException):

142

"""Token authentication specifically failed."""

143

```

144

145

### Message Errors

146

147

Errors related to message processing and state management.

148

149

```python { .api }

150

class MessageResponse(AMQPError):

151

"""Base class for message response errors."""

152

153

class MessageException(MessageResponse):

154

"""General message processing error."""

155

156

class MessageSendFailed(MessageException):

157

"""Message sending operation failed."""

158

159

class MessageContentTooLarge(MessageException):

160

"""Message exceeds maximum size limit."""

161

```

162

163

### Message Settlement Responses

164

165

Specific responses to message settlement that may be treated as exceptions.

166

167

```python { .api }

168

class MessageAlreadySettled(MessageResponse):

169

"""Attempt to settle already-settled message."""

170

171

class MessageAccepted(MessageResponse):

172

"""Message was accepted by receiver."""

173

174

class MessageRejected(MessageResponse):

175

"""Message was rejected by receiver."""

176

177

class MessageReleased(MessageResponse):

178

"""Message was released by receiver."""

179

180

class MessageModified(MessageResponse):

181

"""Message was modified by receiver."""

182

```

183

184

### Timeout Errors

185

186

Errors related to operation timeouts.

187

188

```python { .api }

189

class ClientTimeout(AMQPError):

190

"""Client operation timed out."""

191

```

192

193

## Error Handling Patterns

194

195

### Basic Exception Handling

196

197

Handle common AMQP errors with appropriate recovery strategies.

198

199

```python

200

from uamqp import send_message, receive_message

201

from uamqp.errors import (

202

AMQPConnectionError,

203

AuthenticationException,

204

MessageSendFailed,

205

ClientTimeout

206

)

207

208

def robust_send_message(target, message, auth):

209

max_attempts = 3

210

211

for attempt in range(max_attempts):

212

try:

213

result = send_message(target, message, auth=auth)

214

print(f"Message sent successfully: {result}")

215

return result

216

217

except AMQPConnectionError as e:

218

print(f"Connection error (attempt {attempt + 1}): {e}")

219

if attempt == max_attempts - 1:

220

raise

221

time.sleep(2 ** attempt) # Exponential backoff

222

223

except AuthenticationException as e:

224

print(f"Authentication failed: {e}")

225

# Don't retry auth failures

226

raise

227

228

except MessageSendFailed as e:

229

print(f"Send failed (attempt {attempt + 1}): {e}")

230

if attempt == max_attempts - 1:

231

raise

232

time.sleep(1)

233

234

except ClientTimeout as e:

235

print(f"Timeout (attempt {attempt + 1}): {e}")

236

if attempt == max_attempts - 1:

237

raise

238

```

239

240

### Message Settlement Error Handling

241

242

Handle message settlement responses appropriately.

243

244

```python

245

from uamqp.errors import (

246

MessageRejected,

247

MessageReleased,

248

MessageModified,

249

MessageAlreadySettled

250

)

251

252

def process_message_safely(message):

253

try:

254

# Process message content

255

data = message.get_data()

256

result = process_business_logic(data)

257

258

# Accept message on successful processing

259

message.accept()

260

return result

261

262

except ValueError as e:

263

# Business logic error - reject message

264

print(f"Invalid message data: {e}")

265

message.reject(

266

condition="invalid-data",

267

description=str(e)

268

)

269

270

except MessageRejected as e:

271

print(f"Message was rejected: {e}")

272

# Log for analysis, don't reprocess

273

274

except MessageReleased as e:

275

print(f"Message was released: {e}")

276

# Message will be redelivered

277

278

except MessageAlreadySettled as e:

279

print(f"Message already settled: {e}")

280

# Continue processing, message already handled

281

282

except Exception as e:

283

# Unexpected error - release message for retry

284

print(f"Unexpected error: {e}")

285

try:

286

message.release()

287

except MessageAlreadySettled:

288

pass # Message already handled

289

```

290

291

### Client Error Handling with Policies

292

293

Use error policies for automatic retry behavior.

294

295

```python

296

from uamqp import SendClient

297

from uamqp.errors import ErrorPolicy, ErrorAction

298

299

def create_resilient_client(target, auth):

300

# Define error actions for different scenarios

301

retry_action = ErrorAction(retry=True, backoff=2.0)

302

no_retry_action = ErrorAction(retry=False)

303

304

# Create custom error policy

305

error_policy = ErrorPolicy(max_retries=3)

306

307

# Override specific error handling

308

def custom_connection_error_handler(error):

309

if "authentication" in str(error).lower():

310

return no_retry_action # Don't retry auth errors

311

return retry_action # Retry other connection errors

312

313

error_policy.on_connection_error = custom_connection_error_handler

314

315

return SendClient(target, auth=auth, error_policy=error_policy)

316

317

# Usage

318

try:

319

with create_resilient_client(target, auth) as client:

320

client.queue_message(message)

321

results = client.send_all_messages()

322

except AMQPError as e:

323

print(f"All retry attempts failed: {e}")

324

```

325

326

### Async Error Handling

327

328

Handle errors in async operations with proper coroutine error management.

329

330

```python

331

import asyncio

332

from uamqp.async_ops import SendClientAsync

333

from uamqp.errors import AMQPConnectionError, MessageSendFailed

334

335

async def robust_async_send(target, messages, auth):

336

max_retries = 3

337

338

for attempt in range(max_retries):

339

try:

340

async with SendClientAsync(target, auth=auth) as client:

341

results = await client.send_message_batch_async(messages)

342

print(f"Sent {len(results)} messages successfully")

343

return results

344

345

except AMQPConnectionError as e:

346

print(f"Connection error (attempt {attempt + 1}): {e}")

347

if attempt == max_retries - 1:

348

raise

349

await asyncio.sleep(2 ** attempt) # Exponential backoff

350

351

except MessageSendFailed as e:

352

print(f"Send failed (attempt {attempt + 1}): {e}")

353

if attempt == max_retries - 1:

354

raise

355

await asyncio.sleep(1)

356

357

# Usage in async context

358

async def main():

359

try:

360

results = await robust_async_send(target, messages, auth)

361

except AMQPError as e:

362

print(f"Failed after all retries: {e}")

363

364

asyncio.run(main())

365

```

366

367

### Context Manager Error Handling

368

369

Proper resource cleanup with error handling in context managers.

370

371

```python

372

from uamqp import ReceiveClient

373

from uamqp.errors import AMQPError

374

375

class RobustReceiveClient:

376

def __init__(self, source, auth, **kwargs):

377

self.source = source

378

self.auth = auth

379

self.client_kwargs = kwargs

380

self.client = None

381

382

def __enter__(self):

383

try:

384

self.client = ReceiveClient(self.source, auth=self.auth, **self.client_kwargs)

385

self.client.open()

386

return self.client

387

except AMQPError:

388

if self.client:

389

try:

390

self.client.close()

391

except:

392

pass # Ignore cleanup errors

393

raise

394

395

def __exit__(self, exc_type, exc_val, exc_tb):

396

if self.client:

397

try:

398

self.client.close()

399

except AMQPError as e:

400

print(f"Error during cleanup: {e}")

401

# Don't re-raise cleanup errors

402

403

# Handle different exception types

404

if exc_type == AMQPConnectionError:

405

print("Connection error occurred, consider retry")

406

return False # Re-raise the exception

407

elif exc_type == AuthenticationException:

408

print("Authentication failed, check credentials")

409

return False

410

411

return False # Don't suppress other exceptions

412

413

# Usage

414

try:

415

with RobustReceiveClient(source, auth) as client:

416

messages = client.receive_message_batch(timeout=30000)

417

for message in messages:

418

process_message_safely(message)

419

except AMQPError as e:

420

print(f"Client operation failed: {e}")

421

```

422

423

## Debugging and Diagnostics

424

425

### Enable Debug Logging

426

427

Use debug mode to get detailed protocol-level information for troubleshooting.

428

429

```python

430

import logging

431

432

# Enable uAMQP debug logging

433

logging.basicConfig(level=logging.INFO)

434

logger = logging.getLogger('uamqp')

435

logger.setLevel(logging.DEBUG)

436

437

# Use debug mode in clients

438

client = SendClient(target, auth=auth, debug=True)

439

```

440

441

### Error Information Extraction

442

443

Extract detailed error information for analysis and logging.

444

445

```python

446

def log_error_details(error):

447

print(f"Error type: {type(error).__name__}")

448

print(f"Error message: {str(error)}")

449

450

# Check for AMQP-specific error details

451

if hasattr(error, 'condition'):

452

print(f"AMQP condition: {error.condition}")

453

if hasattr(error, 'description'):

454

print(f"AMQP description: {error.description}")

455

if hasattr(error, 'info'):

456

print(f"AMQP info: {error.info}")

457

458

try:

459

# AMQP operation

460

pass

461

except AMQPError as e:

462

log_error_details(e)

463

# Decide on recovery action based on error details

464

```

465

466

### Health Monitoring

467

468

Implement health checks and monitoring for AMQP connections.

469

470

```python

471

import time

472

from uamqp.errors import AMQPError

473

474

class ConnectionHealthMonitor:

475

def __init__(self, target, auth):

476

self.target = target

477

self.auth = auth

478

self.last_error = None

479

self.error_count = 0

480

481

def check_health(self):

482

try:

483

# Simple health check using high-level API

484

from uamqp import send_message, Message

485

test_message = Message("health-check")

486

487

result = send_message(self.target, test_message, auth=self.auth)

488

489

# Reset error tracking on success

490

self.last_error = None

491

self.error_count = 0

492

return True

493

494

except AMQPError as e:

495

self.last_error = e

496

self.error_count += 1

497

return False

498

499

def get_health_status(self):

500

return {

501

'healthy': self.last_error is None,

502

'last_error': str(self.last_error) if self.last_error else None,

503

'error_count': self.error_count,

504

'last_check': time.time()

505

}

506

507

# Usage

508

monitor = ConnectionHealthMonitor(target, auth)

509

if not monitor.check_health():

510

status = monitor.get_health_status()

511

print(f"Connection unhealthy: {status}")

512

```