CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gnsq

A gevent based python client for the NSQ distributed messaging platform

Pending
Overview
Eval results
Files

message-handling.mddocs/

Message Handling

Message objects represent individual messages received from NSQ, providing methods for acknowledgment, requeuing, timeout management, and asynchronous processing control. The Message class is central to NSQ's at-least-once delivery guarantee and provides the interface for controlling message processing lifecycle.

Capabilities

Message Class

Encapsulates NSQ messages with metadata and provides methods for responding to the NSQ daemon about processing status.

class Message:
    @property
    def timestamp(self):
        """int: Message timestamp from NSQ daemon."""

    @property
    def attempts(self):
        """int: Number of times this message has been attempted for processing."""

    @property
    def id(self):
        """str: Unique message identifier from NSQ."""

    @property
    def body(self):
        """bytes: Message content/payload."""

    def enable_async(self):
        """
        Enable asynchronous processing for this message.
        
        Allows the message to be processed in a separate greenlet or thread
        while preventing automatic timeout. Must call finish(), requeue(), 
        or touch() manually when using async mode.
        """

    def is_async(self):
        """
        Check if asynchronous processing has been enabled.
        
        Returns:
        bool: True if async processing is enabled, False otherwise
        """

    def has_responded(self):
        """
        Check if this message has been responded to.
        
        Returns:
        bool: True if finish(), requeue(), or another response has been sent
        """

    def finish(self):
        """
        Mark message as successfully processed.
        
        Sends FIN command to NSQ daemon indicating successful processing.
        Message will not be redelivered. This should be called after
        successful processing of the message content.
        """

    def requeue(self, time_ms=0, backoff=True):
        """
        Requeue message due to processing failure.
        
        Sends REQ command to NSQ daemon to requeue the message for
        redelivery after the specified delay.
        
        Parameters:
        - time_ms (int): Milliseconds to delay before requeuing (0 for immediate)
        - backoff (bool): Whether to apply exponential backoff delay
        """

    def touch(self):
        """
        Request more time to process the message.
        
        Sends TOUCH command to reset the message timeout, preventing
        automatic requeue. Useful for long-running message processing
        to avoid timeout-based redelivery.
        """

Message Events

Messages provide event signals for monitoring processing lifecycle:

# Signal properties available on Message instances  
@property
def on_finish(self): ...    # Emitted after message.finish() is called

@property
def on_requeue(self): ...   # Emitted after message.requeue() is called

@property
def on_touch(self): ...     # Emitted after message.touch() is called

Usage Examples

Basic Message Processing

import gnsq

consumer = gnsq.Consumer('orders', 'processor', '127.0.0.1:4150')

@consumer.on_message.connect
def process_order(consumer, message):
    try:
        # Decode message content
        order_data = message.body.decode('utf-8')
        order = json.loads(order_data)
        
        # Process the order
        result = process_order_logic(order)
        
        # Mark as successfully processed
        message.finish()
        
    except json.JSONDecodeError:
        # Invalid JSON - don't requeue, log error
        print(f'Invalid JSON in message {message.id}')
        message.finish()  # Discard malformed message
        
    except TemporaryError as e:
        # Temporary failure - requeue for retry
        print(f'Temporary error processing {message.id}: {e}')
        message.requeue()
        
    except PermanentError as e:
        # Permanent failure - don't requeue
        print(f'Permanent error processing {message.id}: {e}')
        message.finish()  # Discard message

consumer.start()

Asynchronous Message Processing

import gnsq
import gevent

consumer = gnsq.Consumer('analytics', 'processor', '127.0.0.1:4150')

@consumer.on_message.connect
def handle_message(consumer, message):
    # Enable async processing
    message.enable_async()
    
    # Spawn greenlet for background processing  
    gevent.spawn(process_analytics_async, message)

def process_analytics_async(message):
    """Process analytics message in background greenlet."""
    try:
        # Decode analytics event
        event_data = json.loads(message.body.decode('utf-8'))
        
        # Long-running analytics processing
        result = perform_analytics_computation(event_data)
        
        # Check if processing is taking too long
        if processing_time > 30:  # seconds
            message.touch()  # Reset timeout
            
        # Store results
        store_analytics_result(result)
        
        # Mark as completed
        message.finish()
        
    except Exception as e:
        print(f'Analytics processing failed: {e}')
        # Requeue with exponential backoff
        message.requeue(backoff=True)

consumer.start()

Message Timeout Management

import gnsq
import time

consumer = gnsq.Consumer(
    'long_tasks', 
    'worker',
    '127.0.0.1:4150',
    message_timeout=60000  # 60 second timeout
)

@consumer.on_message.connect
def handle_long_task(consumer, message):
    message.enable_async()
    gevent.spawn(process_long_task, message)

def process_long_task(message):
    """Process task that may take longer than message timeout."""
    try:
        task_data = json.loads(message.body.decode('utf-8'))
        
        # Start processing
        start_time = time.time()
        
        for step in task_data['steps']:
            # Process each step
            process_step(step)
            
            # Touch message every 30 seconds to prevent timeout
            if time.time() - start_time > 30:
                message.touch()
                start_time = time.time()
                
        # Task completed successfully
        message.finish()
        
    except Exception as e:
        print(f'Long task failed: {e}')
        message.requeue()

consumer.start()

Controlled Requeue Strategy

import gnsq
import time

consumer = gnsq.Consumer('retryable_tasks', 'worker', '127.0.0.1:4150')

@consumer.on_message.connect
def handle_retryable_task(consumer, message):
    try:
        # Check attempt count to avoid infinite retries
        if message.attempts > 5:
            print(f'Message {message.id} exceeded max attempts, discarding')
            message.finish()
            return
            
        # Process the task
        task_data = json.loads(message.body.decode('utf-8'))
        result = process_task(task_data)
        
        # Success
        message.finish()
        
    except RetryableException as e:
        # Calculate delay based on attempt count
        delay_ms = min(1000 * (2 ** message.attempts), 60000)  # Max 60 seconds
        
        print(f'Retrying message {message.id} after {delay_ms}ms (attempt {message.attempts})')
        message.requeue(time_ms=delay_ms, backoff=False)
        
    except NonRetryableException as e:
        print(f'Non-retryable error for message {message.id}: {e}')
        message.finish()

consumer.start()

Message Event Monitoring

import gnsq

consumer = gnsq.Consumer('monitored_topic', 'worker', '127.0.0.1:4150')

# Monitor message lifecycle events
@consumer.on_message.connect
def handle_message(consumer, message):
    # Set up message-specific event handlers
    @message.on_finish.connect
    def on_message_finished(message):
        print(f'Message {message.id} finished successfully')
        
    @message.on_requeue.connect  
    def on_message_requeued(message):
        print(f'Message {message.id} requeued (attempt {message.attempts})')
        
    @message.on_touch.connect
    def on_message_touched(message):
        print(f'Message {message.id} timeout extended')
    
    # Process the message
    try:
        process_message_content(message.body)
        message.finish()
    except Exception:
        message.requeue()

consumer.start()

Install with Tessl CLI

npx tessl i tessl/pypi-gnsq

docs

core-messaging.md

index.md

lookupd-integration.md

message-handling.md

nsqd-clients.md

utilities-errors.md

tile.json