A gevent based python client for the NSQ distributed messaging platform
—
Low-level clients for direct communication with NSQ daemons via TCP and HTTP protocols. These clients provide fine-grained control over NSQ operations, administrative functions, and are used internally by the higher-level Producer and Consumer classes.
Low-level TCP client for NSQ daemon protocol communication. Provides direct access to NSQ's binary protocol for publishing, subscribing, and connection management.
class NsqdTCPClient:
def connect(self):
"""Establish TCP connection to NSQ daemon."""
def close_stream(self):
"""Close the TCP connection stream."""
def send(self):
"""Send command to NSQ daemon over TCP connection."""
def read_response(self):
"""Read response from NSQ daemon."""
def subscribe(self):
"""Subscribe to a topic and channel."""
def publish(self):
"""Publish a message to a topic."""
def multipublish(self):
"""Publish multiple messages to a topic in batch."""
def ready(self):
"""Signal readiness to receive messages."""
def finish(self):
"""Mark a message as finished (FIN command)."""
def requeue(self):
"""Requeue a message (REQ command)."""
def touch(self):
"""Touch a message to reset timeout (TOUCH command)."""
def close(self):
"""Close connection (CLS command)."""
def nop(self):
"""Send no-operation command (NOP)."""
def identify(self):
"""Send client identification (IDENTIFY command)."""
def auth(self):
"""Authenticate with NSQ daemon (AUTH command)."""HTTP client for NSQ daemon administrative operations. Provides RESTful interface for topic/channel management, statistics, and publishing via HTTP.
class NsqdHTTPClient:
def publish(self):
"""
Publish a message via HTTP.
HTTP endpoint for publishing single messages to topics.
Useful for non-persistent connections or web applications.
"""
def multipublish(self):
"""
Publish multiple messages via HTTP.
HTTP endpoint for batch publishing multiple messages
to a topic in a single request.
"""
def create_topic(self):
"""
Create a new topic.
Administrative function to create topics on the NSQ daemon.
Topics are created automatically on first publish, but this
allows explicit creation.
"""
def delete_topic(self):
"""
Delete an existing topic.
Administrative function to delete topics and all associated
channels and messages from the NSQ daemon.
"""
def create_channel(self):
"""
Create a new channel within a topic.
Administrative function to create channels. Channels are
created automatically when consumers connect, but this
allows explicit creation.
"""
def delete_channel(self):
"""
Delete an existing channel.
Administrative function to delete channels and all
associated messages from a topic.
"""
def empty_topic(self):
"""
Remove all messages from a topic.
Administrative function to clear all messages from
all channels in a topic without deleting the topic.
"""
def empty_channel(self):
"""
Remove all messages from a channel.
Administrative function to clear all messages from
a specific channel without deleting the channel.
"""
def pause_topic(self):
"""
Pause message delivery for a topic.
Administrative function to pause all message delivery
for all channels in a topic.
"""
def unpause_topic(self):
"""
Resume message delivery for a topic.
Administrative function to resume message delivery
for a previously paused topic.
"""
def pause_channel(self):
"""
Pause message delivery for a channel.
Administrative function to pause message delivery
for a specific channel within a topic.
"""
def unpause_channel(self):
"""
Resume message delivery for a channel.
Administrative function to resume message delivery
for a previously paused channel.
"""
def stats(self):
"""
Get NSQ daemon statistics.
Returns comprehensive statistics about topics, channels,
connections, and message counts from the NSQ daemon.
Returns:
dict: Statistics data including topics, channels, and metrics
"""
def ping(self):
"""
Ping the NSQ daemon.
Health check endpoint to verify NSQ daemon is responding.
Returns:
str: Response indicating daemon status
"""
def info(self):
"""
Get NSQ daemon information.
Returns daemon configuration and version information.
Returns:
dict: Daemon configuration and version details
"""Legacy NSQ daemon client interface. Use NsqdTCPClient or NsqdHTTPClient instead.
class Nsqd:
def publish(self):
"""Publish message (deprecated - use NsqdTCPClient.publish)."""
def multipublish(self):
"""Publish multiple messages (deprecated - use NsqdTCPClient.multipublish)."""import gnsq
# Create TCP client for direct publishing
tcp_client = gnsq.NsqdTCPClient()
# Configure connection parameters
tcp_client.configure(
address='127.0.0.1',
port=4150,
tls_v1=False,
compression=None
)
# Connect to NSQ daemon
tcp_client.connect()
# Identify client
tcp_client.identify()
# Publish messages directly
tcp_client.publish('events', b'user_action_1')
tcp_client.multipublish('events', [b'event_1', b'event_2', b'event_3'])
# Close connection
tcp_client.close()import gnsq
# Create HTTP client for admin operations
http_client = gnsq.NsqdHTTPClient(
host='127.0.0.1',
port=4151 # NSQ HTTP port
)
# Create topic and channel
http_client.create_topic('new_events')
http_client.create_channel('new_events', 'processor')
# Get daemon statistics
stats = http_client.stats()
print(f"Topics: {len(stats['topics'])}")
print(f"Total messages: {stats['total_messages']}")
# Publish via HTTP (useful for web applications)
http_client.publish('new_events', 'HTTP published message')
# Administrative pause/resume
http_client.pause_channel('new_events', 'processor')
# ... do maintenance ...
http_client.unpause_channel('new_events', 'processor')
# Cleanup
http_client.empty_topic('new_events')
http_client.delete_topic('new_events')import gnsq
import gevent
# Custom consumer using TCP client directly
class CustomConsumer:
def __init__(self, topic, channel, nsqd_address):
self.topic = topic
self.channel = channel
self.client = gnsq.NsqdTCPClient()
host, port = nsqd_address.split(':')
self.client.configure(address=host, port=int(port))
def start(self):
# Connect and identify
self.client.connect()
self.client.identify()
# Subscribe to topic/channel
self.client.subscribe(self.topic, self.channel)
# Signal ready for messages
self.client.ready(1) # Ready for 1 message
# Start message processing loop
gevent.spawn(self._message_loop)
def _message_loop(self):
while True:
# Read response from NSQ
response = self.client.read_response()
if response['type'] == 'message':
# Process the message
message = response['data']
try:
self._handle_message(message)
# Send finish command
self.client.finish(message['id'])
except Exception as e:
# Send requeue command
self.client.requeue(message['id'])
# Ready for next message
self.client.ready(1)
def _handle_message(self, message):
# Custom message processing logic
print(f"Processing: {message['body']}")
# Usage
consumer = CustomConsumer('events', 'custom_processor', '127.0.0.1:4150')
consumer.start()import gnsq
import time
def monitor_nsq_cluster(nsqd_addresses):
"""Monitor multiple NSQ daemons for health and statistics."""
for address in nsqd_addresses:
host, port = address.split(':')
http_port = int(port) + 1 # HTTP port is typically TCP port + 1
try:
# Create HTTP client for this daemon
client = gnsq.NsqdHTTPClient(host=host, port=http_port)
# Health check
ping_response = client.ping()
print(f"{address}: {ping_response}")
# Get daemon info
info = client.info()
print(f"{address}: Version {info['version']}")
# Get statistics
stats = client.stats()
print(f"{address}: {len(stats['topics'])} topics, "
f"{stats['total_messages']} total messages")
# Check for unhealthy topics/channels
for topic in stats['topics']:
for channel in topic['channels']:
if channel['depth'] > 10000: # High message backlog
print(f"WARNING: {topic['name']}/{channel['name']} "
f"has {channel['depth']} pending messages")
except Exception as e:
print(f"ERROR connecting to {address}: {e}")
# Monitor cluster every 30 seconds
nsqd_cluster = ['127.0.0.1:4150', '127.0.0.1:4152', '127.0.0.1:4154']
while True:
monitor_nsq_cluster(nsqd_cluster)
time.sleep(30)Install with Tessl CLI
npx tessl i tessl/pypi-gnsq