CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Pending
Overview
Eval results
Files

message-handling.mddocs/

Message Handling

Message objects represent individual Pub/Sub messages received by subscribers. They provide access to message data, attributes, and metadata, along with methods for acknowledgment and deadline management.

Capabilities

Message Structure

Access message data, attributes, and metadata.

class Message:
    """
    A representation of a single Pub/Sub message.

    Attributes:
    - message_id: Unique message identifier
    - data: Message payload as bytes
    - attributes: Message attributes as dictionary
    - publish_time: When message was originally published
    - delivery_attempt: Number of delivery attempts
    - ordering_key: Message ordering key (if any)
    - opentelemetry_data: OpenTelemetry tracing data (if enabled)
    """
    
    @property
    def message_id(self) -> str:
        """
        Unique message identifier.

        Returns:
        Message ID string
        """
    
    @property
    def data(self) -> bytes:
        """
        Message payload data.

        Returns:
        Message data as bytes
        """
    
    @property
    def attributes(self) -> MutableMapping[str, str]:
        """
        Message attributes.

        Returns:
        Dictionary of message attributes
        """
    
    @property
    def publish_time(self) -> Timestamp:
        """
        Time when message was originally published.

        Returns:
        Protobuf Timestamp
        """
    
    @property
    def delivery_attempt(self) -> int:
        """
        Number of times this message has been delivered.

        Returns:
        Delivery attempt count
        """
    
    @property
    def ordering_key(self) -> str:
        """
        Message ordering key.

        Returns:
        Ordering key string, empty if no ordering key
        """
    
    @property
    def size(self) -> int:
        """
        Size of the underlying message in bytes.

        Returns:
        Message size in bytes
        """

    @property
    def ack_id(self) -> str:
        """
        Acknowledgment ID used to ack the message.

        Returns:
        Acknowledgment ID string
        """
    
    @property
    def opentelemetry_data(self) -> Optional[SubscribeOpenTelemetry]:
        """
        OpenTelemetry tracing data associated with this message.

        Returns:
        OpenTelemetry data object or None if tracing not enabled
        """

Message Acknowledgment

Acknowledge or negative acknowledge messages to control redelivery.

def ack(self) -> None:
    """
    Acknowledge the message.
    
    This tells Pub/Sub that the message was successfully processed
    and should not be redelivered.
    """

def nack(self) -> None:
    """
    Negative acknowledge the message.
    
    This tells Pub/Sub that the message was not successfully processed
    and should be redelivered (subject to retry policies).
    """

def ack_with_response(self) -> Future:
    """
    Acknowledge the message and return response future.

    Returns:
    Future that resolves when acknowledgment is processed
    """

def nack_with_response(self) -> Future:
    """
    Negative acknowledge the message and return response future.

    Returns:
    Future that resolves when negative acknowledgment is processed
    """

Deadline Management

Modify message acknowledgment deadlines to extend processing time.

def modify_ack_deadline(self, seconds: int) -> None:
    """
    Modify the acknowledgment deadline for the message.

    Parameters:
    - seconds: Number of seconds to extend the deadline
                Must be between 0 and 600 seconds
                Use 0 to immediately requeue the message
    """

Message Utilities

Additional methods for message handling and representation.

def __repr__(self) -> str:
    """
    String representation of the message.

    Returns:
    Formatted string showing message data, ordering key, and attributes
    """

Usage Examples

Basic Message Processing

def callback(message):
    # Access message data
    data = message.data.decode('utf-8')
    print(f"Message ID: {message.message_id}")
    print(f"Data: {data}")
    
    # Access attributes
    for key, value in message.attributes.items():
        print(f"Attribute {key}: {value}")
    
    # Acknowledge the message
    message.ack()

Error Handling with Negative Acknowledgment

def callback(message):
    try:
        # Process the message
        process_data(message.data)
        
        # Acknowledge successful processing
        message.ack()
        
    except ProcessingError as e:
        print(f"Processing failed: {e}")
        
        # Negative acknowledge to trigger redelivery
        message.nack()
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        
        # For unexpected errors, still nack to avoid message loss
        message.nack()

Extended Processing with Deadline Modification

def callback(message):
    print(f"Starting to process message: {message.message_id}")
    
    try:
        # Extend deadline before long processing
        message.modify_ack_deadline(300)  # 5 minutes
        
        # Perform long-running operation
        result = long_running_processing(message.data)
        
        # Additional deadline extension if needed
        if complex_validation_needed(result):
            message.modify_ack_deadline(180)  # 3 more minutes
            validate_result(result)
        
        # Acknowledge after successful processing
        message.ack()
        
    except Exception as e:
        print(f"Processing failed: {e}")
        message.nack()

Message Metadata Analysis

def callback(message):
    # Analyze message metadata
    print(f"Message ID: {message.message_id}")
    print(f"Publish time: {message.publish_time}")
    print(f"Delivery attempt: {message.delivery_attempt}")
    
    if message.ordering_key:
        print(f"Ordering key: {message.ordering_key}")
    
    # Check for repeated deliveries
    if message.delivery_attempt > 1:
        print(f"Warning: Message redelivered {message.delivery_attempt} times")
        
        # Consider dead letter queue after too many attempts
        if message.delivery_attempt > 5:
            print("Too many delivery attempts, sending to dead letter queue")
            send_to_dead_letter_queue(message)
            message.ack()
            return
    
    # Process the message
    try:
        process_message(message.data)
        message.ack()
    except Exception as e:
        print(f"Processing error: {e}")
        message.nack()

Attribute-Based Message Routing

def callback(message):
    # Route messages based on attributes
    message_type = message.attributes.get('message_type')
    
    if message_type == 'user_event':
        handle_user_event(message)
    elif message_type == 'system_event':
        handle_system_event(message)
    elif message_type == 'error_event':
        handle_error_event(message)
    else:
        print(f"Unknown message type: {message_type}")
        # Still acknowledge unknown message types to avoid redelivery
        message.ack()

def handle_user_event(message):
    user_id = message.attributes.get('user_id')
    event_data = message.data.decode('utf-8')
    
    try:
        process_user_event(user_id, event_data)
        message.ack()
    except Exception as e:
        print(f"Failed to process user event: {e}")
        message.nack()

Asynchronous Acknowledgment

def callback(message):
    # Process message asynchronously with response futures
    try:
        # Start processing
        process_message_async(message.data)
        
        # Acknowledge with response tracking
        ack_future = message.ack_with_response()
        ack_future.add_done_callback(lambda f: print(f"Ack completed for {message.message_id}"))
        
    except Exception as e:
        print(f"Processing failed: {e}")
        
        # Negative acknowledge with response tracking
        nack_future = message.nack_with_response()
        nack_future.add_done_callback(lambda f: print(f"Nack completed for {message.message_id}"))

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-pubsub

docs

configuration.md

exceptions.md

index.md

message-handling.md

publisher.md

schedulers.md

schema-service.md

subscriber.md

types.md

tile.json