A gevent based python client for the NSQ distributed messaging platform
—
Client for NSQ lookupd services that provide topic producer discovery and cluster topology information. Lookupd acts as a service discovery mechanism, allowing producers and consumers to find NSQ daemons hosting specific topics without requiring static configuration.
HTTP client for NSQ lookupd services that enables dynamic discovery of topic producers and cluster topology management.
class LookupdClient:
def lookup(self, topic):
"""
Look up producers for a specific topic.
Queries lookupd to find all NSQ daemons that have the specified
topic available for production or consumption.
Parameters:
- topic (str): Topic name to lookup
Returns:
dict: Producer information including addresses and metadata
"""
def topics(self):
"""
Get all topics known to lookupd.
Returns a list of all topics across all NSQ daemons
registered with this lookupd instance.
Returns:
list: List of topic names
"""
def channels(self, topic):
"""
Get all channels for a specific topic.
Returns channels that exist for the specified topic
across all registered NSQ daemons.
Parameters:
- topic (str): Topic name to get channels for
Returns:
list: List of channel names for the topic
"""
def nodes(self):
"""
Get all NSQ daemon nodes registered with lookupd.
Returns information about all NSQ daemons that have
registered with this lookupd instance.
Returns:
list: List of node information including addresses and metadata
"""
def create_topic(self, topic):
"""
Create a topic across the cluster.
Instructs all registered NSQ daemons to create the
specified topic if it doesn't exist.
Parameters:
- topic (str): Topic name to create
"""
def delete_topic(self, topic):
"""
Delete a topic across the cluster.
Instructs all registered NSQ daemons to delete the
specified topic and all associated data.
Parameters:
- topic (str): Topic name to delete
"""
def create_channel(self, topic, channel):
"""
Create a channel for a topic across the cluster.
Instructs all NSQ daemons hosting the topic to create
the specified channel if it doesn't exist.
Parameters:
- topic (str): Topic name
- channel (str): Channel name to create
"""
def delete_channel(self, topic, channel):
"""
Delete a channel from a topic across the cluster.
Instructs all NSQ daemons hosting the topic to delete
the specified channel and all associated messages.
Parameters:
- topic (str): Topic name
- channel (str): Channel name to delete
"""
def tombstone_topic(self, topic, node):
"""
Tombstone a topic on a specific node.
Marks a topic as tombstoned on the specified NSQ daemon,
preventing new messages from being queued while allowing
existing messages to be processed.
Parameters:
- topic (str): Topic name to tombstone
- node (str): NSQ daemon node identifier
"""
def ping(self):
"""
Ping the lookupd service.
Health check to verify lookupd is responding and available.
Returns:
str: Response indicating lookupd status
"""
def info(self):
"""
Get lookupd service information.
Returns configuration and version information about
the lookupd service.
Returns:
dict: Lookupd configuration and version details
"""Legacy lookupd client interface. Use LookupdClient instead.
class Lookupd:
def tombstone_topic_producer(self, topic, node):
"""
Tombstone topic producer (deprecated).
Use LookupdClient.tombstone_topic() instead.
Parameters:
- topic (str): Topic name
- node (str): Node identifier
"""import gnsq
# Create lookupd client for service discovery
lookupd = gnsq.LookupdClient(host='127.0.0.1', port=4161)
# Find producers for a topic
topic = 'user_events'
producer_info = lookupd.lookup(topic)
print(f"Producers for {topic}:")
for producer in producer_info['producers']:
print(f" - {producer['broadcast_address']}:{producer['tcp_port']}")
# Create producer using discovered addresses
producer_addresses = [
f"{p['broadcast_address']}:{p['tcp_port']}"
for p in producer_info['producers']
]
producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)
producer.start()
producer.publish(topic, 'Hello from discovered producer!')
producer.close()import gnsq
def create_dynamic_consumer(topic, channel, lookupd_addresses):
"""Create consumer with automatic NSQ daemon discovery."""
# Consumer will automatically use lookupd for discovery
consumer = gnsq.Consumer(
topic=topic,
channel=channel,
lookupd_http_addresses=lookupd_addresses
)
@consumer.on_message.connect
def handle_message(consumer, message):
print(f"Processing message from {topic}: {message.body}")
message.finish()
return consumer
# Create consumer with lookupd discovery
lookupd_addresses = ['127.0.0.1:4161', '127.0.0.1:4163'] # Multiple lookupds
consumer = create_dynamic_consumer('events', 'processor', lookupd_addresses)
# Consumer will automatically discover and connect to NSQ daemons
consumer.start()import gnsq
def manage_cluster_topology(lookupd_host='127.0.0.1', lookupd_port=4161):
"""Manage NSQ cluster topology via lookupd."""
lookupd = gnsq.LookupdClient(host=lookupd_host, port=lookupd_port)
# Get cluster overview
nodes = lookupd.nodes()
topics = lookupd.topics()
print(f"Cluster has {len(nodes)} nodes and {len(topics)} topics")
# List all nodes
print("\nNSQ Daemons:")
for node in nodes:
print(f" - {node['broadcast_address']}:{node['tcp_port']} "
f"(version {node['version']})")
# List topics and their channels
print("\nTopics and Channels:")
for topic in topics:
channels = lookupd.channels(topic)
print(f" - {topic}: {', '.join(channels) if channels else 'no channels'}")
# Create new topic across cluster
new_topic = 'cluster_events'
lookupd.create_topic(new_topic)
print(f"\nCreated topic '{new_topic}' across cluster")
# Create channel for the topic
lookupd.create_channel(new_topic, 'analytics')
print(f"Created channel 'analytics' for topic '{new_topic}'")
return lookupd
# Manage cluster
cluster_manager = manage_cluster_topology()import gnsq
import time
def monitor_cluster_health(lookupd_addresses):
"""Monitor NSQ cluster health via lookupd."""
for address in lookupd_addresses:
host, port = address.split(':')
try:
lookupd = gnsq.LookupdClient(host=host, port=int(port))
# Health check
ping_response = lookupd.ping()
print(f"Lookupd {address}: {ping_response}")
# Get service info
info = lookupd.info()
print(f"Lookupd {address}: Version {info['version']}")
# Check registered nodes
nodes = lookupd.nodes()
print(f"Lookupd {address}: {len(nodes)} registered nodes")
# Check for unhealthy nodes
for node in nodes:
# Node health indicators
last_update = node.get('last_update', 0)
if time.time() - last_update > 60: # No update in 60 seconds
print(f"WARNING: Node {node['broadcast_address']} "
f"last updated {time.time() - last_update}s ago")
except Exception as e:
print(f"ERROR connecting to lookupd {address}: {e}")
def cleanup_stale_topics(lookupd_client, max_age_hours=24):
"""Clean up topics that haven't been used recently."""
topics = lookupd_client.topics()
for topic in topics:
# Get topic producers to check activity
producer_info = lookupd_client.lookup(topic)
# Check if topic has any active channels
channels = lookupd_client.channels(topic)
if not channels:
print(f"Topic '{topic}' has no channels - considering for cleanup")
# Additional logic to determine if topic should be deleted
# lookupd_client.delete_topic(topic)
# Monitor multiple lookupd instances
lookupd_cluster = ['127.0.0.1:4161', '127.0.0.1:4163']
while True:
print("=== Cluster Health Check ===")
monitor_cluster_health(lookupd_cluster)
print()
time.sleep(30)import gnsq
import random
class SmartProducer:
"""Producer with intelligent NSQ daemon selection."""
def __init__(self, lookupd_addresses, topic):
self.lookupd_addresses = lookupd_addresses
self.topic = topic
self.producers = {} # Cache producers by NSQ daemon
def _get_best_producer(self):
"""Select best NSQ daemon for production."""
# Try each lookupd until we get topology info
for lookupd_addr in self.lookupd_addresses:
try:
host, port = lookupd_addr.split(':')
lookupd = gnsq.LookupdClient(host=host, port=int(port))
# Get current topology
producer_info = lookupd.lookup(self.topic)
nsqd_nodes = producer_info['producers']
if not nsqd_nodes:
continue
# Select node with lowest message depth (least loaded)
best_node = min(nsqd_nodes, key=lambda n: n.get('depth', 0))
nsqd_addr = f"{best_node['broadcast_address']}:{best_node['tcp_port']}"
# Create or reuse producer for this NSQ daemon
if nsqd_addr not in self.producers:
producer = gnsq.Producer([nsqd_addr])
producer.start()
self.producers[nsqd_addr] = producer
return self.producers[nsqd_addr]
except Exception as e:
print(f"Failed to get topology from {lookupd_addr}: {e}")
continue
raise Exception("No healthy lookupd instances available")
def publish(self, message):
"""Publish message using best available producer."""
producer = self._get_best_producer()
producer.publish(self.topic, message)
def close(self):
"""Close all producers."""
for producer in self.producers.values():
producer.close()
producer.join()
# Usage
smart_producer = SmartProducer(
lookupd_addresses=['127.0.0.1:4161', '127.0.0.1:4163'],
topic='smart_events'
)
# Publishes will automatically select optimal NSQ daemon
smart_producer.publish('Event 1')
smart_producer.publish('Event 2')
smart_producer.close()Install with Tessl CLI
npx tessl i tessl/pypi-gnsq