or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-messaging.mdindex.mdlookupd-integration.mdmessage-handling.mdnsqd-clients.mdutilities-errors.md

nsqd-clients.mddocs/

0

# NSQ Daemon Clients

1

2

Low-level clients for direct communication with NSQ daemons via TCP and HTTP protocols. These clients provide fine-grained control over NSQ operations, administrative functions, and are used internally by the higher-level Producer and Consumer classes.

3

4

## Capabilities

5

6

### NsqdTCPClient

7

8

Low-level TCP client for NSQ daemon protocol communication. Provides direct access to NSQ's binary protocol for publishing, subscribing, and connection management.

9

10

```python { .api }

11

class NsqdTCPClient:

12

def connect(self):

13

"""Establish TCP connection to NSQ daemon."""

14

15

def close_stream(self):

16

"""Close the TCP connection stream."""

17

18

def send(self):

19

"""Send command to NSQ daemon over TCP connection."""

20

21

def read_response(self):

22

"""Read response from NSQ daemon."""

23

24

def subscribe(self):

25

"""Subscribe to a topic and channel."""

26

27

def publish(self):

28

"""Publish a message to a topic."""

29

30

def multipublish(self):

31

"""Publish multiple messages to a topic in batch."""

32

33

def ready(self):

34

"""Signal readiness to receive messages."""

35

36

def finish(self):

37

"""Mark a message as finished (FIN command)."""

38

39

def requeue(self):

40

"""Requeue a message (REQ command)."""

41

42

def touch(self):

43

"""Touch a message to reset timeout (TOUCH command)."""

44

45

def close(self):

46

"""Close connection (CLS command)."""

47

48

def nop(self):

49

"""Send no-operation command (NOP)."""

50

51

def identify(self):

52

"""Send client identification (IDENTIFY command)."""

53

54

def auth(self):

55

"""Authenticate with NSQ daemon (AUTH command)."""

56

```

57

58

### NsqdHTTPClient

59

60

HTTP client for NSQ daemon administrative operations. Provides RESTful interface for topic/channel management, statistics, and publishing via HTTP.

61

62

```python { .api }

63

class NsqdHTTPClient:

64

def publish(self):

65

"""

66

Publish a message via HTTP.

67

68

HTTP endpoint for publishing single messages to topics.

69

Useful for non-persistent connections or web applications.

70

"""

71

72

def multipublish(self):

73

"""

74

Publish multiple messages via HTTP.

75

76

HTTP endpoint for batch publishing multiple messages

77

to a topic in a single request.

78

"""

79

80

def create_topic(self):

81

"""

82

Create a new topic.

83

84

Administrative function to create topics on the NSQ daemon.

85

Topics are created automatically on first publish, but this

86

allows explicit creation.

87

"""

88

89

def delete_topic(self):

90

"""

91

Delete an existing topic.

92

93

Administrative function to delete topics and all associated

94

channels and messages from the NSQ daemon.

95

"""

96

97

def create_channel(self):

98

"""

99

Create a new channel within a topic.

100

101

Administrative function to create channels. Channels are

102

created automatically when consumers connect, but this

103

allows explicit creation.

104

"""

105

106

def delete_channel(self):

107

"""

108

Delete an existing channel.

109

110

Administrative function to delete channels and all

111

associated messages from a topic.

112

"""

113

114

def empty_topic(self):

115

"""

116

Remove all messages from a topic.

117

118

Administrative function to clear all messages from

119

all channels in a topic without deleting the topic.

120

"""

121

122

def empty_channel(self):

123

"""

124

Remove all messages from a channel.

125

126

Administrative function to clear all messages from

127

a specific channel without deleting the channel.

128

"""

129

130

def pause_topic(self):

131

"""

132

Pause message delivery for a topic.

133

134

Administrative function to pause all message delivery

135

for all channels in a topic.

136

"""

137

138

def unpause_topic(self):

139

"""

140

Resume message delivery for a topic.

141

142

Administrative function to resume message delivery

143

for a previously paused topic.

144

"""

145

146

def pause_channel(self):

147

"""

148

Pause message delivery for a channel.

149

150

Administrative function to pause message delivery

151

for a specific channel within a topic.

152

"""

153

154

def unpause_channel(self):

155

"""

156

Resume message delivery for a channel.

157

158

Administrative function to resume message delivery

159

for a previously paused channel.

160

"""

161

162

def stats(self):

163

"""

164

Get NSQ daemon statistics.

165

166

Returns comprehensive statistics about topics, channels,

167

connections, and message counts from the NSQ daemon.

168

169

Returns:

170

dict: Statistics data including topics, channels, and metrics

171

"""

172

173

def ping(self):

174

"""

175

Ping the NSQ daemon.

176

177

Health check endpoint to verify NSQ daemon is responding.

178

179

Returns:

180

str: Response indicating daemon status

181

"""

182

183

def info(self):

184

"""

185

Get NSQ daemon information.

186

187

Returns daemon configuration and version information.

188

189

Returns:

190

dict: Daemon configuration and version details

191

"""

192

```

193

194

### Nsqd (Deprecated)

195

196

Legacy NSQ daemon client interface. Use NsqdTCPClient or NsqdHTTPClient instead.

197

198

```python { .api }

199

class Nsqd:

200

def publish(self):

201

"""Publish message (deprecated - use NsqdTCPClient.publish)."""

202

203

def multipublish(self):

204

"""Publish multiple messages (deprecated - use NsqdTCPClient.multipublish)."""

205

```

206

207

## Usage Examples

208

209

### Direct TCP Publishing

210

211

```python

212

import gnsq

213

214

# Create TCP client for direct publishing

215

tcp_client = gnsq.NsqdTCPClient()

216

217

# Configure connection parameters

218

tcp_client.configure(

219

address='127.0.0.1',

220

port=4150,

221

tls_v1=False,

222

compression=None

223

)

224

225

# Connect to NSQ daemon

226

tcp_client.connect()

227

228

# Identify client

229

tcp_client.identify()

230

231

# Publish messages directly

232

tcp_client.publish('events', b'user_action_1')

233

tcp_client.multipublish('events', [b'event_1', b'event_2', b'event_3'])

234

235

# Close connection

236

tcp_client.close()

237

```

238

239

### Administrative Operations via HTTP

240

241

```python

242

import gnsq

243

244

# Create HTTP client for admin operations

245

http_client = gnsq.NsqdHTTPClient(

246

host='127.0.0.1',

247

port=4151 # NSQ HTTP port

248

)

249

250

# Create topic and channel

251

http_client.create_topic('new_events')

252

http_client.create_channel('new_events', 'processor')

253

254

# Get daemon statistics

255

stats = http_client.stats()

256

print(f"Topics: {len(stats['topics'])}")

257

print(f"Total messages: {stats['total_messages']}")

258

259

# Publish via HTTP (useful for web applications)

260

http_client.publish('new_events', 'HTTP published message')

261

262

# Administrative pause/resume

263

http_client.pause_channel('new_events', 'processor')

264

# ... do maintenance ...

265

http_client.unpause_channel('new_events', 'processor')

266

267

# Cleanup

268

http_client.empty_topic('new_events')

269

http_client.delete_topic('new_events')

270

```

271

272

### Low-level Consumer Implementation

273

274

```python

275

import gnsq

276

import gevent

277

278

# Custom consumer using TCP client directly

279

class CustomConsumer:

280

def __init__(self, topic, channel, nsqd_address):

281

self.topic = topic

282

self.channel = channel

283

self.client = gnsq.NsqdTCPClient()

284

host, port = nsqd_address.split(':')

285

self.client.configure(address=host, port=int(port))

286

287

def start(self):

288

# Connect and identify

289

self.client.connect()

290

self.client.identify()

291

292

# Subscribe to topic/channel

293

self.client.subscribe(self.topic, self.channel)

294

295

# Signal ready for messages

296

self.client.ready(1) # Ready for 1 message

297

298

# Start message processing loop

299

gevent.spawn(self._message_loop)

300

301

def _message_loop(self):

302

while True:

303

# Read response from NSQ

304

response = self.client.read_response()

305

306

if response['type'] == 'message':

307

# Process the message

308

message = response['data']

309

try:

310

self._handle_message(message)

311

# Send finish command

312

self.client.finish(message['id'])

313

except Exception as e:

314

# Send requeue command

315

self.client.requeue(message['id'])

316

317

# Ready for next message

318

self.client.ready(1)

319

320

def _handle_message(self, message):

321

# Custom message processing logic

322

print(f"Processing: {message['body']}")

323

324

# Usage

325

consumer = CustomConsumer('events', 'custom_processor', '127.0.0.1:4150')

326

consumer.start()

327

```

328

329

### Monitoring and Health Checks

330

331

```python

332

import gnsq

333

import time

334

335

def monitor_nsq_cluster(nsqd_addresses):

336

"""Monitor multiple NSQ daemons for health and statistics."""

337

338

for address in nsqd_addresses:

339

host, port = address.split(':')

340

http_port = int(port) + 1 # HTTP port is typically TCP port + 1

341

342

try:

343

# Create HTTP client for this daemon

344

client = gnsq.NsqdHTTPClient(host=host, port=http_port)

345

346

# Health check

347

ping_response = client.ping()

348

print(f"{address}: {ping_response}")

349

350

# Get daemon info

351

info = client.info()

352

print(f"{address}: Version {info['version']}")

353

354

# Get statistics

355

stats = client.stats()

356

print(f"{address}: {len(stats['topics'])} topics, "

357

f"{stats['total_messages']} total messages")

358

359

# Check for unhealthy topics/channels

360

for topic in stats['topics']:

361

for channel in topic['channels']:

362

if channel['depth'] > 10000: # High message backlog

363

print(f"WARNING: {topic['name']}/{channel['name']} "

364

f"has {channel['depth']} pending messages")

365

366

except Exception as e:

367

print(f"ERROR connecting to {address}: {e}")

368

369

# Monitor cluster every 30 seconds

370

nsqd_cluster = ['127.0.0.1:4150', '127.0.0.1:4152', '127.0.0.1:4154']

371

372

while True:

373

monitor_nsq_cluster(nsqd_cluster)

374

time.sleep(30)

375

```