0
# Lookupd Integration
1
2
Client for NSQ lookupd services that provide topic producer discovery and cluster topology information. Lookupd acts as a service discovery mechanism, allowing producers and consumers to find NSQ daemons hosting specific topics without requiring static configuration.
3
4
## Capabilities
5
6
### LookupdClient
7
8
HTTP client for NSQ lookupd services that enables dynamic discovery of topic producers and cluster topology management.
9
10
```python { .api }
11
class LookupdClient:
12
def lookup(self, topic):
13
"""
14
Look up producers for a specific topic.
15
16
Queries lookupd to find all NSQ daemons that have the specified
17
topic available for production or consumption.
18
19
Parameters:
20
- topic (str): Topic name to lookup
21
22
Returns:
23
dict: Producer information including addresses and metadata
24
"""
25
26
def topics(self):
27
"""
28
Get all topics known to lookupd.
29
30
Returns a list of all topics across all NSQ daemons
31
registered with this lookupd instance.
32
33
Returns:
34
list: List of topic names
35
"""
36
37
def channels(self, topic):
38
"""
39
Get all channels for a specific topic.
40
41
Returns channels that exist for the specified topic
42
across all registered NSQ daemons.
43
44
Parameters:
45
- topic (str): Topic name to get channels for
46
47
Returns:
48
list: List of channel names for the topic
49
"""
50
51
def nodes(self):
52
"""
53
Get all NSQ daemon nodes registered with lookupd.
54
55
Returns information about all NSQ daemons that have
56
registered with this lookupd instance.
57
58
Returns:
59
list: List of node information including addresses and metadata
60
"""
61
62
def create_topic(self, topic):
63
"""
64
Create a topic across the cluster.
65
66
Instructs all registered NSQ daemons to create the
67
specified topic if it doesn't exist.
68
69
Parameters:
70
- topic (str): Topic name to create
71
"""
72
73
def delete_topic(self, topic):
74
"""
75
Delete a topic across the cluster.
76
77
Instructs all registered NSQ daemons to delete the
78
specified topic and all associated data.
79
80
Parameters:
81
- topic (str): Topic name to delete
82
"""
83
84
def create_channel(self, topic, channel):
85
"""
86
Create a channel for a topic across the cluster.
87
88
Instructs all NSQ daemons hosting the topic to create
89
the specified channel if it doesn't exist.
90
91
Parameters:
92
- topic (str): Topic name
93
- channel (str): Channel name to create
94
"""
95
96
def delete_channel(self, topic, channel):
97
"""
98
Delete a channel from a topic across the cluster.
99
100
Instructs all NSQ daemons hosting the topic to delete
101
the specified channel and all associated messages.
102
103
Parameters:
104
- topic (str): Topic name
105
- channel (str): Channel name to delete
106
"""
107
108
def tombstone_topic(self, topic, node):
109
"""
110
Tombstone a topic on a specific node.
111
112
Marks a topic as tombstoned on the specified NSQ daemon,
113
preventing new messages from being queued while allowing
114
existing messages to be processed.
115
116
Parameters:
117
- topic (str): Topic name to tombstone
118
- node (str): NSQ daemon node identifier
119
"""
120
121
def ping(self):
122
"""
123
Ping the lookupd service.
124
125
Health check to verify lookupd is responding and available.
126
127
Returns:
128
str: Response indicating lookupd status
129
"""
130
131
def info(self):
132
"""
133
Get lookupd service information.
134
135
Returns configuration and version information about
136
the lookupd service.
137
138
Returns:
139
dict: Lookupd configuration and version details
140
"""
141
```
142
143
### Lookupd (Deprecated)
144
145
Legacy lookupd client interface. Use LookupdClient instead.
146
147
```python { .api }
148
class Lookupd:
149
def tombstone_topic_producer(self, topic, node):
150
"""
151
Tombstone topic producer (deprecated).
152
153
Use LookupdClient.tombstone_topic() instead.
154
155
Parameters:
156
- topic (str): Topic name
157
- node (str): Node identifier
158
"""
159
```
160
161
## Usage Examples
162
163
### Service Discovery for Producers
164
165
```python
166
import gnsq
167
168
# Create lookupd client for service discovery
169
lookupd = gnsq.LookupdClient(host='127.0.0.1', port=4161)
170
171
# Find producers for a topic
172
topic = 'user_events'
173
producer_info = lookupd.lookup(topic)
174
175
print(f"Producers for {topic}:")
176
for producer in producer_info['producers']:
177
print(f" - {producer['broadcast_address']}:{producer['tcp_port']}")
178
179
# Create producer using discovered addresses
180
producer_addresses = [
181
f"{p['broadcast_address']}:{p['tcp_port']}"
182
for p in producer_info['producers']
183
]
184
185
producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)
186
producer.start()
187
producer.publish(topic, 'Hello from discovered producer!')
188
producer.close()
189
```
190
191
### Dynamic Consumer Configuration
192
193
```python
194
import gnsq
195
196
def create_dynamic_consumer(topic, channel, lookupd_addresses):
197
"""Create consumer with automatic NSQ daemon discovery."""
198
199
# Consumer will automatically use lookupd for discovery
200
consumer = gnsq.Consumer(
201
topic=topic,
202
channel=channel,
203
lookupd_http_addresses=lookupd_addresses
204
)
205
206
@consumer.on_message.connect
207
def handle_message(consumer, message):
208
print(f"Processing message from {topic}: {message.body}")
209
message.finish()
210
211
return consumer
212
213
# Create consumer with lookupd discovery
214
lookupd_addresses = ['127.0.0.1:4161', '127.0.0.1:4163'] # Multiple lookupds
215
consumer = create_dynamic_consumer('events', 'processor', lookupd_addresses)
216
217
# Consumer will automatically discover and connect to NSQ daemons
218
consumer.start()
219
```
220
221
### Cluster Topology Management
222
223
```python
224
import gnsq
225
226
def manage_cluster_topology(lookupd_host='127.0.0.1', lookupd_port=4161):
227
"""Manage NSQ cluster topology via lookupd."""
228
229
lookupd = gnsq.LookupdClient(host=lookupd_host, port=lookupd_port)
230
231
# Get cluster overview
232
nodes = lookupd.nodes()
233
topics = lookupd.topics()
234
235
print(f"Cluster has {len(nodes)} nodes and {len(topics)} topics")
236
237
# List all nodes
238
print("\nNSQ Daemons:")
239
for node in nodes:
240
print(f" - {node['broadcast_address']}:{node['tcp_port']} "
241
f"(version {node['version']})")
242
243
# List topics and their channels
244
print("\nTopics and Channels:")
245
for topic in topics:
246
channels = lookupd.channels(topic)
247
print(f" - {topic}: {', '.join(channels) if channels else 'no channels'}")
248
249
# Create new topic across cluster
250
new_topic = 'cluster_events'
251
lookupd.create_topic(new_topic)
252
print(f"\nCreated topic '{new_topic}' across cluster")
253
254
# Create channel for the topic
255
lookupd.create_channel(new_topic, 'analytics')
256
print(f"Created channel 'analytics' for topic '{new_topic}'")
257
258
return lookupd
259
260
# Manage cluster
261
cluster_manager = manage_cluster_topology()
262
```
263
264
### Health Monitoring and Maintenance
265
266
```python
267
import gnsq
268
import time
269
270
def monitor_cluster_health(lookupd_addresses):
271
"""Monitor NSQ cluster health via lookupd."""
272
273
for address in lookupd_addresses:
274
host, port = address.split(':')
275
276
try:
277
lookupd = gnsq.LookupdClient(host=host, port=int(port))
278
279
# Health check
280
ping_response = lookupd.ping()
281
print(f"Lookupd {address}: {ping_response}")
282
283
# Get service info
284
info = lookupd.info()
285
print(f"Lookupd {address}: Version {info['version']}")
286
287
# Check registered nodes
288
nodes = lookupd.nodes()
289
print(f"Lookupd {address}: {len(nodes)} registered nodes")
290
291
# Check for unhealthy nodes
292
for node in nodes:
293
# Node health indicators
294
last_update = node.get('last_update', 0)
295
if time.time() - last_update > 60: # No update in 60 seconds
296
print(f"WARNING: Node {node['broadcast_address']} "
297
f"last updated {time.time() - last_update}s ago")
298
299
except Exception as e:
300
print(f"ERROR connecting to lookupd {address}: {e}")
301
302
def cleanup_stale_topics(lookupd_client, max_age_hours=24):
303
"""Clean up topics that haven't been used recently."""
304
305
topics = lookupd_client.topics()
306
307
for topic in topics:
308
# Get topic producers to check activity
309
producer_info = lookupd_client.lookup(topic)
310
311
# Check if topic has any active channels
312
channels = lookupd_client.channels(topic)
313
314
if not channels:
315
print(f"Topic '{topic}' has no channels - considering for cleanup")
316
# Additional logic to determine if topic should be deleted
317
# lookupd_client.delete_topic(topic)
318
319
# Monitor multiple lookupd instances
320
lookupd_cluster = ['127.0.0.1:4161', '127.0.0.1:4163']
321
322
while True:
323
print("=== Cluster Health Check ===")
324
monitor_cluster_health(lookupd_cluster)
325
print()
326
time.sleep(30)
327
```
328
329
### Advanced Service Discovery
330
331
```python
332
import gnsq
333
import random
334
335
class SmartProducer:
336
"""Producer with intelligent NSQ daemon selection."""
337
338
def __init__(self, lookupd_addresses, topic):
339
self.lookupd_addresses = lookupd_addresses
340
self.topic = topic
341
self.producers = {} # Cache producers by NSQ daemon
342
343
def _get_best_producer(self):
344
"""Select best NSQ daemon for production."""
345
346
# Try each lookupd until we get topology info
347
for lookupd_addr in self.lookupd_addresses:
348
try:
349
host, port = lookupd_addr.split(':')
350
lookupd = gnsq.LookupdClient(host=host, port=int(port))
351
352
# Get current topology
353
producer_info = lookupd.lookup(self.topic)
354
nsqd_nodes = producer_info['producers']
355
356
if not nsqd_nodes:
357
continue
358
359
# Select node with lowest message depth (least loaded)
360
best_node = min(nsqd_nodes, key=lambda n: n.get('depth', 0))
361
nsqd_addr = f"{best_node['broadcast_address']}:{best_node['tcp_port']}"
362
363
# Create or reuse producer for this NSQ daemon
364
if nsqd_addr not in self.producers:
365
producer = gnsq.Producer([nsqd_addr])
366
producer.start()
367
self.producers[nsqd_addr] = producer
368
369
return self.producers[nsqd_addr]
370
371
except Exception as e:
372
print(f"Failed to get topology from {lookupd_addr}: {e}")
373
continue
374
375
raise Exception("No healthy lookupd instances available")
376
377
def publish(self, message):
378
"""Publish message using best available producer."""
379
producer = self._get_best_producer()
380
producer.publish(self.topic, message)
381
382
def close(self):
383
"""Close all producers."""
384
for producer in self.producers.values():
385
producer.close()
386
producer.join()
387
388
# Usage
389
smart_producer = SmartProducer(
390
lookupd_addresses=['127.0.0.1:4161', '127.0.0.1:4163'],
391
topic='smart_events'
392
)
393
394
# Publishes will automatically select optimal NSQ daemon
395
smart_producer.publish('Event 1')
396
smart_producer.publish('Event 2')
397
398
smart_producer.close()
399
```