A gevent based python client for the NSQ distributed messaging platform
—
Message objects represent individual messages received from NSQ, providing methods for acknowledgment, requeuing, timeout management, and asynchronous processing control. The Message class is central to NSQ's at-least-once delivery guarantee and provides the interface for controlling message processing lifecycle.
Encapsulates NSQ messages with metadata and provides methods for responding to the NSQ daemon about processing status.
class Message:
@property
def timestamp(self):
"""int: Message timestamp from NSQ daemon."""
@property
def attempts(self):
"""int: Number of times this message has been attempted for processing."""
@property
def id(self):
"""str: Unique message identifier from NSQ."""
@property
def body(self):
"""bytes: Message content/payload."""
def enable_async(self):
"""
Enable asynchronous processing for this message.
Allows the message to be processed in a separate greenlet or thread
while preventing automatic timeout. Must call finish(), requeue(),
or touch() manually when using async mode.
"""
def is_async(self):
"""
Check if asynchronous processing has been enabled.
Returns:
bool: True if async processing is enabled, False otherwise
"""
def has_responded(self):
"""
Check if this message has been responded to.
Returns:
bool: True if finish(), requeue(), or another response has been sent
"""
def finish(self):
"""
Mark message as successfully processed.
Sends FIN command to NSQ daemon indicating successful processing.
Message will not be redelivered. This should be called after
successful processing of the message content.
"""
def requeue(self, time_ms=0, backoff=True):
"""
Requeue message due to processing failure.
Sends REQ command to NSQ daemon to requeue the message for
redelivery after the specified delay.
Parameters:
- time_ms (int): Milliseconds to delay before requeuing (0 for immediate)
- backoff (bool): Whether to apply exponential backoff delay
"""
def touch(self):
"""
Request more time to process the message.
Sends TOUCH command to reset the message timeout, preventing
automatic requeue. Useful for long-running message processing
to avoid timeout-based redelivery.
"""Messages provide event signals for monitoring processing lifecycle:
# Signal properties available on Message instances
@property
def on_finish(self): ... # Emitted after message.finish() is called
@property
def on_requeue(self): ... # Emitted after message.requeue() is called
@property
def on_touch(self): ... # Emitted after message.touch() is calledimport gnsq
consumer = gnsq.Consumer('orders', 'processor', '127.0.0.1:4150')
@consumer.on_message.connect
def process_order(consumer, message):
try:
# Decode message content
order_data = message.body.decode('utf-8')
order = json.loads(order_data)
# Process the order
result = process_order_logic(order)
# Mark as successfully processed
message.finish()
except json.JSONDecodeError:
# Invalid JSON - don't requeue, log error
print(f'Invalid JSON in message {message.id}')
message.finish() # Discard malformed message
except TemporaryError as e:
# Temporary failure - requeue for retry
print(f'Temporary error processing {message.id}: {e}')
message.requeue()
except PermanentError as e:
# Permanent failure - don't requeue
print(f'Permanent error processing {message.id}: {e}')
message.finish() # Discard message
consumer.start()import gnsq
import gevent
consumer = gnsq.Consumer('analytics', 'processor', '127.0.0.1:4150')
@consumer.on_message.connect
def handle_message(consumer, message):
# Enable async processing
message.enable_async()
# Spawn greenlet for background processing
gevent.spawn(process_analytics_async, message)
def process_analytics_async(message):
"""Process analytics message in background greenlet."""
try:
# Decode analytics event
event_data = json.loads(message.body.decode('utf-8'))
# Long-running analytics processing
result = perform_analytics_computation(event_data)
# Check if processing is taking too long
if processing_time > 30: # seconds
message.touch() # Reset timeout
# Store results
store_analytics_result(result)
# Mark as completed
message.finish()
except Exception as e:
print(f'Analytics processing failed: {e}')
# Requeue with exponential backoff
message.requeue(backoff=True)
consumer.start()import gnsq
import time
consumer = gnsq.Consumer(
'long_tasks',
'worker',
'127.0.0.1:4150',
message_timeout=60000 # 60 second timeout
)
@consumer.on_message.connect
def handle_long_task(consumer, message):
message.enable_async()
gevent.spawn(process_long_task, message)
def process_long_task(message):
"""Process task that may take longer than message timeout."""
try:
task_data = json.loads(message.body.decode('utf-8'))
# Start processing
start_time = time.time()
for step in task_data['steps']:
# Process each step
process_step(step)
# Touch message every 30 seconds to prevent timeout
if time.time() - start_time > 30:
message.touch()
start_time = time.time()
# Task completed successfully
message.finish()
except Exception as e:
print(f'Long task failed: {e}')
message.requeue()
consumer.start()import gnsq
import time
consumer = gnsq.Consumer('retryable_tasks', 'worker', '127.0.0.1:4150')
@consumer.on_message.connect
def handle_retryable_task(consumer, message):
try:
# Check attempt count to avoid infinite retries
if message.attempts > 5:
print(f'Message {message.id} exceeded max attempts, discarding')
message.finish()
return
# Process the task
task_data = json.loads(message.body.decode('utf-8'))
result = process_task(task_data)
# Success
message.finish()
except RetryableException as e:
# Calculate delay based on attempt count
delay_ms = min(1000 * (2 ** message.attempts), 60000) # Max 60 seconds
print(f'Retrying message {message.id} after {delay_ms}ms (attempt {message.attempts})')
message.requeue(time_ms=delay_ms, backoff=False)
except NonRetryableException as e:
print(f'Non-retryable error for message {message.id}: {e}')
message.finish()
consumer.start()import gnsq
consumer = gnsq.Consumer('monitored_topic', 'worker', '127.0.0.1:4150')
# Monitor message lifecycle events
@consumer.on_message.connect
def handle_message(consumer, message):
# Set up message-specific event handlers
@message.on_finish.connect
def on_message_finished(message):
print(f'Message {message.id} finished successfully')
@message.on_requeue.connect
def on_message_requeued(message):
print(f'Message {message.id} requeued (attempt {message.attempts})')
@message.on_touch.connect
def on_message_touched(message):
print(f'Message {message.id} timeout extended')
# Process the message
try:
process_message_content(message.body)
message.finish()
except Exception:
message.requeue()
consumer.start()Install with Tessl CLI
npx tessl i tessl/pypi-gnsq