Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
Google Cloud Pub/Sub provides specific exception types for different error conditions in publishing and subscribing operations. These exceptions help identify and handle specific failure scenarios appropriately.
Exception types specific to publishing operations and flow control.
class PublishError(Exception):
"""
Base exception for publish operation errors.
Raised when a publish operation fails due to various reasons
including network issues, authentication failures, or server errors.
"""
pass
class MessageTooLargeError(PublishError):
"""
Exception raised when a message exceeds the maximum size limit.
The maximum message size for Pub/Sub is 10MB including the message
data and all attributes.
"""
pass
class PublishToPausedOrderingKeyException(PublishError):
"""
Exception raised when attempting to publish to a paused ordering key.
When message ordering is enabled and an error occurs for a specific
ordering key, that key is paused until explicitly resumed.
"""
pass
class FlowControlLimitError(PublishError):
"""
Exception raised when publisher flow control limits are exceeded.
This occurs when the configured flow control settings (message limit,
byte limit) are exceeded and the limit_exceeded_behavior is set to ERROR.
"""
passException types specific to subscribing operations and message acknowledgment.
class AcknowledgeError(Exception):
"""
Exception raised when message acknowledgment operations fail.
This can occur during ack(), nack(), or modify_ack_deadline() operations
when the acknowledgment request cannot be processed by the server.
"""
pass
class AcknowledgeStatus(Enum):
"""
Enumeration of possible acknowledgment status codes.
Used to indicate the result of acknowledgment operations in
exactly-once delivery scenarios.
"""
SUCCESS = "SUCCESS"
"""Acknowledgment was successful."""
PERMISSION_DENIED = "PERMISSION_DENIED"
"""Insufficient permissions to acknowledge the message."""
FAILED_PRECONDITION = "FAILED_PRECONDITION"
"""Acknowledgment failed due to precondition failure."""
INVALID_ACK_ID = "INVALID_ACK_ID"
"""The acknowledgment ID is invalid or expired."""
OTHER = "OTHER"
"""Other acknowledgment failure."""General exception types used across publisher and subscriber operations.
class TimeoutError(Exception):
"""
Exception raised when an operation exceeds its timeout duration.
This can occur in both publish and subscribe operations when
the configured timeout is exceeded.
"""
passfrom google.cloud import pubsub_v1
from google.cloud.pubsub_v1.publisher.exceptions import (
PublishError,
MessageTooLargeError,
PublishToPausedOrderingKeyException,
FlowControlLimitError
)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
try:
# Attempt to publish a large message
large_data = b"x" * (11 * 1024 * 1024) # 11MB - exceeds limit
future = publisher.publish(topic_path, large_data)
message_id = future.result()
except MessageTooLargeError as e:
print(f"Message too large: {e}")
# Handle by splitting message or reducing size
except PublishToPausedOrderingKeyException as e:
print(f"Ordering key paused: {e}")
# Resume the ordering key and retry
publisher.resume_publish(topic_path, "ordering-key")
except FlowControlLimitError as e:
print(f"Flow control limit exceeded: {e}")
# Wait or adjust flow control settings
except PublishError as e:
print(f"General publish error: {e}")
# Handle general publish failuresfrom google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher.exceptions import FlowControlLimitError
# Configure strict flow control
flow_control = types.PublishFlowControl(
message_limit=100,
byte_limit=1000000, # 1MB
limit_exceeded_behavior=types.LimitExceededBehavior.ERROR
)
publisher_options = types.PublisherOptions(flow_control=flow_control)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
topic_path = publisher.topic_path("my-project", "my-topic")
for i in range(200): # Try to exceed limits
try:
future = publisher.publish(topic_path, f"Message {i}".encode())
except FlowControlLimitError:
print(f"Flow control limit hit at message {i}")
# Wait for some messages to complete
time.sleep(1)
# Retry or skip this message
continuefrom google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
try:
# Process the message
process_message(message.data)
# Acknowledge the message
message.ack()
except AcknowledgeError as e:
print(f"Failed to acknowledge message {message.message_id}: {e}")
# Message will be redelivered automatically
except Exception as e:
print(f"Processing error: {e}")
try:
# Negative acknowledge for redelivery
message.nack()
except AcknowledgeError as ack_error:
print(f"Failed to nack message: {ack_error}")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.exceptions import TimeoutError
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
try:
future = publisher.publish(topic_path, b"Test message")
# Wait for result with timeout
message_id = future.result(timeout=30)
print(f"Published: {message_id}")
except TimeoutError:
print("Publish operation timed out")
# Handle timeout - message may still be published
except Exception as e:
print(f"Publish failed: {e}")from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError, AcknowledgeStatus
def callback(message):
try:
# Process the message
result = process_message(message.data)
# Use ack_with_response for exactly-once delivery
ack_future = message.ack_with_response()
ack_result = ack_future.result()
if ack_result == AcknowledgeStatus.SUCCESS:
print(f"Successfully processed message {message.message_id}")
else:
print(f"Ack failed with status: {ack_result}")
# Handle based on specific ack status
except AcknowledgeError as e:
print(f"Acknowledgment error: {e}")
# Message will be redelivered
except Exception as e:
print(f"Processing error: {e}")
# Nack the message for redelivery
nack_future = message.nack_with_response()
try:
nack_result = nack_future.result()
print(f"Message nacked with status: {nack_result}")
except AcknowledgeError as nack_error:
print(f"Nack failed: {nack_error}")from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher.exceptions import PublishToPausedOrderingKeyException
# Enable message ordering
publisher_options = types.PublisherOptions(enable_message_ordering=True)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)
topic_path = publisher.topic_path("my-project", "my-topic")
ordering_key = "user-123"
def publish_with_retry(topic, data, ordering_key, max_retries=3):
for attempt in range(max_retries):
try:
future = publisher.publish(topic, data, ordering_key=ordering_key)
return future.result()
except PublishToPausedOrderingKeyException:
print(f"Ordering key {ordering_key} is paused, resuming...")
publisher.resume_publish(topic, ordering_key)
if attempt == max_retries - 1:
raise # Re-raise on final attempt
# Wait before retry
time.sleep(2 ** attempt)
except Exception as e:
print(f"Publish failed on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise
# Use the retry function
try:
message_id = publish_with_retry(
topic_path,
b"Ordered message",
ordering_key
)
print(f"Published ordered message: {message_id}")
except Exception as e:
print(f"Failed to publish after retries: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub