0
# NSQ Daemon Clients
1
2
Low-level clients for direct communication with NSQ daemons via TCP and HTTP protocols. These clients provide fine-grained control over NSQ operations, administrative functions, and are used internally by the higher-level Producer and Consumer classes.
3
4
## Capabilities
5
6
### NsqdTCPClient
7
8
Low-level TCP client for NSQ daemon protocol communication. Provides direct access to NSQ's binary protocol for publishing, subscribing, and connection management.
9
10
```python { .api }
11
class NsqdTCPClient:
12
def connect(self):
13
"""Establish TCP connection to NSQ daemon."""
14
15
def close_stream(self):
16
"""Close the TCP connection stream."""
17
18
def send(self):
19
"""Send command to NSQ daemon over TCP connection."""
20
21
def read_response(self):
22
"""Read response from NSQ daemon."""
23
24
def subscribe(self):
25
"""Subscribe to a topic and channel."""
26
27
def publish(self):
28
"""Publish a message to a topic."""
29
30
def multipublish(self):
31
"""Publish multiple messages to a topic in batch."""
32
33
def ready(self):
34
"""Signal readiness to receive messages."""
35
36
def finish(self):
37
"""Mark a message as finished (FIN command)."""
38
39
def requeue(self):
40
"""Requeue a message (REQ command)."""
41
42
def touch(self):
43
"""Touch a message to reset timeout (TOUCH command)."""
44
45
def close(self):
46
"""Close connection (CLS command)."""
47
48
def nop(self):
49
"""Send no-operation command (NOP)."""
50
51
def identify(self):
52
"""Send client identification (IDENTIFY command)."""
53
54
def auth(self):
55
"""Authenticate with NSQ daemon (AUTH command)."""
56
```
57
58
### NsqdHTTPClient
59
60
HTTP client for NSQ daemon administrative operations. Provides RESTful interface for topic/channel management, statistics, and publishing via HTTP.
61
62
```python { .api }
63
class NsqdHTTPClient:
64
def publish(self):
65
"""
66
Publish a message via HTTP.
67
68
HTTP endpoint for publishing single messages to topics.
69
Useful for non-persistent connections or web applications.
70
"""
71
72
def multipublish(self):
73
"""
74
Publish multiple messages via HTTP.
75
76
HTTP endpoint for batch publishing multiple messages
77
to a topic in a single request.
78
"""
79
80
def create_topic(self):
81
"""
82
Create a new topic.
83
84
Administrative function to create topics on the NSQ daemon.
85
Topics are created automatically on first publish, but this
86
allows explicit creation.
87
"""
88
89
def delete_topic(self):
90
"""
91
Delete an existing topic.
92
93
Administrative function to delete topics and all associated
94
channels and messages from the NSQ daemon.
95
"""
96
97
def create_channel(self):
98
"""
99
Create a new channel within a topic.
100
101
Administrative function to create channels. Channels are
102
created automatically when consumers connect, but this
103
allows explicit creation.
104
"""
105
106
def delete_channel(self):
107
"""
108
Delete an existing channel.
109
110
Administrative function to delete channels and all
111
associated messages from a topic.
112
"""
113
114
def empty_topic(self):
115
"""
116
Remove all messages from a topic.
117
118
Administrative function to clear all messages from
119
all channels in a topic without deleting the topic.
120
"""
121
122
def empty_channel(self):
123
"""
124
Remove all messages from a channel.
125
126
Administrative function to clear all messages from
127
a specific channel without deleting the channel.
128
"""
129
130
def pause_topic(self):
131
"""
132
Pause message delivery for a topic.
133
134
Administrative function to pause all message delivery
135
for all channels in a topic.
136
"""
137
138
def unpause_topic(self):
139
"""
140
Resume message delivery for a topic.
141
142
Administrative function to resume message delivery
143
for a previously paused topic.
144
"""
145
146
def pause_channel(self):
147
"""
148
Pause message delivery for a channel.
149
150
Administrative function to pause message delivery
151
for a specific channel within a topic.
152
"""
153
154
def unpause_channel(self):
155
"""
156
Resume message delivery for a channel.
157
158
Administrative function to resume message delivery
159
for a previously paused channel.
160
"""
161
162
def stats(self):
163
"""
164
Get NSQ daemon statistics.
165
166
Returns comprehensive statistics about topics, channels,
167
connections, and message counts from the NSQ daemon.
168
169
Returns:
170
dict: Statistics data including topics, channels, and metrics
171
"""
172
173
def ping(self):
174
"""
175
Ping the NSQ daemon.
176
177
Health check endpoint to verify NSQ daemon is responding.
178
179
Returns:
180
str: Response indicating daemon status
181
"""
182
183
def info(self):
184
"""
185
Get NSQ daemon information.
186
187
Returns daemon configuration and version information.
188
189
Returns:
190
dict: Daemon configuration and version details
191
"""
192
```
193
194
### Nsqd (Deprecated)
195
196
Legacy NSQ daemon client interface. Use NsqdTCPClient or NsqdHTTPClient instead.
197
198
```python { .api }
199
class Nsqd:
200
def publish(self):
201
"""Publish message (deprecated - use NsqdTCPClient.publish)."""
202
203
def multipublish(self):
204
"""Publish multiple messages (deprecated - use NsqdTCPClient.multipublish)."""
205
```
206
207
## Usage Examples
208
209
### Direct TCP Publishing
210
211
```python
212
import gnsq
213
214
# Create TCP client for direct publishing
215
tcp_client = gnsq.NsqdTCPClient()
216
217
# Configure connection parameters
218
tcp_client.configure(
219
address='127.0.0.1',
220
port=4150,
221
tls_v1=False,
222
compression=None
223
)
224
225
# Connect to NSQ daemon
226
tcp_client.connect()
227
228
# Identify client
229
tcp_client.identify()
230
231
# Publish messages directly
232
tcp_client.publish('events', b'user_action_1')
233
tcp_client.multipublish('events', [b'event_1', b'event_2', b'event_3'])
234
235
# Close connection
236
tcp_client.close()
237
```
238
239
### Administrative Operations via HTTP
240
241
```python
242
import gnsq
243
244
# Create HTTP client for admin operations
245
http_client = gnsq.NsqdHTTPClient(
246
host='127.0.0.1',
247
port=4151 # NSQ HTTP port
248
)
249
250
# Create topic and channel
251
http_client.create_topic('new_events')
252
http_client.create_channel('new_events', 'processor')
253
254
# Get daemon statistics
255
stats = http_client.stats()
256
print(f"Topics: {len(stats['topics'])}")
257
print(f"Total messages: {stats['total_messages']}")
258
259
# Publish via HTTP (useful for web applications)
260
http_client.publish('new_events', 'HTTP published message')
261
262
# Administrative pause/resume
263
http_client.pause_channel('new_events', 'processor')
264
# ... do maintenance ...
265
http_client.unpause_channel('new_events', 'processor')
266
267
# Cleanup
268
http_client.empty_topic('new_events')
269
http_client.delete_topic('new_events')
270
```
271
272
### Low-level Consumer Implementation
273
274
```python
275
import gnsq
276
import gevent
277
278
# Custom consumer using TCP client directly
279
class CustomConsumer:
280
def __init__(self, topic, channel, nsqd_address):
281
self.topic = topic
282
self.channel = channel
283
self.client = gnsq.NsqdTCPClient()
284
host, port = nsqd_address.split(':')
285
self.client.configure(address=host, port=int(port))
286
287
def start(self):
288
# Connect and identify
289
self.client.connect()
290
self.client.identify()
291
292
# Subscribe to topic/channel
293
self.client.subscribe(self.topic, self.channel)
294
295
# Signal ready for messages
296
self.client.ready(1) # Ready for 1 message
297
298
# Start message processing loop
299
gevent.spawn(self._message_loop)
300
301
def _message_loop(self):
302
while True:
303
# Read response from NSQ
304
response = self.client.read_response()
305
306
if response['type'] == 'message':
307
# Process the message
308
message = response['data']
309
try:
310
self._handle_message(message)
311
# Send finish command
312
self.client.finish(message['id'])
313
except Exception as e:
314
# Send requeue command
315
self.client.requeue(message['id'])
316
317
# Ready for next message
318
self.client.ready(1)
319
320
def _handle_message(self, message):
321
# Custom message processing logic
322
print(f"Processing: {message['body']}")
323
324
# Usage
325
consumer = CustomConsumer('events', 'custom_processor', '127.0.0.1:4150')
326
consumer.start()
327
```
328
329
### Monitoring and Health Checks
330
331
```python
332
import gnsq
333
import time
334
335
def monitor_nsq_cluster(nsqd_addresses):
336
"""Monitor multiple NSQ daemons for health and statistics."""
337
338
for address in nsqd_addresses:
339
host, port = address.split(':')
340
http_port = int(port) + 1 # HTTP port is typically TCP port + 1
341
342
try:
343
# Create HTTP client for this daemon
344
client = gnsq.NsqdHTTPClient(host=host, port=http_port)
345
346
# Health check
347
ping_response = client.ping()
348
print(f"{address}: {ping_response}")
349
350
# Get daemon info
351
info = client.info()
352
print(f"{address}: Version {info['version']}")
353
354
# Get statistics
355
stats = client.stats()
356
print(f"{address}: {len(stats['topics'])} topics, "
357
f"{stats['total_messages']} total messages")
358
359
# Check for unhealthy topics/channels
360
for topic in stats['topics']:
361
for channel in topic['channels']:
362
if channel['depth'] > 10000: # High message backlog
363
print(f"WARNING: {topic['name']}/{channel['name']} "
364
f"has {channel['depth']} pending messages")
365
366
except Exception as e:
367
print(f"ERROR connecting to {address}: {e}")
368
369
# Monitor cluster every 30 seconds
370
nsqd_cluster = ['127.0.0.1:4150', '127.0.0.1:4152', '127.0.0.1:4154']
371
372
while True:
373
monitor_nsq_cluster(nsqd_cluster)
374
time.sleep(30)
375
```