A gevent based python client for the NSQ distributed messaging platform
—
Helper utilities, decorators, backoff timers, and comprehensive error handling for building robust NSQ client applications. These components provide the foundation for reliable message processing with proper error recovery and performance optimization.
Implements exponential backoff algorithm with configurable parameters for managing retry delays and connection recovery strategies.
class BackoffTimer:
def __init__(self, ratio=1, max_interval=None, min_interval=None):
"""
Initialize exponential backoff timer.
Parameters:
- ratio (float): Backoff multiplier ratio (default: 1)
- max_interval (float, optional): Maximum backoff interval in seconds
- min_interval (float, optional): Minimum backoff interval in seconds
"""
def is_reset(self):
"""
Check if timer is at initial state.
Returns:
bool: True if timer has not recorded any failures
"""
def reset(self):
"""Reset timer to initial state, clearing all failure history."""
def success(self):
"""Record a successful operation, decreasing failure count."""
def failure(self):
"""Record a failed operation, incrementing failure count."""
def get_interval(self):
"""
Calculate current exponential backoff interval.
Returns randomized interval within calculated range based on
failure count, respecting min/max constraints.
Returns:
float: Backoff interval in seconds
"""Address parsing and normalization utilities for NSQ daemon and lookupd connections.
def normalize_nsqd_address(address):
"""
Normalize an NSQ daemon address.
Ensures address has valid host and port components,
applying defaults where necessary.
Parameters:
- address (str): Address in 'host:port' format
Returns:
tuple: (host, port) with normalized values
"""
def parse_nsqds(nsqd_tcp_addresses):
"""
Parse and normalize NSQ daemon TCP addresses.
Converts various address formats into standardized
set of (host, port) tuples.
Parameters:
- nsqd_tcp_addresses (list): List of address strings
Returns:
set: Set of normalized (host, port) tuples
"""
def parse_lookupds(lookupd_http_addresses):
"""
Parse lookupd HTTP addresses into client instances.
Converts address strings into randomized list of
LookupdClient instances for service discovery.
Parameters:
- lookupd_http_addresses (list): List of 'host:port' strings
Returns:
list: List of LookupdClient instances
"""Utility decorators for caching and deprecation warnings.
def cached_property(func):
"""
Decorator that converts a function into a lazy cached property.
Caches the result of the function on first call and returns
the cached value on subsequent calls. Useful for expensive
computations that don't change.
Parameters:
- func (callable): Function to be cached
Returns:
property: Cached property descriptor
"""
def deprecated(func):
"""
Decorator that marks a function as deprecated.
Issues a deprecation warning when the function is called,
using the first line of the function's docstring as the
warning message.
Parameters:
- func (callable): Function to mark as deprecated
Returns:
callable: Wrapped function with deprecation warning
"""Comprehensive exception classes for NSQ-specific error handling.
class NSQException(Exception):
"""Base exception class for all NSQ-related errors."""
class NSQRequeueMessage(NSQException):
"""Exception to trigger message requeuing."""
class NSQNoConnections(NSQException):
"""Exception raised when no NSQ connections are available."""
class NSQHttpError(NSQException):
"""Exception for HTTP-related NSQ errors."""
class NSQSocketError(NSQException):
"""Exception for socket-related NSQ errors."""
class NSQFrameError(NSQException):
"""Exception for NSQ protocol frame errors."""
class NSQErrorCode(NSQException):
"""
Base class for NSQ error code exceptions.
Attributes:
- fatal (bool): Whether the error is fatal and requires connection reset
"""
# Protocol-specific exceptions
class NSQInvalid(NSQErrorCode):
"""Invalid command or parameter error."""
class NSQBadBody(NSQErrorCode):
"""Invalid message body error."""
class NSQBadTopic(NSQErrorCode):
"""Invalid topic name error."""
class NSQBadChannel(NSQErrorCode):
"""Invalid channel name error."""
class NSQBadMessage(NSQErrorCode):
"""Invalid message format error."""
class NSQPutFailed(NSQErrorCode):
"""Put operation failed error."""
class NSQPubFailed(NSQErrorCode):
"""Publish operation failed error."""
class NSQMPubFailed(NSQErrorCode):
"""Multi-publish operation failed error."""
class NSQAuthDisabled(NSQErrorCode):
"""Authentication disabled error."""
class NSQAuthFailed(NSQErrorCode):
"""Authentication failed error."""
class NSQUnauthorized(NSQErrorCode):
"""Unauthorized operation error."""
class NSQFinishFailed(NSQErrorCode):
"""Message finish operation failed error."""
class NSQRequeueFailed(NSQErrorCode):
"""Message requeue operation failed error."""
class NSQTouchFailed(NSQErrorCode):
"""Message touch operation failed error."""Functions for creating and managing NSQ error instances.
def make_error():
"""
Create specific error instances based on NSQ error codes.
Maps NSQ daemon error codes to appropriate exception classes
and creates instances with relevant error information.
Returns:
NSQException: Appropriate exception instance for the error code
"""
# Error code mapping
ERROR_CODES = {
# Dictionary mapping NSQ error codes to exception classes
# Used internally by make_error() function
}import gnsq
import time
def reliable_publisher_with_backoff():
"""Publisher with exponential backoff retry logic."""
producer = gnsq.Producer(['127.0.0.1:4150'])
backoff_timer = gnsq.BackoffTimer(
ratio=2.0, # Double delay each failure
max_interval=60.0, # Max 60 second delay
min_interval=0.1 # Min 100ms delay
)
producer.start()
while True:
try:
# Attempt to publish message
producer.publish('events', 'test message')
# Success - reset backoff timer
backoff_timer.success()
if not backoff_timer.is_reset():
print("Connection recovered!")
backoff_timer.reset()
time.sleep(1) # Normal operation delay
except gnsq.NSQNoConnections:
# Connection failed - apply backoff
backoff_timer.failure()
delay = backoff_timer.get_interval()
print(f"Connection failed, retrying in {delay:.2f}s")
time.sleep(delay)
except Exception as e:
print(f"Unexpected error: {e}")
time.sleep(5)
reliable_publisher_with_backoff()import gnsq
def robust_message_processor():
"""Consumer with comprehensive error handling."""
consumer = gnsq.Consumer('events', 'processor', '127.0.0.1:4150')
@consumer.on_message.connect
def handle_message(consumer, message):
try:
# Process the message
result = process_event(message.body)
message.finish()
except gnsq.NSQRequeueMessage:
# Explicit requeue request
message.requeue()
except gnsq.NSQBadMessage:
# Malformed message - don't requeue
print(f"Discarding malformed message: {message.id}")
message.finish()
except gnsq.NSQTouchFailed:
# Touch operation failed - message may timeout
print(f"Failed to extend timeout for message: {message.id}")
# Continue processing, accept potential duplicate
except (gnsq.NSQFinishFailed, gnsq.NSQRequeueFailed) as e:
# Message response failed - log but continue
print(f"Failed to respond to message {message.id}: {e}")
except gnsq.NSQSocketError:
# Connection issue - will be handled by consumer
print("Socket error during message processing")
raise # Let consumer handle reconnection
except Exception as e:
# Application error - requeue for retry
print(f"Processing error for message {message.id}: {e}")
try:
message.requeue()
except gnsq.NSQRequeueFailed:
print("Failed to requeue message - may be redelivered")
@consumer.on_error.connect
def handle_consumer_error(consumer, error):
if isinstance(error, gnsq.NSQAuthFailed):
print("Authentication failed - check credentials")
elif isinstance(error, gnsq.NSQUnauthorized):
print("Unauthorized - check permissions")
elif isinstance(error, gnsq.NSQHttpError):
print(f"HTTP error: {error}")
else:
print(f"Consumer error: {error}")
consumer.start()
robust_message_processor()import gnsq
def setup_dynamic_connections():
"""Setup connections using utility functions."""
# Various address formats
raw_nsqd_addresses = [
'127.0.0.1:4150',
'nsqd-2:4150',
'192.168.1.100' # Missing port
]
raw_lookupd_addresses = [
'127.0.0.1:4161',
'lookupd-1:4161'
]
# Normalize and parse addresses
nsqd_addresses = gnsq.parse_nsqds(raw_nsqd_addresses)
lookupd_clients = gnsq.parse_lookupds(raw_lookupd_addresses)
print("Normalized NSQD addresses:")
for host, port in nsqd_addresses:
print(f" - {host}:{port}")
print("Lookupd clients:")
for client in lookupd_clients:
print(f" - {client.host}:{client.port}")
# Create producer with normalized addresses
producer_addresses = [f"{host}:{port}" for host, port in nsqd_addresses]
producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)
# Create consumer with lookupd discovery
consumer = gnsq.Consumer(
'events',
'processor',
lookupd_http_addresses=raw_lookupd_addresses
)
return producer, consumer
producer, consumer = setup_dynamic_connections()import gnsq
def advanced_error_handling():
"""Demonstrate advanced error code handling."""
producer = gnsq.Producer(['127.0.0.1:4150'])
try:
producer.start()
producer.publish('test_topic', 'test message')
except gnsq.NSQErrorCode as e:
# Handle NSQ protocol-specific errors
if e.fatal:
print(f"Fatal NSQ error: {e} - connection will be reset")
# Perform connection cleanup
else:
print(f"Non-fatal NSQ error: {e} - retrying")
# Create specific error instance if needed
specific_error = gnsq.make_error() # Based on error context
except gnsq.NSQHttpError as e:
print(f"HTTP API error: {e}")
# Handle HTTP-specific errors
except gnsq.NSQSocketError as e:
print(f"Socket communication error: {e}")
# Handle network-related errors
except gnsq.NSQException as e:
print(f"General NSQ error: {e}")
# Handle any other NSQ-related errors
finally:
try:
producer.close()
producer.join()
except Exception as e:
print(f"Error during cleanup: {e}")
advanced_error_handling()import gnsq
import warnings
class NSQManager:
"""Example class using gnsq decorators."""
def __init__(self, addresses):
self._addresses = addresses
self._producer = None
@gnsq.cached_property
def connection_count(self):
"""Expensive computation cached after first call."""
print("Computing connection count...") # Only runs once
return len(self._addresses)
@gnsq.deprecated
def old_publish_method(self, topic, message):
"""This method is deprecated. Use new_publish_method instead."""
# This will show a deprecation warning when called
return self.new_publish_method(topic, message)
def new_publish_method(self, topic, message):
"""New preferred method for publishing."""
if not self._producer:
self._producer = gnsq.Producer(self._addresses)
self._producer.start()
return self._producer.publish(topic, message)
# Usage
manager = NSQManager(['127.0.0.1:4150'])
# Cached property - expensive computation only runs once
print(manager.connection_count) # Computes and caches
print(manager.connection_count) # Returns cached value
# Deprecated method usage - shows warning
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
manager.old_publish_method('test', 'message')
if w:
print(f"Deprecation warning: {w[0].message}")Install with Tessl CLI
npx tessl i tessl/pypi-gnsq