or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

message-handling.mddocs/

0

# Message Handling

1

2

Message objects represent individual Pub/Sub messages received by subscribers. They provide access to message data, attributes, and metadata, along with methods for acknowledgment and deadline management.

3

4

## Capabilities

5

6

### Message Structure

7

8

Access message data, attributes, and metadata.

9

10

```python { .api }

11

class Message:

12

"""

13

A representation of a single Pub/Sub message.

14

15

Attributes:

16

- message_id: Unique message identifier

17

- data: Message payload as bytes

18

- attributes: Message attributes as dictionary

19

- publish_time: When message was originally published

20

- delivery_attempt: Number of delivery attempts

21

- ordering_key: Message ordering key (if any)

22

- opentelemetry_data: OpenTelemetry tracing data (if enabled)

23

"""

24

25

@property

26

def message_id(self) -> str:

27

"""

28

Unique message identifier.

29

30

Returns:

31

Message ID string

32

"""

33

34

@property

35

def data(self) -> bytes:

36

"""

37

Message payload data.

38

39

Returns:

40

Message data as bytes

41

"""

42

43

@property

44

def attributes(self) -> MutableMapping[str, str]:

45

"""

46

Message attributes.

47

48

Returns:

49

Dictionary of message attributes

50

"""

51

52

@property

53

def publish_time(self) -> Timestamp:

54

"""

55

Time when message was originally published.

56

57

Returns:

58

Protobuf Timestamp

59

"""

60

61

@property

62

def delivery_attempt(self) -> int:

63

"""

64

Number of times this message has been delivered.

65

66

Returns:

67

Delivery attempt count

68

"""

69

70

@property

71

def ordering_key(self) -> str:

72

"""

73

Message ordering key.

74

75

Returns:

76

Ordering key string, empty if no ordering key

77

"""

78

79

@property

80

def size(self) -> int:

81

"""

82

Size of the underlying message in bytes.

83

84

Returns:

85

Message size in bytes

86

"""

87

88

@property

89

def ack_id(self) -> str:

90

"""

91

Acknowledgment ID used to ack the message.

92

93

Returns:

94

Acknowledgment ID string

95

"""

96

97

@property

98

def opentelemetry_data(self) -> Optional[SubscribeOpenTelemetry]:

99

"""

100

OpenTelemetry tracing data associated with this message.

101

102

Returns:

103

OpenTelemetry data object or None if tracing not enabled

104

"""

105

```

106

107

### Message Acknowledgment

108

109

Acknowledge or negative acknowledge messages to control redelivery.

110

111

```python { .api }

112

def ack(self) -> None:

113

"""

114

Acknowledge the message.

115

116

This tells Pub/Sub that the message was successfully processed

117

and should not be redelivered.

118

"""

119

120

def nack(self) -> None:

121

"""

122

Negative acknowledge the message.

123

124

This tells Pub/Sub that the message was not successfully processed

125

and should be redelivered (subject to retry policies).

126

"""

127

128

def ack_with_response(self) -> Future:

129

"""

130

Acknowledge the message and return response future.

131

132

Returns:

133

Future that resolves when acknowledgment is processed

134

"""

135

136

def nack_with_response(self) -> Future:

137

"""

138

Negative acknowledge the message and return response future.

139

140

Returns:

141

Future that resolves when negative acknowledgment is processed

142

"""

143

```

144

145

### Deadline Management

146

147

Modify message acknowledgment deadlines to extend processing time.

148

149

```python { .api }

150

def modify_ack_deadline(self, seconds: int) -> None:

151

"""

152

Modify the acknowledgment deadline for the message.

153

154

Parameters:

155

- seconds: Number of seconds to extend the deadline

156

Must be between 0 and 600 seconds

157

Use 0 to immediately requeue the message

158

"""

159

```

160

161

### Message Utilities

162

163

Additional methods for message handling and representation.

164

165

```python { .api }

166

def __repr__(self) -> str:

167

"""

168

String representation of the message.

169

170

Returns:

171

Formatted string showing message data, ordering key, and attributes

172

"""

173

```

174

175

## Usage Examples

176

177

### Basic Message Processing

178

179

```python

180

def callback(message):

181

# Access message data

182

data = message.data.decode('utf-8')

183

print(f"Message ID: {message.message_id}")

184

print(f"Data: {data}")

185

186

# Access attributes

187

for key, value in message.attributes.items():

188

print(f"Attribute {key}: {value}")

189

190

# Acknowledge the message

191

message.ack()

192

```

193

194

### Error Handling with Negative Acknowledgment

195

196

```python

197

def callback(message):

198

try:

199

# Process the message

200

process_data(message.data)

201

202

# Acknowledge successful processing

203

message.ack()

204

205

except ProcessingError as e:

206

print(f"Processing failed: {e}")

207

208

# Negative acknowledge to trigger redelivery

209

message.nack()

210

211

except Exception as e:

212

print(f"Unexpected error: {e}")

213

214

# For unexpected errors, still nack to avoid message loss

215

message.nack()

216

```

217

218

### Extended Processing with Deadline Modification

219

220

```python

221

def callback(message):

222

print(f"Starting to process message: {message.message_id}")

223

224

try:

225

# Extend deadline before long processing

226

message.modify_ack_deadline(300) # 5 minutes

227

228

# Perform long-running operation

229

result = long_running_processing(message.data)

230

231

# Additional deadline extension if needed

232

if complex_validation_needed(result):

233

message.modify_ack_deadline(180) # 3 more minutes

234

validate_result(result)

235

236

# Acknowledge after successful processing

237

message.ack()

238

239

except Exception as e:

240

print(f"Processing failed: {e}")

241

message.nack()

242

```

243

244

### Message Metadata Analysis

245

246

```python

247

def callback(message):

248

# Analyze message metadata

249

print(f"Message ID: {message.message_id}")

250

print(f"Publish time: {message.publish_time}")

251

print(f"Delivery attempt: {message.delivery_attempt}")

252

253

if message.ordering_key:

254

print(f"Ordering key: {message.ordering_key}")

255

256

# Check for repeated deliveries

257

if message.delivery_attempt > 1:

258

print(f"Warning: Message redelivered {message.delivery_attempt} times")

259

260

# Consider dead letter queue after too many attempts

261

if message.delivery_attempt > 5:

262

print("Too many delivery attempts, sending to dead letter queue")

263

send_to_dead_letter_queue(message)

264

message.ack()

265

return

266

267

# Process the message

268

try:

269

process_message(message.data)

270

message.ack()

271

except Exception as e:

272

print(f"Processing error: {e}")

273

message.nack()

274

```

275

276

### Attribute-Based Message Routing

277

278

```python

279

def callback(message):

280

# Route messages based on attributes

281

message_type = message.attributes.get('message_type')

282

283

if message_type == 'user_event':

284

handle_user_event(message)

285

elif message_type == 'system_event':

286

handle_system_event(message)

287

elif message_type == 'error_event':

288

handle_error_event(message)

289

else:

290

print(f"Unknown message type: {message_type}")

291

# Still acknowledge unknown message types to avoid redelivery

292

message.ack()

293

294

def handle_user_event(message):

295

user_id = message.attributes.get('user_id')

296

event_data = message.data.decode('utf-8')

297

298

try:

299

process_user_event(user_id, event_data)

300

message.ack()

301

except Exception as e:

302

print(f"Failed to process user event: {e}")

303

message.nack()

304

```

305

306

### Asynchronous Acknowledgment

307

308

```python

309

def callback(message):

310

# Process message asynchronously with response futures

311

try:

312

# Start processing

313

process_message_async(message.data)

314

315

# Acknowledge with response tracking

316

ack_future = message.ack_with_response()

317

ack_future.add_done_callback(lambda f: print(f"Ack completed for {message.message_id}"))

318

319

except Exception as e:

320

print(f"Processing failed: {e}")

321

322

# Negative acknowledge with response tracking

323

nack_future = message.nack_with_response()

324

nack_future.add_done_callback(lambda f: print(f"Nack completed for {message.message_id}"))

325

```