or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-messaging.mdindex.mdlookupd-integration.mdmessage-handling.mdnsqd-clients.mdutilities-errors.md

message-handling.mddocs/

0

# Message Handling

1

2

Message objects represent individual messages received from NSQ, providing methods for acknowledgment, requeuing, timeout management, and asynchronous processing control. The Message class is central to NSQ's at-least-once delivery guarantee and provides the interface for controlling message processing lifecycle.

3

4

## Capabilities

5

6

### Message Class

7

8

Encapsulates NSQ messages with metadata and provides methods for responding to the NSQ daemon about processing status.

9

10

```python { .api }

11

class Message:

12

@property

13

def timestamp(self):

14

"""int: Message timestamp from NSQ daemon."""

15

16

@property

17

def attempts(self):

18

"""int: Number of times this message has been attempted for processing."""

19

20

@property

21

def id(self):

22

"""str: Unique message identifier from NSQ."""

23

24

@property

25

def body(self):

26

"""bytes: Message content/payload."""

27

28

def enable_async(self):

29

"""

30

Enable asynchronous processing for this message.

31

32

Allows the message to be processed in a separate greenlet or thread

33

while preventing automatic timeout. Must call finish(), requeue(),

34

or touch() manually when using async mode.

35

"""

36

37

def is_async(self):

38

"""

39

Check if asynchronous processing has been enabled.

40

41

Returns:

42

bool: True if async processing is enabled, False otherwise

43

"""

44

45

def has_responded(self):

46

"""

47

Check if this message has been responded to.

48

49

Returns:

50

bool: True if finish(), requeue(), or another response has been sent

51

"""

52

53

def finish(self):

54

"""

55

Mark message as successfully processed.

56

57

Sends FIN command to NSQ daemon indicating successful processing.

58

Message will not be redelivered. This should be called after

59

successful processing of the message content.

60

"""

61

62

def requeue(self, time_ms=0, backoff=True):

63

"""

64

Requeue message due to processing failure.

65

66

Sends REQ command to NSQ daemon to requeue the message for

67

redelivery after the specified delay.

68

69

Parameters:

70

- time_ms (int): Milliseconds to delay before requeuing (0 for immediate)

71

- backoff (bool): Whether to apply exponential backoff delay

72

"""

73

74

def touch(self):

75

"""

76

Request more time to process the message.

77

78

Sends TOUCH command to reset the message timeout, preventing

79

automatic requeue. Useful for long-running message processing

80

to avoid timeout-based redelivery.

81

"""

82

```

83

84

### Message Events

85

86

Messages provide event signals for monitoring processing lifecycle:

87

88

```python { .api }

89

# Signal properties available on Message instances

90

@property

91

def on_finish(self): ... # Emitted after message.finish() is called

92

93

@property

94

def on_requeue(self): ... # Emitted after message.requeue() is called

95

96

@property

97

def on_touch(self): ... # Emitted after message.touch() is called

98

```

99

100

## Usage Examples

101

102

### Basic Message Processing

103

104

```python

105

import gnsq

106

107

consumer = gnsq.Consumer('orders', 'processor', '127.0.0.1:4150')

108

109

@consumer.on_message.connect

110

def process_order(consumer, message):

111

try:

112

# Decode message content

113

order_data = message.body.decode('utf-8')

114

order = json.loads(order_data)

115

116

# Process the order

117

result = process_order_logic(order)

118

119

# Mark as successfully processed

120

message.finish()

121

122

except json.JSONDecodeError:

123

# Invalid JSON - don't requeue, log error

124

print(f'Invalid JSON in message {message.id}')

125

message.finish() # Discard malformed message

126

127

except TemporaryError as e:

128

# Temporary failure - requeue for retry

129

print(f'Temporary error processing {message.id}: {e}')

130

message.requeue()

131

132

except PermanentError as e:

133

# Permanent failure - don't requeue

134

print(f'Permanent error processing {message.id}: {e}')

135

message.finish() # Discard message

136

137

consumer.start()

138

```

139

140

### Asynchronous Message Processing

141

142

```python

143

import gnsq

144

import gevent

145

146

consumer = gnsq.Consumer('analytics', 'processor', '127.0.0.1:4150')

147

148

@consumer.on_message.connect

149

def handle_message(consumer, message):

150

# Enable async processing

151

message.enable_async()

152

153

# Spawn greenlet for background processing

154

gevent.spawn(process_analytics_async, message)

155

156

def process_analytics_async(message):

157

"""Process analytics message in background greenlet."""

158

try:

159

# Decode analytics event

160

event_data = json.loads(message.body.decode('utf-8'))

161

162

# Long-running analytics processing

163

result = perform_analytics_computation(event_data)

164

165

# Check if processing is taking too long

166

if processing_time > 30: # seconds

167

message.touch() # Reset timeout

168

169

# Store results

170

store_analytics_result(result)

171

172

# Mark as completed

173

message.finish()

174

175

except Exception as e:

176

print(f'Analytics processing failed: {e}')

177

# Requeue with exponential backoff

178

message.requeue(backoff=True)

179

180

consumer.start()

181

```

182

183

### Message Timeout Management

184

185

```python

186

import gnsq

187

import time

188

189

consumer = gnsq.Consumer(

190

'long_tasks',

191

'worker',

192

'127.0.0.1:4150',

193

message_timeout=60000 # 60 second timeout

194

)

195

196

@consumer.on_message.connect

197

def handle_long_task(consumer, message):

198

message.enable_async()

199

gevent.spawn(process_long_task, message)

200

201

def process_long_task(message):

202

"""Process task that may take longer than message timeout."""

203

try:

204

task_data = json.loads(message.body.decode('utf-8'))

205

206

# Start processing

207

start_time = time.time()

208

209

for step in task_data['steps']:

210

# Process each step

211

process_step(step)

212

213

# Touch message every 30 seconds to prevent timeout

214

if time.time() - start_time > 30:

215

message.touch()

216

start_time = time.time()

217

218

# Task completed successfully

219

message.finish()

220

221

except Exception as e:

222

print(f'Long task failed: {e}')

223

message.requeue()

224

225

consumer.start()

226

```

227

228

### Controlled Requeue Strategy

229

230

```python

231

import gnsq

232

import time

233

234

consumer = gnsq.Consumer('retryable_tasks', 'worker', '127.0.0.1:4150')

235

236

@consumer.on_message.connect

237

def handle_retryable_task(consumer, message):

238

try:

239

# Check attempt count to avoid infinite retries

240

if message.attempts > 5:

241

print(f'Message {message.id} exceeded max attempts, discarding')

242

message.finish()

243

return

244

245

# Process the task

246

task_data = json.loads(message.body.decode('utf-8'))

247

result = process_task(task_data)

248

249

# Success

250

message.finish()

251

252

except RetryableException as e:

253

# Calculate delay based on attempt count

254

delay_ms = min(1000 * (2 ** message.attempts), 60000) # Max 60 seconds

255

256

print(f'Retrying message {message.id} after {delay_ms}ms (attempt {message.attempts})')

257

message.requeue(time_ms=delay_ms, backoff=False)

258

259

except NonRetryableException as e:

260

print(f'Non-retryable error for message {message.id}: {e}')

261

message.finish()

262

263

consumer.start()

264

```

265

266

### Message Event Monitoring

267

268

```python

269

import gnsq

270

271

consumer = gnsq.Consumer('monitored_topic', 'worker', '127.0.0.1:4150')

272

273

# Monitor message lifecycle events

274

@consumer.on_message.connect

275

def handle_message(consumer, message):

276

# Set up message-specific event handlers

277

@message.on_finish.connect

278

def on_message_finished(message):

279

print(f'Message {message.id} finished successfully')

280

281

@message.on_requeue.connect

282

def on_message_requeued(message):

283

print(f'Message {message.id} requeued (attempt {message.attempts})')

284

285

@message.on_touch.connect

286

def on_message_touched(message):

287

print(f'Message {message.id} timeout extended')

288

289

# Process the message

290

try:

291

process_message_content(message.body)

292

message.finish()

293

except Exception:

294

message.requeue()

295

296

consumer.start()

297

```