0
# Core Messaging
1
2
High-level producer and consumer classes that provide the primary interface for publishing and consuming messages from NSQ topics. These classes handle connection management, automatic nsqlookupd discovery, and provide convenient event-driven APIs for most messaging applications.
3
4
## Capabilities
5
6
### Producer
7
8
Publishes messages to NSQ topics with support for single and batch message publishing, connection pooling, and automatic retry logic.
9
10
```python { .api }
11
class Producer:
12
def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs):
13
"""
14
Initialize a Producer for publishing messages to NSQ.
15
16
Parameters:
17
- nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses
18
- max_backoff_duration (int): Maximum backoff duration in seconds
19
- **kwargs: Additional connection parameters (tls_v1, compression, etc.)
20
"""
21
22
def start(self):
23
"""Start discovering and listening to connections."""
24
25
def close(self):
26
"""Immediately close all connections and stop workers."""
27
28
def join(self, timeout=None, raise_error=False):
29
"""
30
Block until all connections close and workers stop.
31
32
Parameters:
33
- timeout (float, optional): Maximum time to wait in seconds
34
- raise_error (bool): Whether to raise exceptions on timeout
35
"""
36
37
def connect_to_nsqd(self, address, port):
38
"""
39
Establish connection to a specific NSQ daemon.
40
41
Parameters:
42
- address (str): NSQ daemon host address
43
- port (int): NSQ daemon port number
44
"""
45
46
def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True):
47
"""
48
Publish a single message to a topic.
49
50
Parameters:
51
- topic (str): Topic name to publish to
52
- data (str or bytes): Message data
53
- defer (int, optional): Milliseconds to defer message delivery
54
- block (bool): Whether to block until publish completes
55
- timeout (float, optional): Maximum time to wait for publish
56
- raise_error (bool): Whether to raise exceptions on failure
57
"""
58
59
def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True):
60
"""
61
Publish multiple messages to a topic in a single operation.
62
63
Parameters:
64
- topic (str): Topic name to publish to
65
- messages (list): List of message data (str or bytes)
66
- block (bool): Whether to block until publish completes
67
- timeout (float, optional): Maximum time to wait for publish
68
- raise_error (bool): Whether to raise exceptions on failure
69
"""
70
71
@property
72
def is_running(self):
73
"""bool: Check if producer is currently active."""
74
```
75
76
#### Producer Events
77
78
Producers support event signals for monitoring connection and operational status:
79
80
```python { .api }
81
# Signal properties available on Producer instances
82
@property
83
def on_response(self): ... # Emitted on successful responses
84
85
@property
86
def on_error(self): ... # Emitted on error responses
87
88
@property
89
def on_auth(self): ... # Emitted on authentication events
90
91
@property
92
def on_close(self): ... # Emitted when connections close
93
```
94
95
### Consumer
96
97
Consumes messages from NSQ topics with support for automatic nsqlookupd discovery, configurable concurrency, message acknowledgment patterns, and comprehensive event handling.
98
99
```python { .api }
100
class Consumer:
101
def __init__(self, topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], **kwargs):
102
"""
103
Initialize a Consumer for receiving messages from NSQ.
104
105
Parameters:
106
- topic (str): Topic name to consume from
107
- channel (str): Channel name for this consumer
108
- nsqd_tcp_addresses (list): List of 'host:port' NSQ daemon addresses
109
- lookupd_http_addresses (list): List of 'host:port' lookupd addresses
110
- **kwargs: Additional options (max_in_flight, message_timeout, etc.)
111
"""
112
113
def start(self, block=True):
114
"""
115
Start discovering and listening to connections.
116
117
Parameters:
118
- block (bool): Whether to block execution until consumer stops
119
"""
120
121
def close(self):
122
"""Immediately close all connections and stop workers."""
123
124
def join(self, timeout=None, raise_error=False):
125
"""
126
Block until all connections close and workers stop.
127
128
Parameters:
129
- timeout (float, optional): Maximum time to wait in seconds
130
- raise_error (bool): Whether to raise exceptions on timeout
131
"""
132
133
def query_nsqd(self):
134
"""Connect to specified NSQ daemon TCP addresses."""
135
136
def query_lookupd(self):
137
"""Query lookup daemon for topic producers."""
138
139
def connect_to_nsqd(self, address, port):
140
"""
141
Establish connection to a specific NSQ daemon.
142
143
Parameters:
144
- address (str): NSQ daemon host address
145
- port (int): NSQ daemon port number
146
"""
147
148
def redistribute_ready_state(self):
149
"""Trigger redistribution of message processing readiness across connections."""
150
151
@property
152
def is_running(self):
153
"""bool: Check if consumer is active."""
154
155
@property
156
def is_starved(self):
157
"""bool: Determine if connections are starved for messages."""
158
159
@property
160
def total_ready_count(self):
161
"""int: Total ready message count across all connections."""
162
163
@property
164
def total_in_flight(self):
165
"""int: Total messages currently being processed."""
166
```
167
168
#### Consumer Events
169
170
Consumers provide comprehensive event handling for message processing lifecycle:
171
172
```python { .api }
173
# Signal properties available on Consumer instances
174
@property
175
def on_message(self): ... # Emitted when messages are received
176
177
@property
178
def on_response(self): ... # Emitted on successful responses
179
180
@property
181
def on_error(self): ... # Emitted on error responses
182
183
@property
184
def on_finish(self): ... # Emitted when messages are finished
185
186
@property
187
def on_requeue(self): ... # Emitted when messages are requeued
188
189
@property
190
def on_giving_up(self): ... # Emitted when giving up on messages
191
192
@property
193
def on_auth(self): ... # Emitted on authentication events
194
195
@property
196
def on_exception(self): ... # Emitted on exceptions
197
198
@property
199
def on_close(self): ... # Emitted when connections close
200
```
201
202
### Reader (Deprecated)
203
204
Legacy consumer class with built-in concurrency support. Use Consumer class instead for new applications.
205
206
```python { .api }
207
class Reader:
208
def __init__(self, *args, **kwargs):
209
"""
210
Initialize Reader (deprecated).
211
212
Use Consumer class instead. Sets up concurrency settings
213
and creates message queue if max_concurrency is specified.
214
215
Parameters:
216
- *args, **kwargs: Same as Consumer parameters
217
"""
218
219
def start(self, *args, **kwargs):
220
"""
221
Start reader with worker threads.
222
223
Spawns worker threads based on max_concurrency setting
224
and calls parent Consumer start method.
225
"""
226
227
def handle_message(self, conn, message):
228
"""
229
Handle incoming message.
230
231
Queues messages if max_concurrency is set, otherwise
232
calls parent class message handling directly.
233
"""
234
235
def publish(self, topic, message):
236
"""
237
Publish message (deprecated).
238
239
Publishes message to a random NSQ connection.
240
Use Producer class instead for publishing.
241
242
Parameters:
243
- topic (str): Topic name
244
- message (str or bytes): Message data
245
246
Raises:
247
NSQNoConnections: If no connections are available
248
"""
249
```
250
251
## Usage Examples
252
253
### Basic Producer Usage
254
255
```python
256
import gnsq
257
258
# Create producer with multiple NSQ daemons
259
producer = gnsq.Producer([
260
'127.0.0.1:4150',
261
'127.0.0.1:4152'
262
])
263
264
producer.start()
265
266
# Publish single message
267
producer.publish('events', 'user_signup:12345')
268
269
# Publish batch of messages
270
events = [
271
'user_login:12345',
272
'page_view:/dashboard',
273
'user_logout:12345'
274
]
275
producer.multipublish('events', events)
276
277
producer.close()
278
producer.join()
279
```
280
281
### Consumer with Event Handling
282
283
```python
284
import gnsq
285
286
# Create consumer with lookupd discovery
287
consumer = gnsq.Consumer(
288
'events',
289
'analytics',
290
lookupd_http_addresses=['127.0.0.1:4161']
291
)
292
293
# Message handler
294
@consumer.on_message.connect
295
def handle_message(consumer, message):
296
try:
297
# Process the message
298
event_data = message.body.decode('utf-8')
299
print(f'Processing: {event_data}')
300
301
# Simulate processing work
302
process_event(event_data)
303
304
# Mark as successfully processed
305
message.finish()
306
307
except Exception as e:
308
print(f'Error processing message: {e}')
309
# Requeue for retry (with exponential backoff)
310
message.requeue()
311
312
# Error handler
313
@consumer.on_error.connect
314
def handle_error(consumer, error):
315
print(f'Consumer error: {error}')
316
317
# Start consuming
318
consumer.start()
319
```
320
321
### Advanced Consumer Configuration
322
323
```python
324
import gnsq
325
326
# Consumer with advanced options
327
consumer = gnsq.Consumer(
328
'high_volume_topic',
329
'worker_pool',
330
nsqd_tcp_addresses=['127.0.0.1:4150'],
331
max_in_flight=100, # Process up to 100 messages concurrently
332
message_timeout=60000, # 60 second message timeout
333
max_backoff_duration=128, # Maximum backoff time
334
tls_v1=True, # Enable TLS
335
compression='deflate' # Enable compression
336
)
337
338
@consumer.on_message.connect
339
def handle_high_volume_message(consumer, message):
340
# Enable async processing for this message
341
message.enable_async()
342
343
# Spawn a greenlet to handle processing
344
gevent.spawn(process_message_async, message)
345
346
def process_message_async(message):
347
try:
348
# Long-running processing
349
result = heavy_computation(message.body)
350
351
# Touch message to extend timeout if needed
352
if processing_taking_long():
353
message.touch()
354
355
save_result(result)
356
message.finish()
357
358
except Exception as e:
359
# Requeue with backoff
360
message.requeue(backoff=True)
361
362
consumer.start()
363
```