tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
A comprehensive Go client library for Google Cloud Pub/Sub, a fully-managed real-time messaging service that enables asynchronous communication between applications through a publish-subscribe pattern. This library provides high-level, idiomatic Go APIs that abstract the underlying gRPC calls, offering easy-to-use interfaces for publishing messages to topics and receiving messages from subscriptions with automatic batching, flow control, and message lifecycle management.
go get cloud.google.com/go/pubsub@v1.50.1import (
"cloud.google.com/go/pubsub"
"context"
)For testing:
import "cloud.google.com/go/pubsub/pstest"For low-level gRPC access:
import "cloud.google.com/go/pubsub/apiv1"package main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/pubsub"
)
func main() {
ctx := context.Background()
// Create a Pub/Sub client
client, err := pubsub.NewClient(ctx, "my-project-id")
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create a topic
topic, err := client.CreateTopic(ctx, "my-topic")
if err != nil {
log.Fatal(err)
}
// Publish a message
result := topic.Publish(ctx, &pubsub.Message{
Data: []byte("Hello, Pub/Sub!"),
Attributes: map[string]string{
"origin": "example",
},
})
// Block until the message is sent
msgID, err := result.Get(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Published message with ID: %s\n", msgID)
// Create a subscription
sub, err := client.CreateSubscription(ctx, "my-subscription", pubsub.SubscriptionConfig{
Topic: topic,
})
if err != nil {
log.Fatal(err)
}
// Receive messages
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Printf("Received message: %s\n", msg.Data)
msg.Ack() // Acknowledge the message
})
if err != nil {
log.Fatal(err)
}
}The library is organized into several key components:
The library provides three distinct API levels:
pubsub package): Recommended for most use cases with automatic batching and flow controlpubsub/apiv1 package): Direct access to Pub/Sub RPCs for advanced scenariospubsub/pstest package): In-memory fake server for unit testingCreate and manage Pub/Sub clients for accessing topics and subscriptions.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error)
type Client struct {
// Has unexported fields
}
func (c *Client) Close() error
func (c *Client) Project() string
func (c *Client) Topic(id string) *Topic
func (c *Client) TopicInProject(id, projectID string) *Topic
func (c *Client) Subscription(id string) *Subscription
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription
func (c *Client) Topics(ctx context.Context) *TopicIterator
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error)
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error)
func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)Publish messages to topics with automatic batching, flow control, and ordering support.
type Topic struct {
PublishSettings PublishSettings
EnableMessageOrdering bool
// Has unexported fields
}
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
func (t *Topic) Stop()
func (t *Topic) Flush()
type PublishSettings struct {
DelayThreshold time.Duration
CountThreshold int
ByteThreshold int
Timeout time.Duration
BundleByteLimit int
BundleCountLimit int
EnableCompression bool
CompressionBytesThreshold int
}
type PublishResult struct {
// Has unexported fields
}
func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)
func (r *PublishResult) Ready() <-chan struct{}Receive messages from subscriptions with configurable concurrency, flow control, and automatic ack deadline management.
type Subscription struct {
ReceiveSettings ReceiveSettings
// Has unexported fields
}
func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error
type ReceiveSettings struct {
MaxOutstandingMessages int
MaxOutstandingBytes int
MaxExtension time.Duration
MaxExtensionPeriod time.Duration
MinExtensionPeriod time.Duration
NumGoroutines int
Synchronous bool
UseLegacyFlowControl bool
ExactlyOnceDelivery bool
}
type Message struct {
Data []byte
Attributes map[string]string
ID string
PublishTime time.Time
OrderingKey string
DeliveryAttempt *int
}
func (m *Message) Ack()
func (m *Message) Nack()
func (m *Message) AckWithResult() *AckResult
func (m *Message) NackWithResult() *AckResultCreate, configure, update, and delete topics with support for message retention, schema validation, and ingestion from external sources.
func (t *Topic) ID() string
func (t *Topic) String() string
func (t *Topic) Exists(ctx context.Context) (bool, error)
func (t *Topic) Delete(ctx context.Context) error
func (t *Topic) Config(ctx context.Context) (TopicConfig, error)
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
type TopicConfig struct {
Labels map[string]string
MessageStoragePolicy MessageStoragePolicy
KMSKeyName string
SchemaSettings *SchemaSettings
RetentionDuration time.Duration
State TopicState
IngestionDataSourceSettings *IngestionDataSourceSettings
MessageTransforms []MessageTransform
}
type TopicConfigToUpdate struct {
Labels map[string]string
MessageStoragePolicy *MessageStoragePolicy
RetentionDuration optional.Duration
SchemaSettings *SchemaSettings
IngestionDataSourceSettings *IngestionDataSourceSettings
MessageTransforms []MessageTransform
}Create, configure, update, and delete subscriptions with support for push/pull delivery, dead letter topics, message filtering, and export to BigQuery or Cloud Storage.
func (s *Subscription) ID() string
func (s *Subscription) String() string
func (s *Subscription) Exists(ctx context.Context) (bool, error)
func (s *Subscription) Delete(ctx context.Context) error
func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)
func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)
type SubscriptionConfig struct {
Topic *Topic
PushConfig PushConfig
BigQueryConfig BigQueryConfig
CloudStorageConfig CloudStorageConfig
AckDeadline time.Duration
RetainAckedMessages bool
RetentionDuration time.Duration
ExpirationPolicy optional.Duration
Labels map[string]string
EnableMessageOrdering bool
DeadLetterPolicy *DeadLetterPolicy
Filter string
RetryPolicy *RetryPolicy
Detached bool
TopicMessageRetentionDuration time.Duration
State SubscriptionState
EnableExactlyOnceDelivery bool
MessageTransforms []MessageTransform
}
type SubscriptionConfigToUpdate struct {
PushConfig *PushConfig
BigQueryConfig *BigQueryConfig
CloudStorageConfig *CloudStorageConfig
AckDeadline time.Duration
RetainAckedMessages bool
RetentionDuration time.Duration
ExpirationPolicy optional.Duration
DeadLetterPolicy *DeadLetterPolicy
Labels map[string]string
RetryPolicy *RetryPolicy
EnableExactlyOnceDelivery bool
MessageTransforms []MessageTransform
}Define and manage message schemas for Protocol Buffer or Avro validation.
func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error)
type SchemaClient struct {
// Has unexported fields
}
func (c *SchemaClient) Close() error
type SchemaConfig struct {
Name string
Type SchemaType
Definition string
RevisionID string
RevisionCreateTime time.Time
}
type SchemaSettings struct {
Schema string
Encoding SchemaEncoding
FirstRevisionID string
LastRevisionID string
}
type SchemaType int
const (
SchemaTypeUnspecified SchemaType = iota
SchemaProtocolBuffer
SchemaAvro
)
type SchemaEncoding intCreate snapshots of subscription state and seek to specific points in time or snapshots.
func (c *Client) Snapshot(id string) *Snapshot
func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator
func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error
func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error
type Snapshot struct {
// Has unexported fields
}
func (s *Snapshot) ID() string
func (s *Snapshot) Delete(ctx context.Context) error
func (s *Snapshot) SetLabels(ctx context.Context, labels map[string]string) error
type SnapshotConfig struct {
Snapshot *Snapshot
Topic Topic
Expiration time.Time
Labels map[string]string
}Configure push delivery to HTTP endpoints, export to BigQuery, or write to Cloud Storage.
type PushConfig struct {
Endpoint string
Attributes map[string]string
AuthenticationMethod AuthenticationMethod
Wrapper Wrapper
}
type OIDCToken struct {
ServiceAccountEmail string
Audience string
}
type BigQueryConfig struct {
Table string
UseTopicSchema bool
WriteMetadata bool
DropUnknownFields bool
State BigQueryConfigState
UseTableSchema bool
ServiceAccountEmail string
}
type CloudStorageConfig struct {
Bucket string
FilenamePrefix string
FilenameSuffix string
FilenameDatetimeFormat string
TextConfig *CloudStorageOutputFormatTextConfig
AvroConfig *CloudStorageOutputFormatAvroConfig
MaxDuration time.Duration
MaxBytes int64
MaxMessages int64
State CloudStorageConfigState
ServiceAccountEmail string
}Ingest messages from external sources including AWS Kinesis, Amazon MSK, Azure Event Hubs, Confluent Cloud, and Cloud Storage.
type IngestionDataSourceSettings struct {
Source IngestionDataSource
PlatformLogsSettings *PlatformLogsSettings
}
type IngestionDataSourceCloudStorage struct {
State CloudStorageIngestionState
Bucket string
MinimumObjectCreateTime time.Time
MatchGlob string
TextFormat *IngestionDataSourceCloudStorageTextFormat
AvroFormat *IngestionDataSourceCloudStorageAvroFormat
PubSubAvroFormat *IngestionDataSourceCloudStoragePubSubAvroFormat
}
type IngestionDataSourceAWSKinesis struct {
State AWSKinesisState
StreamARN string
ConsumerARN string
AWSRoleArn string
GcpServiceAccount string
}
type IngestionDataSourceAmazonMSK struct {
State AmazonMSKState
ClusterArn string
TopicName string
BootstrapServers []string
Authentication AmazonMSKAuthentication
}
type IngestionDataSourceAzureEventHubs struct {
State ResourceState
ResourceGroup string
Namespace string
EventHubName string
ConsumerGroup string
AuthenticationMethod AzureEventHubsAuthenticationMethod
}
type IngestionDataSourceConfluentCloud struct {
State ConfluentCloudState
BootstrapServers []string
TopicName string
Authentication ConfluentCloudAuthentication
}Apply JavaScript-based transformations to messages before delivery.
type MessageTransform struct {
Transform Transform
}
type Transform interface {
// Has unexported methods
}
type JavaScriptUDF struct {
FunctionName string
Code string
}Manage Identity and Access Management policies for topics and subscriptions.
func (t *Topic) IAM() *iam.Handle
func (s *Subscription) IAM() *iam.HandleThe IAM handle provides methods for getting, setting, and testing IAM policies using the standard cloud.google.com/go/iam package.
In-memory fake Pub/Sub server for unit testing without connecting to the actual service.
func NewServer(opts ...ServerOption) *Server
func NewServerWithPort(port int, opts ...ServerOption) (*Server, error)
type Server struct {
Addr string
GServer *GServer
}
func (s *Server) Close() error
func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string
func (s *Server) Messages() []*Message
func (s *Server) ClearMessages()
func ValidateFilter(filter string) errorDirect access to Pub/Sub gRPC APIs for advanced use cases.
import "cloud.google.com/go/pubsub/apiv1"
func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error)
func NewSubscriberClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error)
func NewSchemaClient(ctx context.Context, opts ...option.ClientOption) (*SchemaClient, error)const (
ScopePubSub = "https://www.googleapis.com/auth/pubsub"
ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
)var DefaultPublishSettings = PublishSettings{
DelayThreshold: 1 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
}
var DefaultReceiveSettings = ReceiveSettings{
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
MaxExtension: 60 * time.Minute,
MaxExtensionPeriod: 10 * time.Minute,
NumGoroutines: 10,
}var (
ErrEmptyProjectID = errors.New("pubsub: project ID is empty")
ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")
ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
ErrOversizedMessage error // Message exceeds the 10 MB size limit
ErrTopicStopped = errors.New("pubsub: Topic.Stop has been called")
)const DetectProjectID = "*detect-project-id*"Sentinel value for automatic project ID detection from credentials.
The library provides various state enumerations for monitoring resource health:
The library provides optional integration with OpenTelemetry for distributed tracing and OpenCensus for metrics collection.
type ClientConfig struct {
PublisherCallOptions *vkit.PublisherCallOptions
SubscriberCallOptions *vkit.SubscriberCallOptions
EnableOpenTelemetryTracing bool
}Multiple OpenCensus metrics are available including published message counts, publish latency, pull counts, ack/nack counts, and outstanding message/byte gauges.