A gevent based python client for the NSQ distributed messaging platform
—
High-level producer and consumer classes that provide the primary interface for publishing and consuming messages from NSQ topics. These classes handle connection management, automatic nsqlookupd discovery, and provide convenient event-driven APIs for most messaging applications.
Publishes messages to NSQ topics with support for single and batch message publishing, connection pooling, and automatic retry logic.
class Producer:
def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs):
"""
Initialize a Producer for publishing messages to NSQ.
Parameters:
- nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses
- max_backoff_duration (int): Maximum backoff duration in seconds
- **kwargs: Additional connection parameters (tls_v1, compression, etc.)
"""
def start(self):
"""Start discovering and listening to connections."""
def close(self):
"""Immediately close all connections and stop workers."""
def join(self, timeout=None, raise_error=False):
"""
Block until all connections close and workers stop.
Parameters:
- timeout (float, optional): Maximum time to wait in seconds
- raise_error (bool): Whether to raise exceptions on timeout
"""
def connect_to_nsqd(self, address, port):
"""
Establish connection to a specific NSQ daemon.
Parameters:
- address (str): NSQ daemon host address
- port (int): NSQ daemon port number
"""
def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True):
"""
Publish a single message to a topic.
Parameters:
- topic (str): Topic name to publish to
- data (str or bytes): Message data
- defer (int, optional): Milliseconds to defer message delivery
- block (bool): Whether to block until publish completes
- timeout (float, optional): Maximum time to wait for publish
- raise_error (bool): Whether to raise exceptions on failure
"""
def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True):
"""
Publish multiple messages to a topic in a single operation.
Parameters:
- topic (str): Topic name to publish to
- messages (list): List of message data (str or bytes)
- block (bool): Whether to block until publish completes
- timeout (float, optional): Maximum time to wait for publish
- raise_error (bool): Whether to raise exceptions on failure
"""
@property
def is_running(self):
"""bool: Check if producer is currently active."""Producers support event signals for monitoring connection and operational status:
# Signal properties available on Producer instances
@property
def on_response(self): ... # Emitted on successful responses
@property
def on_error(self): ... # Emitted on error responses
@property
def on_auth(self): ... # Emitted on authentication events
@property
def on_close(self): ... # Emitted when connections closeConsumes messages from NSQ topics with support for automatic nsqlookupd discovery, configurable concurrency, message acknowledgment patterns, and comprehensive event handling.
class Consumer:
def __init__(self, topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], **kwargs):
"""
Initialize a Consumer for receiving messages from NSQ.
Parameters:
- topic (str): Topic name to consume from
- channel (str): Channel name for this consumer
- nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses
- lookupd_http_addresses (list): List of 'host:port' lookupd addresses
- **kwargs: Additional options (max_in_flight, message_timeout, etc.)
"""
def start(self, block=True):
"""
Start discovering and listening to connections.
Parameters:
- block (bool): Whether to block execution until consumer stops
"""
def close(self):
"""Immediately close all connections and stop workers."""
def join(self, timeout=None, raise_error=False):
"""
Block until all connections close and workers stop.
Parameters:
- timeout (float, optional): Maximum time to wait in seconds
- raise_error (bool): Whether to raise exceptions on timeout
"""
def query_nsqd(self):
"""Connect to specified NSQ daemon TCP addresses."""
def query_lookupd(self):
"""Query lookup daemon for topic producers."""
def connect_to_nsqd(self, address, port):
"""
Establish connection to a specific NSQ daemon.
Parameters:
- address (str): NSQ daemon host address
- port (int): NSQ daemon port number
"""
def redistribute_ready_state(self):
"""Trigger redistribution of message processing readiness across connections."""
@property
def is_running(self):
"""bool: Check if consumer is active."""
@property
def is_starved(self):
"""bool: Determine if connections are starved for messages."""
@property
def total_ready_count(self):
"""int: Total ready message count across all connections."""
@property
def total_in_flight(self):
"""int: Total messages currently being processed."""Consumers provide comprehensive event handling for message processing lifecycle:
# Signal properties available on Consumer instances
@property
def on_message(self): ... # Emitted when messages are received
@property
def on_response(self): ... # Emitted on successful responses
@property
def on_error(self): ... # Emitted on error responses
@property
def on_finish(self): ... # Emitted when messages are finished
@property
def on_requeue(self): ... # Emitted when messages are requeued
@property
def on_giving_up(self): ... # Emitted when giving up on messages
@property
def on_auth(self): ... # Emitted on authentication events
@property
def on_exception(self): ... # Emitted on exceptions
@property
def on_close(self): ... # Emitted when connections closeLegacy consumer class with built-in concurrency support. Use Consumer class instead for new applications.
class Reader:
def __init__(self, *args, **kwargs):
"""
Initialize Reader (deprecated).
Use Consumer class instead. Sets up concurrency settings
and creates message queue if max_concurrency is specified.
Parameters:
- *args, **kwargs: Same as Consumer parameters
"""
def start(self, *args, **kwargs):
"""
Start reader with worker threads.
Spawns worker threads based on max_concurrency setting
and calls parent Consumer start method.
"""
def handle_message(self, conn, message):
"""
Handle incoming message.
Queues messages if max_concurrency is set, otherwise
calls parent class message handling directly.
"""
def publish(self, topic, message):
"""
Publish message (deprecated).
Publishes message to a random NSQ connection.
Use Producer class instead for publishing.
Parameters:
- topic (str): Topic name
- message (str or bytes): Message data
Raises:
NSQNoConnections: If no connections are available
"""import gnsq
# Create producer with multiple NSQ daemons
producer = gnsq.Producer([
'127.0.0.1:4150',
'127.0.0.1:4152'
])
producer.start()
# Publish single message
producer.publish('events', 'user_signup:12345')
# Publish batch of messages
events = [
'user_login:12345',
'page_view:/dashboard',
'user_logout:12345'
]
producer.multipublish('events', events)
producer.close()
producer.join()import gnsq
# Create consumer with lookupd discovery
consumer = gnsq.Consumer(
'events',
'analytics',
lookupd_http_addresses=['127.0.0.1:4161']
)
# Message handler
@consumer.on_message.connect
def handle_message(consumer, message):
try:
# Process the message
event_data = message.body.decode('utf-8')
print(f'Processing: {event_data}')
# Simulate processing work
process_event(event_data)
# Mark as successfully processed
message.finish()
except Exception as e:
print(f'Error processing message: {e}')
# Requeue for retry (with exponential backoff)
message.requeue()
# Error handler
@consumer.on_error.connect
def handle_error(consumer, error):
print(f'Consumer error: {error}')
# Start consuming
consumer.start()import gnsq
# Consumer with advanced options
consumer = gnsq.Consumer(
'high_volume_topic',
'worker_pool',
nsqd_tcp_addresses=['127.0.0.1:4150'],
max_in_flight=100, # Process up to 100 messages concurrently
message_timeout=60000, # 60 second message timeout
max_backoff_duration=128, # Maximum backoff time
tls_v1=True, # Enable TLS
compression='deflate' # Enable compression
)
@consumer.on_message.connect
def handle_high_volume_message(consumer, message):
# Enable async processing for this message
message.enable_async()
# Spawn a greenlet to handle processing
gevent.spawn(process_message_async, message)
def process_message_async(message):
try:
# Long-running processing
result = heavy_computation(message.body)
# Touch message to extend timeout if needed
if processing_taking_long():
message.touch()
save_result(result)
message.finish()
except Exception as e:
# Requeue with backoff
message.requeue(backoff=True)
consumer.start()Install with Tessl CLI
npx tessl i tessl/pypi-gnsq