or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

publishing.mddocs/

Publishing Messages

Publishing messages to Pub/Sub topics with automatic batching, flow control, error handling, and optional message ordering.

Topic Type

type Topic struct {
    PublishSettings       PublishSettings
    EnableMessageOrdering bool
    // Has unexported fields
}

Represents a Pub/Sub topic for publishing messages.

Fields:

  • PublishSettings: Configuration for message batching and delivery
  • EnableMessageOrdering: Enable ordered message delivery (requires OrderingKey on messages)

Publishing Methods

Basic Publishing

func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult

Publishes a message to the topic asynchronously. Messages are automatically batched based on PublishSettings.

Parameters:

  • ctx: Context for the operation
  • msg: Message to publish

Returns: PublishResult for retrieving the server-assigned message ID

Example:

msg := &pubsub.Message{
    Data: []byte("Hello, World!"),
    Attributes: map[string]string{
        "timestamp": time.Now().Format(time.RFC3339),
    },
}

result := topic.Publish(ctx, msg)

// Block until published
msgID, err := result.Get(ctx)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Published message with ID: %s\n", msgID)

Publishing with Ordering

To publish messages with ordering guarantees, set EnableMessageOrdering to true on the topic and include an OrderingKey in the message. The Publish() method handles both ordered and unordered publishing.

Requirements:

  • topic.EnableMessageOrdering must be set to true
  • msg.OrderingKey must be set

Note: This is an EXPERIMENTAL feature. Messages with the same ordering key are delivered in the order they were published.

Example:

topic.EnableMessageOrdering = true

msg := &pubsub.Message{
    Data:        []byte("Ordered message"),
    OrderingKey: "user-123",
}

result := topic.Publish(ctx, msg)
msgID, err := result.Get(ctx)

Resuming Ordered Publishing

func (t *Topic) ResumePublish(orderingKey string)

Resumes publishing for an ordering key after a publish error. When publishing with ordering fails, publishing for that key is paused until ResumePublish is called.

Parameters:

  • orderingKey: The ordering key to resume

Topic Lifecycle

Stopping a Topic

func (t *Topic) Stop()

Stops the topic and flushes all pending messages. Blocks until all messages are published or failed. After Stop() is called, Publish() returns ErrTopicStopped.

Example:

defer topic.Stop()

// Publish messages...
for i := 0; i < 100; i++ {
    result := topic.Publish(ctx, &pubsub.Message{
        Data: []byte(fmt.Sprintf("Message %d", i)),
    })
}

// Stop will flush all pending messages

Flushing Messages

func (t *Topic) Flush()

Flushes all pending messages immediately without stopping the topic. This is an EXPERIMENTAL feature.

Publish Result

type PublishResult struct {
    // Has unexported fields
}

Future-like object representing the result of a publish operation.

Getting the Message ID

func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)

Blocks until the publish operation completes and returns the server-assigned message ID.

Parameters:

  • ctx: Context for waiting (can be cancelled)

Returns: Server-assigned message ID and error

Example:

result := topic.Publish(ctx, msg)

// Non-blocking: continue with other work...

// Block when ready to get the ID
msgID, err := result.Get(ctx)
if err != nil {
    log.Printf("Publish failed: %v", err)
    return
}
fmt.Printf("Message ID: %s\n", msgID)

Checking Readiness

func (r *PublishResult) Ready() <-chan struct{}

Returns a channel that closes when the publish operation completes. This is an EXPERIMENTAL feature.

Returns: Channel that closes when ready

Example:

result := topic.Publish(ctx, msg)

select {
case <-result.Ready():
    msgID, err := result.Get(ctx)
    // Handle result
case <-time.After(5 * time.Second):
    // Timeout handling
}

Publish Settings

type PublishSettings struct {
    DelayThreshold            time.Duration
    CountThreshold            int
    ByteThreshold             int
    Timeout                   time.Duration
    BundleByteLimit           int
    BundleCountLimit          int
    EnableCompression         bool
    CompressionBytesThreshold int
}

Configuration for automatic message batching and publishing behavior.

Fields:

  • DelayThreshold: Maximum time to wait before publishing a batch (default: 1ms)
  • CountThreshold: Maximum number of messages in a batch (default: 100, max: 1000)
  • ByteThreshold: Maximum total bytes in a batch (default: 1MB, max: 10MB)
  • Timeout: Timeout for publish RPC (default: 60s)
  • BundleByteLimit: EXPERIMENTAL - Maximum bytes per bundle
  • BundleCountLimit: EXPERIMENTAL - Maximum messages per bundle
  • EnableCompression: EXPERIMENTAL - Enable message compression
  • CompressionBytesThreshold: EXPERIMENTAL - Minimum bytes for compression

Example:

topic.PublishSettings = pubsub.PublishSettings{
    DelayThreshold:  10 * time.Millisecond,
    CountThreshold:  50,
    ByteThreshold:   500000, // 500 KB
    Timeout:         30 * time.Second,
}

Default Settings

var DefaultPublishSettings = PublishSettings{
    DelayThreshold: 1 * time.Millisecond,
    CountThreshold: 100,
    ByteThreshold:  1e6,
    Timeout:        60 * time.Second,
}

Message Type

type Message struct {
    Data            []byte
    Attributes      map[string]string
    ID              string
    PublishTime     time.Time
    OrderingKey     string
    DeliveryAttempt *int
}

Represents a Pub/Sub message.

Fields:

  • Data: Message payload (max 10MB including attributes)
  • Attributes: Key-value metadata
  • ID: Server-assigned message ID (set after publishing)
  • PublishTime: Server-assigned publish time (set after publishing)
  • OrderingKey: Key for ordered message delivery
  • DeliveryAttempt: Number of delivery attempts (only for subscribers with exactly-once delivery)

Example:

msg := &pubsub.Message{
    Data: []byte("message payload"),
    Attributes: map[string]string{
        "type":      "notification",
        "priority":  "high",
        "timestamp": time.Now().Format(time.RFC3339),
    },
    OrderingKey: "user-456", // Optional, for ordered delivery
}

Errors

var (
    ErrOversizedMessage error // Message exceeds the 10 MB size limit
    ErrTopicStopped     = errors.New("pubsub: Topic.Stop has been called")
)

ErrOversizedMessage: Returned when a message exceeds the 10 MB size limit.

ErrTopicStopped: Returned when attempting to publish after Stop() is called.

Publishing Paused Error

type ErrPublishingPaused struct {
    OrderingKey string
}

func (e ErrPublishingPaused) Error() string

Returned when publishing is paused for an ordering key due to a previous error. Call ResumePublish() to resume.

Flow Control

Publishing automatically applies flow control based on PublishSettings. If thresholds are exceeded, Publish() blocks until space is available.

Batch Behavior

Messages are automatically batched based on:

  1. Time: DelayThreshold elapsed since first message in batch
  2. Count: CountThreshold messages accumulated
  3. Size: ByteThreshold bytes accumulated

Whichever condition is met first triggers the batch to be sent.

Asynchronous Publishing

All publishing is asynchronous:

  1. Publish() returns immediately with a PublishResult
  2. Messages are queued and batched in the background
  3. Call Get() on the PublishResult to block until completion
  4. Call Stop() to flush all pending messages before shutting down

Example:

var results []*pubsub.PublishResult

// Queue many messages quickly
for i := 0; i < 1000; i++ {
    result := topic.Publish(ctx, &pubsub.Message{
        Data: []byte(fmt.Sprintf("Message %d", i)),
    })
    results = append(results, result)
}

// Wait for all to complete
for i, result := range results {
    msgID, err := result.Get(ctx)
    if err != nil {
        log.Printf("Message %d failed: %v", i, err)
    }
}

Message Ordering

To use message ordering:

  1. Set topic.EnableMessageOrdering = true
  2. Set OrderingKey on messages
  3. Use Publish() with the message containing the ordering key
  4. Handle errors and call ResumePublish() if publishing pauses

Messages with the same OrderingKey are guaranteed to be delivered to subscribers in the same order they were published.

Example:

topic.EnableMessageOrdering = true

for i := 0; i < 10; i++ {
    result := topic.Publish(ctx, &pubsub.Message{
        Data:        []byte(fmt.Sprintf("Event %d", i)),
        OrderingKey: "session-abc-123",
    })

    if _, err := result.Get(ctx); err != nil {
        log.Printf("Publish failed: %v", err)
        // Publishing for this key is now paused
        // Fix the issue, then:
        topic.ResumePublish("session-abc-123")
    }
}