0
# Message Handling
1
2
Message objects represent individual Pub/Sub messages received by subscribers. They provide access to message data, attributes, and metadata, along with methods for acknowledgment and deadline management.
3
4
## Capabilities
5
6
### Message Structure
7
8
Access message data, attributes, and metadata.
9
10
```python { .api }
11
class Message:
12
"""
13
A representation of a single Pub/Sub message.
14
15
Attributes:
16
- message_id: Unique message identifier
17
- data: Message payload as bytes
18
- attributes: Message attributes as dictionary
19
- publish_time: When message was originally published
20
- delivery_attempt: Number of delivery attempts
21
- ordering_key: Message ordering key (if any)
22
- opentelemetry_data: OpenTelemetry tracing data (if enabled)
23
"""
24
25
@property
26
def message_id(self) -> str:
27
"""
28
Unique message identifier.
29
30
Returns:
31
Message ID string
32
"""
33
34
@property
35
def data(self) -> bytes:
36
"""
37
Message payload data.
38
39
Returns:
40
Message data as bytes
41
"""
42
43
@property
44
def attributes(self) -> MutableMapping[str, str]:
45
"""
46
Message attributes.
47
48
Returns:
49
Dictionary of message attributes
50
"""
51
52
@property
53
def publish_time(self) -> Timestamp:
54
"""
55
Time when message was originally published.
56
57
Returns:
58
Protobuf Timestamp
59
"""
60
61
@property
62
def delivery_attempt(self) -> int:
63
"""
64
Number of times this message has been delivered.
65
66
Returns:
67
Delivery attempt count
68
"""
69
70
@property
71
def ordering_key(self) -> str:
72
"""
73
Message ordering key.
74
75
Returns:
76
Ordering key string, empty if no ordering key
77
"""
78
79
@property
80
def size(self) -> int:
81
"""
82
Size of the underlying message in bytes.
83
84
Returns:
85
Message size in bytes
86
"""
87
88
@property
89
def ack_id(self) -> str:
90
"""
91
Acknowledgment ID used to ack the message.
92
93
Returns:
94
Acknowledgment ID string
95
"""
96
97
@property
98
def opentelemetry_data(self) -> Optional[SubscribeOpenTelemetry]:
99
"""
100
OpenTelemetry tracing data associated with this message.
101
102
Returns:
103
OpenTelemetry data object or None if tracing not enabled
104
"""
105
```
106
107
### Message Acknowledgment
108
109
Acknowledge or negative acknowledge messages to control redelivery.
110
111
```python { .api }
112
def ack(self) -> None:
113
"""
114
Acknowledge the message.
115
116
This tells Pub/Sub that the message was successfully processed
117
and should not be redelivered.
118
"""
119
120
def nack(self) -> None:
121
"""
122
Negative acknowledge the message.
123
124
This tells Pub/Sub that the message was not successfully processed
125
and should be redelivered (subject to retry policies).
126
"""
127
128
def ack_with_response(self) -> Future:
129
"""
130
Acknowledge the message and return response future.
131
132
Returns:
133
Future that resolves when acknowledgment is processed
134
"""
135
136
def nack_with_response(self) -> Future:
137
"""
138
Negative acknowledge the message and return response future.
139
140
Returns:
141
Future that resolves when negative acknowledgment is processed
142
"""
143
```
144
145
### Deadline Management
146
147
Modify message acknowledgment deadlines to extend processing time.
148
149
```python { .api }
150
def modify_ack_deadline(self, seconds: int) -> None:
151
"""
152
Modify the acknowledgment deadline for the message.
153
154
Parameters:
155
- seconds: Number of seconds to extend the deadline
156
Must be between 0 and 600 seconds
157
Use 0 to immediately requeue the message
158
"""
159
```
160
161
### Message Utilities
162
163
Additional methods for message handling and representation.
164
165
```python { .api }
166
def __repr__(self) -> str:
167
"""
168
String representation of the message.
169
170
Returns:
171
Formatted string showing message data, ordering key, and attributes
172
"""
173
```
174
175
## Usage Examples
176
177
### Basic Message Processing
178
179
```python
180
def callback(message):
181
# Access message data
182
data = message.data.decode('utf-8')
183
print(f"Message ID: {message.message_id}")
184
print(f"Data: {data}")
185
186
# Access attributes
187
for key, value in message.attributes.items():
188
print(f"Attribute {key}: {value}")
189
190
# Acknowledge the message
191
message.ack()
192
```
193
194
### Error Handling with Negative Acknowledgment
195
196
```python
197
def callback(message):
198
try:
199
# Process the message
200
process_data(message.data)
201
202
# Acknowledge successful processing
203
message.ack()
204
205
except ProcessingError as e:
206
print(f"Processing failed: {e}")
207
208
# Negative acknowledge to trigger redelivery
209
message.nack()
210
211
except Exception as e:
212
print(f"Unexpected error: {e}")
213
214
# For unexpected errors, still nack to avoid message loss
215
message.nack()
216
```
217
218
### Extended Processing with Deadline Modification
219
220
```python
221
def callback(message):
222
print(f"Starting to process message: {message.message_id}")
223
224
try:
225
# Extend deadline before long processing
226
message.modify_ack_deadline(300) # 5 minutes
227
228
# Perform long-running operation
229
result = long_running_processing(message.data)
230
231
# Additional deadline extension if needed
232
if complex_validation_needed(result):
233
message.modify_ack_deadline(180) # 3 more minutes
234
validate_result(result)
235
236
# Acknowledge after successful processing
237
message.ack()
238
239
except Exception as e:
240
print(f"Processing failed: {e}")
241
message.nack()
242
```
243
244
### Message Metadata Analysis
245
246
```python
247
def callback(message):
248
# Analyze message metadata
249
print(f"Message ID: {message.message_id}")
250
print(f"Publish time: {message.publish_time}")
251
print(f"Delivery attempt: {message.delivery_attempt}")
252
253
if message.ordering_key:
254
print(f"Ordering key: {message.ordering_key}")
255
256
# Check for repeated deliveries
257
if message.delivery_attempt > 1:
258
print(f"Warning: Message redelivered {message.delivery_attempt} times")
259
260
# Consider dead letter queue after too many attempts
261
if message.delivery_attempt > 5:
262
print("Too many delivery attempts, sending to dead letter queue")
263
send_to_dead_letter_queue(message)
264
message.ack()
265
return
266
267
# Process the message
268
try:
269
process_message(message.data)
270
message.ack()
271
except Exception as e:
272
print(f"Processing error: {e}")
273
message.nack()
274
```
275
276
### Attribute-Based Message Routing
277
278
```python
279
def callback(message):
280
# Route messages based on attributes
281
message_type = message.attributes.get('message_type')
282
283
if message_type == 'user_event':
284
handle_user_event(message)
285
elif message_type == 'system_event':
286
handle_system_event(message)
287
elif message_type == 'error_event':
288
handle_error_event(message)
289
else:
290
print(f"Unknown message type: {message_type}")
291
# Still acknowledge unknown message types to avoid redelivery
292
message.ack()
293
294
def handle_user_event(message):
295
user_id = message.attributes.get('user_id')
296
event_data = message.data.decode('utf-8')
297
298
try:
299
process_user_event(user_id, event_data)
300
message.ack()
301
except Exception as e:
302
print(f"Failed to process user event: {e}")
303
message.nack()
304
```
305
306
### Asynchronous Acknowledgment
307
308
```python
309
def callback(message):
310
# Process message asynchronously with response futures
311
try:
312
# Start processing
313
process_message_async(message.data)
314
315
# Acknowledge with response tracking
316
ack_future = message.ack_with_response()
317
ack_future.add_done_callback(lambda f: print(f"Ack completed for {message.message_id}"))
318
319
except Exception as e:
320
print(f"Processing failed: {e}")
321
322
# Negative acknowledge with response tracking
323
nack_future = message.nack_with_response()
324
nack_future.add_done_callback(lambda f: print(f"Nack completed for {message.message_id}"))
325
```