tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Topics are named resources to which messages are published. This document covers topic creation, configuration, updates, deletion, IAM management, and listing associated subscriptions.
type Topic struct {
PublishSettings PublishSettings
EnableMessageOrdering bool
// Has unexported fields
}A reference to a Pub/Sub topic. Topic instances are safe for concurrent use by multiple goroutines.
Fields:
PublishSettings: Settings for message publishing (batching, timeouts, flow control, compression)EnableMessageOrdering: Enable ordered message delivery using ordering keysfunc (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error)Creates a new topic with default configuration.
Parameters:
ctx: Context for the operationtopicID: Topic ID (must start with letter, 3-255 chars, alphanumeric/dash/underscore/period/tilde/plus/percent)Returns: Pointer to created Topic and error
Example:
topic, err := client.CreateTopic(ctx, "my-topic")
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
defer topic.Stop()func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error)Creates a new topic with custom configuration including labels, schema settings, retention, and ingestion.
Parameters:
ctx: Context for the operationtopicID: Topic IDtc: Topic configurationReturns: Pointer to created Topic and error
Example:
topic, err := client.CreateTopicWithConfig(ctx, "my-topic", &pubsub.TopicConfig{
Labels: map[string]string{
"env": "production",
"team": "backend",
},
RetentionDuration: optional.Duration(24 * time.Hour),
SchemaSettings: &pubsub.SchemaSettings{
Schema: "projects/my-project/schemas/my-schema",
Encoding: pubsub.EncodingJSON,
},
})
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}type TopicConfig struct {
Labels map[string]string
MessageStoragePolicy MessageStoragePolicy
KMSKeyName string
SchemaSettings *SchemaSettings
RetentionDuration optional.Duration
State TopicState
IngestionDataSourceSettings *IngestionDataSourceSettings
MessageTransforms []MessageTransform
}Configuration of a topic (read-only output from Config()).
Fields:
Labels: User-defined key-value labelsMessageStoragePolicy: Geographic regions for message storageKMSKeyName: Cloud KMS key for encryption (format: projects/P/locations/L/keyRings/R/cryptoKeys/K)SchemaSettings: Schema validation settingsRetentionDuration: Minimum message retention duration (10 minutes to 31 days)State: Current topic state (active, ingestion error)IngestionDataSourceSettings: Data ingestion source configurationMessageTransforms: Message transformation functionsMethods:
func (t *TopicConfig) String() string
func (t *TopicConfig) ID() stringString(): Returns fully qualified topic nameID(): Returns topic ID within its projectfunc (t *Topic) Config(ctx context.Context) (TopicConfig, error)Retrieves the current configuration of the topic.
Example:
config, err := topic.Config(ctx)
if err != nil {
log.Fatalf("Failed to get config: %v", err)
}
fmt.Printf("Topic: %s\n", config.String())
fmt.Printf("Retention: %v\n", config.RetentionDuration)
fmt.Printf("Labels: %v\n", config.Labels)type TopicConfigToUpdate struct {
Labels map[string]string
MessageStoragePolicy *MessageStoragePolicy
RetentionDuration optional.Duration
SchemaSettings *SchemaSettings
IngestionDataSourceSettings *IngestionDataSourceSettings
MessageTransforms []MessageTransform
}Fields to update on a topic. Only non-nil fields are updated.
Fields:
Labels: Replace current labels (use empty map to clear all)MessageStoragePolicy: Replace message storage policy (use &MessageStoragePolicy{} to reset)RetentionDuration: Set retention duration (positive for new value, negative to clear, nil to keep unchanged)SchemaSettings: Update schema settings (use &SchemaSettings{} to remove schema)IngestionDataSourceSettings: Update ingestion settings (use &IngestionDataSourceSettings{} to remove)MessageTransforms: Replace message transformsfunc (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)Updates an existing topic according to the fields set in cfg. Returns the new TopicConfig.
Parameters:
ctx: Context for the operationcfg: Fields to updateReturns: Updated TopicConfig and error
Example:
// Update labels and retention
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
Labels: map[string]string{
"env": "staging",
},
RetentionDuration: optional.Duration(48 * time.Hour),
})
if err != nil {
log.Fatalf("Failed to update topic: %v", err)
}
// Clear retention duration
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
RetentionDuration: optional.Duration(-1),
})func (t *Topic) ID() string
func (t *Topic) String() stringID(): Returns the unique identifier within the topic's projectString(): Returns the fully qualified topic name (format: projects/PROJECT/topics/TOPIC)Example:
fmt.Printf("Topic ID: %s\n", topic.ID()) // "my-topic"
fmt.Printf("Topic Name: %s\n", topic.String()) // "projects/my-project/topics/my-topic"func (t *Topic) Exists(ctx context.Context) (bool, error)Checks whether the topic exists on the server.
Example:
exists, err := topic.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check existence: %v", err)
}
if !exists {
fmt.Println("Topic does not exist")
}func (t *Topic) Delete(ctx context.Context) errorDeletes the topic. All subscriptions to the topic must be deleted before deleting the topic.
Example:
if err := topic.Delete(ctx); err != nil {
log.Fatalf("Failed to delete topic: %v", err)
}type TopicState int
const (
TopicStateUnspecified = iota
TopicStateActive
TopicStateIngestionResourceError
)Denotes the possible states for a topic:
TopicStateUnspecified: Default/unused valueTopicStateActive: Topic has no persistent errorsTopicStateIngestionResourceError: Ingestion from data source encountered permanent errorfunc (t *Topic) IAM() *iam.HandleReturns an IAM handle for managing topic permissions.
Example:
import "cloud.google.com/go/iam"
// Get IAM policy
policy, err := topic.IAM().Policy(ctx)
if err != nil {
log.Fatalf("Failed to get policy: %v", err)
}
// Add a member to a role
policy.Add("user:alice@example.com", iam.Viewer)
policy.Add("serviceAccount:sa@project.iam.gserviceaccount.com", iam.Editor)
// Set the updated policy
if err := topic.IAM().SetPolicy(ctx, policy); err != nil {
log.Fatalf("Failed to set policy: %v", err)
}
// Test permissions
perms := []string{"pubsub.topics.publish", "pubsub.topics.get"}
allowed, err := topic.IAM().TestPermissions(ctx, perms)
if err != nil {
log.Fatalf("Failed to test permissions: %v", err)
}
fmt.Printf("Allowed permissions: %v\n", allowed)func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIteratorReturns an iterator for listing subscriptions attached to this topic. Some subscriptions may belong to other projects.
Example:
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to iterate: %v", err)
}
fmt.Printf("Subscription: %s\n", sub.ID())
}func (c *Client) Topics(ctx context.Context) *TopicIteratorReturns an iterator for listing all topics in the project.
type TopicIterator struct {
// Has unexported fields
}
func (it *TopicIterator) Next() (*Topic, error)
func (it *TopicIterator) NextConfig() (*TopicConfig, error)Iterator for listing topics:
Next(): Returns next Topic reference (returns iterator.Done when finished)NextConfig(): Returns next TopicConfig with full configuration (EXPERIMENTAL)Example:
// List topics with references
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to list topics: %v", err)
}
fmt.Printf("Topic: %s\n", topic.ID())
}
// List topics with full configuration
it2 := client.Topics(ctx)
for {
config, err := it2.NextConfig()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to list topics: %v", err)
}
fmt.Printf("Topic: %s, Labels: %v\n", config.ID(), config.Labels)
}type MessageStoragePolicy struct {
AllowedPersistenceRegions []string
}Constrains how messages published to the topic may be stored. Determined when the topic is created based on the policy configured at the project level.
Fields:
AllowedPersistenceRegions: GCP regions where messages may be persisted. If empty, indicates misconfiguration. If nil, uses organization-level Resource Location Restriction policy.Example:
topic, err := client.CreateTopicWithConfig(ctx, "regional-topic", &pubsub.TopicConfig{
MessageStoragePolicy: pubsub.MessageStoragePolicy{
AllowedPersistenceRegions: []string{"us-east1", "us-west1"},
},
})type DetachSubscriptionResult struct{}
func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error)Detaches a subscription from its topic. All retained messages are dropped. Subsequent Pull and StreamingPull requests will return FAILED_PRECONDITION. For push subscriptions, pushes to the endpoint will stop.
Parameters:
ctx: Context for the operationsub: Full subscription name (format: projects/PROJECT/subscriptions/SUBSCRIPTION)Example:
subName := "projects/my-project/subscriptions/my-sub"
result, err := client.DetachSubscription(ctx, subName)
if err != nil {
log.Fatalf("Failed to detach subscription: %v", err)
}func (t *Topic) Stop()
func (t *Topic) Flush()Methods:
Stop(): Sends all remaining published messages and stops goroutines. Blocks until all outstanding messages have been sent or failed.Flush(): Blocks until all remaining messages are sent (EXPERIMENTAL)Example:
// Graceful shutdown
topic.Stop()
// Or flush during operation
topic.Flush()var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")Returned when attempting to publish after Stop() has been called.
func (c *Client) Topic(id string) *Topic
func (c *Client) TopicInProject(id, projectID string) *TopicCreates references to topics:
Topic(): Reference to topic in client's projectTopicInProject(): Reference to topic in specific projectExample:
// Topic in client's project
topic := client.Topic("my-topic")
// Topic in different project
crossProjectTopic := client.TopicInProject("my-topic", "other-project")Stop() on topics used for publishing to clean up background goroutines