CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Pending
Overview
Eval results
Files

exceptions.mddocs/

Exceptions and Error Handling

Google Cloud Pub/Sub provides specific exception types for different error conditions in publishing and subscribing operations. These exceptions help identify and handle specific failure scenarios appropriately.

Capabilities

Publisher Exceptions

Exception types specific to publishing operations and flow control.

class PublishError(Exception):
    """
    Base exception for publish operation errors.
    
    Raised when a publish operation fails due to various reasons
    including network issues, authentication failures, or server errors.
    """
    pass

class MessageTooLargeError(PublishError):
    """
    Exception raised when a message exceeds the maximum size limit.
    
    The maximum message size for Pub/Sub is 10MB including the message
    data and all attributes.
    """
    pass

class PublishToPausedOrderingKeyException(PublishError):
    """
    Exception raised when attempting to publish to a paused ordering key.
    
    When message ordering is enabled and an error occurs for a specific
    ordering key, that key is paused until explicitly resumed.
    """
    pass

class FlowControlLimitError(PublishError):
    """
    Exception raised when publisher flow control limits are exceeded.
    
    This occurs when the configured flow control settings (message limit,
    byte limit) are exceeded and the limit_exceeded_behavior is set to ERROR.
    """
    pass

Subscriber Exceptions

Exception types specific to subscribing operations and message acknowledgment.

class AcknowledgeError(Exception):
    """
    Exception raised when message acknowledgment operations fail.
    
    This can occur during ack(), nack(), or modify_ack_deadline() operations
    when the acknowledgment request cannot be processed by the server.
    """
    pass

class AcknowledgeStatus(Enum):
    """
    Enumeration of possible acknowledgment status codes.
    
    Used to indicate the result of acknowledgment operations in
    exactly-once delivery scenarios.
    """
    
    SUCCESS = "SUCCESS"
    """Acknowledgment was successful."""
    
    PERMISSION_DENIED = "PERMISSION_DENIED"
    """Insufficient permissions to acknowledge the message."""
    
    FAILED_PRECONDITION = "FAILED_PRECONDITION"
    """Acknowledgment failed due to precondition failure."""
    
    INVALID_ACK_ID = "INVALID_ACK_ID"
    """The acknowledgment ID is invalid or expired."""
    
    OTHER = "OTHER"
    """Other acknowledgment failure."""

General Exceptions

General exception types used across publisher and subscriber operations.

class TimeoutError(Exception):
    """
    Exception raised when an operation exceeds its timeout duration.
    
    This can occur in both publish and subscribe operations when
    the configured timeout is exceeded.
    """
    pass

Usage Examples

Publisher Error Handling

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.publisher.exceptions import (
    PublishError,
    MessageTooLargeError,
    PublishToPausedOrderingKeyException,
    FlowControlLimitError
)

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

try:
    # Attempt to publish a large message
    large_data = b"x" * (11 * 1024 * 1024)  # 11MB - exceeds limit
    future = publisher.publish(topic_path, large_data)
    message_id = future.result()
    
except MessageTooLargeError as e:
    print(f"Message too large: {e}")
    # Handle by splitting message or reducing size
    
except PublishToPausedOrderingKeyException as e:
    print(f"Ordering key paused: {e}")
    # Resume the ordering key and retry
    publisher.resume_publish(topic_path, "ordering-key")
    
except FlowControlLimitError as e:
    print(f"Flow control limit exceeded: {e}")
    # Wait or adjust flow control settings
    
except PublishError as e:
    print(f"General publish error: {e}")
    # Handle general publish failures

Flow Control Error Handling

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher.exceptions import FlowControlLimitError

# Configure strict flow control
flow_control = types.PublishFlowControl(
    message_limit=100,
    byte_limit=1000000,  # 1MB
    limit_exceeded_behavior=types.LimitExceededBehavior.ERROR
)

publisher_options = types.PublisherOptions(flow_control=flow_control)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

topic_path = publisher.topic_path("my-project", "my-topic")

for i in range(200):  # Try to exceed limits
    try:
        future = publisher.publish(topic_path, f"Message {i}".encode())
        
    except FlowControlLimitError:
        print(f"Flow control limit hit at message {i}")
        # Wait for some messages to complete
        time.sleep(1)
        # Retry or skip this message
        continue

Subscriber Error Handling

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    try:
        # Process the message
        process_message(message.data)
        
        # Acknowledge the message
        message.ack()
        
    except AcknowledgeError as e:
        print(f"Failed to acknowledge message {message.message_id}: {e}")
        # Message will be redelivered automatically
        
    except Exception as e:
        print(f"Processing error: {e}")
        try:
            # Negative acknowledge for redelivery
            message.nack()
        except AcknowledgeError as ack_error:
            print(f"Failed to nack message: {ack_error}")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)

Timeout Handling

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.exceptions import TimeoutError

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

try:
    future = publisher.publish(topic_path, b"Test message")
    # Wait for result with timeout
    message_id = future.result(timeout=30)
    print(f"Published: {message_id}")
    
except TimeoutError:
    print("Publish operation timed out")
    # Handle timeout - message may still be published
    
except Exception as e:
    print(f"Publish failed: {e}")

Exactly-Once Delivery Error Handling

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.exceptions import AcknowledgeError, AcknowledgeStatus

def callback(message):
    try:
        # Process the message
        result = process_message(message.data)
        
        # Use ack_with_response for exactly-once delivery
        ack_future = message.ack_with_response()
        ack_result = ack_future.result()
        
        if ack_result == AcknowledgeStatus.SUCCESS:
            print(f"Successfully processed message {message.message_id}")
        else:
            print(f"Ack failed with status: {ack_result}")
            # Handle based on specific ack status
            
    except AcknowledgeError as e:
        print(f"Acknowledgment error: {e}")
        # Message will be redelivered
        
    except Exception as e:
        print(f"Processing error: {e}")
        # Nack the message for redelivery
        nack_future = message.nack_with_response()
        try:
            nack_result = nack_future.result()
            print(f"Message nacked with status: {nack_result}")
        except AcknowledgeError as nack_error:
            print(f"Nack failed: {nack_error}")

Ordering Key Error Recovery

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher.exceptions import PublishToPausedOrderingKeyException

# Enable message ordering
publisher_options = types.PublisherOptions(enable_message_ordering=True)
publisher = pubsub_v1.PublisherClient(publisher_options=publisher_options)

topic_path = publisher.topic_path("my-project", "my-topic")
ordering_key = "user-123"

def publish_with_retry(topic, data, ordering_key, max_retries=3):
    for attempt in range(max_retries):
        try:
            future = publisher.publish(topic, data, ordering_key=ordering_key)
            return future.result()
            
        except PublishToPausedOrderingKeyException:
            print(f"Ordering key {ordering_key} is paused, resuming...")
            publisher.resume_publish(topic, ordering_key)
            
            if attempt == max_retries - 1:
                raise  # Re-raise on final attempt
            
            # Wait before retry
            time.sleep(2 ** attempt)
            
        except Exception as e:
            print(f"Publish failed on attempt {attempt + 1}: {e}")
            if attempt == max_retries - 1:
                raise

# Use the retry function
try:
    message_id = publish_with_retry(
        topic_path, 
        b"Ordered message", 
        ordering_key
    )
    print(f"Published ordered message: {message_id}")
    
except Exception as e:
    print(f"Failed to publish after retries: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-pubsub

docs

configuration.md

exceptions.md

index.md

message-handling.md

publisher.md

schedulers.md

schema-service.md

subscriber.md

types.md

tile.json