A gevent based python client for the NSQ distributed messaging platform
npx @tessl/cli install tessl/pypi-gnsq@1.0.0A gevent-based Python client for the NSQ distributed messaging platform. GNSQ provides high-performance producer and consumer classes with automatic nsqlookupd discovery, connection management, and support for advanced features like TLS encryption, compression (DEFLATE, Snappy), and configurable backoff strategies.
pip install gnsqpip install python-snappy (for Snappy compression support)import gnsqCommon patterns for basic messaging:
from gnsq import Producer, Consumer, MessageFor advanced use cases:
from gnsq import (
Producer, Consumer, Message, BackoffTimer,
NsqdTCPClient, NsqdHTTPClient, LookupdClient
)import gnsq
# Create and start a producer
producer = gnsq.Producer('127.0.0.1:4150')
producer.start()
# Publish a single message
producer.publish('my_topic', 'Hello NSQ!')
# Publish multiple messages at once
messages = ['message1', 'message2', 'message3']
producer.multipublish('my_topic', messages)
# Clean shutdown
producer.close()
producer.join()import gnsq
# Create a consumer for topic and channel
consumer = gnsq.Consumer('my_topic', 'my_channel', '127.0.0.1:4150')
# Set up message handler
@consumer.on_message.connect
def message_handler(consumer, message):
print(f'Received: {message.body}')
# Mark message as successfully processed
message.finish()
# Start consuming messages
consumer.start()
# Or start without blocking to do other work
consumer.start(block=False)
# ... do other work ...
consumer.join() # Wait for completionGNSQ is built around a layered architecture that provides both high-level convenience and low-level control:
This design allows applications to start with simple Producer/Consumer usage while providing access to lower-level components for advanced use cases requiring fine-grained control over NSQ interactions.
High-level producer and consumer classes for publishing and consuming messages from NSQ topics. These classes handle connection management, automatic discovery, and provide convenient APIs for most messaging use cases.
class Producer:
def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs): ...
def start(self): ...
def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True): ...
def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True): ...
class Consumer:
def __init__(self, topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], **kwargs): ...
def start(self, block=True): ...
def connect_to_nsqd(self, address, port): ...Message objects represent individual messages received from NSQ, providing methods for acknowledgment, requeuing, and asynchronous processing control.
class Message:
@property
def body: ...
@property
def id: ...
def finish(self): ...
def requeue(self, time_ms=0, backoff=True): ...
def enable_async(self): ...Low-level clients for direct communication with NSQ daemons via TCP and HTTP protocols. These provide fine-grained control over NSQ operations and administrative functions.
class NsqdTCPClient:
def connect(self): ...
def publish(self): ...
def subscribe(self): ...
class NsqdHTTPClient:
def publish(self): ...
def create_topic(self): ...
def stats(self): ...Client for NSQ lookupd services that provide topic producer discovery and cluster topology information.
class LookupdClient:
def lookup(self, topic): ...
def topics(self): ...
def nodes(self): ...Helper utilities, decorators, backoff timers, and comprehensive error handling for robust NSQ client applications.
class BackoffTimer:
def __init__(self, ratio=1, max_interval=None, min_interval=None): ...
def failure(self): ...
def get_interval(self): ...
# Exception hierarchy
class NSQException(Exception): ...
class NSQRequeueMessage(NSQException): ...