or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

receiving.mddocs/

Receiving Messages

Receiving messages from Pub/Sub subscriptions with automatic flow control, ack deadline management, and configurable concurrency.

Subscription Type

type Subscription struct {
    ReceiveSettings ReceiveSettings
    // Has unexported fields
}

Represents a Pub/Sub subscription for receiving messages.

Fields:

  • ReceiveSettings: Configuration for message reception, flow control, and concurrency

Receiving Messages

Basic Receive

func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error

Receives messages from the subscription and invokes the callback function for each message. This method blocks until the context is cancelled.

Parameters:

  • ctx: Context for the operation (cancel to stop receiving)
  • f: Callback function invoked for each message

Returns: Error (returns nil if context is cancelled normally)

Example:

err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    fmt.Printf("Received: %s\n", msg.Data)

    // Process the message...

    // Must call Ack() or Nack() within this callback
    msg.Ack()
})
if err != nil && !errors.Is(err, context.Canceled) {
    log.Fatal(err)
}

Message Type

type Message struct {
    Data            []byte
    Attributes      map[string]string
    ID              string
    PublishTime     time.Time
    OrderingKey     string
    DeliveryAttempt *int
}

Represents a received Pub/Sub message.

Fields:

  • Data: Message payload
  • Attributes: Key-value metadata
  • ID: Server-assigned unique message ID
  • PublishTime: Time the message was published
  • OrderingKey: Ordering key (if message was published with ordering)
  • DeliveryAttempt: Delivery attempt count (only set if subscription has dead-letter policy or exactly-once delivery enabled)

Message Acknowledgment

Acknowledging Messages

func (m *Message) Ack()

Acknowledges the message, indicating successful processing. The message will not be redelivered.

Important: Must be called within the Receive callback, not from a separate goroutine.

Example:

sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    if err := processMessage(msg); err != nil {
        log.Printf("Processing failed: %v", err)
        msg.Nack() // Will be redelivered
        return
    }
    msg.Ack() // Successfully processed
})

Negative Acknowledgment

func (m *Message) Nack()

Negatively acknowledges the message, requesting immediate redelivery.

Important: Must be called within the Receive callback.

Example:

sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    if !isReady() {
        msg.Nack() // Redeliver immediately
        return
    }

    // Process message...
    msg.Ack()
})

Acknowledgment with Result

func (m *Message) AckWithResult() *AckResult
func (m *Message) NackWithResult() *AckResult

Acknowledges or negatively acknowledges the message and returns a result object. This is an EXPERIMENTAL feature for exactly-once delivery scenarios.

Returns: AckResult that can be used to check acknowledgment status

Example:

sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    processMessage(msg)

    ackResult := msg.AckWithResult()
    status, err := ackResult.Get(ctx)
    if err != nil {
        log.Printf("Ack failed: %v", err)
        return
    }

    if status == pubsub.AcknowledgeStatusSuccess {
        // Definitely processed exactly once
    }
})

Ack Result

type AckResult struct {
    // Has unexported fields
}

func (r *AckResult) Get(ctx context.Context) (AcknowledgeStatus, error)

Result of an acknowledgment operation. EXPERIMENTAL feature.

Acknowledgment Status

type AcknowledgeStatus int

const (
    AcknowledgeStatusSuccess           AcknowledgeStatus = iota
    AcknowledgeStatusPermissionDenied
    AcknowledgeStatusFailedPrecondition
    AcknowledgeStatusInvalidAckID
    AcknowledgeStatusOther
)

Status codes for acknowledgment operations.

Receive Settings

type ReceiveSettings struct {
    MaxOutstandingMessages int
    MaxOutstandingBytes    int
    MaxExtension           time.Duration
    MaxExtensionPeriod     time.Duration
    MinExtensionPeriod     time.Duration
    NumGoroutines          int
    Synchronous            bool
    UseLegacyFlowControl   bool
    ExactlyOnceDelivery    bool
}

Configuration for message reception and flow control.

Fields:

  • MaxOutstandingMessages: Maximum unacknowledged messages (default: 1000)
  • MaxOutstandingBytes: Maximum bytes of unacknowledged messages (default: 1GB)
  • MaxExtension: Maximum duration to auto-extend ack deadlines (default: 60m)
  • MaxExtensionPeriod: Maximum single extension period (default: 10m)
  • MinExtensionPeriod: EXPERIMENTAL - Minimum extension period
  • NumGoroutines: Number of StreamingPull connections (default: 10)
  • Synchronous: Process messages synchronously (default: false)
  • UseLegacyFlowControl: EXPERIMENTAL - Use legacy flow control
  • ExactlyOnceDelivery: EXPERIMENTAL - Enable exactly-once delivery

Example:

sub.ReceiveSettings = pubsub.ReceiveSettings{
    MaxOutstandingMessages: 500,
    MaxOutstandingBytes:    500e6, // 500 MB
    MaxExtension:           30 * time.Minute,
    NumGoroutines:          5,
}

err := sub.Receive(ctx, messageHandler)

Default Settings

var DefaultReceiveSettings = ReceiveSettings{
    MaxOutstandingMessages: 1000,
    MaxOutstandingBytes:    1e9,
    MaxExtension:           60 * time.Minute,
    MaxExtensionPeriod:     10 * time.Minute,
    NumGoroutines:          10,
}

Flow Control

The library automatically applies flow control to prevent overwhelming the application:

  1. Message Count: Blocks when MaxOutstandingMessages is reached
  2. Byte Size: Blocks when MaxOutstandingBytes is reached
  3. Messages are delivered to the callback only when within limits
  4. Calling Ack()/Nack() releases flow control

Example with constrained resources:

// Limit to 100 concurrent messages
sub.ReceiveSettings.MaxOutstandingMessages = 100

err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    // Process message (may be slow)
    time.Sleep(1 * time.Second)
    msg.Ack()
    // Flow control slot now available for next message
})

Ack Deadline Management

The library automatically manages acknowledgment deadlines:

  1. Each message has an ack deadline (time before redelivery)
  2. The library automatically extends deadlines while processing
  3. Extension period adapts based on processing time (99th percentile)
  4. Extensions stop after MaxExtension duration
  5. Call Ack() or Nack() to stop extensions

Automatic Extension

The library extends ack deadlines automatically based on processing speed:

  • Fast processing: Short extension periods
  • Slow processing: Long extension periods
  • Maximum extension period is capped at MaxExtensionPeriod (default: 10m)

Example for long-running processing:

// Allow up to 2 hours for message processing
sub.ReceiveSettings.MaxExtension = 2 * time.Hour
sub.ReceiveSettings.MaxExtensionPeriod = 30 * time.Minute

err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    // Long-running operation
    processForUpTo2Hours(msg.Data)
    msg.Ack()
})

Note: For processing exceeding 30 minutes, consider using the low-level Pull API to avoid firewall timeouts on long-lived streams.

Concurrency

Concurrent Processing

By default, the callback function is invoked concurrently by multiple goroutines for maximum throughput:

// Default: 10 StreamingPull connections, each processing messages concurrently
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    // This function may be called concurrently
    // Ensure thread-safety if accessing shared state
    processMessageConcurrently(msg)
    msg.Ack()
})

Controlling Concurrency

// Reduce to 5 connections
sub.ReceiveSettings.NumGoroutines = 5

Synchronous Processing

// Process messages one at a time
sub.ReceiveSettings.Synchronous = true

err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    // Messages processed sequentially
    processMessageSequentially(msg)
    msg.Ack()
})

Message Ordering

If the subscription has message ordering enabled and messages were published with OrderingKey:

sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    if msg.OrderingKey != "" {
        // Messages with same OrderingKey arrive in publish order
        fmt.Printf("Ordered message for key %s: %s\n", msg.OrderingKey, msg.Data)
    }
    msg.Ack()
})

Messages with the same OrderingKey are delivered in the same order they were published.

Exactly-Once Delivery

Enable exactly-once delivery semantics:

// On subscription creation
sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    EnableExactlyOnceDelivery: true,
})

// Or update existing subscription
sub.ReceiveSettings.ExactlyOnceDelivery = true

err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    // DeliveryAttempt is now set
    if msg.DeliveryAttempt != nil {
        fmt.Printf("Delivery attempt: %d\n", *msg.DeliveryAttempt)
    }

    processMessage(msg)

    // Use AckWithResult for exactly-once guarantees
    ackResult := msg.AckWithResult()
    status, err := ackResult.Get(ctx)
    if status == pubsub.AcknowledgeStatusSuccess {
        // Definitely acknowledged exactly once
    }
})

Error Handling

Context Cancellation

The normal way to stop receiving is to cancel the context:

ctx, cancel := context.WithCancel(context.Background())

go func() {
    err := sub.Receive(ctx, messageHandler)
    if err != nil && !errors.Is(err, context.Canceled) {
        log.Printf("Receive error: %v", err)
    }
}()

// Later: stop receiving
cancel()

Handling Receive Errors

err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    msg.Ack()
})

if err != nil {
    if errors.Is(err, context.Canceled) {
        // Normal shutdown
    } else {
        // Unexpected error
        log.Fatal(err)
    }
}

Dead Letter Topics

If the subscription has a dead-letter policy, messages that fail delivery attempts are sent to the dead-letter topic:

sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    if msg.DeliveryAttempt != nil {
        fmt.Printf("Delivery attempt %d\n", *msg.DeliveryAttempt)

        // If processing keeps failing, message will eventually
        // be sent to dead-letter topic after MaxDeliveryAttempts
    }

    if err := processMessage(msg); err != nil {
        msg.Nack()
        return
    }

    msg.Ack()
})

Streaming Pull Behavior

The library uses Pub/Sub's streaming pull feature:

  • Multiple persistent bidirectional gRPC streams (controlled by NumGoroutines)
  • Lower latency than synchronous pull
  • Automatic stream management and reconnection
  • See: https://cloud.google.com/pubsub/docs/pull#streamingpull

Best Practices

Acknowledge Within Callback

Always call Ack() or Nack() within the Receive callback:

// CORRECT
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    processMessage(msg)
    msg.Ack() // Within callback
})

// INCORRECT - breaks flow control
sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    go func() {
        processMessage(msg)
        msg.Ack() // In separate goroutine - DON'T DO THIS
    }()
})

Handle Redelivery

Messages may be redelivered even after Ack():

sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    // Make processing idempotent
    if alreadyProcessed(msg.ID) {
        msg.Ack()
        return
    }

    processMessage(msg)
    markAsProcessed(msg.ID)
    msg.Ack()
})

Thread Safety

The callback may be invoked concurrently. Ensure thread-safety:

var mu sync.Mutex
var counter int

sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    mu.Lock()
    counter++
    mu.Unlock()

    msg.Ack()
})

Tune Settings

Adjust settings based on your workload:

// High-throughput, fast processing
sub.ReceiveSettings.MaxOutstandingMessages = 5000
sub.ReceiveSettings.NumGoroutines = 20

// Low-throughput, slow processing
sub.ReceiveSettings.MaxOutstandingMessages = 100
sub.ReceiveSettings.NumGoroutines = 2
sub.ReceiveSettings.MaxExtension = 30 * time.Minute