A gevent based python client for the NSQ distributed messaging platform
npx @tessl/cli install tessl/pypi-gnsq@1.0.00
# GNSQ
1
2
A gevent-based Python client for the NSQ distributed messaging platform. GNSQ provides high-performance producer and consumer classes with automatic nsqlookupd discovery, connection management, and support for advanced features like TLS encryption, compression (DEFLATE, Snappy), and configurable backoff strategies.
3
4
## Package Information
5
6
- **Package Name**: gnsq
7
- **Language**: Python
8
- **Installation**: `pip install gnsq`
9
- **Optional Dependencies**: `pip install python-snappy` (for Snappy compression support)
10
11
## Core Imports
12
13
```python
14
import gnsq
15
```
16
17
Common patterns for basic messaging:
18
19
```python
20
from gnsq import Producer, Consumer, Message
21
```
22
23
For advanced use cases:
24
25
```python
26
from gnsq import (
27
Producer, Consumer, Message, BackoffTimer,
28
NsqdTCPClient, NsqdHTTPClient, LookupdClient
29
)
30
```
31
32
## Basic Usage
33
34
### Publishing Messages
35
36
```python
37
import gnsq
38
39
# Create and start a producer
40
producer = gnsq.Producer('127.0.0.1:4150')
41
producer.start()
42
43
# Publish a single message
44
producer.publish('my_topic', 'Hello NSQ!')
45
46
# Publish multiple messages at once
47
messages = ['message1', 'message2', 'message3']
48
producer.multipublish('my_topic', messages)
49
50
# Clean shutdown
51
producer.close()
52
producer.join()
53
```
54
55
### Consuming Messages
56
57
```python
58
import gnsq
59
60
# Create a consumer for topic and channel
61
consumer = gnsq.Consumer('my_topic', 'my_channel', '127.0.0.1:4150')
62
63
# Set up message handler
64
@consumer.on_message.connect
65
def message_handler(consumer, message):
66
print(f'Received: {message.body}')
67
# Mark message as successfully processed
68
message.finish()
69
70
# Start consuming messages
71
consumer.start()
72
73
# Or start without blocking to do other work
74
consumer.start(block=False)
75
# ... do other work ...
76
consumer.join() # Wait for completion
77
```
78
79
## Architecture
80
81
GNSQ is built around a layered architecture that provides both high-level convenience and low-level control:
82
83
- **High-level Classes**: Producer and Consumer provide the primary interface for most applications
84
- **Message Abstraction**: Message class encapsulates NSQ message handling with async support
85
- **Protocol Clients**: NsqdTCPClient and NsqdHTTPClient handle direct NSQ daemon communication
86
- **Discovery Service**: LookupdClient enables automatic topic producer discovery
87
- **Connection Management**: Automatic connection pooling, retry logic, and backoff strategies
88
- **Gevent Integration**: Non-blocking I/O with gevent for high concurrency
89
90
This design allows applications to start with simple Producer/Consumer usage while providing access to lower-level components for advanced use cases requiring fine-grained control over NSQ interactions.
91
92
## Capabilities
93
94
### Core Messaging
95
96
High-level producer and consumer classes for publishing and consuming messages from NSQ topics. These classes handle connection management, automatic discovery, and provide convenient APIs for most messaging use cases.
97
98
```python { .api }
99
class Producer:
100
def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs): ...
101
def start(self): ...
102
def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True): ...
103
def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True): ...
104
105
class Consumer:
106
def __init__(self, topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], **kwargs): ...
107
def start(self, block=True): ...
108
def connect_to_nsqd(self, address, port): ...
109
```
110
111
[Core Messaging](./core-messaging.md)
112
113
### Message Handling
114
115
Message objects represent individual messages received from NSQ, providing methods for acknowledgment, requeuing, and asynchronous processing control.
116
117
```python { .api }
118
class Message:
119
@property
120
def body: ...
121
@property
122
def id: ...
123
def finish(self): ...
124
def requeue(self, time_ms=0, backoff=True): ...
125
def enable_async(self): ...
126
```
127
128
[Message Handling](./message-handling.md)
129
130
### NSQ Daemon Clients
131
132
Low-level clients for direct communication with NSQ daemons via TCP and HTTP protocols. These provide fine-grained control over NSQ operations and administrative functions.
133
134
```python { .api }
135
class NsqdTCPClient:
136
def connect(self): ...
137
def publish(self): ...
138
def subscribe(self): ...
139
140
class NsqdHTTPClient:
141
def publish(self): ...
142
def create_topic(self): ...
143
def stats(self): ...
144
```
145
146
[NSQ Daemon Clients](./nsqd-clients.md)
147
148
### Lookupd Integration
149
150
Client for NSQ lookupd services that provide topic producer discovery and cluster topology information.
151
152
```python { .api }
153
class LookupdClient:
154
def lookup(self, topic): ...
155
def topics(self): ...
156
def nodes(self): ...
157
```
158
159
[Lookupd Integration](./lookupd-integration.md)
160
161
### Utilities and Error Handling
162
163
Helper utilities, decorators, backoff timers, and comprehensive error handling for robust NSQ client applications.
164
165
```python { .api }
166
class BackoffTimer:
167
def __init__(self, ratio=1, max_interval=None, min_interval=None): ...
168
def failure(self): ...
169
def get_interval(self): ...
170
171
# Exception hierarchy
172
class NSQException(Exception): ...
173
class NSQRequeueMessage(NSQException): ...
174
```
175
176
[Utilities and Error Handling](./utilities-errors.md)