or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-messaging.mdindex.mdlookupd-integration.mdmessage-handling.mdnsqd-clients.mdutilities-errors.md

lookupd-integration.mddocs/

0

# Lookupd Integration

1

2

Client for NSQ lookupd services that provide topic producer discovery and cluster topology information. Lookupd acts as a service discovery mechanism, allowing producers and consumers to find NSQ daemons hosting specific topics without requiring static configuration.

3

4

## Capabilities

5

6

### LookupdClient

7

8

HTTP client for NSQ lookupd services that enables dynamic discovery of topic producers and cluster topology management.

9

10

```python { .api }

11

class LookupdClient:

12

def lookup(self, topic):

13

"""

14

Look up producers for a specific topic.

15

16

Queries lookupd to find all NSQ daemons that have the specified

17

topic available for production or consumption.

18

19

Parameters:

20

- topic (str): Topic name to lookup

21

22

Returns:

23

dict: Producer information including addresses and metadata

24

"""

25

26

def topics(self):

27

"""

28

Get all topics known to lookupd.

29

30

Returns a list of all topics across all NSQ daemons

31

registered with this lookupd instance.

32

33

Returns:

34

list: List of topic names

35

"""

36

37

def channels(self, topic):

38

"""

39

Get all channels for a specific topic.

40

41

Returns channels that exist for the specified topic

42

across all registered NSQ daemons.

43

44

Parameters:

45

- topic (str): Topic name to get channels for

46

47

Returns:

48

list: List of channel names for the topic

49

"""

50

51

def nodes(self):

52

"""

53

Get all NSQ daemon nodes registered with lookupd.

54

55

Returns information about all NSQ daemons that have

56

registered with this lookupd instance.

57

58

Returns:

59

list: List of node information including addresses and metadata

60

"""

61

62

def create_topic(self, topic):

63

"""

64

Create a topic across the cluster.

65

66

Instructs all registered NSQ daemons to create the

67

specified topic if it doesn't exist.

68

69

Parameters:

70

- topic (str): Topic name to create

71

"""

72

73

def delete_topic(self, topic):

74

"""

75

Delete a topic across the cluster.

76

77

Instructs all registered NSQ daemons to delete the

78

specified topic and all associated data.

79

80

Parameters:

81

- topic (str): Topic name to delete

82

"""

83

84

def create_channel(self, topic, channel):

85

"""

86

Create a channel for a topic across the cluster.

87

88

Instructs all NSQ daemons hosting the topic to create

89

the specified channel if it doesn't exist.

90

91

Parameters:

92

- topic (str): Topic name

93

- channel (str): Channel name to create

94

"""

95

96

def delete_channel(self, topic, channel):

97

"""

98

Delete a channel from a topic across the cluster.

99

100

Instructs all NSQ daemons hosting the topic to delete

101

the specified channel and all associated messages.

102

103

Parameters:

104

- topic (str): Topic name

105

- channel (str): Channel name to delete

106

"""

107

108

def tombstone_topic(self, topic, node):

109

"""

110

Tombstone a topic on a specific node.

111

112

Marks a topic as tombstoned on the specified NSQ daemon,

113

preventing new messages from being queued while allowing

114

existing messages to be processed.

115

116

Parameters:

117

- topic (str): Topic name to tombstone

118

- node (str): NSQ daemon node identifier

119

"""

120

121

def ping(self):

122

"""

123

Ping the lookupd service.

124

125

Health check to verify lookupd is responding and available.

126

127

Returns:

128

str: Response indicating lookupd status

129

"""

130

131

def info(self):

132

"""

133

Get lookupd service information.

134

135

Returns configuration and version information about

136

the lookupd service.

137

138

Returns:

139

dict: Lookupd configuration and version details

140

"""

141

```

142

143

### Lookupd (Deprecated)

144

145

Legacy lookupd client interface. Use LookupdClient instead.

146

147

```python { .api }

148

class Lookupd:

149

def tombstone_topic_producer(self, topic, node):

150

"""

151

Tombstone topic producer (deprecated).

152

153

Use LookupdClient.tombstone_topic() instead.

154

155

Parameters:

156

- topic (str): Topic name

157

- node (str): Node identifier

158

"""

159

```

160

161

## Usage Examples

162

163

### Service Discovery for Producers

164

165

```python

166

import gnsq

167

168

# Create lookupd client for service discovery

169

lookupd = gnsq.LookupdClient(host='127.0.0.1', port=4161)

170

171

# Find producers for a topic

172

topic = 'user_events'

173

producer_info = lookupd.lookup(topic)

174

175

print(f"Producers for {topic}:")

176

for producer in producer_info['producers']:

177

print(f" - {producer['broadcast_address']}:{producer['tcp_port']}")

178

179

# Create producer using discovered addresses

180

producer_addresses = [

181

f"{p['broadcast_address']}:{p['tcp_port']}"

182

for p in producer_info['producers']

183

]

184

185

producer = gnsq.Producer(nsqd_tcp_addresses=producer_addresses)

186

producer.start()

187

producer.publish(topic, 'Hello from discovered producer!')

188

producer.close()

189

```

190

191

### Dynamic Consumer Configuration

192

193

```python

194

import gnsq

195

196

def create_dynamic_consumer(topic, channel, lookupd_addresses):

197

"""Create consumer with automatic NSQ daemon discovery."""

198

199

# Consumer will automatically use lookupd for discovery

200

consumer = gnsq.Consumer(

201

topic=topic,

202

channel=channel,

203

lookupd_http_addresses=lookupd_addresses

204

)

205

206

@consumer.on_message.connect

207

def handle_message(consumer, message):

208

print(f"Processing message from {topic}: {message.body}")

209

message.finish()

210

211

return consumer

212

213

# Create consumer with lookupd discovery

214

lookupd_addresses = ['127.0.0.1:4161', '127.0.0.1:4163'] # Multiple lookupds

215

consumer = create_dynamic_consumer('events', 'processor', lookupd_addresses)

216

217

# Consumer will automatically discover and connect to NSQ daemons

218

consumer.start()

219

```

220

221

### Cluster Topology Management

222

223

```python

224

import gnsq

225

226

def manage_cluster_topology(lookupd_host='127.0.0.1', lookupd_port=4161):

227

"""Manage NSQ cluster topology via lookupd."""

228

229

lookupd = gnsq.LookupdClient(host=lookupd_host, port=lookupd_port)

230

231

# Get cluster overview

232

nodes = lookupd.nodes()

233

topics = lookupd.topics()

234

235

print(f"Cluster has {len(nodes)} nodes and {len(topics)} topics")

236

237

# List all nodes

238

print("\nNSQ Daemons:")

239

for node in nodes:

240

print(f" - {node['broadcast_address']}:{node['tcp_port']} "

241

f"(version {node['version']})")

242

243

# List topics and their channels

244

print("\nTopics and Channels:")

245

for topic in topics:

246

channels = lookupd.channels(topic)

247

print(f" - {topic}: {', '.join(channels) if channels else 'no channels'}")

248

249

# Create new topic across cluster

250

new_topic = 'cluster_events'

251

lookupd.create_topic(new_topic)

252

print(f"\nCreated topic '{new_topic}' across cluster")

253

254

# Create channel for the topic

255

lookupd.create_channel(new_topic, 'analytics')

256

print(f"Created channel 'analytics' for topic '{new_topic}'")

257

258

return lookupd

259

260

# Manage cluster

261

cluster_manager = manage_cluster_topology()

262

```

263

264

### Health Monitoring and Maintenance

265

266

```python

267

import gnsq

268

import time

269

270

def monitor_cluster_health(lookupd_addresses):

271

"""Monitor NSQ cluster health via lookupd."""

272

273

for address in lookupd_addresses:

274

host, port = address.split(':')

275

276

try:

277

lookupd = gnsq.LookupdClient(host=host, port=int(port))

278

279

# Health check

280

ping_response = lookupd.ping()

281

print(f"Lookupd {address}: {ping_response}")

282

283

# Get service info

284

info = lookupd.info()

285

print(f"Lookupd {address}: Version {info['version']}")

286

287

# Check registered nodes

288

nodes = lookupd.nodes()

289

print(f"Lookupd {address}: {len(nodes)} registered nodes")

290

291

# Check for unhealthy nodes

292

for node in nodes:

293

# Node health indicators

294

last_update = node.get('last_update', 0)

295

if time.time() - last_update > 60: # No update in 60 seconds

296

print(f"WARNING: Node {node['broadcast_address']} "

297

f"last updated {time.time() - last_update}s ago")

298

299

except Exception as e:

300

print(f"ERROR connecting to lookupd {address}: {e}")

301

302

def cleanup_stale_topics(lookupd_client, max_age_hours=24):

303

"""Clean up topics that haven't been used recently."""

304

305

topics = lookupd_client.topics()

306

307

for topic in topics:

308

# Get topic producers to check activity

309

producer_info = lookupd_client.lookup(topic)

310

311

# Check if topic has any active channels

312

channels = lookupd_client.channels(topic)

313

314

if not channels:

315

print(f"Topic '{topic}' has no channels - considering for cleanup")

316

# Additional logic to determine if topic should be deleted

317

# lookupd_client.delete_topic(topic)

318

319

# Monitor multiple lookupd instances

320

lookupd_cluster = ['127.0.0.1:4161', '127.0.0.1:4163']

321

322

while True:

323

print("=== Cluster Health Check ===")

324

monitor_cluster_health(lookupd_cluster)

325

print()

326

time.sleep(30)

327

```

328

329

### Advanced Service Discovery

330

331

```python

332

import gnsq

333

import random

334

335

class SmartProducer:

336

"""Producer with intelligent NSQ daemon selection."""

337

338

def __init__(self, lookupd_addresses, topic):

339

self.lookupd_addresses = lookupd_addresses

340

self.topic = topic

341

self.producers = {} # Cache producers by NSQ daemon

342

343

def _get_best_producer(self):

344

"""Select best NSQ daemon for production."""

345

346

# Try each lookupd until we get topology info

347

for lookupd_addr in self.lookupd_addresses:

348

try:

349

host, port = lookupd_addr.split(':')

350

lookupd = gnsq.LookupdClient(host=host, port=int(port))

351

352

# Get current topology

353

producer_info = lookupd.lookup(self.topic)

354

nsqd_nodes = producer_info['producers']

355

356

if not nsqd_nodes:

357

continue

358

359

# Select node with lowest message depth (least loaded)

360

best_node = min(nsqd_nodes, key=lambda n: n.get('depth', 0))

361

nsqd_addr = f"{best_node['broadcast_address']}:{best_node['tcp_port']}"

362

363

# Create or reuse producer for this NSQ daemon

364

if nsqd_addr not in self.producers:

365

producer = gnsq.Producer([nsqd_addr])

366

producer.start()

367

self.producers[nsqd_addr] = producer

368

369

return self.producers[nsqd_addr]

370

371

except Exception as e:

372

print(f"Failed to get topology from {lookupd_addr}: {e}")

373

continue

374

375

raise Exception("No healthy lookupd instances available")

376

377

def publish(self, message):

378

"""Publish message using best available producer."""

379

producer = self._get_best_producer()

380

producer.publish(self.topic, message)

381

382

def close(self):

383

"""Close all producers."""

384

for producer in self.producers.values():

385

producer.close()

386

producer.join()

387

388

# Usage

389

smart_producer = SmartProducer(

390

lookupd_addresses=['127.0.0.1:4161', '127.0.0.1:4163'],

391

topic='smart_events'

392

)

393

394

# Publishes will automatically select optimal NSQ daemon

395

smart_producer.publish('Event 1')

396

smart_producer.publish('Event 2')

397

398

smart_producer.close()

399

```