0
# Error Handling
1
2
Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.
3
4
## Capabilities
5
6
### Core NATS Errors
7
8
Base error classes and connection-related exceptions.
9
10
```python { .api }
11
class Error(Exception):
12
"""Base NATS error class."""
13
14
class ConnectionClosedError(Error):
15
"""Connection was closed."""
16
17
class TimeoutError(Error):
18
"""Operation timed out."""
19
20
class NoRespondersError(Error):
21
"""No services responded to request."""
22
23
class StaleConnectionError(Error):
24
"""Connection is stale and unusable."""
25
26
class OutboundBufferLimitError(Error):
27
"""Outbound buffer limit exceeded."""
28
29
class UnexpectedEOF(Error):
30
"""Unexpected end of file/connection."""
31
32
class FlushTimeoutError(Error):
33
"""Flush operation timed out."""
34
```
35
36
#### Usage Examples
37
38
```python
39
import asyncio
40
import nats
41
from nats.errors import (
42
ConnectionClosedError, TimeoutError, NoRespondersError,
43
StaleConnectionError, FlushTimeoutError
44
)
45
46
async def robust_client():
47
nc = None
48
try:
49
# Connect with error handling
50
nc = await nats.connect("nats://localhost:4222")
51
52
# Publish with flush error handling
53
await nc.publish("test.subject", b"Hello")
54
await nc.flush(timeout=5.0)
55
56
# Request with timeout and no responders handling
57
response = await nc.request("api.service", b"request", timeout=2.0)
58
print(f"Response: {response.data.decode()}")
59
60
except ConnectionClosedError:
61
print("Connection was closed unexpectedly")
62
except FlushTimeoutError:
63
print("Failed to flush messages within timeout")
64
except TimeoutError:
65
print("Request timed out")
66
except NoRespondersError:
67
print("No service available to handle request")
68
except StaleConnectionError:
69
print("Connection is stale, need to reconnect")
70
except Exception as e:
71
print(f"Unexpected error: {e}")
72
finally:
73
if nc and not nc.is_closed():
74
await nc.close()
75
```
76
77
### Connection and Protocol Errors
78
79
Network and protocol-level error handling.
80
81
```python { .api }
82
class SecureConnRequiredError(Error):
83
"""Server requires secure connection."""
84
85
class SecureConnWantedError(Error):
86
"""Server prefers secure connection."""
87
88
class SecureConnFailedError(Error):
89
"""Secure connection failed to establish."""
90
91
class AuthorizationError(Error):
92
"""Authentication/authorization failed."""
93
94
class NoServersError(Error):
95
"""No servers available for connection."""
96
97
class ProtocolError(Error):
98
"""NATS protocol error."""
99
100
class MaxPayloadError(Error):
101
"""Message payload exceeds maximum size."""
102
```
103
104
#### Usage Examples
105
106
```python
107
from nats.errors import (
108
SecureConnRequiredError, AuthorizationError, NoServersError,
109
MaxPayloadError, ProtocolError
110
)
111
112
async def secure_connection():
113
try:
114
# Try insecure connection first
115
nc = await nats.connect("nats://secure-server:4222")
116
117
except SecureConnRequiredError:
118
print("Server requires TLS, retrying with secure connection")
119
import ssl
120
ssl_ctx = ssl.create_default_context()
121
nc = await nats.connect("tls://secure-server:4443", tls=ssl_ctx)
122
123
except AuthorizationError:
124
print("Authentication failed, check credentials")
125
return None
126
127
except NoServersError:
128
print("No NATS servers available")
129
return None
130
131
try:
132
# Test large message
133
large_message = b"x" * (2 * 1024 * 1024) # 2MB
134
await nc.publish("test", large_message)
135
136
except MaxPayloadError as e:
137
print(f"Message too large: {e}")
138
# Split into smaller chunks
139
chunk_size = nc.max_payload()
140
for i in range(0, len(large_message), chunk_size):
141
chunk = large_message[i:i + chunk_size]
142
await nc.publish(f"test.chunk.{i//chunk_size}", chunk)
143
144
return nc
145
```
146
147
### Subscription and Message Errors
148
149
Subscription and message processing error handling.
150
151
```python { .api }
152
class BadSubscriptionError(Error):
153
"""Invalid subscription parameters."""
154
155
class BadSubjectError(Error):
156
"""Invalid subject format."""
157
158
class SlowConsumerError(Error):
159
"""Consumer cannot keep up with message rate."""
160
161
class InvalidCallbackTypeError(Error):
162
"""Invalid callback function type."""
163
164
class BadTimeoutError(Error):
165
"""Invalid timeout value."""
166
167
class DrainTimeoutError(Error):
168
"""Drain operation timed out."""
169
170
class ConnectionDrainingError(Error):
171
"""Connection is in draining state."""
172
173
class ConnectionReconnectingError(Error):
174
"""Connection is in reconnecting state."""
175
```
176
177
#### Usage Examples
178
179
```python
180
from nats.errors import (
181
BadSubscriptionError, SlowConsumerError, DrainTimeoutError,
182
ConnectionReconnectingError
183
)
184
185
async def robust_subscription():
186
nc = await nats.connect()
187
188
try:
189
# Subscribe with error handling
190
async def message_handler(msg):
191
try:
192
await process_message(msg)
193
except Exception as e:
194
print(f"Message processing error: {e}")
195
196
sub = await nc.subscribe("events.*", cb=message_handler)
197
198
# Monitor for slow consumer
199
while True:
200
await asyncio.sleep(10)
201
202
if sub.pending_msgs() > 10000:
203
print("Warning: High pending message count")
204
205
except BadSubscriptionError as e:
206
print(f"Invalid subscription: {e}")
207
208
except SlowConsumerError:
209
print("Consumer is too slow, increase processing capacity")
210
211
except ConnectionReconnectingError:
212
print("Connection is reconnecting, waiting...")
213
await asyncio.sleep(5) # Wait for reconnection
214
215
finally:
216
try:
217
await nc.drain(timeout=30)
218
except DrainTimeoutError:
219
print("Drain timed out, forcing close")
220
await nc.close()
221
```
222
223
### JetStream Errors
224
225
JetStream-specific error handling for streams and consumers.
226
227
```python { .api }
228
from nats.js.errors import (
229
Error as JSError,
230
APIError,
231
ServiceUnavailableError,
232
ServerError,
233
NotFoundError,
234
BadRequestError,
235
NoStreamResponseError,
236
TooManyStalledMsgsError,
237
FetchTimeoutError,
238
ConsumerSequenceMismatchError
239
)
240
```
241
242
#### Usage Examples
243
244
```python
245
from nats.js.errors import (
246
APIError, NotFoundError, BadRequestError, ServiceUnavailableError,
247
FetchTimeoutError, ConsumerSequenceMismatchError
248
)
249
250
async def jetstream_operations():
251
nc = await nats.connect()
252
js = nc.jetstream()
253
jsm = nc.jsm()
254
255
try:
256
# Create stream with error handling
257
stream_info = await jsm.add_stream(
258
name="events",
259
subjects=["events.*"]
260
)
261
print(f"Created stream: {stream_info.config.name}")
262
263
except BadRequestError as e:
264
print(f"Invalid stream configuration: {e}")
265
266
except ServiceUnavailableError:
267
print("JetStream service unavailable")
268
return
269
270
try:
271
# Publish to JetStream
272
ack = await js.publish("events.test", b"test message")
273
print(f"Published message at sequence {ack.seq}")
274
275
except APIError as e:
276
print(f"JetStream API error: {e}")
277
278
try:
279
# Pull subscribe with error handling
280
psub = await js.pull_subscribe("events.*", durable="test-consumer")
281
282
msgs = await psub.fetch(batch_size=10, timeout=5.0)
283
for msg in msgs:
284
await msg.ack()
285
286
except FetchTimeoutError:
287
print("No messages available within timeout")
288
289
except ConsumerSequenceMismatchError as e:
290
print(f"Consumer sequence mismatch: {e}")
291
# Reset consumer or handle sequence gap
292
293
try:
294
# Get stream info
295
info = await jsm.stream_info("events")
296
print(f"Stream has {info.state.messages} messages")
297
298
except NotFoundError:
299
print("Stream 'events' not found")
300
```
301
302
### Key-Value Store Errors
303
304
Key-value store specific error handling.
305
306
```python { .api }
307
from nats.js.errors import (
308
BucketNotFoundError,
309
BadBucketError,
310
KeyValueError,
311
KeyDeletedError,
312
KeyNotFoundError,
313
KeyWrongLastSequenceError,
314
NoKeysError,
315
KeyHistoryTooLargeError,
316
InvalidKeyError,
317
InvalidBucketNameError
318
)
319
```
320
321
#### Usage Examples
322
323
```python
324
from nats.js.errors import (
325
BucketNotFoundError, KeyNotFoundError, KeyDeletedError,
326
KeyWrongLastSequenceError, InvalidKeyError
327
)
328
329
async def kv_operations():
330
nc = await nats.connect()
331
js = nc.jetstream()
332
333
try:
334
# Get or create KV store
335
kv = await js.key_value("user-sessions")
336
337
except BucketNotFoundError:
338
print("Creating new KV bucket")
339
kv = await js.create_key_value(bucket="user-sessions")
340
341
try:
342
# Get key with error handling
343
entry = await kv.get("session:user123")
344
print(f"Session data: {entry.value.decode()}")
345
346
except KeyNotFoundError:
347
print("Session not found, creating new one")
348
await kv.put("session:user123", b'{"new": "session"}')
349
350
except KeyDeletedError:
351
print("Session was deleted")
352
353
try:
354
# Conditional update with error handling
355
entry = await kv.get("session:user123")
356
updated_data = b'{"updated": "session"}'
357
await kv.update("session:user123", updated_data, entry.revision)
358
359
except KeyWrongLastSequenceError:
360
print("Session was modified by another process")
361
# Retry with latest revision
362
entry = await kv.get("session:user123")
363
await kv.update("session:user123", updated_data, entry.revision)
364
365
try:
366
# Validate key format
367
await kv.put("invalid key name!", b"data")
368
369
except InvalidKeyError as e:
370
print(f"Invalid key format: {e}")
371
```
372
373
### Object Store Errors
374
375
Object store specific error handling.
376
377
```python { .api }
378
from nats.js.errors import (
379
InvalidObjectNameError,
380
BadObjectMetaError,
381
LinkIsABucketError,
382
DigestMismatchError,
383
ObjectNotFoundError,
384
ObjectDeletedError,
385
ObjectAlreadyExists
386
)
387
```
388
389
#### Usage Examples
390
391
```python
392
from nats.js.errors import (
393
ObjectNotFoundError, ObjectDeletedError, ObjectAlreadyExists,
394
DigestMismatchError, BadObjectMetaError
395
)
396
397
async def object_store_operations():
398
nc = await nats.connect()
399
js = nc.jetstream()
400
401
try:
402
os = await js.object_store("file-storage")
403
404
# Store object with error handling
405
obj_info = await os.put("document.pdf", file_data)
406
print(f"Stored object: {obj_info.name}")
407
408
except ObjectAlreadyExists:
409
print("Object already exists")
410
# Update existing object
411
obj_info = await os.put("document.pdf", file_data, replace=True)
412
413
try:
414
# Retrieve object
415
data = await os.get("document.pdf")
416
417
# Verify integrity
418
if verify_checksum(data, expected_checksum):
419
print("Data integrity verified")
420
421
except ObjectNotFoundError:
422
print("Object not found")
423
424
except ObjectDeletedError:
425
print("Object was deleted")
426
427
except DigestMismatchError as e:
428
print(f"Data corruption detected: {e}")
429
# Handle corrupted data
430
431
try:
432
# Update metadata
433
from nats.js.api import ObjectMeta
434
meta = ObjectMeta(
435
name="document.pdf",
436
description="Updated document"
437
)
438
await os.update_meta("document.pdf", meta)
439
440
except BadObjectMetaError as e:
441
print(f"Invalid object metadata: {e}")
442
```
443
444
### Microservices Errors
445
446
Service framework error handling.
447
448
```python { .api }
449
from nats.micro import ServiceError
450
451
class ServiceError(Exception):
452
"""Service error with code and description."""
453
def __init__(self, code: str, description: str):
454
self.code = code
455
self.description = description
456
super().__init__(f"{code}: {description}")
457
```
458
459
#### Usage Examples
460
461
```python
462
from nats.micro import ServiceError, Request
463
464
async def service_handler(request: Request):
465
try:
466
# Process request
467
data = json.loads(request.data.decode())
468
result = await process_service_request(data)
469
470
response = json.dumps(result).encode()
471
await request.respond(response)
472
473
except ValidationError as e:
474
await request.respond_error(
475
code="VALIDATION_ERROR",
476
description=str(e)
477
)
478
479
except AuthenticationError:
480
await request.respond_error(
481
code="UNAUTHORIZED",
482
description="Authentication required"
483
)
484
485
except RateLimitError:
486
await request.respond_error(
487
code="RATE_LIMITED",
488
description="Too many requests"
489
)
490
491
except ServiceError as e:
492
# Re-raise service errors to be handled by framework
493
await request.respond_error(e.code, e.description)
494
495
except Exception as e:
496
# Log unexpected errors
497
logger.error(f"Unexpected service error: {e}")
498
await request.respond_error(
499
code="INTERNAL_ERROR",
500
description="Internal server error"
501
)
502
503
# Custom service errors
504
class BusinessLogicError(ServiceError):
505
def __init__(self, message: str):
506
super().__init__("BUSINESS_LOGIC_ERROR", message)
507
508
class DataValidationError(ServiceError):
509
def __init__(self, field: str, message: str):
510
super().__init__("VALIDATION_ERROR", f"{field}: {message}")
511
512
# Usage in service logic
513
async def create_user_service(request: Request):
514
try:
515
user_data = json.loads(request.data.decode())
516
517
if not user_data.get("email"):
518
raise DataValidationError("email", "Email is required")
519
520
if await user_exists(user_data["email"]):
521
raise BusinessLogicError("User already exists")
522
523
user = await create_user(user_data)
524
await request.respond(json.dumps(user.to_dict()).encode())
525
526
except ServiceError:
527
raise # Let framework handle service errors
528
except Exception as e:
529
raise ServiceError("INTERNAL_ERROR", str(e))
530
```
531
532
## Error Handling Patterns
533
534
### Retry Strategies
535
536
Implement robust retry logic for transient errors.
537
538
```python
539
import asyncio
540
from typing import Callable, Type
541
542
async def retry_with_backoff(
543
operation: Callable,
544
max_retries: int = 3,
545
backoff_factor: float = 2.0,
546
exceptions: tuple = (Exception,)
547
) -> any:
548
"""Retry operation with exponential backoff."""
549
550
for attempt in range(max_retries + 1):
551
try:
552
return await operation()
553
except exceptions as e:
554
if attempt == max_retries:
555
raise e
556
557
wait_time = backoff_factor ** attempt
558
print(f"Attempt {attempt + 1} failed: {e}, retrying in {wait_time}s")
559
await asyncio.sleep(wait_time)
560
561
# Usage
562
async def flaky_operation():
563
# Simulate flaky network operation
564
response = await nc.request("flaky.service", b"request", timeout=1.0)
565
return response
566
567
try:
568
result = await retry_with_backoff(
569
flaky_operation,
570
max_retries=3,
571
exceptions=(TimeoutError, ConnectionClosedError)
572
)
573
except Exception as e:
574
print(f"Operation failed after retries: {e}")
575
```
576
577
### Circuit Breaker Pattern
578
579
Prevent cascading failures with circuit breaker.
580
581
```python
582
import time
583
from enum import Enum
584
585
class CircuitState(Enum):
586
CLOSED = "closed"
587
OPEN = "open"
588
HALF_OPEN = "half_open"
589
590
class CircuitBreaker:
591
def __init__(self, failure_threshold=5, timeout=60):
592
self.failure_threshold = failure_threshold
593
self.timeout = timeout
594
self.failure_count = 0
595
self.last_failure_time = None
596
self.state = CircuitState.CLOSED
597
598
async def call(self, operation):
599
if self.state == CircuitState.OPEN:
600
if time.time() - self.last_failure_time > self.timeout:
601
self.state = CircuitState.HALF_OPEN
602
else:
603
raise Exception("Circuit breaker is open")
604
605
try:
606
result = await operation()
607
self.on_success()
608
return result
609
except Exception as e:
610
self.on_failure()
611
raise e
612
613
def on_success(self):
614
self.failure_count = 0
615
self.state = CircuitState.CLOSED
616
617
def on_failure(self):
618
self.failure_count += 1
619
self.last_failure_time = time.time()
620
621
if self.failure_count >= self.failure_threshold:
622
self.state = CircuitState.OPEN
623
624
# Usage
625
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
626
627
async def protected_service_call():
628
try:
629
return await circuit_breaker.call(lambda: nc.request("service", b"data"))
630
except Exception as e:
631
print(f"Service call failed: {e}")
632
return None
633
```
634
635
## Constants
636
637
```python { .api }
638
# Error categories
639
NATS_ERRORS = [
640
Error, ConnectionClosedError, TimeoutError, NoRespondersError,
641
StaleConnectionError, OutboundBufferLimitError, UnexpectedEOF,
642
FlushTimeoutError, SecureConnRequiredError, SecureConnWantedError,
643
SecureConnFailedError, AuthorizationError, NoServersError,
644
ProtocolError, MaxPayloadError, BadSubscriptionError,
645
BadSubjectError, SlowConsumerError, InvalidCallbackTypeError,
646
BadTimeoutError, DrainTimeoutError, ConnectionDrainingError,
647
ConnectionReconnectingError, InvalidUserCredentialsError,
648
JsonParseError
649
]
650
651
JETSTREAM_ERRORS = [
652
"APIError", "ServiceUnavailableError", "ServerError", "NotFoundError",
653
"BadRequestError", "NoStreamResponseError", "TooManyStalledMsgsError",
654
"FetchTimeoutError", "ConsumerSequenceMismatchError"
655
]
656
657
KEYVALUE_ERRORS = [
658
"BucketNotFoundError", "BadBucketError", "KeyValueError",
659
"KeyDeletedError", "KeyNotFoundError", "KeyWrongLastSequenceError",
660
"NoKeysError", "KeyHistoryTooLargeError", "InvalidKeyError",
661
"InvalidBucketNameError"
662
]
663
664
OBJECTSTORE_ERRORS = [
665
"InvalidObjectNameError", "BadObjectMetaError", "LinkIsABucketError",
666
"DigestMismatchError", "ObjectNotFoundError", "ObjectDeletedError",
667
"ObjectAlreadyExists"
668
]
669
```