Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
Message objects represent individual Pub/Sub messages received by subscribers. They provide access to message data, attributes, and metadata, along with methods for acknowledgment and deadline management.
Access message data, attributes, and metadata.
class Message:
"""
A representation of a single Pub/Sub message.
Attributes:
- message_id: Unique message identifier
- data: Message payload as bytes
- attributes: Message attributes as dictionary
- publish_time: When message was originally published
- delivery_attempt: Number of delivery attempts
- ordering_key: Message ordering key (if any)
- opentelemetry_data: OpenTelemetry tracing data (if enabled)
"""
@property
def message_id(self) -> str:
"""
Unique message identifier.
Returns:
Message ID string
"""
@property
def data(self) -> bytes:
"""
Message payload data.
Returns:
Message data as bytes
"""
@property
def attributes(self) -> MutableMapping[str, str]:
"""
Message attributes.
Returns:
Dictionary of message attributes
"""
@property
def publish_time(self) -> Timestamp:
"""
Time when message was originally published.
Returns:
Protobuf Timestamp
"""
@property
def delivery_attempt(self) -> int:
"""
Number of times this message has been delivered.
Returns:
Delivery attempt count
"""
@property
def ordering_key(self) -> str:
"""
Message ordering key.
Returns:
Ordering key string, empty if no ordering key
"""
@property
def size(self) -> int:
"""
Size of the underlying message in bytes.
Returns:
Message size in bytes
"""
@property
def ack_id(self) -> str:
"""
Acknowledgment ID used to ack the message.
Returns:
Acknowledgment ID string
"""
@property
def opentelemetry_data(self) -> Optional[SubscribeOpenTelemetry]:
"""
OpenTelemetry tracing data associated with this message.
Returns:
OpenTelemetry data object or None if tracing not enabled
"""Acknowledge or negative acknowledge messages to control redelivery.
def ack(self) -> None:
"""
Acknowledge the message.
This tells Pub/Sub that the message was successfully processed
and should not be redelivered.
"""
def nack(self) -> None:
"""
Negative acknowledge the message.
This tells Pub/Sub that the message was not successfully processed
and should be redelivered (subject to retry policies).
"""
def ack_with_response(self) -> Future:
"""
Acknowledge the message and return response future.
Returns:
Future that resolves when acknowledgment is processed
"""
def nack_with_response(self) -> Future:
"""
Negative acknowledge the message and return response future.
Returns:
Future that resolves when negative acknowledgment is processed
"""Modify message acknowledgment deadlines to extend processing time.
def modify_ack_deadline(self, seconds: int) -> None:
"""
Modify the acknowledgment deadline for the message.
Parameters:
- seconds: Number of seconds to extend the deadline
Must be between 0 and 600 seconds
Use 0 to immediately requeue the message
"""Additional methods for message handling and representation.
def __repr__(self) -> str:
"""
String representation of the message.
Returns:
Formatted string showing message data, ordering key, and attributes
"""def callback(message):
# Access message data
data = message.data.decode('utf-8')
print(f"Message ID: {message.message_id}")
print(f"Data: {data}")
# Access attributes
for key, value in message.attributes.items():
print(f"Attribute {key}: {value}")
# Acknowledge the message
message.ack()def callback(message):
try:
# Process the message
process_data(message.data)
# Acknowledge successful processing
message.ack()
except ProcessingError as e:
print(f"Processing failed: {e}")
# Negative acknowledge to trigger redelivery
message.nack()
except Exception as e:
print(f"Unexpected error: {e}")
# For unexpected errors, still nack to avoid message loss
message.nack()def callback(message):
print(f"Starting to process message: {message.message_id}")
try:
# Extend deadline before long processing
message.modify_ack_deadline(300) # 5 minutes
# Perform long-running operation
result = long_running_processing(message.data)
# Additional deadline extension if needed
if complex_validation_needed(result):
message.modify_ack_deadline(180) # 3 more minutes
validate_result(result)
# Acknowledge after successful processing
message.ack()
except Exception as e:
print(f"Processing failed: {e}")
message.nack()def callback(message):
# Analyze message metadata
print(f"Message ID: {message.message_id}")
print(f"Publish time: {message.publish_time}")
print(f"Delivery attempt: {message.delivery_attempt}")
if message.ordering_key:
print(f"Ordering key: {message.ordering_key}")
# Check for repeated deliveries
if message.delivery_attempt > 1:
print(f"Warning: Message redelivered {message.delivery_attempt} times")
# Consider dead letter queue after too many attempts
if message.delivery_attempt > 5:
print("Too many delivery attempts, sending to dead letter queue")
send_to_dead_letter_queue(message)
message.ack()
return
# Process the message
try:
process_message(message.data)
message.ack()
except Exception as e:
print(f"Processing error: {e}")
message.nack()def callback(message):
# Route messages based on attributes
message_type = message.attributes.get('message_type')
if message_type == 'user_event':
handle_user_event(message)
elif message_type == 'system_event':
handle_system_event(message)
elif message_type == 'error_event':
handle_error_event(message)
else:
print(f"Unknown message type: {message_type}")
# Still acknowledge unknown message types to avoid redelivery
message.ack()
def handle_user_event(message):
user_id = message.attributes.get('user_id')
event_data = message.data.decode('utf-8')
try:
process_user_event(user_id, event_data)
message.ack()
except Exception as e:
print(f"Failed to process user event: {e}")
message.nack()def callback(message):
# Process message asynchronously with response futures
try:
# Start processing
process_message_async(message.data)
# Acknowledge with response tracking
ack_future = message.ack_with_response()
ack_future.add_done_callback(lambda f: print(f"Ack completed for {message.message_id}"))
except Exception as e:
print(f"Processing failed: {e}")
# Negative acknowledge with response tracking
nack_future = message.nack_with_response()
nack_future.add_done_callback(lambda f: print(f"Nack completed for {message.message_id}"))Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub