CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gnsq

A gevent based python client for the NSQ distributed messaging platform

Pending
Overview
Eval results
Files

nsqd-clients.mddocs/

NSQ Daemon Clients

Low-level clients for direct communication with NSQ daemons via TCP and HTTP protocols. These clients provide fine-grained control over NSQ operations, administrative functions, and are used internally by the higher-level Producer and Consumer classes.

Capabilities

NsqdTCPClient

Low-level TCP client for NSQ daemon protocol communication. Provides direct access to NSQ's binary protocol for publishing, subscribing, and connection management.

class NsqdTCPClient:
    def connect(self):
        """Establish TCP connection to NSQ daemon."""

    def close_stream(self):
        """Close the TCP connection stream."""

    def send(self):
        """Send command to NSQ daemon over TCP connection."""

    def read_response(self):
        """Read response from NSQ daemon."""

    def subscribe(self):
        """Subscribe to a topic and channel."""

    def publish(self):
        """Publish a message to a topic."""

    def multipublish(self):
        """Publish multiple messages to a topic in batch."""

    def ready(self):
        """Signal readiness to receive messages."""

    def finish(self):
        """Mark a message as finished (FIN command)."""

    def requeue(self):
        """Requeue a message (REQ command)."""

    def touch(self):
        """Touch a message to reset timeout (TOUCH command)."""

    def close(self):
        """Close connection (CLS command)."""

    def nop(self):
        """Send no-operation command (NOP)."""

    def identify(self):
        """Send client identification (IDENTIFY command)."""

    def auth(self):
        """Authenticate with NSQ daemon (AUTH command)."""

NsqdHTTPClient

HTTP client for NSQ daemon administrative operations. Provides RESTful interface for topic/channel management, statistics, and publishing via HTTP.

class NsqdHTTPClient:
    def publish(self):
        """
        Publish a message via HTTP.
        
        HTTP endpoint for publishing single messages to topics.
        Useful for non-persistent connections or web applications.
        """

    def multipublish(self):
        """
        Publish multiple messages via HTTP.
        
        HTTP endpoint for batch publishing multiple messages
        to a topic in a single request.
        """

    def create_topic(self):
        """
        Create a new topic.
        
        Administrative function to create topics on the NSQ daemon.
        Topics are created automatically on first publish, but this
        allows explicit creation.
        """

    def delete_topic(self):
        """
        Delete an existing topic.
        
        Administrative function to delete topics and all associated
        channels and messages from the NSQ daemon.
        """

    def create_channel(self):
        """
        Create a new channel within a topic.
        
        Administrative function to create channels. Channels are
        created automatically when consumers connect, but this
        allows explicit creation.
        """

    def delete_channel(self):
        """
        Delete an existing channel.
        
        Administrative function to delete channels and all
        associated messages from a topic.
        """

    def empty_topic(self):
        """
        Remove all messages from a topic.
        
        Administrative function to clear all messages from
        all channels in a topic without deleting the topic.
        """

    def empty_channel(self):
        """
        Remove all messages from a channel.
        
        Administrative function to clear all messages from
        a specific channel without deleting the channel.
        """

    def pause_topic(self):
        """
        Pause message delivery for a topic.
        
        Administrative function to pause all message delivery
        for all channels in a topic.
        """

    def unpause_topic(self):
        """
        Resume message delivery for a topic.
        
        Administrative function to resume message delivery
        for a previously paused topic.
        """

    def pause_channel(self):
        """
        Pause message delivery for a channel.
        
        Administrative function to pause message delivery
        for a specific channel within a topic.
        """

    def unpause_channel(self):
        """
        Resume message delivery for a channel.
        
        Administrative function to resume message delivery
        for a previously paused channel.
        """

    def stats(self):
        """
        Get NSQ daemon statistics.
        
        Returns comprehensive statistics about topics, channels,
        connections, and message counts from the NSQ daemon.
        
        Returns:
        dict: Statistics data including topics, channels, and metrics
        """

    def ping(self):
        """
        Ping the NSQ daemon.
        
        Health check endpoint to verify NSQ daemon is responding.
        
        Returns:
        str: Response indicating daemon status
        """

    def info(self):
        """
        Get NSQ daemon information.
        
        Returns daemon configuration and version information.
        
        Returns:
        dict: Daemon configuration and version details
        """

Nsqd (Deprecated)

Legacy NSQ daemon client interface. Use NsqdTCPClient or NsqdHTTPClient instead.

class Nsqd:
    def publish(self):
        """Publish message (deprecated - use NsqdTCPClient.publish)."""

    def multipublish(self):
        """Publish multiple messages (deprecated - use NsqdTCPClient.multipublish)."""

Usage Examples

Direct TCP Publishing

import gnsq

# Create TCP client for direct publishing
tcp_client = gnsq.NsqdTCPClient()

# Configure connection parameters
tcp_client.configure(
    address='127.0.0.1',
    port=4150,
    tls_v1=False,
    compression=None
)

# Connect to NSQ daemon
tcp_client.connect()

# Identify client
tcp_client.identify()

# Publish messages directly
tcp_client.publish('events', b'user_action_1')
tcp_client.multipublish('events', [b'event_1', b'event_2', b'event_3'])

# Close connection
tcp_client.close()

Administrative Operations via HTTP

import gnsq

# Create HTTP client for admin operations
http_client = gnsq.NsqdHTTPClient(
    host='127.0.0.1',
    port=4151  # NSQ HTTP port
)

# Create topic and channel
http_client.create_topic('new_events')
http_client.create_channel('new_events', 'processor')

# Get daemon statistics
stats = http_client.stats()
print(f"Topics: {len(stats['topics'])}")
print(f"Total messages: {stats['total_messages']}")

# Publish via HTTP (useful for web applications)
http_client.publish('new_events', 'HTTP published message')

# Administrative pause/resume
http_client.pause_channel('new_events', 'processor')
# ... do maintenance ...
http_client.unpause_channel('new_events', 'processor')

# Cleanup
http_client.empty_topic('new_events')
http_client.delete_topic('new_events')

Low-level Consumer Implementation

import gnsq
import gevent

# Custom consumer using TCP client directly
class CustomConsumer:
    def __init__(self, topic, channel, nsqd_address):
        self.topic = topic
        self.channel = channel
        self.client = gnsq.NsqdTCPClient()
        host, port = nsqd_address.split(':')
        self.client.configure(address=host, port=int(port))
        
    def start(self):
        # Connect and identify
        self.client.connect()
        self.client.identify()
        
        # Subscribe to topic/channel
        self.client.subscribe(self.topic, self.channel)
        
        # Signal ready for messages
        self.client.ready(1)  # Ready for 1 message
        
        # Start message processing loop
        gevent.spawn(self._message_loop)
        
    def _message_loop(self):
        while True:
            # Read response from NSQ
            response = self.client.read_response()
            
            if response['type'] == 'message':
                # Process the message
                message = response['data']
                try:
                    self._handle_message(message)
                    # Send finish command
                    self.client.finish(message['id'])
                except Exception as e:
                    # Send requeue command  
                    self.client.requeue(message['id'])
                
                # Ready for next message
                self.client.ready(1)
                
    def _handle_message(self, message):
        # Custom message processing logic
        print(f"Processing: {message['body']}")
        
# Usage
consumer = CustomConsumer('events', 'custom_processor', '127.0.0.1:4150')
consumer.start()

Monitoring and Health Checks

import gnsq
import time

def monitor_nsq_cluster(nsqd_addresses):
    """Monitor multiple NSQ daemons for health and statistics."""
    
    for address in nsqd_addresses:
        host, port = address.split(':')
        http_port = int(port) + 1  # HTTP port is typically TCP port + 1
        
        try:
            # Create HTTP client for this daemon
            client = gnsq.NsqdHTTPClient(host=host, port=http_port)
            
            # Health check
            ping_response = client.ping()
            print(f"{address}: {ping_response}")
            
            # Get daemon info
            info = client.info()
            print(f"{address}: Version {info['version']}")
            
            # Get statistics
            stats = client.stats()
            print(f"{address}: {len(stats['topics'])} topics, "
                  f"{stats['total_messages']} total messages")
                  
            # Check for unhealthy topics/channels
            for topic in stats['topics']:
                for channel in topic['channels']:
                    if channel['depth'] > 10000:  # High message backlog
                        print(f"WARNING: {topic['name']}/{channel['name']} "
                              f"has {channel['depth']} pending messages")
                        
        except Exception as e:
            print(f"ERROR connecting to {address}: {e}")

# Monitor cluster every 30 seconds
nsqd_cluster = ['127.0.0.1:4150', '127.0.0.1:4152', '127.0.0.1:4154']

while True:
    monitor_nsq_cluster(nsqd_cluster)
    time.sleep(30)

Install with Tessl CLI

npx tessl i tessl/pypi-gnsq

docs

core-messaging.md

index.md

lookupd-integration.md

message-handling.md

nsqd-clients.md

utilities-errors.md

tile.json