CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gnsq

A gevent based python client for the NSQ distributed messaging platform

Pending
Overview
Eval results
Files

lookupd-integration.mddocs/

Lookupd Integration

Client for NSQ lookupd services that provide topic producer discovery and cluster topology information. Lookupd acts as a service discovery mechanism, allowing producers and consumers to find NSQ daemons hosting specific topics without requiring static configuration.

Capabilities

LookupdClient

HTTP client for NSQ lookupd services that enables dynamic discovery of topic producers and cluster topology management.

class LookupdClient:
    def lookup(self, topic):
        """
        Look up producers for a specific topic.
        
        Queries lookupd to find all NSQ daemons that have the specified
        topic available for production or consumption.
        
        Parameters:
        - topic (str): Topic name to lookup
        
        Returns:
        dict: Producer information including addresses and metadata
        """

    def topics(self):
        """
        Get all topics known to lookupd.
        
        Returns a list of all topics across all NSQ daemons
        registered with this lookupd instance.
        
        Returns:
        list: List of topic names
        """

    def channels(self, topic):
        """
        Get all channels for a specific topic.
        
        Returns channels that exist for the specified topic
        across all registered NSQ daemons.
        
        Parameters:
        - topic (str): Topic name to get channels for
        
        Returns:
        list: List of channel names for the topic
        """

    def nodes(self):
        """
        Get all NSQ daemon nodes registered with lookupd.
        
        Returns information about all NSQ daemons that have
        registered with this lookupd instance.
        
        Returns:
        list: List of node information including addresses and metadata
        """

    def create_topic(self, topic):
        """
        Create a topic across the cluster.
        
        Instructs all registered NSQ daemons to create the
        specified topic if it doesn't exist.
        
        Parameters:  
        - topic (str): Topic name to create
        """

    def delete_topic(self, topic):
        """
        Delete a topic across the cluster.
        
        Instructs all registered NSQ daemons to delete the
        specified topic and all associated data.
        
        Parameters:
        - topic (str): Topic name to delete
        """

    def create_channel(self, topic, channel):
        """
        Create a channel for a topic across the cluster.
        
        Instructs all NSQ daemons hosting the topic to create
        the specified channel if it doesn't exist.
        
        Parameters:
        - topic (str): Topic name
        - channel (str): Channel name to create
        """

    def delete_channel(self, topic, channel):
        """
        Delete a channel from a topic across the cluster.
        
        Instructs all NSQ daemons hosting the topic to delete
        the specified channel and all associated messages.
        
        Parameters:
        - topic (str): Topic name  
        - channel (str): Channel name to delete
        """

    def tombstone_topic(self, topic, node):
        """
        Tombstone a topic on a specific node.
        
        Marks a topic as tombstoned on the specified NSQ daemon,
        preventing new messages from being queued while allowing
        existing messages to be processed.
        
        Parameters:
        - topic (str): Topic name to tombstone
        - node (str): NSQ daemon node identifier
        """

    def ping(self):
        """
        Ping the lookupd service.
        
        Health check to verify lookupd is responding and available.
        
        Returns:
        str: Response indicating lookupd status
        """

    def info(self):
        """
        Get lookupd service information.
        
        Returns configuration and version information about
        the lookupd service.
        
        Returns:
        dict: Lookupd configuration and version details
        """

Lookupd (Deprecated)

Legacy lookupd client interface. Use LookupdClient instead.

class Lookupd:
    def tombstone_topic_producer(self, topic, node):
        """
        Tombstone topic producer (deprecated).
        
        Use LookupdClient.tombstone_topic() instead.
        
        Parameters:
        - topic (str): Topic name
        - node (str): Node identifier
        """

Usage Examples

Service Discovery for Producers

import gnsq

# Create lookupd client for service discovery
lookupd = gnsq.LookupdClient(host='127.0.0.1', port=4161)

# Find producers for a topic
topic = 'user_events'
producer_info = lookupd.lookup(topic)

print(f"Producers for {topic}:")
for producer in producer_info['producers']:
    print(f"  - {producer['broadcast_address']}:{producer['tcp_port']}")

# Create producer using discovered addresses
producer_addresses = [
    f"{p['broadcast_address']}:{p['tcp_port']}" 
    for p in producer_info['producers']
]

producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)
producer.start()
producer.publish(topic, 'Hello from discovered producer!')
producer.close()

Dynamic Consumer Configuration

import gnsq

def create_dynamic_consumer(topic, channel, lookupd_addresses):
    """Create consumer with automatic NSQ daemon discovery."""
    
    # Consumer will automatically use lookupd for discovery
    consumer = gnsq.Consumer(
        topic=topic,
        channel=channel,
        lookupd_http_addresses=lookupd_addresses
    )
    
    @consumer.on_message.connect
    def handle_message(consumer, message):
        print(f"Processing message from {topic}: {message.body}")
        message.finish()
    
    return consumer

# Create consumer with lookupd discovery
lookupd_addresses = ['127.0.0.1:4161', '127.0.0.1:4163']  # Multiple lookupds
consumer = create_dynamic_consumer('events', 'processor', lookupd_addresses)

# Consumer will automatically discover and connect to NSQ daemons
consumer.start()

Cluster Topology Management

import gnsq

def manage_cluster_topology(lookupd_host='127.0.0.1', lookupd_port=4161):
    """Manage NSQ cluster topology via lookupd."""
    
    lookupd = gnsq.LookupdClient(host=lookupd_host, port=lookupd_port)
    
    # Get cluster overview
    nodes = lookupd.nodes()
    topics = lookupd.topics()
    
    print(f"Cluster has {len(nodes)} nodes and {len(topics)} topics")
    
    # List all nodes
    print("\nNSQ Daemons:")
    for node in nodes:
        print(f"  - {node['broadcast_address']}:{node['tcp_port']} "
              f"(version {node['version']})")
    
    # List topics and their channels
    print("\nTopics and Channels:")
    for topic in topics:
        channels = lookupd.channels(topic)
        print(f"  - {topic}: {', '.join(channels) if channels else 'no channels'}")
    
    # Create new topic across cluster
    new_topic = 'cluster_events'
    lookupd.create_topic(new_topic)
    print(f"\nCreated topic '{new_topic}' across cluster")
    
    # Create channel for the topic
    lookupd.create_channel(new_topic, 'analytics')
    print(f"Created channel 'analytics' for topic '{new_topic}'")
    
    return lookupd

# Manage cluster
cluster_manager = manage_cluster_topology()

Health Monitoring and Maintenance

import gnsq
import time

def monitor_cluster_health(lookupd_addresses):
    """Monitor NSQ cluster health via lookupd."""
    
    for address in lookupd_addresses:
        host, port = address.split(':')
        
        try:
            lookupd = gnsq.LookupdClient(host=host, port=int(port))
            
            # Health check
            ping_response = lookupd.ping()
            print(f"Lookupd {address}: {ping_response}")
            
            # Get service info
            info = lookupd.info()
            print(f"Lookupd {address}: Version {info['version']}")
            
            # Check registered nodes
            nodes = lookupd.nodes()
            print(f"Lookupd {address}: {len(nodes)} registered nodes")
            
            # Check for unhealthy nodes
            for node in nodes:
                # Node health indicators
                last_update = node.get('last_update', 0)
                if time.time() - last_update > 60:  # No update in 60 seconds
                    print(f"WARNING: Node {node['broadcast_address']} "
                          f"last updated {time.time() - last_update}s ago")
                          
        except Exception as e:
            print(f"ERROR connecting to lookupd {address}: {e}")

def cleanup_stale_topics(lookupd_client, max_age_hours=24):
    """Clean up topics that haven't been used recently."""
    
    topics = lookupd_client.topics()
    
    for topic in topics:
        # Get topic producers to check activity
        producer_info = lookupd_client.lookup(topic)
        
        # Check if topic has any active channels
        channels = lookupd_client.channels(topic)
        
        if not channels:
            print(f"Topic '{topic}' has no channels - considering for cleanup")
            # Additional logic to determine if topic should be deleted
            # lookupd_client.delete_topic(topic)

# Monitor multiple lookupd instances
lookupd_cluster = ['127.0.0.1:4161', '127.0.0.1:4163']

while True:
    print("=== Cluster Health Check ===")
    monitor_cluster_health(lookupd_cluster)
    print()
    time.sleep(30)

Advanced Service Discovery

import gnsq
import random

class SmartProducer:
    """Producer with intelligent NSQ daemon selection."""
    
    def __init__(self, lookupd_addresses, topic):
        self.lookupd_addresses = lookupd_addresses
        self.topic = topic
        self.producers = {}  # Cache producers by NSQ daemon
        
    def _get_best_producer(self):
        """Select best NSQ daemon for production."""
        
        # Try each lookupd until we get topology info
        for lookupd_addr in self.lookupd_addresses:
            try:
                host, port = lookupd_addr.split(':')
                lookupd = gnsq.LookupdClient(host=host, port=int(port))
                
                # Get current topology
                producer_info = lookupd.lookup(self.topic)
                nsqd_nodes = producer_info['producers']
                
                if not nsqd_nodes:
                    continue
                    
                # Select node with lowest message depth (least loaded)
                best_node = min(nsqd_nodes, key=lambda n: n.get('depth', 0))
                nsqd_addr = f"{best_node['broadcast_address']}:{best_node['tcp_port']}"
                
                # Create or reuse producer for this NSQ daemon
                if nsqd_addr not in self.producers:
                    producer = gnsq.Producer([nsqd_addr])
                    producer.start()
                    self.producers[nsqd_addr] = producer
                    
                return self.producers[nsqd_addr]
                
            except Exception as e:
                print(f"Failed to get topology from {lookupd_addr}: {e}")
                continue
                
        raise Exception("No healthy lookupd instances available")
    
    def publish(self, message):
        """Publish message using best available producer."""
        producer = self._get_best_producer()
        producer.publish(self.topic, message)
    
    def close(self):
        """Close all producers."""
        for producer in self.producers.values():
            producer.close()
            producer.join()

# Usage
smart_producer = SmartProducer(
    lookupd_addresses=['127.0.0.1:4161', '127.0.0.1:4163'],
    topic='smart_events'
)

# Publishes will automatically select optimal NSQ daemon
smart_producer.publish('Event 1')
smart_producer.publish('Event 2')

smart_producer.close()

Install with Tessl CLI

npx tessl i tessl/pypi-gnsq

docs

core-messaging.md

index.md

lookupd-integration.md

message-handling.md

nsqd-clients.md

utilities-errors.md

tile.json