0
# Exceptions and Error Handling
1
2
Google Cloud Pub/Sub provides specific exception types for different error conditions in publishing and subscribing operations. These exceptions help identify and handle specific failure scenarios appropriately.
3
4
## Capabilities
5
6
### Publisher Exceptions
7
8
Exception types specific to publishing operations and flow control.
9
10
```python { .api }
11
class PublishError(Exception):
12
"""
13
Base exception for publish operation errors.
14
15
Raised when a publish operation fails due to various reasons
16
including network issues, authentication failures, or server errors.
17
"""
18
pass
19
20
class MessageTooLargeError(PublishError):
21
"""
22
Exception raised when a message exceeds the maximum size limit.
23
24
The maximum message size for Pub/Sub is 10MB including the message
25
data and all attributes.
26
"""
27
pass
28
29
class PublishToPausedOrderingKeyException(PublishError):
30
"""
31
Exception raised when attempting to publish to a paused ordering key.
32
33
When message ordering is enabled and an error occurs for a specific
34
ordering key, that key is paused until explicitly resumed.
35
"""
36
pass
37
38
class FlowControlLimitError(PublishError):
39
"""
40
Exception raised when publisher flow control limits are exceeded.
41
42
This occurs when the configured flow control settings (message limit,
43
byte limit) are exceeded and the limit_exceeded_behavior is set to ERROR.
44
"""
45
pass
46
```
47
48
### Subscriber Exceptions
49
50
Exception types specific to subscribing operations and message acknowledgment.
51
52
```python { .api }
53
class AcknowledgeError(Exception):
54
"""
55
Exception raised when message acknowledgment operations fail.
56
57
This can occur during ack(), nack(), or modify_ack_deadline() operations
58
when the acknowledgment request cannot be processed by the server.
59
"""
60
pass
61
62
class AcknowledgeStatus(Enum):
63
"""
64
Enumeration of possible acknowledgment status codes.
65
66
Used to indicate the result of acknowledgment operations in
67
exactly-once delivery scenarios.
68
"""
69
70
SUCCESS = "SUCCESS"
71
"""Acknowledgment was successful."""
72
73
PERMISSION_DENIED = "PERMISSION_DENIED"
74
"""Insufficient permissions to acknowledge the message."""
75
76
FAILED_PRECONDITION = "FAILED_PRECONDITION"
77
"""Acknowledgment failed due to precondition failure."""
78
79
INVALID_ACK_ID = "INVALID_ACK_ID"
80
"""The acknowledgment ID is invalid or expired."""
81
82
OTHER = "OTHER"
83
"""Other acknowledgment failure."""
84
```
85
86
### General Exceptions
87
88
General exception types used across publisher and subscriber operations.
89
90
```python { .api }
91
class TimeoutError(Exception):
92
"""
93
Exception raised when an operation exceeds its timeout duration.
94
95
This can occur in both publish and subscribe operations when
96
the configured timeout is exceeded.
97
"""
98
pass
99
```
100
101
## Usage Examples
102
103
### Publisher Error Handling
104
105
```python
106
from google.cloud import pubsub_v1
107
from google.cloud.pubsub_v1.publisher.exceptions import (
108
PublishError,
109
MessageTooLargeError,
110
PublishToPausedOrderingKeyException,
111
FlowControlLimitError
112
)
113
114
publisher = pubsub_v1.PublisherClient()
115
topic_path = publisher.topic_path("my-project", "my-topic")
116
117
try:
118
# Attempt to publish a large message
119
large_data = b"x" * (11 * 1024 * 1024) # 11MB - exceeds limit
120
future = publisher.publish(topic_path, large_data)
121
message_id = future.result()
122
123
except MessageTooLargeError as e:
124
print(f"Message too large: {e}")
125
# Handle by splitting message or reducing size
126
127
except PublishToPausedOrderingKeyException as e:
128
print(f"Ordering key paused: {e}")
129
# Resume the ordering key and retry
130
publisher.resume_publish(topic_path, "ordering-key")
131
132
except FlowControlLimitError as e:
133
print(f"Flow control limit exceeded: {e}")
134
# Wait or adjust flow control settings
135
136
except PublishError as e:
137
print(f"General publish error: {e}")
138
# Handle general publish failures
139
```
140
141
### Flow Control Error Handling
142
143
```python
144
from google.cloud import pubsub_v1
145
from google.cloud.pubsub_v1 import types
146
from google.cloud.pubsub_v1.publisher.exceptions import FlowControlLimitError
147
148
# Configure strict flow control
149
flow_control = types.PublishFlowControl(
150
message_limit=100,
151
byte_limit=1000000, # 1MB
152
limit_exceeded_behavior=types.LimitExceededBehavior.ERROR
153
)
154
155
publisher_options = types.PublisherOptions(flow_control=flow_control)
156
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
157
158
topic_path = publisher.topic_path("my-project", "my-topic")
159
160
for i in range(200): # Try to exceed limits
161
try:
162
future = publisher.publish(topic_path, f"Message {i}".encode())
163
164
except FlowControlLimitError:
165
print(f"Flow control limit hit at message {i}")
166
# Wait for some messages to complete
167
time.sleep(1)
168
# Retry or skip this message
169
continue
170
```
171
172
### Subscriber Error Handling
173
174
```python
175
from google.cloud import pubsub_v1
176
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError
177
178
subscriber = pubsub_v1.SubscriberClient()
179
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
180
181
def callback(message):
182
try:
183
# Process the message
184
process_message(message.data)
185
186
# Acknowledge the message
187
message.ack()
188
189
except AcknowledgeError as e:
190
print(f"Failed to acknowledge message {message.message_id}: {e}")
191
# Message will be redelivered automatically
192
193
except Exception as e:
194
print(f"Processing error: {e}")
195
try:
196
# Negative acknowledge for redelivery
197
message.nack()
198
except AcknowledgeError as ack_error:
199
print(f"Failed to nack message: {ack_error}")
200
201
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
202
```
203
204
### Timeout Handling
205
206
```python
207
from google.cloud import pubsub_v1
208
from google.cloud.pubsub_v1.exceptions import TimeoutError
209
210
publisher = pubsub_v1.PublisherClient()
211
topic_path = publisher.topic_path("my-project", "my-topic")
212
213
try:
214
future = publisher.publish(topic_path, b"Test message")
215
# Wait for result with timeout
216
message_id = future.result(timeout=30)
217
print(f"Published: {message_id}")
218
219
except TimeoutError:
220
print("Publish operation timed out")
221
# Handle timeout - message may still be published
222
223
except Exception as e:
224
print(f"Publish failed: {e}")
225
```
226
227
### Exactly-Once Delivery Error Handling
228
229
```python
230
from google.cloud import pubsub_v1
231
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError, AcknowledgeStatus
232
233
def callback(message):
234
try:
235
# Process the message
236
result = process_message(message.data)
237
238
# Use ack_with_response for exactly-once delivery
239
ack_future = message.ack_with_response()
240
ack_result = ack_future.result()
241
242
if ack_result == AcknowledgeStatus.SUCCESS:
243
print(f"Successfully processed message {message.message_id}")
244
else:
245
print(f"Ack failed with status: {ack_result}")
246
# Handle based on specific ack status
247
248
except AcknowledgeError as e:
249
print(f"Acknowledgment error: {e}")
250
# Message will be redelivered
251
252
except Exception as e:
253
print(f"Processing error: {e}")
254
# Nack the message for redelivery
255
nack_future = message.nack_with_response()
256
try:
257
nack_result = nack_future.result()
258
print(f"Message nacked with status: {nack_result}")
259
except AcknowledgeError as nack_error:
260
print(f"Nack failed: {nack_error}")
261
```
262
263
### Ordering Key Error Recovery
264
265
```python
266
from google.cloud import pubsub_v1
267
from google.cloud.pubsub_v1 import types
268
from google.cloud.pubsub_v1.publisher.exceptions import PublishToPausedOrderingKeyException
269
270
# Enable message ordering
271
publisher_options = types.PublisherOptions(enable_message_ordering=True)
272
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
273
274
topic_path = publisher.topic_path("my-project", "my-topic")
275
ordering_key = "user-123"
276
277
def publish_with_retry(topic, data, ordering_key, max_retries=3):
278
for attempt in range(max_retries):
279
try:
280
future = publisher.publish(topic, data, ordering_key=ordering_key)
281
return future.result()
282
283
except PublishToPausedOrderingKeyException:
284
print(f"Ordering key {ordering_key} is paused, resuming...")
285
publisher.resume_publish(topic, ordering_key)
286
287
if attempt == max_retries - 1:
288
raise # Re-raise on final attempt
289
290
# Wait before retry
291
time.sleep(2 ** attempt)
292
293
except Exception as e:
294
print(f"Publish failed on attempt {attempt + 1}: {e}")
295
if attempt == max_retries - 1:
296
raise
297
298
# Use the retry function
299
try:
300
message_id = publish_with_retry(
301
topic_path,
302
b"Ordered message",
303
ordering_key
304
)
305
print(f"Published ordered message: {message_id}")
306
307
except Exception as e:
308
print(f"Failed to publish after retries: {e}")
309
```