CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gnsq

A gevent based python client for the NSQ distributed messaging platform

Pending
Overview
Eval results
Files

utilities-errors.mddocs/

Utilities and Error Handling

Helper utilities, decorators, backoff timers, and comprehensive error handling for building robust NSQ client applications. These components provide the foundation for reliable message processing with proper error recovery and performance optimization.

Capabilities

BackoffTimer

Implements exponential backoff algorithm with configurable parameters for managing retry delays and connection recovery strategies.

class BackoffTimer:
    def __init__(self, ratio=1, max_interval=None, min_interval=None):
        """
        Initialize exponential backoff timer.
        
        Parameters:
        - ratio (float): Backoff multiplier ratio (default: 1)
        - max_interval (float, optional): Maximum backoff interval in seconds
        - min_interval (float, optional): Minimum backoff interval in seconds
        """

    def is_reset(self):
        """
        Check if timer is at initial state.
        
        Returns:
        bool: True if timer has not recorded any failures
        """

    def reset(self):
        """Reset timer to initial state, clearing all failure history."""

    def success(self):
        """Record a successful operation, decreasing failure count."""

    def failure(self):
        """Record a failed operation, incrementing failure count."""

    def get_interval(self):
        """
        Calculate current exponential backoff interval.
        
        Returns randomized interval within calculated range based on
        failure count, respecting min/max constraints.
        
        Returns:
        float: Backoff interval in seconds
        """

Utility Functions

Address parsing and normalization utilities for NSQ daemon and lookupd connections.

def normalize_nsqd_address(address):
    """
    Normalize an NSQ daemon address.
    
    Ensures address has valid host and port components,
    applying defaults where necessary.
    
    Parameters:
    - address (str): Address in 'host:port' format
    
    Returns:
    tuple: (host, port) with normalized values
    """

def parse_nsqds(nsqd_tcp_addresses):
    """
    Parse and normalize NSQ daemon TCP addresses.
    
    Converts various address formats into standardized
    set of (host, port) tuples.
    
    Parameters:
    - nsqd_tcp_addresses (list): List of address strings
    
    Returns:
    set: Set of normalized (host, port) tuples
    """

def parse_lookupds(lookupd_http_addresses):
    """
    Parse lookupd HTTP addresses into client instances.
    
    Converts address strings into randomized list of
    LookupdClient instances for service discovery.
    
    Parameters:
    - lookupd_http_addresses (list): List of 'host:port' strings
    
    Returns:
    list: List of LookupdClient instances
    """

Decorators

Utility decorators for caching and deprecation warnings.

def cached_property(func):
    """
    Decorator that converts a function into a lazy cached property.
    
    Caches the result of the function on first call and returns 
    the cached value on subsequent calls. Useful for expensive
    computations that don't change.
    
    Parameters:
    - func (callable): Function to be cached
    
    Returns:
    property: Cached property descriptor
    """

def deprecated(func):
    """
    Decorator that marks a function as deprecated.
    
    Issues a deprecation warning when the function is called,
    using the first line of the function's docstring as the
    warning message.
    
    Parameters:
    - func (callable): Function to mark as deprecated
    
    Returns:
    callable: Wrapped function with deprecation warning
    """

Exception Hierarchy

Comprehensive exception classes for NSQ-specific error handling.

class NSQException(Exception):
    """Base exception class for all NSQ-related errors."""

class NSQRequeueMessage(NSQException):
    """Exception to trigger message requeuing."""

class NSQNoConnections(NSQException):
    """Exception raised when no NSQ connections are available."""

class NSQHttpError(NSQException):
    """Exception for HTTP-related NSQ errors."""

class NSQSocketError(NSQException):
    """Exception for socket-related NSQ errors."""

class NSQFrameError(NSQException):
    """Exception for NSQ protocol frame errors."""

class NSQErrorCode(NSQException):
    """
    Base class for NSQ error code exceptions.
    
    Attributes:
    - fatal (bool): Whether the error is fatal and requires connection reset
    """

# Protocol-specific exceptions
class NSQInvalid(NSQErrorCode):
    """Invalid command or parameter error."""

class NSQBadBody(NSQErrorCode):
    """Invalid message body error."""

class NSQBadTopic(NSQErrorCode):
    """Invalid topic name error."""

class NSQBadChannel(NSQErrorCode):
    """Invalid channel name error."""

class NSQBadMessage(NSQErrorCode):
    """Invalid message format error."""

class NSQPutFailed(NSQErrorCode):
    """Put operation failed error."""

class NSQPubFailed(NSQErrorCode):
    """Publish operation failed error."""

class NSQMPubFailed(NSQErrorCode):
    """Multi-publish operation failed error."""

class NSQAuthDisabled(NSQErrorCode):
    """Authentication disabled error."""

class NSQAuthFailed(NSQErrorCode):
    """Authentication failed error."""

class NSQUnauthorized(NSQErrorCode):
    """Unauthorized operation error."""

class NSQFinishFailed(NSQErrorCode):
    """Message finish operation failed error."""

class NSQRequeueFailed(NSQErrorCode):
    """Message requeue operation failed error."""

class NSQTouchFailed(NSQErrorCode):
    """Message touch operation failed error."""

Error Handling Utilities

Functions for creating and managing NSQ error instances.

def make_error():
    """
    Create specific error instances based on NSQ error codes.
    
    Maps NSQ daemon error codes to appropriate exception classes
    and creates instances with relevant error information.
    
    Returns:
    NSQException: Appropriate exception instance for the error code
    """

# Error code mapping
ERROR_CODES = {
    # Dictionary mapping NSQ error codes to exception classes
    # Used internally by make_error() function
}

Usage Examples

Backoff Timer for Retry Logic

import gnsq
import time

def reliable_publisher_with_backoff():
    """Publisher with exponential backoff retry logic."""
    
    producer = gnsq.Producer(['127.0.0.1:4150'])
    backoff_timer = gnsq.BackoffTimer(
        ratio=2.0,           # Double delay each failure
        max_interval=60.0,   # Max 60 second delay
        min_interval=0.1     # Min 100ms delay
    )
    
    producer.start()
    
    while True:
        try:
            # Attempt to publish message
            producer.publish('events', 'test message')
            
            # Success - reset backoff timer
            backoff_timer.success()
            if not backoff_timer.is_reset():
                print("Connection recovered!")
                backoff_timer.reset()
                
            time.sleep(1)  # Normal operation delay
            
        except gnsq.NSQNoConnections:
            # Connection failed - apply backoff
            backoff_timer.failure()
            delay = backoff_timer.get_interval()
            
            print(f"Connection failed, retrying in {delay:.2f}s")
            time.sleep(delay)
            
        except Exception as e:
            print(f"Unexpected error: {e}")
            time.sleep(5)

reliable_publisher_with_backoff()

Comprehensive Error Handling

import gnsq

def robust_message_processor():
    """Consumer with comprehensive error handling."""
    
    consumer = gnsq.Consumer('events', 'processor', '127.0.0.1:4150')
    
    @consumer.on_message.connect
    def handle_message(consumer, message):
        try:
            # Process the message
            result = process_event(message.body)
            message.finish()
            
        except gnsq.NSQRequeueMessage:
            # Explicit requeue request
            message.requeue()
            
        except gnsq.NSQBadMessage:
            # Malformed message - don't requeue
            print(f"Discarding malformed message: {message.id}")
            message.finish()
            
        except gnsq.NSQTouchFailed:
            # Touch operation failed - message may timeout
            print(f"Failed to extend timeout for message: {message.id}")
            # Continue processing, accept potential duplicate
            
        except (gnsq.NSQFinishFailed, gnsq.NSQRequeueFailed) as e:
            # Message response failed - log but continue
            print(f"Failed to respond to message {message.id}: {e}")
            
        except gnsq.NSQSocketError:
            # Connection issue - will be handled by consumer
            print("Socket error during message processing")
            raise  # Let consumer handle reconnection
            
        except Exception as e:
            # Application error - requeue for retry
            print(f"Processing error for message {message.id}: {e}")
            try:
                message.requeue()
            except gnsq.NSQRequeueFailed:
                print("Failed to requeue message - may be redelivered")
    
    @consumer.on_error.connect 
    def handle_consumer_error(consumer, error):
        if isinstance(error, gnsq.NSQAuthFailed):
            print("Authentication failed - check credentials")
        elif isinstance(error, gnsq.NSQUnauthorized):
            print("Unauthorized - check permissions")
        elif isinstance(error, gnsq.NSQHttpError):
            print(f"HTTP error: {error}")
        else:
            print(f"Consumer error: {error}")
    
    consumer.start()

robust_message_processor()

Using Utility Functions

import gnsq

def setup_dynamic_connections():
    """Setup connections using utility functions."""
    
    # Various address formats
    raw_nsqd_addresses = [
        '127.0.0.1:4150',
        'nsqd-2:4150',
        '192.168.1.100'  # Missing port
    ]
    
    raw_lookupd_addresses = [
        '127.0.0.1:4161',
        'lookupd-1:4161'
    ]
    
    # Normalize and parse addresses
    nsqd_addresses = gnsq.parse_nsqds(raw_nsqd_addresses)
    lookupd_clients = gnsq.parse_lookupds(raw_lookupd_addresses)
    
    print("Normalized NSQD addresses:")
    for host, port in nsqd_addresses:
        print(f"  - {host}:{port}")
    
    print("Lookupd clients:")
    for client in lookupd_clients:
        print(f"  - {client.host}:{client.port}")
    
    # Create producer with normalized addresses
    producer_addresses = [f"{host}:{port}" for host, port in nsqd_addresses]
    producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)
    
    # Create consumer with lookupd discovery
    consumer = gnsq.Consumer(
        'events',
        'processor', 
        lookupd_http_addresses=raw_lookupd_addresses
    )
    
    return producer, consumer

producer, consumer = setup_dynamic_connections()

Custom Error Handling with Error Codes

import gnsq

def advanced_error_handling():
    """Demonstrate advanced error code handling."""
    
    producer = gnsq.Producer(['127.0.0.1:4150'])
    
    try:
        producer.start()
        producer.publish('test_topic', 'test message')
        
    except gnsq.NSQErrorCode as e:
        # Handle NSQ protocol-specific errors
        if e.fatal:
            print(f"Fatal NSQ error: {e} - connection will be reset")
            # Perform connection cleanup
        else:
            print(f"Non-fatal NSQ error: {e} - retrying")
            
        # Create specific error instance if needed
        specific_error = gnsq.make_error()  # Based on error context
        
    except gnsq.NSQHttpError as e:
        print(f"HTTP API error: {e}")
        # Handle HTTP-specific errors
        
    except gnsq.NSQSocketError as e:
        print(f"Socket communication error: {e}")
        # Handle network-related errors
        
    except gnsq.NSQException as e:
        print(f"General NSQ error: {e}")
        # Handle any other NSQ-related errors
        
    finally:
        try:
            producer.close()
            producer.join()
        except Exception as e:
            print(f"Error during cleanup: {e}")

advanced_error_handling()

Using Decorators

import gnsq
import warnings

class NSQManager:
    """Example class using gnsq decorators."""
    
    def __init__(self, addresses):
        self._addresses = addresses
        self._producer = None
    
    @gnsq.cached_property
    def connection_count(self):
        """Expensive computation cached after first call."""
        print("Computing connection count...")  # Only runs once
        return len(self._addresses)
    
    @gnsq.deprecated
    def old_publish_method(self, topic, message):
        """This method is deprecated. Use new_publish_method instead."""
        # This will show a deprecation warning when called
        return self.new_publish_method(topic, message)
    
    def new_publish_method(self, topic, message):
        """New preferred method for publishing."""
        if not self._producer:
            self._producer = gnsq.Producer(self._addresses)
            self._producer.start()
        
        return self._producer.publish(topic, message)

# Usage
manager = NSQManager(['127.0.0.1:4150'])

# Cached property - expensive computation only runs once
print(manager.connection_count)  # Computes and caches
print(manager.connection_count)  # Returns cached value

# Deprecated method usage - shows warning
with warnings.catch_warnings(record=True) as w:
    warnings.simplefilter("always")
    manager.old_publish_method('test', 'message')
    if w:
        print(f"Deprecation warning: {w[0].message}")

Install with Tessl CLI

npx tessl i tessl/pypi-gnsq

docs

core-messaging.md

index.md

lookupd-integration.md

message-handling.md

nsqd-clients.md

utilities-errors.md

tile.json