or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-messaging.mdindex.mdlookupd-integration.mdmessage-handling.mdnsqd-clients.mdutilities-errors.md

core-messaging.mddocs/

0

# Core Messaging

1

2

High-level producer and consumer classes that provide the primary interface for publishing and consuming messages from NSQ topics. These classes handle connection management, automatic nsqlookupd discovery, and provide convenient event-driven APIs for most messaging applications.

3

4

## Capabilities

5

6

### Producer

7

8

Publishes messages to NSQ topics with support for single and batch message publishing, connection pooling, and automatic retry logic.

9

10

```python { .api }

11

class Producer:

12

def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs):

13

"""

14

Initialize a Producer for publishing messages to NSQ.

15

16

Parameters:

17

- nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses

18

- max_backoff_duration (int): Maximum backoff duration in seconds

19

- **kwargs: Additional connection parameters (tls_v1, compression, etc.)

20

"""

21

22

def start(self):

23

"""Start discovering and listening to connections."""

24

25

def close(self):

26

"""Immediately close all connections and stop workers."""

27

28

def join(self, timeout=None, raise_error=False):

29

"""

30

Block until all connections close and workers stop.

31

32

Parameters:

33

- timeout (float, optional): Maximum time to wait in seconds

34

- raise_error (bool): Whether to raise exceptions on timeout

35

"""

36

37

def connect_to_nsqd(self, address, port):

38

"""

39

Establish connection to a specific NSQ daemon.

40

41

Parameters:

42

- address (str): NSQ daemon host address

43

- port (int): NSQ daemon port number

44

"""

45

46

def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True):

47

"""

48

Publish a single message to a topic.

49

50

Parameters:

51

- topic (str): Topic name to publish to

52

- data (str or bytes): Message data

53

- defer (int, optional): Milliseconds to defer message delivery

54

- block (bool): Whether to block until publish completes

55

- timeout (float, optional): Maximum time to wait for publish

56

- raise_error (bool): Whether to raise exceptions on failure

57

"""

58

59

def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True):

60

"""

61

Publish multiple messages to a topic in a single operation.

62

63

Parameters:

64

- topic (str): Topic name to publish to

65

- messages (list): List of message data (str or bytes)

66

- block (bool): Whether to block until publish completes

67

- timeout (float, optional): Maximum time to wait for publish

68

- raise_error (bool): Whether to raise exceptions on failure

69

"""

70

71

@property

72

def is_running(self):

73

"""bool: Check if producer is currently active."""

74

```

75

76

#### Producer Events

77

78

Producers support event signals for monitoring connection and operational status:

79

80

```python { .api }

81

# Signal properties available on Producer instances

82

@property

83

def on_response(self): ... # Emitted on successful responses

84

85

@property

86

def on_error(self): ... # Emitted on error responses

87

88

@property

89

def on_auth(self): ... # Emitted on authentication events

90

91

@property

92

def on_close(self): ... # Emitted when connections close

93

```

94

95

### Consumer

96

97

Consumes messages from NSQ topics with support for automatic nsqlookupd discovery, configurable concurrency, message acknowledgment patterns, and comprehensive event handling.

98

99

```python { .api }

100

class Consumer:

101

def __init__(self, topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], **kwargs):

102

"""

103

Initialize a Consumer for receiving messages from NSQ.

104

105

Parameters:

106

- topic (str): Topic name to consume from

107

- channel (str): Channel name for this consumer

108

- nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses

109

- lookupd_http_addresses (list): List of 'host:port' lookupd addresses

110

- **kwargs: Additional options (max_in_flight, message_timeout, etc.)

111

"""

112

113

def start(self, block=True):

114

"""

115

Start discovering and listening to connections.

116

117

Parameters:

118

- block (bool): Whether to block execution until consumer stops

119

"""

120

121

def close(self):

122

"""Immediately close all connections and stop workers."""

123

124

def join(self, timeout=None, raise_error=False):

125

"""

126

Block until all connections close and workers stop.

127

128

Parameters:

129

- timeout (float, optional): Maximum time to wait in seconds

130

- raise_error (bool): Whether to raise exceptions on timeout

131

"""

132

133

def query_nsqd(self):

134

"""Connect to specified NSQ daemon TCP addresses."""

135

136

def query_lookupd(self):

137

"""Query lookup daemon for topic producers."""

138

139

def connect_to_nsqd(self, address, port):

140

"""

141

Establish connection to a specific NSQ daemon.

142

143

Parameters:

144

- address (str): NSQ daemon host address

145

- port (int): NSQ daemon port number

146

"""

147

148

def redistribute_ready_state(self):

149

"""Trigger redistribution of message processing readiness across connections."""

150

151

@property

152

def is_running(self):

153

"""bool: Check if consumer is active."""

154

155

@property

156

def is_starved(self):

157

"""bool: Determine if connections are starved for messages."""

158

159

@property

160

def total_ready_count(self):

161

"""int: Total ready message count across all connections."""

162

163

@property

164

def total_in_flight(self):

165

"""int: Total messages currently being processed."""

166

```

167

168

#### Consumer Events

169

170

Consumers provide comprehensive event handling for message processing lifecycle:

171

172

```python { .api }

173

# Signal properties available on Consumer instances

174

@property

175

def on_message(self): ... # Emitted when messages are received

176

177

@property

178

def on_response(self): ... # Emitted on successful responses

179

180

@property

181

def on_error(self): ... # Emitted on error responses

182

183

@property

184

def on_finish(self): ... # Emitted when messages are finished

185

186

@property

187

def on_requeue(self): ... # Emitted when messages are requeued

188

189

@property

190

def on_giving_up(self): ... # Emitted when giving up on messages

191

192

@property

193

def on_auth(self): ... # Emitted on authentication events

194

195

@property

196

def on_exception(self): ... # Emitted on exceptions

197

198

@property

199

def on_close(self): ... # Emitted when connections close

200

```

201

202

### Reader (Deprecated)

203

204

Legacy consumer class with built-in concurrency support. Use Consumer class instead for new applications.

205

206

```python { .api }

207

class Reader:

208

def __init__(self, *args, **kwargs):

209

"""

210

Initialize Reader (deprecated).

211

212

Use Consumer class instead. Sets up concurrency settings

213

and creates message queue if max_concurrency is specified.

214

215

Parameters:

216

- *args, **kwargs: Same as Consumer parameters

217

"""

218

219

def start(self, *args, **kwargs):

220

"""

221

Start reader with worker threads.

222

223

Spawns worker threads based on max_concurrency setting

224

and calls parent Consumer start method.

225

"""

226

227

def handle_message(self, conn, message):

228

"""

229

Handle incoming message.

230

231

Queues messages if max_concurrency is set, otherwise

232

calls parent class message handling directly.

233

"""

234

235

def publish(self, topic, message):

236

"""

237

Publish message (deprecated).

238

239

Publishes message to a random NSQ connection.

240

Use Producer class instead for publishing.

241

242

Parameters:

243

- topic (str): Topic name

244

- message (str or bytes): Message data

245

246

Raises:

247

NSQNoConnections: If no connections are available

248

"""

249

```

250

251

## Usage Examples

252

253

### Basic Producer Usage

254

255

```python

256

import gnsq

257

258

# Create producer with multiple NSQ daemons

259

producer = gnsq.Producer([

260

'127.0.0.1:4150',

261

'127.0.0.1:4152'

262

])

263

264

producer.start()

265

266

# Publish single message

267

producer.publish('events', 'user_signup:12345')

268

269

# Publish batch of messages

270

events = [

271

'user_login:12345',

272

'page_view:/dashboard',

273

'user_logout:12345'

274

]

275

producer.multipublish('events', events)

276

277

producer.close()

278

producer.join()

279

```

280

281

### Consumer with Event Handling

282

283

```python

284

import gnsq

285

286

# Create consumer with lookupd discovery

287

consumer = gnsq.Consumer(

288

'events',

289

'analytics',

290

lookupd_http_addresses=['127.0.0.1:4161']

291

)

292

293

# Message handler

294

@consumer.on_message.connect

295

def handle_message(consumer, message):

296

try:

297

# Process the message

298

event_data = message.body.decode('utf-8')

299

print(f'Processing: {event_data}')

300

301

# Simulate processing work

302

process_event(event_data)

303

304

# Mark as successfully processed

305

message.finish()

306

307

except Exception as e:

308

print(f'Error processing message: {e}')

309

# Requeue for retry (with exponential backoff)

310

message.requeue()

311

312

# Error handler

313

@consumer.on_error.connect

314

def handle_error(consumer, error):

315

print(f'Consumer error: {error}')

316

317

# Start consuming

318

consumer.start()

319

```

320

321

### Advanced Consumer Configuration

322

323

```python

324

import gnsq

325

326

# Consumer with advanced options

327

consumer = gnsq.Consumer(

328

'high_volume_topic',

329

'worker_pool',

330

nsqd_tcp_addresses=['127.0.0.1:4150'],

331

max_in_flight=100, # Process up to 100 messages concurrently

332

message_timeout=60000, # 60 second message timeout

333

max_backoff_duration=128, # Maximum backoff time

334

tls_v1=True, # Enable TLS

335

compression='deflate' # Enable compression

336

)

337

338

@consumer.on_message.connect

339

def handle_high_volume_message(consumer, message):

340

# Enable async processing for this message

341

message.enable_async()

342

343

# Spawn a greenlet to handle processing

344

gevent.spawn(process_message_async, message)

345

346

def process_message_async(message):

347

try:

348

# Long-running processing

349

result = heavy_computation(message.body)

350

351

# Touch message to extend timeout if needed

352

if processing_taking_long():

353

message.touch()

354

355

save_result(result)

356

message.finish()

357

358

except Exception as e:

359

# Requeue with backoff

360

message.requeue(backoff=True)

361

362

consumer.start()

363

```