CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gnsq

A gevent based python client for the NSQ distributed messaging platform

Pending
Overview
Eval results
Files

core-messaging.mddocs/

Core Messaging

High-level producer and consumer classes that provide the primary interface for publishing and consuming messages from NSQ topics. These classes handle connection management, automatic nsqlookupd discovery, and provide convenient event-driven APIs for most messaging applications.

Capabilities

Producer

Publishes messages to NSQ topics with support for single and batch message publishing, connection pooling, and automatic retry logic.

class Producer:
    def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs):
        """
        Initialize a Producer for publishing messages to NSQ.
        
        Parameters:
        - nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses
        - max_backoff_duration (int): Maximum backoff duration in seconds
        - **kwargs: Additional connection parameters (tls_v1, compression, etc.)
        """

    def start(self):
        """Start discovering and listening to connections."""

    def close(self):
        """Immediately close all connections and stop workers."""

    def join(self, timeout=None, raise_error=False):
        """
        Block until all connections close and workers stop.
        
        Parameters:
        - timeout (float, optional): Maximum time to wait in seconds
        - raise_error (bool): Whether to raise exceptions on timeout
        """

    def connect_to_nsqd(self, address, port):
        """
        Establish connection to a specific NSQ daemon.
        
        Parameters:
        - address (str): NSQ daemon host address
        - port (int): NSQ daemon port number
        """

    def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True):
        """
        Publish a single message to a topic.
        
        Parameters:
        - topic (str): Topic name to publish to
        - data (str or bytes): Message data
        - defer (int, optional): Milliseconds to defer message delivery
        - block (bool): Whether to block until publish completes
        - timeout (float, optional): Maximum time to wait for publish
        - raise_error (bool): Whether to raise exceptions on failure
        """

    def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True):
        """
        Publish multiple messages to a topic in a single operation.
        
        Parameters:
        - topic (str): Topic name to publish to
        - messages (list): List of message data (str or bytes)
        - block (bool): Whether to block until publish completes
        - timeout (float, optional): Maximum time to wait for publish
        - raise_error (bool): Whether to raise exceptions on failure
        """

    @property
    def is_running(self):
        """bool: Check if producer is currently active."""

Producer Events

Producers support event signals for monitoring connection and operational status:

# Signal properties available on Producer instances
@property
def on_response(self): ...  # Emitted on successful responses

@property  
def on_error(self): ...     # Emitted on error responses

@property
def on_auth(self): ...      # Emitted on authentication events

@property
def on_close(self): ...     # Emitted when connections close

Consumer

Consumes messages from NSQ topics with support for automatic nsqlookupd discovery, configurable concurrency, message acknowledgment patterns, and comprehensive event handling.

class Consumer:
    def __init__(self, topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], **kwargs):
        """
        Initialize a Consumer for receiving messages from NSQ.
        
        Parameters:
        - topic (str): Topic name to consume from
        - channel (str): Channel name for this consumer
        - nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses
        - lookupd_http_addresses (list): List of 'host:port' lookupd addresses
        - **kwargs: Additional options (max_in_flight, message_timeout, etc.)
        """

    def start(self, block=True):
        """
        Start discovering and listening to connections.
        
        Parameters:
        - block (bool): Whether to block execution until consumer stops
        """

    def close(self):
        """Immediately close all connections and stop workers."""

    def join(self, timeout=None, raise_error=False):
        """
        Block until all connections close and workers stop.
        
        Parameters:
        - timeout (float, optional): Maximum time to wait in seconds
        - raise_error (bool): Whether to raise exceptions on timeout
        """

    def query_nsqd(self):
        """Connect to specified NSQ daemon TCP addresses."""

    def query_lookupd(self):
        """Query lookup daemon for topic producers."""

    def connect_to_nsqd(self, address, port):
        """
        Establish connection to a specific NSQ daemon.
        
        Parameters:
        - address (str): NSQ daemon host address  
        - port (int): NSQ daemon port number
        """

    def redistribute_ready_state(self):
        """Trigger redistribution of message processing readiness across connections."""

    @property
    def is_running(self):
        """bool: Check if consumer is active."""

    @property
    def is_starved(self):
        """bool: Determine if connections are starved for messages."""

    @property
    def total_ready_count(self):
        """int: Total ready message count across all connections."""

    @property
    def total_in_flight(self):
        """int: Total messages currently being processed."""

Consumer Events

Consumers provide comprehensive event handling for message processing lifecycle:

# Signal properties available on Consumer instances
@property
def on_message(self): ...      # Emitted when messages are received

@property
def on_response(self): ...     # Emitted on successful responses

@property
def on_error(self): ...        # Emitted on error responses

@property
def on_finish(self): ...       # Emitted when messages are finished

@property
def on_requeue(self): ...      # Emitted when messages are requeued

@property
def on_giving_up(self): ...    # Emitted when giving up on messages

@property
def on_auth(self): ...         # Emitted on authentication events

@property
def on_exception(self): ...    # Emitted on exceptions

@property
def on_close(self): ...        # Emitted when connections close

Reader (Deprecated)

Legacy consumer class with built-in concurrency support. Use Consumer class instead for new applications.

class Reader:
    def __init__(self, *args, **kwargs):
        """
        Initialize Reader (deprecated).
        
        Use Consumer class instead. Sets up concurrency settings
        and creates message queue if max_concurrency is specified.
        
        Parameters:
        - *args, **kwargs: Same as Consumer parameters
        """

    def start(self, *args, **kwargs):
        """
        Start reader with worker threads.
        
        Spawns worker threads based on max_concurrency setting
        and calls parent Consumer start method.
        """

    def handle_message(self, conn, message):
        """
        Handle incoming message.
        
        Queues messages if max_concurrency is set, otherwise
        calls parent class message handling directly.
        """

    def publish(self, topic, message):
        """
        Publish message (deprecated).
        
        Publishes message to a random NSQ connection.
        Use Producer class instead for publishing.
        
        Parameters:
        - topic (str): Topic name
        - message (str or bytes): Message data
        
        Raises:
        NSQNoConnections: If no connections are available
        """

Usage Examples

Basic Producer Usage

import gnsq

# Create producer with multiple NSQ daemons
producer = gnsq.Producer([
    '127.0.0.1:4150',
    '127.0.0.1:4152'
])

producer.start()

# Publish single message  
producer.publish('events', 'user_signup:12345')

# Publish batch of messages
events = [
    'user_login:12345',
    'page_view:/dashboard',
    'user_logout:12345'
]
producer.multipublish('events', events)

producer.close()
producer.join()

Consumer with Event Handling

import gnsq

# Create consumer with lookupd discovery
consumer = gnsq.Consumer(
    'events', 
    'analytics',
    lookupd_http_addresses=['127.0.0.1:4161']
)

# Message handler
@consumer.on_message.connect
def handle_message(consumer, message):
    try:
        # Process the message
        event_data = message.body.decode('utf-8')
        print(f'Processing: {event_data}')
        
        # Simulate processing work
        process_event(event_data)
        
        # Mark as successfully processed
        message.finish()
        
    except Exception as e:
        print(f'Error processing message: {e}')
        # Requeue for retry (with exponential backoff)
        message.requeue()

# Error handler
@consumer.on_error.connect
def handle_error(consumer, error):
    print(f'Consumer error: {error}')

# Start consuming
consumer.start()

Advanced Consumer Configuration

import gnsq

# Consumer with advanced options
consumer = gnsq.Consumer(
    'high_volume_topic',
    'worker_pool',
    nsqd_tcp_addresses=['127.0.0.1:4150'],
    max_in_flight=100,          # Process up to 100 messages concurrently
    message_timeout=60000,      # 60 second message timeout
    max_backoff_duration=128,   # Maximum backoff time
    tls_v1=True,               # Enable TLS
    compression='deflate'       # Enable compression
)

@consumer.on_message.connect
def handle_high_volume_message(consumer, message):
    # Enable async processing for this message
    message.enable_async()
    
    # Spawn a greenlet to handle processing
    gevent.spawn(process_message_async, message)

def process_message_async(message):
    try:
        # Long-running processing
        result = heavy_computation(message.body)
        
        # Touch message to extend timeout if needed
        if processing_taking_long():
            message.touch()
            
        save_result(result)
        message.finish()
        
    except Exception as e:
        # Requeue with backoff
        message.requeue(backoff=True)

consumer.start()

Install with Tessl CLI

npx tessl i tessl/pypi-gnsq

docs

core-messaging.md

index.md

lookupd-integration.md

message-handling.md

nsqd-clients.md

utilities-errors.md

tile.json