0
# Utilities and Error Handling
1
2
Helper utilities, decorators, backoff timers, and comprehensive error handling for building robust NSQ client applications. These components provide the foundation for reliable message processing with proper error recovery and performance optimization.
3
4
## Capabilities
5
6
### BackoffTimer
7
8
Implements exponential backoff algorithm with configurable parameters for managing retry delays and connection recovery strategies.
9
10
```python { .api }
11
class BackoffTimer:
12
def __init__(self, ratio=1, max_interval=None, min_interval=None):
13
"""
14
Initialize exponential backoff timer.
15
16
Parameters:
17
- ratio (float): Backoff multiplier ratio (default: 1)
18
- max_interval (float, optional): Maximum backoff interval in seconds
19
- min_interval (float, optional): Minimum backoff interval in seconds
20
"""
21
22
def is_reset(self):
23
"""
24
Check if timer is at initial state.
25
26
Returns:
27
bool: True if timer has not recorded any failures
28
"""
29
30
def reset(self):
31
"""Reset timer to initial state, clearing all failure history."""
32
33
def success(self):
34
"""Record a successful operation, decreasing failure count."""
35
36
def failure(self):
37
"""Record a failed operation, incrementing failure count."""
38
39
def get_interval(self):
40
"""
41
Calculate current exponential backoff interval.
42
43
Returns randomized interval within calculated range based on
44
failure count, respecting min/max constraints.
45
46
Returns:
47
float: Backoff interval in seconds
48
"""
49
```
50
51
### Utility Functions
52
53
Address parsing and normalization utilities for NSQ daemon and lookupd connections.
54
55
```python { .api }
56
def normalize_nsqd_address(address):
57
"""
58
Normalize an NSQ daemon address.
59
60
Ensures address has valid host and port components,
61
applying defaults where necessary.
62
63
Parameters:
64
- address (str): Address in 'host:port' format
65
66
Returns:
67
tuple: (host, port) with normalized values
68
"""
69
70
def parse_nsqds(nsqd_tcp_addresses):
71
"""
72
Parse and normalize NSQ daemon TCP addresses.
73
74
Converts various address formats into standardized
75
set of (host, port) tuples.
76
77
Parameters:
78
- nsqd_tcp_addresses (list): List of address strings
79
80
Returns:
81
set: Set of normalized (host, port) tuples
82
"""
83
84
def parse_lookupds(lookupd_http_addresses):
85
"""
86
Parse lookupd HTTP addresses into client instances.
87
88
Converts address strings into randomized list of
89
LookupdClient instances for service discovery.
90
91
Parameters:
92
- lookupd_http_addresses (list): List of 'host:port' strings
93
94
Returns:
95
list: List of LookupdClient instances
96
"""
97
```
98
99
### Decorators
100
101
Utility decorators for caching and deprecation warnings.
102
103
```python { .api }
104
def cached_property(func):
105
"""
106
Decorator that converts a function into a lazy cached property.
107
108
Caches the result of the function on first call and returns
109
the cached value on subsequent calls. Useful for expensive
110
computations that don't change.
111
112
Parameters:
113
- func (callable): Function to be cached
114
115
Returns:
116
property: Cached property descriptor
117
"""
118
119
def deprecated(func):
120
"""
121
Decorator that marks a function as deprecated.
122
123
Issues a deprecation warning when the function is called,
124
using the first line of the function's docstring as the
125
warning message.
126
127
Parameters:
128
- func (callable): Function to mark as deprecated
129
130
Returns:
131
callable: Wrapped function with deprecation warning
132
"""
133
```
134
135
### Exception Hierarchy
136
137
Comprehensive exception classes for NSQ-specific error handling.
138
139
```python { .api }
140
class NSQException(Exception):
141
"""Base exception class for all NSQ-related errors."""
142
143
class NSQRequeueMessage(NSQException):
144
"""Exception to trigger message requeuing."""
145
146
class NSQNoConnections(NSQException):
147
"""Exception raised when no NSQ connections are available."""
148
149
class NSQHttpError(NSQException):
150
"""Exception for HTTP-related NSQ errors."""
151
152
class NSQSocketError(NSQException):
153
"""Exception for socket-related NSQ errors."""
154
155
class NSQFrameError(NSQException):
156
"""Exception for NSQ protocol frame errors."""
157
158
class NSQErrorCode(NSQException):
159
"""
160
Base class for NSQ error code exceptions.
161
162
Attributes:
163
- fatal (bool): Whether the error is fatal and requires connection reset
164
"""
165
166
# Protocol-specific exceptions
167
class NSQInvalid(NSQErrorCode):
168
"""Invalid command or parameter error."""
169
170
class NSQBadBody(NSQErrorCode):
171
"""Invalid message body error."""
172
173
class NSQBadTopic(NSQErrorCode):
174
"""Invalid topic name error."""
175
176
class NSQBadChannel(NSQErrorCode):
177
"""Invalid channel name error."""
178
179
class NSQBadMessage(NSQErrorCode):
180
"""Invalid message format error."""
181
182
class NSQPutFailed(NSQErrorCode):
183
"""Put operation failed error."""
184
185
class NSQPubFailed(NSQErrorCode):
186
"""Publish operation failed error."""
187
188
class NSQMPubFailed(NSQErrorCode):
189
"""Multi-publish operation failed error."""
190
191
class NSQAuthDisabled(NSQErrorCode):
192
"""Authentication disabled error."""
193
194
class NSQAuthFailed(NSQErrorCode):
195
"""Authentication failed error."""
196
197
class NSQUnauthorized(NSQErrorCode):
198
"""Unauthorized operation error."""
199
200
class NSQFinishFailed(NSQErrorCode):
201
"""Message finish operation failed error."""
202
203
class NSQRequeueFailed(NSQErrorCode):
204
"""Message requeue operation failed error."""
205
206
class NSQTouchFailed(NSQErrorCode):
207
"""Message touch operation failed error."""
208
```
209
210
### Error Handling Utilities
211
212
Functions for creating and managing NSQ error instances.
213
214
```python { .api }
215
def make_error():
216
"""
217
Create specific error instances based on NSQ error codes.
218
219
Maps NSQ daemon error codes to appropriate exception classes
220
and creates instances with relevant error information.
221
222
Returns:
223
NSQException: Appropriate exception instance for the error code
224
"""
225
226
# Error code mapping
227
ERROR_CODES = {
228
# Dictionary mapping NSQ error codes to exception classes
229
# Used internally by make_error() function
230
}
231
```
232
233
## Usage Examples
234
235
### Backoff Timer for Retry Logic
236
237
```python
238
import gnsq
239
import time
240
241
def reliable_publisher_with_backoff():
242
"""Publisher with exponential backoff retry logic."""
243
244
producer = gnsq.Producer(['127.0.0.1:4150'])
245
backoff_timer = gnsq.BackoffTimer(
246
ratio=2.0, # Double delay each failure
247
max_interval=60.0, # Max 60 second delay
248
min_interval=0.1 # Min 100ms delay
249
)
250
251
producer.start()
252
253
while True:
254
try:
255
# Attempt to publish message
256
producer.publish('events', 'test message')
257
258
# Success - reset backoff timer
259
backoff_timer.success()
260
if not backoff_timer.is_reset():
261
print("Connection recovered!")
262
backoff_timer.reset()
263
264
time.sleep(1) # Normal operation delay
265
266
except gnsq.NSQNoConnections:
267
# Connection failed - apply backoff
268
backoff_timer.failure()
269
delay = backoff_timer.get_interval()
270
271
print(f"Connection failed, retrying in {delay:.2f}s")
272
time.sleep(delay)
273
274
except Exception as e:
275
print(f"Unexpected error: {e}")
276
time.sleep(5)
277
278
reliable_publisher_with_backoff()
279
```
280
281
### Comprehensive Error Handling
282
283
```python
284
import gnsq
285
286
def robust_message_processor():
287
"""Consumer with comprehensive error handling."""
288
289
consumer = gnsq.Consumer('events', 'processor', '127.0.0.1:4150')
290
291
@consumer.on_message.connect
292
def handle_message(consumer, message):
293
try:
294
# Process the message
295
result = process_event(message.body)
296
message.finish()
297
298
except gnsq.NSQRequeueMessage:
299
# Explicit requeue request
300
message.requeue()
301
302
except gnsq.NSQBadMessage:
303
# Malformed message - don't requeue
304
print(f"Discarding malformed message: {message.id}")
305
message.finish()
306
307
except gnsq.NSQTouchFailed:
308
# Touch operation failed - message may timeout
309
print(f"Failed to extend timeout for message: {message.id}")
310
# Continue processing, accept potential duplicate
311
312
except (gnsq.NSQFinishFailed, gnsq.NSQRequeueFailed) as e:
313
# Message response failed - log but continue
314
print(f"Failed to respond to message {message.id}: {e}")
315
316
except gnsq.NSQSocketError:
317
# Connection issue - will be handled by consumer
318
print("Socket error during message processing")
319
raise # Let consumer handle reconnection
320
321
except Exception as e:
322
# Application error - requeue for retry
323
print(f"Processing error for message {message.id}: {e}")
324
try:
325
message.requeue()
326
except gnsq.NSQRequeueFailed:
327
print("Failed to requeue message - may be redelivered")
328
329
@consumer.on_error.connect
330
def handle_consumer_error(consumer, error):
331
if isinstance(error, gnsq.NSQAuthFailed):
332
print("Authentication failed - check credentials")
333
elif isinstance(error, gnsq.NSQUnauthorized):
334
print("Unauthorized - check permissions")
335
elif isinstance(error, gnsq.NSQHttpError):
336
print(f"HTTP error: {error}")
337
else:
338
print(f"Consumer error: {error}")
339
340
consumer.start()
341
342
robust_message_processor()
343
```
344
345
### Using Utility Functions
346
347
```python
348
import gnsq
349
350
def setup_dynamic_connections():
351
"""Setup connections using utility functions."""
352
353
# Various address formats
354
raw_nsqd_addresses = [
355
'127.0.0.1:4150',
356
'nsqd-2:4150',
357
'192.168.1.100' # Missing port
358
]
359
360
raw_lookupd_addresses = [
361
'127.0.0.1:4161',
362
'lookupd-1:4161'
363
]
364
365
# Normalize and parse addresses
366
nsqd_addresses = gnsq.parse_nsqds(raw_nsqd_addresses)
367
lookupd_clients = gnsq.parse_lookupds(raw_lookupd_addresses)
368
369
print("Normalized NSQD addresses:")
370
for host, port in nsqd_addresses:
371
print(f" - {host}:{port}")
372
373
print("Lookupd clients:")
374
for client in lookupd_clients:
375
print(f" - {client.host}:{client.port}")
376
377
# Create producer with normalized addresses
378
producer_addresses = [f"{host}:{port}" for host, port in nsqd_addresses]
379
producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)
380
381
# Create consumer with lookupd discovery
382
consumer = gnsq.Consumer(
383
'events',
384
'processor',
385
lookupd_http_addresses=raw_lookupd_addresses
386
)
387
388
return producer, consumer
389
390
producer, consumer = setup_dynamic_connections()
391
```
392
393
### Custom Error Handling with Error Codes
394
395
```python
396
import gnsq
397
398
def advanced_error_handling():
399
"""Demonstrate advanced error code handling."""
400
401
producer = gnsq.Producer(['127.0.0.1:4150'])
402
403
try:
404
producer.start()
405
producer.publish('test_topic', 'test message')
406
407
except gnsq.NSQErrorCode as e:
408
# Handle NSQ protocol-specific errors
409
if e.fatal:
410
print(f"Fatal NSQ error: {e} - connection will be reset")
411
# Perform connection cleanup
412
else:
413
print(f"Non-fatal NSQ error: {e} - retrying")
414
415
# Create specific error instance if needed
416
specific_error = gnsq.make_error() # Based on error context
417
418
except gnsq.NSQHttpError as e:
419
print(f"HTTP API error: {e}")
420
# Handle HTTP-specific errors
421
422
except gnsq.NSQSocketError as e:
423
print(f"Socket communication error: {e}")
424
# Handle network-related errors
425
426
except gnsq.NSQException as e:
427
print(f"General NSQ error: {e}")
428
# Handle any other NSQ-related errors
429
430
finally:
431
try:
432
producer.close()
433
producer.join()
434
except Exception as e:
435
print(f"Error during cleanup: {e}")
436
437
advanced_error_handling()
438
```
439
440
### Using Decorators
441
442
```python
443
import gnsq
444
import warnings
445
446
class NSQManager:
447
"""Example class using gnsq decorators."""
448
449
def __init__(self, addresses):
450
self._addresses = addresses
451
self._producer = None
452
453
@gnsq.cached_property
454
def connection_count(self):
455
"""Expensive computation cached after first call."""
456
print("Computing connection count...") # Only runs once
457
return len(self._addresses)
458
459
@gnsq.deprecated
460
def old_publish_method(self, topic, message):
461
"""This method is deprecated. Use new_publish_method instead."""
462
# This will show a deprecation warning when called
463
return self.new_publish_method(topic, message)
464
465
def new_publish_method(self, topic, message):
466
"""New preferred method for publishing."""
467
if not self._producer:
468
self._producer = gnsq.Producer(self._addresses)
469
self._producer.start()
470
471
return self._producer.publish(topic, message)
472
473
# Usage
474
manager = NSQManager(['127.0.0.1:4150'])
475
476
# Cached property - expensive computation only runs once
477
print(manager.connection_count) # Computes and caches
478
print(manager.connection_count) # Returns cached value
479
480
# Deprecated method usage - shows warning
481
with warnings.catch_warnings(record=True) as w:
482
warnings.simplefilter("always")
483
manager.old_publish_method('test', 'message')
484
if w:
485
print(f"Deprecation warning: {w[0].message}")
486
```