or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

exceptions.mddocs/

0

# Exceptions and Error Handling

1

2

Google Cloud Pub/Sub provides specific exception types for different error conditions in publishing and subscribing operations. These exceptions help identify and handle specific failure scenarios appropriately.

3

4

## Capabilities

5

6

### Publisher Exceptions

7

8

Exception types specific to publishing operations and flow control.

9

10

```python { .api }

11

class PublishError(Exception):

12

"""

13

Base exception for publish operation errors.

14

15

Raised when a publish operation fails due to various reasons

16

including network issues, authentication failures, or server errors.

17

"""

18

pass

19

20

class MessageTooLargeError(PublishError):

21

"""

22

Exception raised when a message exceeds the maximum size limit.

23

24

The maximum message size for Pub/Sub is 10MB including the message

25

data and all attributes.

26

"""

27

pass

28

29

class PublishToPausedOrderingKeyException(PublishError):

30

"""

31

Exception raised when attempting to publish to a paused ordering key.

32

33

When message ordering is enabled and an error occurs for a specific

34

ordering key, that key is paused until explicitly resumed.

35

"""

36

pass

37

38

class FlowControlLimitError(PublishError):

39

"""

40

Exception raised when publisher flow control limits are exceeded.

41

42

This occurs when the configured flow control settings (message limit,

43

byte limit) are exceeded and the limit_exceeded_behavior is set to ERROR.

44

"""

45

pass

46

```

47

48

### Subscriber Exceptions

49

50

Exception types specific to subscribing operations and message acknowledgment.

51

52

```python { .api }

53

class AcknowledgeError(Exception):

54

"""

55

Exception raised when message acknowledgment operations fail.

56

57

This can occur during ack(), nack(), or modify_ack_deadline() operations

58

when the acknowledgment request cannot be processed by the server.

59

"""

60

pass

61

62

class AcknowledgeStatus(Enum):

63

"""

64

Enumeration of possible acknowledgment status codes.

65

66

Used to indicate the result of acknowledgment operations in

67

exactly-once delivery scenarios.

68

"""

69

70

SUCCESS = "SUCCESS"

71

"""Acknowledgment was successful."""

72

73

PERMISSION_DENIED = "PERMISSION_DENIED"

74

"""Insufficient permissions to acknowledge the message."""

75

76

FAILED_PRECONDITION = "FAILED_PRECONDITION"

77

"""Acknowledgment failed due to precondition failure."""

78

79

INVALID_ACK_ID = "INVALID_ACK_ID"

80

"""The acknowledgment ID is invalid or expired."""

81

82

OTHER = "OTHER"

83

"""Other acknowledgment failure."""

84

```

85

86

### General Exceptions

87

88

General exception types used across publisher and subscriber operations.

89

90

```python { .api }

91

class TimeoutError(Exception):

92

"""

93

Exception raised when an operation exceeds its timeout duration.

94

95

This can occur in both publish and subscribe operations when

96

the configured timeout is exceeded.

97

"""

98

pass

99

```

100

101

## Usage Examples

102

103

### Publisher Error Handling

104

105

```python

106

from google.cloud import pubsub_v1

107

from google.cloud.pubsub_v1.publisher.exceptions import (

108

PublishError,

109

MessageTooLargeError,

110

PublishToPausedOrderingKeyException,

111

FlowControlLimitError

112

)

113

114

publisher = pubsub_v1.PublisherClient()

115

topic_path = publisher.topic_path("my-project", "my-topic")

116

117

try:

118

# Attempt to publish a large message

119

large_data = b"x" * (11 * 1024 * 1024) # 11MB - exceeds limit

120

future = publisher.publish(topic_path, large_data)

121

message_id = future.result()

122

123

except MessageTooLargeError as e:

124

print(f"Message too large: {e}")

125

# Handle by splitting message or reducing size

126

127

except PublishToPausedOrderingKeyException as e:

128

print(f"Ordering key paused: {e}")

129

# Resume the ordering key and retry

130

publisher.resume_publish(topic_path, "ordering-key")

131

132

except FlowControlLimitError as e:

133

print(f"Flow control limit exceeded: {e}")

134

# Wait or adjust flow control settings

135

136

except PublishError as e:

137

print(f"General publish error: {e}")

138

# Handle general publish failures

139

```

140

141

### Flow Control Error Handling

142

143

```python

144

from google.cloud import pubsub_v1

145

from google.cloud.pubsub_v1 import types

146

from google.cloud.pubsub_v1.publisher.exceptions import FlowControlLimitError

147

148

# Configure strict flow control

149

flow_control = types.PublishFlowControl(

150

message_limit=100,

151

byte_limit=1000000, # 1MB

152

limit_exceeded_behavior=types.LimitExceededBehavior.ERROR

153

)

154

155

publisher_options = types.PublisherOptions(flow_control=flow_control)

156

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

157

158

topic_path = publisher.topic_path("my-project", "my-topic")

159

160

for i in range(200): # Try to exceed limits

161

try:

162

future = publisher.publish(topic_path, f"Message {i}".encode())

163

164

except FlowControlLimitError:

165

print(f"Flow control limit hit at message {i}")

166

# Wait for some messages to complete

167

time.sleep(1)

168

# Retry or skip this message

169

continue

170

```

171

172

### Subscriber Error Handling

173

174

```python

175

from google.cloud import pubsub_v1

176

from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError

177

178

subscriber = pubsub_v1.SubscriberClient()

179

subscription_path = subscriber.subscription_path("my-project", "my-subscription")

180

181

def callback(message):

182

try:

183

# Process the message

184

process_message(message.data)

185

186

# Acknowledge the message

187

message.ack()

188

189

except AcknowledgeError as e:

190

print(f"Failed to acknowledge message {message.message_id}: {e}")

191

# Message will be redelivered automatically

192

193

except Exception as e:

194

print(f"Processing error: {e}")

195

try:

196

# Negative acknowledge for redelivery

197

message.nack()

198

except AcknowledgeError as ack_error:

199

print(f"Failed to nack message: {ack_error}")

200

201

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

202

```

203

204

### Timeout Handling

205

206

```python

207

from google.cloud import pubsub_v1

208

from google.cloud.pubsub_v1.exceptions import TimeoutError

209

210

publisher = pubsub_v1.PublisherClient()

211

topic_path = publisher.topic_path("my-project", "my-topic")

212

213

try:

214

future = publisher.publish(topic_path, b"Test message")

215

# Wait for result with timeout

216

message_id = future.result(timeout=30)

217

print(f"Published: {message_id}")

218

219

except TimeoutError:

220

print("Publish operation timed out")

221

# Handle timeout - message may still be published

222

223

except Exception as e:

224

print(f"Publish failed: {e}")

225

```

226

227

### Exactly-Once Delivery Error Handling

228

229

```python

230

from google.cloud import pubsub_v1

231

from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError, AcknowledgeStatus

232

233

def callback(message):

234

try:

235

# Process the message

236

result = process_message(message.data)

237

238

# Use ack_with_response for exactly-once delivery

239

ack_future = message.ack_with_response()

240

ack_result = ack_future.result()

241

242

if ack_result == AcknowledgeStatus.SUCCESS:

243

print(f"Successfully processed message {message.message_id}")

244

else:

245

print(f"Ack failed with status: {ack_result}")

246

# Handle based on specific ack status

247

248

except AcknowledgeError as e:

249

print(f"Acknowledgment error: {e}")

250

# Message will be redelivered

251

252

except Exception as e:

253

print(f"Processing error: {e}")

254

# Nack the message for redelivery

255

nack_future = message.nack_with_response()

256

try:

257

nack_result = nack_future.result()

258

print(f"Message nacked with status: {nack_result}")

259

except AcknowledgeError as nack_error:

260

print(f"Nack failed: {nack_error}")

261

```

262

263

### Ordering Key Error Recovery

264

265

```python

266

from google.cloud import pubsub_v1

267

from google.cloud.pubsub_v1 import types

268

from google.cloud.pubsub_v1.publisher.exceptions import PublishToPausedOrderingKeyException

269

270

# Enable message ordering

271

publisher_options = types.PublisherOptions(enable_message_ordering=True)

272

publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

273

274

topic_path = publisher.topic_path("my-project", "my-topic")

275

ordering_key = "user-123"

276

277

def publish_with_retry(topic, data, ordering_key, max_retries=3):

278

for attempt in range(max_retries):

279

try:

280

future = publisher.publish(topic, data, ordering_key=ordering_key)

281

return future.result()

282

283

except PublishToPausedOrderingKeyException:

284

print(f"Ordering key {ordering_key} is paused, resuming...")

285

publisher.resume_publish(topic, ordering_key)

286

287

if attempt == max_retries - 1:

288

raise # Re-raise on final attempt

289

290

# Wait before retry

291

time.sleep(2 ** attempt)

292

293

except Exception as e:

294

print(f"Publish failed on attempt {attempt + 1}: {e}")

295

if attempt == max_retries - 1:

296

raise

297

298

# Use the retry function

299

try:

300

message_id = publish_with_retry(

301

topic_path,

302

b"Ordered message",

303

ordering_key

304

)

305

print(f"Published ordered message: {message_id}")

306

307

except Exception as e:

308

print(f"Failed to publish after retries: {e}")

309

```