Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
—
Topics are named resources to which messages are published. This document covers topic creation, configuration, updates, deletion, IAM management, and listing associated subscriptions.
type Topic struct {
PublishSettings PublishSettings
EnableMessageOrdering bool
// Has unexported fields
}A reference to a Pub/Sub topic. Topic instances are safe for concurrent use by multiple goroutines.
Fields:
PublishSettings: Settings for message publishing (batching, timeouts, flow control, compression)EnableMessageOrdering: Enable ordered message delivery using ordering keysfunc (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error)Creates a new topic with default configuration.
Parameters:
ctx: Context for the operationtopicID: Topic ID (must start with letter, 3-255 chars, alphanumeric/dash/underscore/period/tilde/plus/percent)Returns: Pointer to created Topic and error
Example:
topic, err := client.CreateTopic(ctx, "my-topic")
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
defer topic.Stop()func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error)Creates a new topic with custom configuration including labels, schema settings, retention, and ingestion.
Parameters:
ctx: Context for the operationtopicID: Topic IDtc: Topic configurationReturns: Pointer to created Topic and error
Example:
topic, err := client.CreateTopicWithConfig(ctx, "my-topic", &pubsub.TopicConfig{
Labels: map[string]string{
"env": "production",
"team": "backend",
},
RetentionDuration: optional.Duration(24 * time.Hour),
SchemaSettings: &pubsub.SchemaSettings{
Schema: "projects/my-project/schemas/my-schema",
Encoding: pubsub.EncodingJSON,
},
})
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}type TopicConfig struct {
Labels map[string]string
MessageStoragePolicy MessageStoragePolicy
KMSKeyName string
SchemaSettings *SchemaSettings
RetentionDuration optional.Duration
State TopicState
IngestionDataSourceSettings *IngestionDataSourceSettings
MessageTransforms []MessageTransform
}Configuration of a topic (read-only output from Config()).
Fields:
Labels: User-defined key-value labelsMessageStoragePolicy: Geographic regions for message storageKMSKeyName: Cloud KMS key for encryption (format: projects/P/locations/L/keyRings/R/cryptoKeys/K)SchemaSettings: Schema validation settingsRetentionDuration: Minimum message retention duration (10 minutes to 31 days)State: Current topic state (active, ingestion error)IngestionDataSourceSettings: Data ingestion source configurationMessageTransforms: Message transformation functionsMethods:
func (t *TopicConfig) String() string
func (t *TopicConfig) ID() stringString(): Returns fully qualified topic nameID(): Returns topic ID within its projectfunc (t *Topic) Config(ctx context.Context) (TopicConfig, error)Retrieves the current configuration of the topic.
Example:
config, err := topic.Config(ctx)
if err != nil {
log.Fatalf("Failed to get config: %v", err)
}
fmt.Printf("Topic: %s\n", config.String())
fmt.Printf("Retention: %v\n", config.RetentionDuration)
fmt.Printf("Labels: %v\n", config.Labels)type TopicConfigToUpdate struct {
Labels map[string]string
MessageStoragePolicy *MessageStoragePolicy
RetentionDuration optional.Duration
SchemaSettings *SchemaSettings
IngestionDataSourceSettings *IngestionDataSourceSettings
MessageTransforms []MessageTransform
}Fields to update on a topic. Only non-nil fields are updated.
Fields:
Labels: Replace current labels (use empty map to clear all)MessageStoragePolicy: Replace message storage policy (use &MessageStoragePolicy{} to reset)RetentionDuration: Set retention duration (positive for new value, negative to clear, nil to keep unchanged)SchemaSettings: Update schema settings (use &SchemaSettings{} to remove schema)IngestionDataSourceSettings: Update ingestion settings (use &IngestionDataSourceSettings{} to remove)MessageTransforms: Replace message transformsfunc (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)Updates an existing topic according to the fields set in cfg. Returns the new TopicConfig.
Parameters:
ctx: Context for the operationcfg: Fields to updateReturns: Updated TopicConfig and error
Example:
// Update labels and retention
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
Labels: map[string]string{
"env": "staging",
},
RetentionDuration: optional.Duration(48 * time.Hour),
})
if err != nil {
log.Fatalf("Failed to update topic: %v", err)
}
// Clear retention duration
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
RetentionDuration: optional.Duration(-1),
})func (t *Topic) ID() string
func (t *Topic) String() stringID(): Returns the unique identifier within the topic's projectString(): Returns the fully qualified topic name (format: projects/PROJECT/topics/TOPIC)Example:
fmt.Printf("Topic ID: %s\n", topic.ID()) // "my-topic"
fmt.Printf("Topic Name: %s\n", topic.String()) // "projects/my-project/topics/my-topic"func (t *Topic) Exists(ctx context.Context) (bool, error)Checks whether the topic exists on the server.
Example:
exists, err := topic.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check existence: %v", err)
}
if !exists {
fmt.Println("Topic does not exist")
}func (t *Topic) Delete(ctx context.Context) errorDeletes the topic. All subscriptions to the topic must be deleted before deleting the topic.
Example:
if err := topic.Delete(ctx); err != nil {
log.Fatalf("Failed to delete topic: %v", err)
}type TopicState int
const (
TopicStateUnspecified = iota
TopicStateActive
TopicStateIngestionResourceError
)Denotes the possible states for a topic:
TopicStateUnspecified: Default/unused valueTopicStateActive: Topic has no persistent errorsTopicStateIngestionResourceError: Ingestion from data source encountered permanent errorfunc (t *Topic) IAM() *iam.HandleReturns an IAM handle for managing topic permissions.
Example:
import "cloud.google.com/go/iam"
// Get IAM policy
policy, err := topic.IAM().Policy(ctx)
if err != nil {
log.Fatalf("Failed to get policy: %v", err)
}
// Add a member to a role
policy.Add("user:alice@example.com", iam.Viewer)
policy.Add("serviceAccount:sa@project.iam.gserviceaccount.com", iam.Editor)
// Set the updated policy
if err := topic.IAM().SetPolicy(ctx, policy); err != nil {
log.Fatalf("Failed to set policy: %v", err)
}
// Test permissions
perms := []string{"pubsub.topics.publish", "pubsub.topics.get"}
allowed, err := topic.IAM().TestPermissions(ctx, perms)
if err != nil {
log.Fatalf("Failed to test permissions: %v", err)
}
fmt.Printf("Allowed permissions: %v\n", allowed)func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIteratorReturns an iterator for listing subscriptions attached to this topic. Some subscriptions may belong to other projects.
Example:
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to iterate: %v", err)
}
fmt.Printf("Subscription: %s\n", sub.ID())
}func (c *Client) Topics(ctx context.Context) *TopicIteratorReturns an iterator for listing all topics in the project.
type TopicIterator struct {
// Has unexported fields
}
func (it *TopicIterator) Next() (*Topic, error)
func (it *TopicIterator) NextConfig() (*TopicConfig, error)Iterator for listing topics:
Next(): Returns next Topic reference (returns iterator.Done when finished)NextConfig(): Returns next TopicConfig with full configuration (EXPERIMENTAL)Example:
// List topics with references
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to list topics: %v", err)
}
fmt.Printf("Topic: %s\n", topic.ID())
}
// List topics with full configuration
it2 := client.Topics(ctx)
for {
config, err := it2.NextConfig()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to list topics: %v", err)
}
fmt.Printf("Topic: %s, Labels: %v\n", config.ID(), config.Labels)
}type MessageStoragePolicy struct {
AllowedPersistenceRegions []string
}Constrains how messages published to the topic may be stored. Determined when the topic is created based on the policy configured at the project level.
Fields:
AllowedPersistenceRegions: GCP regions where messages may be persisted. If empty, indicates misconfiguration. If nil, uses organization-level Resource Location Restriction policy.Example:
topic, err := client.CreateTopicWithConfig(ctx, "regional-topic", &pubsub.TopicConfig{
MessageStoragePolicy: pubsub.MessageStoragePolicy{
AllowedPersistenceRegions: []string{"us-east1", "us-west1"},
},
})type DetachSubscriptionResult struct{}
func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error)Detaches a subscription from its topic. All retained messages are dropped. Subsequent Pull and StreamingPull requests will return FAILED_PRECONDITION. For push subscriptions, pushes to the endpoint will stop.
Parameters:
ctx: Context for the operationsub: Full subscription name (format: projects/PROJECT/subscriptions/SUBSCRIPTION)Example:
subName := "projects/my-project/subscriptions/my-sub"
result, err := client.DetachSubscription(ctx, subName)
if err != nil {
log.Fatalf("Failed to detach subscription: %v", err)
}func (t *Topic) Stop()
func (t *Topic) Flush()Methods:
Stop(): Sends all remaining published messages and stops goroutines. Blocks until all outstanding messages have been sent or failed.Flush(): Blocks until all remaining messages are sent (EXPERIMENTAL)Example:
// Graceful shutdown
topic.Stop()
// Or flush during operation
topic.Flush()var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")Returned when attempting to publish after Stop() has been called.
func (c *Client) Topic(id string) *Topic
func (c *Client) TopicInProject(id, projectID string) *TopicCreates references to topics:
Topic(): Reference to topic in client's projectTopicInProject(): Reference to topic in specific projectExample:
// Topic in client's project
topic := client.Topic("my-topic")
// Topic in different project
crossProjectTopic := client.TopicInProject("my-topic", "other-project")Stop() on topics used for publishing to clean up background goroutinesInstall with Tessl CLI
npx tessl i tessl/golang-cloud-google-com--go--pubsub