tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Receiving messages from Pub/Sub subscriptions with automatic flow control, ack deadline management, and configurable concurrency.
type Subscription struct {
ReceiveSettings ReceiveSettings
// Has unexported fields
}Represents a Pub/Sub subscription for receiving messages.
Fields:
ReceiveSettings: Configuration for message reception, flow control, and concurrencyfunc (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) errorReceives messages from the subscription and invokes the callback function for each message. This method blocks until the context is cancelled.
Parameters:
ctx: Context for the operation (cancel to stop receiving)f: Callback function invoked for each messageReturns: Error (returns nil if context is cancelled normally)
Example:
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Printf("Received: %s\n", msg.Data)
// Process the message...
// Must call Ack() or Nack() within this callback
msg.Ack()
})
if err != nil && !errors.Is(err, context.Canceled) {
log.Fatal(err)
}type Message struct {
Data []byte
Attributes map[string]string
ID string
PublishTime time.Time
OrderingKey string
DeliveryAttempt *int
}Represents a received Pub/Sub message.
Fields:
Data: Message payloadAttributes: Key-value metadataID: Server-assigned unique message IDPublishTime: Time the message was publishedOrderingKey: Ordering key (if message was published with ordering)DeliveryAttempt: Delivery attempt count (only set if subscription has dead-letter policy or exactly-once delivery enabled)func (m *Message) Ack()Acknowledges the message, indicating successful processing. The message will not be redelivered.
Important: Must be called within the Receive callback, not from a separate goroutine.
Example:
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
if err := processMessage(msg); err != nil {
log.Printf("Processing failed: %v", err)
msg.Nack() // Will be redelivered
return
}
msg.Ack() // Successfully processed
})func (m *Message) Nack()Negatively acknowledges the message, requesting immediate redelivery.
Important: Must be called within the Receive callback.
Example:
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
if !isReady() {
msg.Nack() // Redeliver immediately
return
}
// Process message...
msg.Ack()
})func (m *Message) AckWithResult() *AckResult
func (m *Message) NackWithResult() *AckResultAcknowledges or negatively acknowledges the message and returns a result object. This is an EXPERIMENTAL feature for exactly-once delivery scenarios.
Returns: AckResult that can be used to check acknowledgment status
Example:
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
processMessage(msg)
ackResult := msg.AckWithResult()
status, err := ackResult.Get(ctx)
if err != nil {
log.Printf("Ack failed: %v", err)
return
}
if status == pubsub.AcknowledgeStatusSuccess {
// Definitely processed exactly once
}
})type AckResult struct {
// Has unexported fields
}
func (r *AckResult) Get(ctx context.Context) (AcknowledgeStatus, error)Result of an acknowledgment operation. EXPERIMENTAL feature.
type AcknowledgeStatus int
const (
AcknowledgeStatusSuccess AcknowledgeStatus = iota
AcknowledgeStatusPermissionDenied
AcknowledgeStatusFailedPrecondition
AcknowledgeStatusInvalidAckID
AcknowledgeStatusOther
)Status codes for acknowledgment operations.
type ReceiveSettings struct {
MaxOutstandingMessages int
MaxOutstandingBytes int
MaxExtension time.Duration
MaxExtensionPeriod time.Duration
MinExtensionPeriod time.Duration
NumGoroutines int
Synchronous bool
UseLegacyFlowControl bool
ExactlyOnceDelivery bool
}Configuration for message reception and flow control.
Fields:
MaxOutstandingMessages: Maximum unacknowledged messages (default: 1000)MaxOutstandingBytes: Maximum bytes of unacknowledged messages (default: 1GB)MaxExtension: Maximum duration to auto-extend ack deadlines (default: 60m)MaxExtensionPeriod: Maximum single extension period (default: 10m)MinExtensionPeriod: EXPERIMENTAL - Minimum extension periodNumGoroutines: Number of StreamingPull connections (default: 10)Synchronous: Process messages synchronously (default: false)UseLegacyFlowControl: EXPERIMENTAL - Use legacy flow controlExactlyOnceDelivery: EXPERIMENTAL - Enable exactly-once deliveryExample:
sub.ReceiveSettings = pubsub.ReceiveSettings{
MaxOutstandingMessages: 500,
MaxOutstandingBytes: 500e6, // 500 MB
MaxExtension: 30 * time.Minute,
NumGoroutines: 5,
}
err := sub.Receive(ctx, messageHandler)var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
MaxExtension: 60 * time.Minute,
MaxExtensionPeriod: 10 * time.Minute,
NumGoroutines: 10,
}The library automatically applies flow control to prevent overwhelming the application:
Example with constrained resources:
// Limit to 100 concurrent messages
sub.ReceiveSettings.MaxOutstandingMessages = 100
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Process message (may be slow)
time.Sleep(1 * time.Second)
msg.Ack()
// Flow control slot now available for next message
})The library automatically manages acknowledgment deadlines:
The library extends ack deadlines automatically based on processing speed:
Example for long-running processing:
// Allow up to 2 hours for message processing
sub.ReceiveSettings.MaxExtension = 2 * time.Hour
sub.ReceiveSettings.MaxExtensionPeriod = 30 * time.Minute
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Long-running operation
processForUpTo2Hours(msg.Data)
msg.Ack()
})Note: For processing exceeding 30 minutes, consider using the low-level Pull API to avoid firewall timeouts on long-lived streams.
By default, the callback function is invoked concurrently by multiple goroutines for maximum throughput:
// Default: 10 StreamingPull connections, each processing messages concurrently
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// This function may be called concurrently
// Ensure thread-safety if accessing shared state
processMessageConcurrently(msg)
msg.Ack()
})// Reduce to 5 connections
sub.ReceiveSettings.NumGoroutines = 5// Process messages one at a time
sub.ReceiveSettings.Synchronous = true
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Messages processed sequentially
processMessageSequentially(msg)
msg.Ack()
})If the subscription has message ordering enabled and messages were published with OrderingKey:
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
if msg.OrderingKey != "" {
// Messages with same OrderingKey arrive in publish order
fmt.Printf("Ordered message for key %s: %s\n", msg.OrderingKey, msg.Data)
}
msg.Ack()
})Messages with the same OrderingKey are delivered in the same order they were published.
Enable exactly-once delivery semantics:
// On subscription creation
sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: true,
})
// Or update existing subscription
sub.ReceiveSettings.ExactlyOnceDelivery = true
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// DeliveryAttempt is now set
if msg.DeliveryAttempt != nil {
fmt.Printf("Delivery attempt: %d\n", *msg.DeliveryAttempt)
}
processMessage(msg)
// Use AckWithResult for exactly-once guarantees
ackResult := msg.AckWithResult()
status, err := ackResult.Get(ctx)
if status == pubsub.AcknowledgeStatusSuccess {
// Definitely acknowledged exactly once
}
})The normal way to stop receiving is to cancel the context:
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := sub.Receive(ctx, messageHandler)
if err != nil && !errors.Is(err, context.Canceled) {
log.Printf("Receive error: %v", err)
}
}()
// Later: stop receiving
cancel()err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
if err != nil {
if errors.Is(err, context.Canceled) {
// Normal shutdown
} else {
// Unexpected error
log.Fatal(err)
}
}If the subscription has a dead-letter policy, messages that fail delivery attempts are sent to the dead-letter topic:
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
if msg.DeliveryAttempt != nil {
fmt.Printf("Delivery attempt %d\n", *msg.DeliveryAttempt)
// If processing keeps failing, message will eventually
// be sent to dead-letter topic after MaxDeliveryAttempts
}
if err := processMessage(msg); err != nil {
msg.Nack()
return
}
msg.Ack()
})The library uses Pub/Sub's streaming pull feature:
Always call Ack() or Nack() within the Receive callback:
// CORRECT
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
processMessage(msg)
msg.Ack() // Within callback
})
// INCORRECT - breaks flow control
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
go func() {
processMessage(msg)
msg.Ack() // In separate goroutine - DON'T DO THIS
}()
})Messages may be redelivered even after Ack():
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Make processing idempotent
if alreadyProcessed(msg.ID) {
msg.Ack()
return
}
processMessage(msg)
markAsProcessed(msg.ID)
msg.Ack()
})The callback may be invoked concurrently. Ensure thread-safety:
var mu sync.Mutex
var counter int
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
counter++
mu.Unlock()
msg.Ack()
})Adjust settings based on your workload:
// High-throughput, fast processing
sub.ReceiveSettings.MaxOutstandingMessages = 5000
sub.ReceiveSettings.NumGoroutines = 20
// Low-throughput, slow processing
sub.ReceiveSettings.MaxOutstandingMessages = 100
sub.ReceiveSettings.NumGoroutines = 2
sub.ReceiveSettings.MaxExtension = 30 * time.Minute