or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-messaging.mdindex.mdlookupd-integration.mdmessage-handling.mdnsqd-clients.mdutilities-errors.md
tile.json

tessl/pypi-gnsq

A gevent based python client for the NSQ distributed messaging platform

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/gnsq@1.0.x

To install, run

npx @tessl/cli install tessl/pypi-gnsq@1.0.0

index.mddocs/

GNSQ

A gevent-based Python client for the NSQ distributed messaging platform. GNSQ provides high-performance producer and consumer classes with automatic nsqlookupd discovery, connection management, and support for advanced features like TLS encryption, compression (DEFLATE, Snappy), and configurable backoff strategies.

Package Information

  • Package Name: gnsq
  • Language: Python
  • Installation: pip install gnsq
  • Optional Dependencies: pip install python-snappy (for Snappy compression support)

Core Imports

import gnsq

Common patterns for basic messaging:

from gnsq import Producer, Consumer, Message

For advanced use cases:

from gnsq import (
    Producer, Consumer, Message, BackoffTimer,
    NsqdTCPClient, NsqdHTTPClient, LookupdClient
)

Basic Usage

Publishing Messages

import gnsq

# Create and start a producer
producer = gnsq.Producer('127.0.0.1:4150')
producer.start()

# Publish a single message
producer.publish('my_topic', 'Hello NSQ!')

# Publish multiple messages at once
messages = ['message1', 'message2', 'message3']
producer.multipublish('my_topic', messages)

# Clean shutdown
producer.close()
producer.join()

Consuming Messages

import gnsq

# Create a consumer for topic and channel
consumer = gnsq.Consumer('my_topic', 'my_channel', '127.0.0.1:4150')

# Set up message handler
@consumer.on_message.connect
def message_handler(consumer, message):
    print(f'Received: {message.body}')
    # Mark message as successfully processed
    message.finish()

# Start consuming messages
consumer.start()

# Or start without blocking to do other work
consumer.start(block=False)
# ... do other work ...
consumer.join()  # Wait for completion

Architecture

GNSQ is built around a layered architecture that provides both high-level convenience and low-level control:

  • High-level Classes: Producer and Consumer provide the primary interface for most applications
  • Message Abstraction: Message class encapsulates NSQ message handling with async support
  • Protocol Clients: NsqdTCPClient and NsqdHTTPClient handle direct NSQ daemon communication
  • Discovery Service: LookupdClient enables automatic topic producer discovery
  • Connection Management: Automatic connection pooling, retry logic, and backoff strategies
  • Gevent Integration: Non-blocking I/O with gevent for high concurrency

This design allows applications to start with simple Producer/Consumer usage while providing access to lower-level components for advanced use cases requiring fine-grained control over NSQ interactions.

Capabilities

Core Messaging

High-level producer and consumer classes for publishing and consuming messages from NSQ topics. These classes handle connection management, automatic discovery, and provide convenient APIs for most messaging use cases.

class Producer:
    def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs): ...
    def start(self): ...
    def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True): ...
    def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True): ...

class Consumer:
    def __init__(self, topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], **kwargs): ...
    def start(self, block=True): ...
    def connect_to_nsqd(self, address, port): ...

Core Messaging

Message Handling

Message objects represent individual messages received from NSQ, providing methods for acknowledgment, requeuing, and asynchronous processing control.

class Message:
    @property
    def body: ...
    @property  
    def id: ...
    def finish(self): ...
    def requeue(self, time_ms=0, backoff=True): ...
    def enable_async(self): ...

Message Handling

NSQ Daemon Clients

Low-level clients for direct communication with NSQ daemons via TCP and HTTP protocols. These provide fine-grained control over NSQ operations and administrative functions.

class NsqdTCPClient:
    def connect(self): ...
    def publish(self): ...
    def subscribe(self): ...

class NsqdHTTPClient:
    def publish(self): ...
    def create_topic(self): ...
    def stats(self): ...

NSQ Daemon Clients

Lookupd Integration

Client for NSQ lookupd services that provide topic producer discovery and cluster topology information.

class LookupdClient:
    def lookup(self, topic): ...
    def topics(self): ...
    def nodes(self): ...

Lookupd Integration

Utilities and Error Handling

Helper utilities, decorators, backoff timers, and comprehensive error handling for robust NSQ client applications.

class BackoffTimer:
    def __init__(self, ratio=1, max_interval=None, min_interval=None): ...
    def failure(self): ...
    def get_interval(self): ...

# Exception hierarchy
class NSQException(Exception): ...
class NSQRequeueMessage(NSQException): ...

Utilities and Error Handling