0
# Message Handling
1
2
Message objects represent individual messages received from NSQ, providing methods for acknowledgment, requeuing, timeout management, and asynchronous processing control. The Message class is central to NSQ's at-least-once delivery guarantee and provides the interface for controlling message processing lifecycle.
3
4
## Capabilities
5
6
### Message Class
7
8
Encapsulates NSQ messages with metadata and provides methods for responding to the NSQ daemon about processing status.
9
10
```python { .api }
11
class Message:
12
@property
13
def timestamp(self):
14
"""int: Message timestamp from NSQ daemon."""
15
16
@property
17
def attempts(self):
18
"""int: Number of times this message has been attempted for processing."""
19
20
@property
21
def id(self):
22
"""str: Unique message identifier from NSQ."""
23
24
@property
25
def body(self):
26
"""bytes: Message content/payload."""
27
28
def enable_async(self):
29
"""
30
Enable asynchronous processing for this message.
31
32
Allows the message to be processed in a separate greenlet or thread
33
while preventing automatic timeout. Must call finish(), requeue(),
34
or touch() manually when using async mode.
35
"""
36
37
def is_async(self):
38
"""
39
Check if asynchronous processing has been enabled.
40
41
Returns:
42
bool: True if async processing is enabled, False otherwise
43
"""
44
45
def has_responded(self):
46
"""
47
Check if this message has been responded to.
48
49
Returns:
50
bool: True if finish(), requeue(), or another response has been sent
51
"""
52
53
def finish(self):
54
"""
55
Mark message as successfully processed.
56
57
Sends FIN command to NSQ daemon indicating successful processing.
58
Message will not be redelivered. This should be called after
59
successful processing of the message content.
60
"""
61
62
def requeue(self, time_ms=0, backoff=True):
63
"""
64
Requeue message due to processing failure.
65
66
Sends REQ command to NSQ daemon to requeue the message for
67
redelivery after the specified delay.
68
69
Parameters:
70
- time_ms (int): Milliseconds to delay before requeuing (0 for immediate)
71
- backoff (bool): Whether to apply exponential backoff delay
72
"""
73
74
def touch(self):
75
"""
76
Request more time to process the message.
77
78
Sends TOUCH command to reset the message timeout, preventing
79
automatic requeue. Useful for long-running message processing
80
to avoid timeout-based redelivery.
81
"""
82
```
83
84
### Message Events
85
86
Messages provide event signals for monitoring processing lifecycle:
87
88
```python { .api }
89
# Signal properties available on Message instances
90
@property
91
def on_finish(self): ... # Emitted after message.finish() is called
92
93
@property
94
def on_requeue(self): ... # Emitted after message.requeue() is called
95
96
@property
97
def on_touch(self): ... # Emitted after message.touch() is called
98
```
99
100
## Usage Examples
101
102
### Basic Message Processing
103
104
```python
105
import gnsq
106
107
consumer = gnsq.Consumer('orders', 'processor', '127.0.0.1:4150')
108
109
@consumer.on_message.connect
110
def process_order(consumer, message):
111
try:
112
# Decode message content
113
order_data = message.body.decode('utf-8')
114
order = json.loads(order_data)
115
116
# Process the order
117
result = process_order_logic(order)
118
119
# Mark as successfully processed
120
message.finish()
121
122
except json.JSONDecodeError:
123
# Invalid JSON - don't requeue, log error
124
print(f'Invalid JSON in message {message.id}')
125
message.finish() # Discard malformed message
126
127
except TemporaryError as e:
128
# Temporary failure - requeue for retry
129
print(f'Temporary error processing {message.id}: {e}')
130
message.requeue()
131
132
except PermanentError as e:
133
# Permanent failure - don't requeue
134
print(f'Permanent error processing {message.id}: {e}')
135
message.finish() # Discard message
136
137
consumer.start()
138
```
139
140
### Asynchronous Message Processing
141
142
```python
143
import gnsq
144
import gevent
145
146
consumer = gnsq.Consumer('analytics', 'processor', '127.0.0.1:4150')
147
148
@consumer.on_message.connect
149
def handle_message(consumer, message):
150
# Enable async processing
151
message.enable_async()
152
153
# Spawn greenlet for background processing
154
gevent.spawn(process_analytics_async, message)
155
156
def process_analytics_async(message):
157
"""Process analytics message in background greenlet."""
158
try:
159
# Decode analytics event
160
event_data = json.loads(message.body.decode('utf-8'))
161
162
# Long-running analytics processing
163
result = perform_analytics_computation(event_data)
164
165
# Check if processing is taking too long
166
if processing_time > 30: # seconds
167
message.touch() # Reset timeout
168
169
# Store results
170
store_analytics_result(result)
171
172
# Mark as completed
173
message.finish()
174
175
except Exception as e:
176
print(f'Analytics processing failed: {e}')
177
# Requeue with exponential backoff
178
message.requeue(backoff=True)
179
180
consumer.start()
181
```
182
183
### Message Timeout Management
184
185
```python
186
import gnsq
187
import time
188
189
consumer = gnsq.Consumer(
190
'long_tasks',
191
'worker',
192
'127.0.0.1:4150',
193
message_timeout=60000 # 60 second timeout
194
)
195
196
@consumer.on_message.connect
197
def handle_long_task(consumer, message):
198
message.enable_async()
199
gevent.spawn(process_long_task, message)
200
201
def process_long_task(message):
202
"""Process task that may take longer than message timeout."""
203
try:
204
task_data = json.loads(message.body.decode('utf-8'))
205
206
# Start processing
207
start_time = time.time()
208
209
for step in task_data['steps']:
210
# Process each step
211
process_step(step)
212
213
# Touch message every 30 seconds to prevent timeout
214
if time.time() - start_time > 30:
215
message.touch()
216
start_time = time.time()
217
218
# Task completed successfully
219
message.finish()
220
221
except Exception as e:
222
print(f'Long task failed: {e}')
223
message.requeue()
224
225
consumer.start()
226
```
227
228
### Controlled Requeue Strategy
229
230
```python
231
import gnsq
232
import time
233
234
consumer = gnsq.Consumer('retryable_tasks', 'worker', '127.0.0.1:4150')
235
236
@consumer.on_message.connect
237
def handle_retryable_task(consumer, message):
238
try:
239
# Check attempt count to avoid infinite retries
240
if message.attempts > 5:
241
print(f'Message {message.id} exceeded max attempts, discarding')
242
message.finish()
243
return
244
245
# Process the task
246
task_data = json.loads(message.body.decode('utf-8'))
247
result = process_task(task_data)
248
249
# Success
250
message.finish()
251
252
except RetryableException as e:
253
# Calculate delay based on attempt count
254
delay_ms = min(1000 * (2 ** message.attempts), 60000) # Max 60 seconds
255
256
print(f'Retrying message {message.id} after {delay_ms}ms (attempt {message.attempts})')
257
message.requeue(time_ms=delay_ms, backoff=False)
258
259
except NonRetryableException as e:
260
print(f'Non-retryable error for message {message.id}: {e}')
261
message.finish()
262
263
consumer.start()
264
```
265
266
### Message Event Monitoring
267
268
```python
269
import gnsq
270
271
consumer = gnsq.Consumer('monitored_topic', 'worker', '127.0.0.1:4150')
272
273
# Monitor message lifecycle events
274
@consumer.on_message.connect
275
def handle_message(consumer, message):
276
# Set up message-specific event handlers
277
@message.on_finish.connect
278
def on_message_finished(message):
279
print(f'Message {message.id} finished successfully')
280
281
@message.on_requeue.connect
282
def on_message_requeued(message):
283
print(f'Message {message.id} requeued (attempt {message.attempts})')
284
285
@message.on_touch.connect
286
def on_message_touched(message):
287
print(f'Message {message.id} timeout extended')
288
289
# Process the message
290
try:
291
process_message_content(message.body)
292
message.finish()
293
except Exception:
294
message.requeue()
295
296
consumer.start()
297
```