or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-messaging.mdindex.mdlookupd-integration.mdmessage-handling.mdnsqd-clients.mdutilities-errors.md

utilities-errors.mddocs/

0

# Utilities and Error Handling

1

2

Helper utilities, decorators, backoff timers, and comprehensive error handling for building robust NSQ client applications. These components provide the foundation for reliable message processing with proper error recovery and performance optimization.

3

4

## Capabilities

5

6

### BackoffTimer

7

8

Implements exponential backoff algorithm with configurable parameters for managing retry delays and connection recovery strategies.

9

10

```python { .api }

11

class BackoffTimer:

12

def __init__(self, ratio=1, max_interval=None, min_interval=None):

13

"""

14

Initialize exponential backoff timer.

15

16

Parameters:

17

- ratio (float): Backoff multiplier ratio (default: 1)

18

- max_interval (float, optional): Maximum backoff interval in seconds

19

- min_interval (float, optional): Minimum backoff interval in seconds

20

"""

21

22

def is_reset(self):

23

"""

24

Check if timer is at initial state.

25

26

Returns:

27

bool: True if timer has not recorded any failures

28

"""

29

30

def reset(self):

31

"""Reset timer to initial state, clearing all failure history."""

32

33

def success(self):

34

"""Record a successful operation, decreasing failure count."""

35

36

def failure(self):

37

"""Record a failed operation, incrementing failure count."""

38

39

def get_interval(self):

40

"""

41

Calculate current exponential backoff interval.

42

43

Returns randomized interval within calculated range based on

44

failure count, respecting min/max constraints.

45

46

Returns:

47

float: Backoff interval in seconds

48

"""

49

```

50

51

### Utility Functions

52

53

Address parsing and normalization utilities for NSQ daemon and lookupd connections.

54

55

```python { .api }

56

def normalize_nsqd_address(address):

57

"""

58

Normalize an NSQ daemon address.

59

60

Ensures address has valid host and port components,

61

applying defaults where necessary.

62

63

Parameters:

64

- address (str): Address in 'host:port' format

65

66

Returns:

67

tuple: (host, port) with normalized values

68

"""

69

70

def parse_nsqds(nsqd_tcp_addresses):

71

"""

72

Parse and normalize NSQ daemon TCP addresses.

73

74

Converts various address formats into standardized

75

set of (host, port) tuples.

76

77

Parameters:

78

- nsqd_tcp_addresses (list): List of address strings

79

80

Returns:

81

set: Set of normalized (host, port) tuples

82

"""

83

84

def parse_lookupds(lookupd_http_addresses):

85

"""

86

Parse lookupd HTTP addresses into client instances.

87

88

Converts address strings into randomized list of

89

LookupdClient instances for service discovery.

90

91

Parameters:

92

- lookupd_http_addresses (list): List of 'host:port' strings

93

94

Returns:

95

list: List of LookupdClient instances

96

"""

97

```

98

99

### Decorators

100

101

Utility decorators for caching and deprecation warnings.

102

103

```python { .api }

104

def cached_property(func):

105

"""

106

Decorator that converts a function into a lazy cached property.

107

108

Caches the result of the function on first call and returns

109

the cached value on subsequent calls. Useful for expensive

110

computations that don't change.

111

112

Parameters:

113

- func (callable): Function to be cached

114

115

Returns:

116

property: Cached property descriptor

117

"""

118

119

def deprecated(func):

120

"""

121

Decorator that marks a function as deprecated.

122

123

Issues a deprecation warning when the function is called,

124

using the first line of the function's docstring as the

125

warning message.

126

127

Parameters:

128

- func (callable): Function to mark as deprecated

129

130

Returns:

131

callable: Wrapped function with deprecation warning

132

"""

133

```

134

135

### Exception Hierarchy

136

137

Comprehensive exception classes for NSQ-specific error handling.

138

139

```python { .api }

140

class NSQException(Exception):

141

"""Base exception class for all NSQ-related errors."""

142

143

class NSQRequeueMessage(NSQException):

144

"""Exception to trigger message requeuing."""

145

146

class NSQNoConnections(NSQException):

147

"""Exception raised when no NSQ connections are available."""

148

149

class NSQHttpError(NSQException):

150

"""Exception for HTTP-related NSQ errors."""

151

152

class NSQSocketError(NSQException):

153

"""Exception for socket-related NSQ errors."""

154

155

class NSQFrameError(NSQException):

156

"""Exception for NSQ protocol frame errors."""

157

158

class NSQErrorCode(NSQException):

159

"""

160

Base class for NSQ error code exceptions.

161

162

Attributes:

163

- fatal (bool): Whether the error is fatal and requires connection reset

164

"""

165

166

# Protocol-specific exceptions

167

class NSQInvalid(NSQErrorCode):

168

"""Invalid command or parameter error."""

169

170

class NSQBadBody(NSQErrorCode):

171

"""Invalid message body error."""

172

173

class NSQBadTopic(NSQErrorCode):

174

"""Invalid topic name error."""

175

176

class NSQBadChannel(NSQErrorCode):

177

"""Invalid channel name error."""

178

179

class NSQBadMessage(NSQErrorCode):

180

"""Invalid message format error."""

181

182

class NSQPutFailed(NSQErrorCode):

183

"""Put operation failed error."""

184

185

class NSQPubFailed(NSQErrorCode):

186

"""Publish operation failed error."""

187

188

class NSQMPubFailed(NSQErrorCode):

189

"""Multi-publish operation failed error."""

190

191

class NSQAuthDisabled(NSQErrorCode):

192

"""Authentication disabled error."""

193

194

class NSQAuthFailed(NSQErrorCode):

195

"""Authentication failed error."""

196

197

class NSQUnauthorized(NSQErrorCode):

198

"""Unauthorized operation error."""

199

200

class NSQFinishFailed(NSQErrorCode):

201

"""Message finish operation failed error."""

202

203

class NSQRequeueFailed(NSQErrorCode):

204

"""Message requeue operation failed error."""

205

206

class NSQTouchFailed(NSQErrorCode):

207

"""Message touch operation failed error."""

208

```

209

210

### Error Handling Utilities

211

212

Functions for creating and managing NSQ error instances.

213

214

```python { .api }

215

def make_error():

216

"""

217

Create specific error instances based on NSQ error codes.

218

219

Maps NSQ daemon error codes to appropriate exception classes

220

and creates instances with relevant error information.

221

222

Returns:

223

NSQException: Appropriate exception instance for the error code

224

"""

225

226

# Error code mapping

227

ERROR_CODES = {

228

# Dictionary mapping NSQ error codes to exception classes

229

# Used internally by make_error() function

230

}

231

```

232

233

## Usage Examples

234

235

### Backoff Timer for Retry Logic

236

237

```python

238

import gnsq

239

import time

240

241

def reliable_publisher_with_backoff():

242

"""Publisher with exponential backoff retry logic."""

243

244

producer = gnsq.Producer(['127.0.0.1:4150'])

245

backoff_timer = gnsq.BackoffTimer(

246

ratio=2.0, # Double delay each failure

247

max_interval=60.0, # Max 60 second delay

248

min_interval=0.1 # Min 100ms delay

249

)

250

251

producer.start()

252

253

while True:

254

try:

255

# Attempt to publish message

256

producer.publish('events', 'test message')

257

258

# Success - reset backoff timer

259

backoff_timer.success()

260

if not backoff_timer.is_reset():

261

print("Connection recovered!")

262

backoff_timer.reset()

263

264

time.sleep(1) # Normal operation delay

265

266

except gnsq.NSQNoConnections:

267

# Connection failed - apply backoff

268

backoff_timer.failure()

269

delay = backoff_timer.get_interval()

270

271

print(f"Connection failed, retrying in {delay:.2f}s")

272

time.sleep(delay)

273

274

except Exception as e:

275

print(f"Unexpected error: {e}")

276

time.sleep(5)

277

278

reliable_publisher_with_backoff()

279

```

280

281

### Comprehensive Error Handling

282

283

```python

284

import gnsq

285

286

def robust_message_processor():

287

"""Consumer with comprehensive error handling."""

288

289

consumer = gnsq.Consumer('events', 'processor', '127.0.0.1:4150')

290

291

@consumer.on_message.connect

292

def handle_message(consumer, message):

293

try:

294

# Process the message

295

result = process_event(message.body)

296

message.finish()

297

298

except gnsq.NSQRequeueMessage:

299

# Explicit requeue request

300

message.requeue()

301

302

except gnsq.NSQBadMessage:

303

# Malformed message - don't requeue

304

print(f"Discarding malformed message: {message.id}")

305

message.finish()

306

307

except gnsq.NSQTouchFailed:

308

# Touch operation failed - message may timeout

309

print(f"Failed to extend timeout for message: {message.id}")

310

# Continue processing, accept potential duplicate

311

312

except (gnsq.NSQFinishFailed, gnsq.NSQRequeueFailed) as e:

313

# Message response failed - log but continue

314

print(f"Failed to respond to message {message.id}: {e}")

315

316

except gnsq.NSQSocketError:

317

# Connection issue - will be handled by consumer

318

print("Socket error during message processing")

319

raise # Let consumer handle reconnection

320

321

except Exception as e:

322

# Application error - requeue for retry

323

print(f"Processing error for message {message.id}: {e}")

324

try:

325

message.requeue()

326

except gnsq.NSQRequeueFailed:

327

print("Failed to requeue message - may be redelivered")

328

329

@consumer.on_error.connect

330

def handle_consumer_error(consumer, error):

331

if isinstance(error, gnsq.NSQAuthFailed):

332

print("Authentication failed - check credentials")

333

elif isinstance(error, gnsq.NSQUnauthorized):

334

print("Unauthorized - check permissions")

335

elif isinstance(error, gnsq.NSQHttpError):

336

print(f"HTTP error: {error}")

337

else:

338

print(f"Consumer error: {error}")

339

340

consumer.start()

341

342

robust_message_processor()

343

```

344

345

### Using Utility Functions

346

347

```python

348

import gnsq

349

350

def setup_dynamic_connections():

351

"""Setup connections using utility functions."""

352

353

# Various address formats

354

raw_nsqd_addresses = [

355

'127.0.0.1:4150',

356

'nsqd-2:4150',

357

'192.168.1.100' # Missing port

358

]

359

360

raw_lookupd_addresses = [

361

'127.0.0.1:4161',

362

'lookupd-1:4161'

363

]

364

365

# Normalize and parse addresses

366

nsqd_addresses = gnsq.parse_nsqds(raw_nsqd_addresses)

367

lookupd_clients = gnsq.parse_lookupds(raw_lookupd_addresses)

368

369

print("Normalized NSQD addresses:")

370

for host, port in nsqd_addresses:

371

print(f" - {host}:{port}")

372

373

print("Lookupd clients:")

374

for client in lookupd_clients:

375

print(f" - {client.host}:{client.port}")

376

377

# Create producer with normalized addresses

378

producer_addresses = [f"{host}:{port}" for host, port in nsqd_addresses]

379

producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)

380

381

# Create consumer with lookupd discovery

382

consumer = gnsq.Consumer(

383

'events',

384

'processor',

385

lookupd_http_addresses=raw_lookupd_addresses

386

)

387

388

return producer, consumer

389

390

producer, consumer = setup_dynamic_connections()

391

```

392

393

### Custom Error Handling with Error Codes

394

395

```python

396

import gnsq

397

398

def advanced_error_handling():

399

"""Demonstrate advanced error code handling."""

400

401

producer = gnsq.Producer(['127.0.0.1:4150'])

402

403

try:

404

producer.start()

405

producer.publish('test_topic', 'test message')

406

407

except gnsq.NSQErrorCode as e:

408

# Handle NSQ protocol-specific errors

409

if e.fatal:

410

print(f"Fatal NSQ error: {e} - connection will be reset")

411

# Perform connection cleanup

412

else:

413

print(f"Non-fatal NSQ error: {e} - retrying")

414

415

# Create specific error instance if needed

416

specific_error = gnsq.make_error() # Based on error context

417

418

except gnsq.NSQHttpError as e:

419

print(f"HTTP API error: {e}")

420

# Handle HTTP-specific errors

421

422

except gnsq.NSQSocketError as e:

423

print(f"Socket communication error: {e}")

424

# Handle network-related errors

425

426

except gnsq.NSQException as e:

427

print(f"General NSQ error: {e}")

428

# Handle any other NSQ-related errors

429

430

finally:

431

try:

432

producer.close()

433

producer.join()

434

except Exception as e:

435

print(f"Error during cleanup: {e}")

436

437

advanced_error_handling()

438

```

439

440

### Using Decorators

441

442

```python

443

import gnsq

444

import warnings

445

446

class NSQManager:

447

"""Example class using gnsq decorators."""

448

449

def __init__(self, addresses):

450

self._addresses = addresses

451

self._producer = None

452

453

@gnsq.cached_property

454

def connection_count(self):

455

"""Expensive computation cached after first call."""

456

print("Computing connection count...") # Only runs once

457

return len(self._addresses)

458

459

@gnsq.deprecated

460

def old_publish_method(self, topic, message):

461

"""This method is deprecated. Use new_publish_method instead."""

462

# This will show a deprecation warning when called

463

return self.new_publish_method(topic, message)

464

465

def new_publish_method(self, topic, message):

466

"""New preferred method for publishing."""

467

if not self._producer:

468

self._producer = gnsq.Producer(self._addresses)

469

self._producer.start()

470

471

return self._producer.publish(topic, message)

472

473

# Usage

474

manager = NSQManager(['127.0.0.1:4150'])

475

476

# Cached property - expensive computation only runs once

477

print(manager.connection_count) # Computes and caches

478

print(manager.connection_count) # Returns cached value

479

480

# Deprecated method usage - shows warning

481

with warnings.catch_warnings(record=True) as w:

482

warnings.simplefilter("always")

483

manager.old_publish_method('test', 'message')

484

if w:

485

print(f"Deprecation warning: {w[0].message}")

486

```