or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

error-handling.mddocs/

0

# Error Handling

1

2

Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.

3

4

## Capabilities

5

6

### Core NATS Errors

7

8

Base error classes and connection-related exceptions.

9

10

```python { .api }

11

class Error(Exception):

12

"""Base NATS error class."""

13

14

class ConnectionClosedError(Error):

15

"""Connection was closed."""

16

17

class TimeoutError(Error):

18

"""Operation timed out."""

19

20

class NoRespondersError(Error):

21

"""No services responded to request."""

22

23

class StaleConnectionError(Error):

24

"""Connection is stale and unusable."""

25

26

class OutboundBufferLimitError(Error):

27

"""Outbound buffer limit exceeded."""

28

29

class UnexpectedEOF(Error):

30

"""Unexpected end of file/connection."""

31

32

class FlushTimeoutError(Error):

33

"""Flush operation timed out."""

34

```

35

36

#### Usage Examples

37

38

```python

39

import asyncio

40

import nats

41

from nats.errors import (

42

ConnectionClosedError, TimeoutError, NoRespondersError,

43

StaleConnectionError, FlushTimeoutError

44

)

45

46

async def robust_client():

47

nc = None

48

try:

49

# Connect with error handling

50

nc = await nats.connect("nats://localhost:4222")

51

52

# Publish with flush error handling

53

await nc.publish("test.subject", b"Hello")

54

await nc.flush(timeout=5.0)

55

56

# Request with timeout and no responders handling

57

response = await nc.request("api.service", b"request", timeout=2.0)

58

print(f"Response: {response.data.decode()}")

59

60

except ConnectionClosedError:

61

print("Connection was closed unexpectedly")

62

except FlushTimeoutError:

63

print("Failed to flush messages within timeout")

64

except TimeoutError:

65

print("Request timed out")

66

except NoRespondersError:

67

print("No service available to handle request")

68

except StaleConnectionError:

69

print("Connection is stale, need to reconnect")

70

except Exception as e:

71

print(f"Unexpected error: {e}")

72

finally:

73

if nc and not nc.is_closed():

74

await nc.close()

75

```

76

77

### Connection and Protocol Errors

78

79

Network and protocol-level error handling.

80

81

```python { .api }

82

class SecureConnRequiredError(Error):

83

"""Server requires secure connection."""

84

85

class SecureConnWantedError(Error):

86

"""Server prefers secure connection."""

87

88

class SecureConnFailedError(Error):

89

"""Secure connection failed to establish."""

90

91

class AuthorizationError(Error):

92

"""Authentication/authorization failed."""

93

94

class NoServersError(Error):

95

"""No servers available for connection."""

96

97

class ProtocolError(Error):

98

"""NATS protocol error."""

99

100

class MaxPayloadError(Error):

101

"""Message payload exceeds maximum size."""

102

```

103

104

#### Usage Examples

105

106

```python

107

from nats.errors import (

108

SecureConnRequiredError, AuthorizationError, NoServersError,

109

MaxPayloadError, ProtocolError

110

)

111

112

async def secure_connection():

113

try:

114

# Try insecure connection first

115

nc = await nats.connect("nats://secure-server:4222")

116

117

except SecureConnRequiredError:

118

print("Server requires TLS, retrying with secure connection")

119

import ssl

120

ssl_ctx = ssl.create_default_context()

121

nc = await nats.connect("tls://secure-server:4443", tls=ssl_ctx)

122

123

except AuthorizationError:

124

print("Authentication failed, check credentials")

125

return None

126

127

except NoServersError:

128

print("No NATS servers available")

129

return None

130

131

try:

132

# Test large message

133

large_message = b"x" * (2 * 1024 * 1024) # 2MB

134

await nc.publish("test", large_message)

135

136

except MaxPayloadError as e:

137

print(f"Message too large: {e}")

138

# Split into smaller chunks

139

chunk_size = nc.max_payload()

140

for i in range(0, len(large_message), chunk_size):

141

chunk = large_message[i:i + chunk_size]

142

await nc.publish(f"test.chunk.{i//chunk_size}", chunk)

143

144

return nc

145

```

146

147

### Subscription and Message Errors

148

149

Subscription and message processing error handling.

150

151

```python { .api }

152

class BadSubscriptionError(Error):

153

"""Invalid subscription parameters."""

154

155

class BadSubjectError(Error):

156

"""Invalid subject format."""

157

158

class SlowConsumerError(Error):

159

"""Consumer cannot keep up with message rate."""

160

161

class InvalidCallbackTypeError(Error):

162

"""Invalid callback function type."""

163

164

class BadTimeoutError(Error):

165

"""Invalid timeout value."""

166

167

class DrainTimeoutError(Error):

168

"""Drain operation timed out."""

169

170

class ConnectionDrainingError(Error):

171

"""Connection is in draining state."""

172

173

class ConnectionReconnectingError(Error):

174

"""Connection is in reconnecting state."""

175

```

176

177

#### Usage Examples

178

179

```python

180

from nats.errors import (

181

BadSubscriptionError, SlowConsumerError, DrainTimeoutError,

182

ConnectionReconnectingError

183

)

184

185

async def robust_subscription():

186

nc = await nats.connect()

187

188

try:

189

# Subscribe with error handling

190

async def message_handler(msg):

191

try:

192

await process_message(msg)

193

except Exception as e:

194

print(f"Message processing error: {e}")

195

196

sub = await nc.subscribe("events.*", cb=message_handler)

197

198

# Monitor for slow consumer

199

while True:

200

await asyncio.sleep(10)

201

202

if sub.pending_msgs() > 10000:

203

print("Warning: High pending message count")

204

205

except BadSubscriptionError as e:

206

print(f"Invalid subscription: {e}")

207

208

except SlowConsumerError:

209

print("Consumer is too slow, increase processing capacity")

210

211

except ConnectionReconnectingError:

212

print("Connection is reconnecting, waiting...")

213

await asyncio.sleep(5) # Wait for reconnection

214

215

finally:

216

try:

217

await nc.drain(timeout=30)

218

except DrainTimeoutError:

219

print("Drain timed out, forcing close")

220

await nc.close()

221

```

222

223

### JetStream Errors

224

225

JetStream-specific error handling for streams and consumers.

226

227

```python { .api }

228

from nats.js.errors import (

229

Error as JSError,

230

APIError,

231

ServiceUnavailableError,

232

ServerError,

233

NotFoundError,

234

BadRequestError,

235

NoStreamResponseError,

236

TooManyStalledMsgsError,

237

FetchTimeoutError,

238

ConsumerSequenceMismatchError

239

)

240

```

241

242

#### Usage Examples

243

244

```python

245

from nats.js.errors import (

246

APIError, NotFoundError, BadRequestError, ServiceUnavailableError,

247

FetchTimeoutError, ConsumerSequenceMismatchError

248

)

249

250

async def jetstream_operations():

251

nc = await nats.connect()

252

js = nc.jetstream()

253

jsm = nc.jsm()

254

255

try:

256

# Create stream with error handling

257

stream_info = await jsm.add_stream(

258

name="events",

259

subjects=["events.*"]

260

)

261

print(f"Created stream: {stream_info.config.name}")

262

263

except BadRequestError as e:

264

print(f"Invalid stream configuration: {e}")

265

266

except ServiceUnavailableError:

267

print("JetStream service unavailable")

268

return

269

270

try:

271

# Publish to JetStream

272

ack = await js.publish("events.test", b"test message")

273

print(f"Published message at sequence {ack.seq}")

274

275

except APIError as e:

276

print(f"JetStream API error: {e}")

277

278

try:

279

# Pull subscribe with error handling

280

psub = await js.pull_subscribe("events.*", durable="test-consumer")

281

282

msgs = await psub.fetch(batch_size=10, timeout=5.0)

283

for msg in msgs:

284

await msg.ack()

285

286

except FetchTimeoutError:

287

print("No messages available within timeout")

288

289

except ConsumerSequenceMismatchError as e:

290

print(f"Consumer sequence mismatch: {e}")

291

# Reset consumer or handle sequence gap

292

293

try:

294

# Get stream info

295

info = await jsm.stream_info("events")

296

print(f"Stream has {info.state.messages} messages")

297

298

except NotFoundError:

299

print("Stream 'events' not found")

300

```

301

302

### Key-Value Store Errors

303

304

Key-value store specific error handling.

305

306

```python { .api }

307

from nats.js.errors import (

308

BucketNotFoundError,

309

BadBucketError,

310

KeyValueError,

311

KeyDeletedError,

312

KeyNotFoundError,

313

KeyWrongLastSequenceError,

314

NoKeysError,

315

KeyHistoryTooLargeError,

316

InvalidKeyError,

317

InvalidBucketNameError

318

)

319

```

320

321

#### Usage Examples

322

323

```python

324

from nats.js.errors import (

325

BucketNotFoundError, KeyNotFoundError, KeyDeletedError,

326

KeyWrongLastSequenceError, InvalidKeyError

327

)

328

329

async def kv_operations():

330

nc = await nats.connect()

331

js = nc.jetstream()

332

333

try:

334

# Get or create KV store

335

kv = await js.key_value("user-sessions")

336

337

except BucketNotFoundError:

338

print("Creating new KV bucket")

339

kv = await js.create_key_value(bucket="user-sessions")

340

341

try:

342

# Get key with error handling

343

entry = await kv.get("session:user123")

344

print(f"Session data: {entry.value.decode()}")

345

346

except KeyNotFoundError:

347

print("Session not found, creating new one")

348

await kv.put("session:user123", b'{"new": "session"}')

349

350

except KeyDeletedError:

351

print("Session was deleted")

352

353

try:

354

# Conditional update with error handling

355

entry = await kv.get("session:user123")

356

updated_data = b'{"updated": "session"}'

357

await kv.update("session:user123", updated_data, entry.revision)

358

359

except KeyWrongLastSequenceError:

360

print("Session was modified by another process")

361

# Retry with latest revision

362

entry = await kv.get("session:user123")

363

await kv.update("session:user123", updated_data, entry.revision)

364

365

try:

366

# Validate key format

367

await kv.put("invalid key name!", b"data")

368

369

except InvalidKeyError as e:

370

print(f"Invalid key format: {e}")

371

```

372

373

### Object Store Errors

374

375

Object store specific error handling.

376

377

```python { .api }

378

from nats.js.errors import (

379

InvalidObjectNameError,

380

BadObjectMetaError,

381

LinkIsABucketError,

382

DigestMismatchError,

383

ObjectNotFoundError,

384

ObjectDeletedError,

385

ObjectAlreadyExists

386

)

387

```

388

389

#### Usage Examples

390

391

```python

392

from nats.js.errors import (

393

ObjectNotFoundError, ObjectDeletedError, ObjectAlreadyExists,

394

DigestMismatchError, BadObjectMetaError

395

)

396

397

async def object_store_operations():

398

nc = await nats.connect()

399

js = nc.jetstream()

400

401

try:

402

os = await js.object_store("file-storage")

403

404

# Store object with error handling

405

obj_info = await os.put("document.pdf", file_data)

406

print(f"Stored object: {obj_info.name}")

407

408

except ObjectAlreadyExists:

409

print("Object already exists")

410

# Update existing object

411

obj_info = await os.put("document.pdf", file_data, replace=True)

412

413

try:

414

# Retrieve object

415

data = await os.get("document.pdf")

416

417

# Verify integrity

418

if verify_checksum(data, expected_checksum):

419

print("Data integrity verified")

420

421

except ObjectNotFoundError:

422

print("Object not found")

423

424

except ObjectDeletedError:

425

print("Object was deleted")

426

427

except DigestMismatchError as e:

428

print(f"Data corruption detected: {e}")

429

# Handle corrupted data

430

431

try:

432

# Update metadata

433

from nats.js.api import ObjectMeta

434

meta = ObjectMeta(

435

name="document.pdf",

436

description="Updated document"

437

)

438

await os.update_meta("document.pdf", meta)

439

440

except BadObjectMetaError as e:

441

print(f"Invalid object metadata: {e}")

442

```

443

444

### Microservices Errors

445

446

Service framework error handling.

447

448

```python { .api }

449

from nats.micro import ServiceError

450

451

class ServiceError(Exception):

452

"""Service error with code and description."""

453

def __init__(self, code: str, description: str):

454

self.code = code

455

self.description = description

456

super().__init__(f"{code}: {description}")

457

```

458

459

#### Usage Examples

460

461

```python

462

from nats.micro import ServiceError, Request

463

464

async def service_handler(request: Request):

465

try:

466

# Process request

467

data = json.loads(request.data.decode())

468

result = await process_service_request(data)

469

470

response = json.dumps(result).encode()

471

await request.respond(response)

472

473

except ValidationError as e:

474

await request.respond_error(

475

code="VALIDATION_ERROR",

476

description=str(e)

477

)

478

479

except AuthenticationError:

480

await request.respond_error(

481

code="UNAUTHORIZED",

482

description="Authentication required"

483

)

484

485

except RateLimitError:

486

await request.respond_error(

487

code="RATE_LIMITED",

488

description="Too many requests"

489

)

490

491

except ServiceError as e:

492

# Re-raise service errors to be handled by framework

493

await request.respond_error(e.code, e.description)

494

495

except Exception as e:

496

# Log unexpected errors

497

logger.error(f"Unexpected service error: {e}")

498

await request.respond_error(

499

code="INTERNAL_ERROR",

500

description="Internal server error"

501

)

502

503

# Custom service errors

504

class BusinessLogicError(ServiceError):

505

def __init__(self, message: str):

506

super().__init__("BUSINESS_LOGIC_ERROR", message)

507

508

class DataValidationError(ServiceError):

509

def __init__(self, field: str, message: str):

510

super().__init__("VALIDATION_ERROR", f"{field}: {message}")

511

512

# Usage in service logic

513

async def create_user_service(request: Request):

514

try:

515

user_data = json.loads(request.data.decode())

516

517

if not user_data.get("email"):

518

raise DataValidationError("email", "Email is required")

519

520

if await user_exists(user_data["email"]):

521

raise BusinessLogicError("User already exists")

522

523

user = await create_user(user_data)

524

await request.respond(json.dumps(user.to_dict()).encode())

525

526

except ServiceError:

527

raise # Let framework handle service errors

528

except Exception as e:

529

raise ServiceError("INTERNAL_ERROR", str(e))

530

```

531

532

## Error Handling Patterns

533

534

### Retry Strategies

535

536

Implement robust retry logic for transient errors.

537

538

```python

539

import asyncio

540

from typing import Callable, Type

541

542

async def retry_with_backoff(

543

operation: Callable,

544

max_retries: int = 3,

545

backoff_factor: float = 2.0,

546

exceptions: tuple = (Exception,)

547

) -> any:

548

"""Retry operation with exponential backoff."""

549

550

for attempt in range(max_retries + 1):

551

try:

552

return await operation()

553

except exceptions as e:

554

if attempt == max_retries:

555

raise e

556

557

wait_time = backoff_factor ** attempt

558

print(f"Attempt {attempt + 1} failed: {e}, retrying in {wait_time}s")

559

await asyncio.sleep(wait_time)

560

561

# Usage

562

async def flaky_operation():

563

# Simulate flaky network operation

564

response = await nc.request("flaky.service", b"request", timeout=1.0)

565

return response

566

567

try:

568

result = await retry_with_backoff(

569

flaky_operation,

570

max_retries=3,

571

exceptions=(TimeoutError, ConnectionClosedError)

572

)

573

except Exception as e:

574

print(f"Operation failed after retries: {e}")

575

```

576

577

### Circuit Breaker Pattern

578

579

Prevent cascading failures with circuit breaker.

580

581

```python

582

import time

583

from enum import Enum

584

585

class CircuitState(Enum):

586

CLOSED = "closed"

587

OPEN = "open"

588

HALF_OPEN = "half_open"

589

590

class CircuitBreaker:

591

def __init__(self, failure_threshold=5, timeout=60):

592

self.failure_threshold = failure_threshold

593

self.timeout = timeout

594

self.failure_count = 0

595

self.last_failure_time = None

596

self.state = CircuitState.CLOSED

597

598

async def call(self, operation):

599

if self.state == CircuitState.OPEN:

600

if time.time() - self.last_failure_time > self.timeout:

601

self.state = CircuitState.HALF_OPEN

602

else:

603

raise Exception("Circuit breaker is open")

604

605

try:

606

result = await operation()

607

self.on_success()

608

return result

609

except Exception as e:

610

self.on_failure()

611

raise e

612

613

def on_success(self):

614

self.failure_count = 0

615

self.state = CircuitState.CLOSED

616

617

def on_failure(self):

618

self.failure_count += 1

619

self.last_failure_time = time.time()

620

621

if self.failure_count >= self.failure_threshold:

622

self.state = CircuitState.OPEN

623

624

# Usage

625

circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)

626

627

async def protected_service_call():

628

try:

629

return await circuit_breaker.call(lambda: nc.request("service", b"data"))

630

except Exception as e:

631

print(f"Service call failed: {e}")

632

return None

633

```

634

635

## Constants

636

637

```python { .api }

638

# Error categories

639

NATS_ERRORS = [

640

Error, ConnectionClosedError, TimeoutError, NoRespondersError,

641

StaleConnectionError, OutboundBufferLimitError, UnexpectedEOF,

642

FlushTimeoutError, SecureConnRequiredError, SecureConnWantedError,

643

SecureConnFailedError, AuthorizationError, NoServersError,

644

ProtocolError, MaxPayloadError, BadSubscriptionError,

645

BadSubjectError, SlowConsumerError, InvalidCallbackTypeError,

646

BadTimeoutError, DrainTimeoutError, ConnectionDrainingError,

647

ConnectionReconnectingError, InvalidUserCredentialsError,

648

JsonParseError

649

]

650

651

JETSTREAM_ERRORS = [

652

"APIError", "ServiceUnavailableError", "ServerError", "NotFoundError",

653

"BadRequestError", "NoStreamResponseError", "TooManyStalledMsgsError",

654

"FetchTimeoutError", "ConsumerSequenceMismatchError"

655

]

656

657

KEYVALUE_ERRORS = [

658

"BucketNotFoundError", "BadBucketError", "KeyValueError",

659

"KeyDeletedError", "KeyNotFoundError", "KeyWrongLastSequenceError",

660

"NoKeysError", "KeyHistoryTooLargeError", "InvalidKeyError",

661

"InvalidBucketNameError"

662

]

663

664

OBJECTSTORE_ERRORS = [

665

"InvalidObjectNameError", "BadObjectMetaError", "LinkIsABucketError",

666

"DigestMismatchError", "ObjectNotFoundError", "ObjectDeletedError",

667

"ObjectAlreadyExists"

668

]

669

```