tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Publishing messages to Pub/Sub topics with automatic batching, flow control, error handling, and optional message ordering.
type Topic struct {
PublishSettings PublishSettings
EnableMessageOrdering bool
// Has unexported fields
}Represents a Pub/Sub topic for publishing messages.
Fields:
PublishSettings: Configuration for message batching and deliveryEnableMessageOrdering: Enable ordered message delivery (requires OrderingKey on messages)func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResultPublishes a message to the topic asynchronously. Messages are automatically batched based on PublishSettings.
Parameters:
ctx: Context for the operationmsg: Message to publishReturns: PublishResult for retrieving the server-assigned message ID
Example:
msg := &pubsub.Message{
Data: []byte("Hello, World!"),
Attributes: map[string]string{
"timestamp": time.Now().Format(time.RFC3339),
},
}
result := topic.Publish(ctx, msg)
// Block until published
msgID, err := result.Get(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Published message with ID: %s\n", msgID)To publish messages with ordering guarantees, set EnableMessageOrdering to true on the topic and include an OrderingKey in the message. The Publish() method handles both ordered and unordered publishing.
Requirements:
topic.EnableMessageOrdering must be set to truemsg.OrderingKey must be setNote: This is an EXPERIMENTAL feature. Messages with the same ordering key are delivered in the order they were published.
Example:
topic.EnableMessageOrdering = true
msg := &pubsub.Message{
Data: []byte("Ordered message"),
OrderingKey: "user-123",
}
result := topic.Publish(ctx, msg)
msgID, err := result.Get(ctx)func (t *Topic) ResumePublish(orderingKey string)Resumes publishing for an ordering key after a publish error. When publishing with ordering fails, publishing for that key is paused until ResumePublish is called.
Parameters:
orderingKey: The ordering key to resumefunc (t *Topic) Stop()Stops the topic and flushes all pending messages. Blocks until all messages are published or failed. After Stop() is called, Publish() returns ErrTopicStopped.
Example:
defer topic.Stop()
// Publish messages...
for i := 0; i < 100; i++ {
result := topic.Publish(ctx, &pubsub.Message{
Data: []byte(fmt.Sprintf("Message %d", i)),
})
}
// Stop will flush all pending messagesfunc (t *Topic) Flush()Flushes all pending messages immediately without stopping the topic. This is an EXPERIMENTAL feature.
type PublishResult struct {
// Has unexported fields
}Future-like object representing the result of a publish operation.
func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)Blocks until the publish operation completes and returns the server-assigned message ID.
Parameters:
ctx: Context for waiting (can be cancelled)Returns: Server-assigned message ID and error
Example:
result := topic.Publish(ctx, msg)
// Non-blocking: continue with other work...
// Block when ready to get the ID
msgID, err := result.Get(ctx)
if err != nil {
log.Printf("Publish failed: %v", err)
return
}
fmt.Printf("Message ID: %s\n", msgID)func (r *PublishResult) Ready() <-chan struct{}Returns a channel that closes when the publish operation completes. This is an EXPERIMENTAL feature.
Returns: Channel that closes when ready
Example:
result := topic.Publish(ctx, msg)
select {
case <-result.Ready():
msgID, err := result.Get(ctx)
// Handle result
case <-time.After(5 * time.Second):
// Timeout handling
}type PublishSettings struct {
DelayThreshold time.Duration
CountThreshold int
ByteThreshold int
Timeout time.Duration
BundleByteLimit int
BundleCountLimit int
EnableCompression bool
CompressionBytesThreshold int
}Configuration for automatic message batching and publishing behavior.
Fields:
DelayThreshold: Maximum time to wait before publishing a batch (default: 1ms)CountThreshold: Maximum number of messages in a batch (default: 100, max: 1000)ByteThreshold: Maximum total bytes in a batch (default: 1MB, max: 10MB)Timeout: Timeout for publish RPC (default: 60s)BundleByteLimit: EXPERIMENTAL - Maximum bytes per bundleBundleCountLimit: EXPERIMENTAL - Maximum messages per bundleEnableCompression: EXPERIMENTAL - Enable message compressionCompressionBytesThreshold: EXPERIMENTAL - Minimum bytes for compressionExample:
topic.PublishSettings = pubsub.PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 50,
ByteThreshold: 500000, // 500 KB
Timeout: 30 * time.Second,
}var DefaultPublishSettings = PublishSettings{
DelayThreshold: 1 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
}type Message struct {
Data []byte
Attributes map[string]string
ID string
PublishTime time.Time
OrderingKey string
DeliveryAttempt *int
}Represents a Pub/Sub message.
Fields:
Data: Message payload (max 10MB including attributes)Attributes: Key-value metadataID: Server-assigned message ID (set after publishing)PublishTime: Server-assigned publish time (set after publishing)OrderingKey: Key for ordered message deliveryDeliveryAttempt: Number of delivery attempts (only for subscribers with exactly-once delivery)Example:
msg := &pubsub.Message{
Data: []byte("message payload"),
Attributes: map[string]string{
"type": "notification",
"priority": "high",
"timestamp": time.Now().Format(time.RFC3339),
},
OrderingKey: "user-456", // Optional, for ordered delivery
}var (
ErrOversizedMessage error // Message exceeds the 10 MB size limit
ErrTopicStopped = errors.New("pubsub: Topic.Stop has been called")
)ErrOversizedMessage: Returned when a message exceeds the 10 MB size limit.
ErrTopicStopped: Returned when attempting to publish after Stop() is called.
type ErrPublishingPaused struct {
OrderingKey string
}
func (e ErrPublishingPaused) Error() stringReturned when publishing is paused for an ordering key due to a previous error. Call ResumePublish() to resume.
Publishing automatically applies flow control based on PublishSettings. If thresholds are exceeded, Publish() blocks until space is available.
Messages are automatically batched based on:
Whichever condition is met first triggers the batch to be sent.
All publishing is asynchronous:
Publish() returns immediately with a PublishResultGet() on the PublishResult to block until completionStop() to flush all pending messages before shutting downExample:
var results []*pubsub.PublishResult
// Queue many messages quickly
for i := 0; i < 1000; i++ {
result := topic.Publish(ctx, &pubsub.Message{
Data: []byte(fmt.Sprintf("Message %d", i)),
})
results = append(results, result)
}
// Wait for all to complete
for i, result := range results {
msgID, err := result.Get(ctx)
if err != nil {
log.Printf("Message %d failed: %v", i, err)
}
}To use message ordering:
topic.EnableMessageOrdering = trueOrderingKey on messagesPublish() with the message containing the ordering keyResumePublish() if publishing pausesMessages with the same OrderingKey are guaranteed to be delivered to subscribers in the same order they were published.
Example:
topic.EnableMessageOrdering = true
for i := 0; i < 10; i++ {
result := topic.Publish(ctx, &pubsub.Message{
Data: []byte(fmt.Sprintf("Event %d", i)),
OrderingKey: "session-abc-123",
})
if _, err := result.Get(ctx); err != nil {
log.Printf("Publish failed: %v", err)
// Publishing for this key is now paused
// Fix the issue, then:
topic.ResumePublish("session-abc-123")
}
}