or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

topics.mddocs/

Topic Management

Topics are named resources to which messages are published. This document covers topic creation, configuration, updates, deletion, IAM management, and listing associated subscriptions.

Topic Type

type Topic struct {
    PublishSettings      PublishSettings
    EnableMessageOrdering bool
    // Has unexported fields
}

A reference to a Pub/Sub topic. Topic instances are safe for concurrent use by multiple goroutines.

Fields:

  • PublishSettings: Settings for message publishing (batching, timeouts, flow control, compression)
  • EnableMessageOrdering: Enable ordered message delivery using ordering keys

Topic Creation

Create Topic with Default Settings

func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error)

Creates a new topic with default configuration.

Parameters:

  • ctx: Context for the operation
  • topicID: Topic ID (must start with letter, 3-255 chars, alphanumeric/dash/underscore/period/tilde/plus/percent)

Returns: Pointer to created Topic and error

Example:

topic, err := client.CreateTopic(ctx, "my-topic")
if err != nil {
    log.Fatalf("Failed to create topic: %v", err)
}
defer topic.Stop()

Create Topic with Configuration

func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error)

Creates a new topic with custom configuration including labels, schema settings, retention, and ingestion.

Parameters:

  • ctx: Context for the operation
  • topicID: Topic ID
  • tc: Topic configuration

Returns: Pointer to created Topic and error

Example:

topic, err := client.CreateTopicWithConfig(ctx, "my-topic", &pubsub.TopicConfig{
    Labels: map[string]string{
        "env": "production",
        "team": "backend",
    },
    RetentionDuration: optional.Duration(24 * time.Hour),
    SchemaSettings: &pubsub.SchemaSettings{
        Schema:   "projects/my-project/schemas/my-schema",
        Encoding: pubsub.EncodingJSON,
    },
})
if err != nil {
    log.Fatalf("Failed to create topic: %v", err)
}

Topic Configuration

TopicConfig

type TopicConfig struct {
    Labels                      map[string]string
    MessageStoragePolicy        MessageStoragePolicy
    KMSKeyName                  string
    SchemaSettings              *SchemaSettings
    RetentionDuration           optional.Duration
    State                       TopicState
    IngestionDataSourceSettings *IngestionDataSourceSettings
    MessageTransforms           []MessageTransform
}

Configuration of a topic (read-only output from Config()).

Fields:

  • Labels: User-defined key-value labels
  • MessageStoragePolicy: Geographic regions for message storage
  • KMSKeyName: Cloud KMS key for encryption (format: projects/P/locations/L/keyRings/R/cryptoKeys/K)
  • SchemaSettings: Schema validation settings
  • RetentionDuration: Minimum message retention duration (10 minutes to 31 days)
  • State: Current topic state (active, ingestion error)
  • IngestionDataSourceSettings: Data ingestion source configuration
  • MessageTransforms: Message transformation functions

Methods:

func (t *TopicConfig) String() string
func (t *TopicConfig) ID() string
  • String(): Returns fully qualified topic name
  • ID(): Returns topic ID within its project

Get Topic Configuration

func (t *Topic) Config(ctx context.Context) (TopicConfig, error)

Retrieves the current configuration of the topic.

Example:

config, err := topic.Config(ctx)
if err != nil {
    log.Fatalf("Failed to get config: %v", err)
}
fmt.Printf("Topic: %s\n", config.String())
fmt.Printf("Retention: %v\n", config.RetentionDuration)
fmt.Printf("Labels: %v\n", config.Labels)

Topic Updates

TopicConfigToUpdate

type TopicConfigToUpdate struct {
    Labels                      map[string]string
    MessageStoragePolicy        *MessageStoragePolicy
    RetentionDuration           optional.Duration
    SchemaSettings              *SchemaSettings
    IngestionDataSourceSettings *IngestionDataSourceSettings
    MessageTransforms           []MessageTransform
}

Fields to update on a topic. Only non-nil fields are updated.

Fields:

  • Labels: Replace current labels (use empty map to clear all)
  • MessageStoragePolicy: Replace message storage policy (use &MessageStoragePolicy{} to reset)
  • RetentionDuration: Set retention duration (positive for new value, negative to clear, nil to keep unchanged)
  • SchemaSettings: Update schema settings (use &SchemaSettings{} to remove schema)
  • IngestionDataSourceSettings: Update ingestion settings (use &IngestionDataSourceSettings{} to remove)
  • MessageTransforms: Replace message transforms

Update Topic

func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)

Updates an existing topic according to the fields set in cfg. Returns the new TopicConfig.

Parameters:

  • ctx: Context for the operation
  • cfg: Fields to update

Returns: Updated TopicConfig and error

Example:

// Update labels and retention
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
    Labels: map[string]string{
        "env": "staging",
    },
    RetentionDuration: optional.Duration(48 * time.Hour),
})
if err != nil {
    log.Fatalf("Failed to update topic: %v", err)
}

// Clear retention duration
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
    RetentionDuration: optional.Duration(-1),
})

Topic Information

Topic ID and Name

func (t *Topic) ID() string
func (t *Topic) String() string
  • ID(): Returns the unique identifier within the topic's project
  • String(): Returns the fully qualified topic name (format: projects/PROJECT/topics/TOPIC)

Example:

fmt.Printf("Topic ID: %s\n", topic.ID())         // "my-topic"
fmt.Printf("Topic Name: %s\n", topic.String())   // "projects/my-project/topics/my-topic"

Check Topic Existence

func (t *Topic) Exists(ctx context.Context) (bool, error)

Checks whether the topic exists on the server.

Example:

exists, err := topic.Exists(ctx)
if err != nil {
    log.Fatalf("Failed to check existence: %v", err)
}
if !exists {
    fmt.Println("Topic does not exist")
}

Topic Deletion

func (t *Topic) Delete(ctx context.Context) error

Deletes the topic. All subscriptions to the topic must be deleted before deleting the topic.

Example:

if err := topic.Delete(ctx); err != nil {
    log.Fatalf("Failed to delete topic: %v", err)
}

Topic States

type TopicState int

const (
    TopicStateUnspecified = iota
    TopicStateActive
    TopicStateIngestionResourceError
)

Denotes the possible states for a topic:

  • TopicStateUnspecified: Default/unused value
  • TopicStateActive: Topic has no persistent errors
  • TopicStateIngestionResourceError: Ingestion from data source encountered permanent error

IAM Management

func (t *Topic) IAM() *iam.Handle

Returns an IAM handle for managing topic permissions.

Example:

import "cloud.google.com/go/iam"

// Get IAM policy
policy, err := topic.IAM().Policy(ctx)
if err != nil {
    log.Fatalf("Failed to get policy: %v", err)
}

// Add a member to a role
policy.Add("user:alice@example.com", iam.Viewer)
policy.Add("serviceAccount:sa@project.iam.gserviceaccount.com", iam.Editor)

// Set the updated policy
if err := topic.IAM().SetPolicy(ctx, policy); err != nil {
    log.Fatalf("Failed to set policy: %v", err)
}

// Test permissions
perms := []string{"pubsub.topics.publish", "pubsub.topics.get"}
allowed, err := topic.IAM().TestPermissions(ctx, perms)
if err != nil {
    log.Fatalf("Failed to test permissions: %v", err)
}
fmt.Printf("Allowed permissions: %v\n", allowed)

Listing Topic Subscriptions

func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator

Returns an iterator for listing subscriptions attached to this topic. Some subscriptions may belong to other projects.

Example:

it := topic.Subscriptions(ctx)
for {
    sub, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to iterate: %v", err)
    }
    fmt.Printf("Subscription: %s\n", sub.ID())
}

Listing Topics

func (c *Client) Topics(ctx context.Context) *TopicIterator

Returns an iterator for listing all topics in the project.

TopicIterator

type TopicIterator struct {
    // Has unexported fields
}

func (it *TopicIterator) Next() (*Topic, error)
func (it *TopicIterator) NextConfig() (*TopicConfig, error)

Iterator for listing topics:

  • Next(): Returns next Topic reference (returns iterator.Done when finished)
  • NextConfig(): Returns next TopicConfig with full configuration (EXPERIMENTAL)

Example:

// List topics with references
it := client.Topics(ctx)
for {
    topic, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to list topics: %v", err)
    }
    fmt.Printf("Topic: %s\n", topic.ID())
}

// List topics with full configuration
it2 := client.Topics(ctx)
for {
    config, err := it2.NextConfig()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to list topics: %v", err)
    }
    fmt.Printf("Topic: %s, Labels: %v\n", config.ID(), config.Labels)
}

Message Storage Policy

type MessageStoragePolicy struct {
    AllowedPersistenceRegions []string
}

Constrains how messages published to the topic may be stored. Determined when the topic is created based on the policy configured at the project level.

Fields:

  • AllowedPersistenceRegions: GCP regions where messages may be persisted. If empty, indicates misconfiguration. If nil, uses organization-level Resource Location Restriction policy.

Example:

topic, err := client.CreateTopicWithConfig(ctx, "regional-topic", &pubsub.TopicConfig{
    MessageStoragePolicy: pubsub.MessageStoragePolicy{
        AllowedPersistenceRegions: []string{"us-east1", "us-west1"},
    },
})

Detaching Subscriptions

type DetachSubscriptionResult struct{}

func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error)

Detaches a subscription from its topic. All retained messages are dropped. Subsequent Pull and StreamingPull requests will return FAILED_PRECONDITION. For push subscriptions, pushes to the endpoint will stop.

Parameters:

  • ctx: Context for the operation
  • sub: Full subscription name (format: projects/PROJECT/subscriptions/SUBSCRIPTION)

Example:

subName := "projects/my-project/subscriptions/my-sub"
result, err := client.DetachSubscription(ctx, subName)
if err != nil {
    log.Fatalf("Failed to detach subscription: %v", err)
}

Stopping a Topic

func (t *Topic) Stop()
func (t *Topic) Flush()

Methods:

  • Stop(): Sends all remaining published messages and stops goroutines. Blocks until all outstanding messages have been sent or failed.
  • Flush(): Blocks until all remaining messages are sent (EXPERIMENTAL)

Example:

// Graceful shutdown
topic.Stop()

// Or flush during operation
topic.Flush()

Error Handling

var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")

Returned when attempting to publish after Stop() has been called.

Topic Access from Client

func (c *Client) Topic(id string) *Topic
func (c *Client) TopicInProject(id, projectID string) *Topic

Creates references to topics:

  • Topic(): Reference to topic in client's project
  • TopicInProject(): Reference to topic in specific project

Example:

// Topic in client's project
topic := client.Topic("my-topic")

// Topic in different project
crossProjectTopic := client.TopicInProject("my-topic", "other-project")

Best Practices

  1. Resource Cleanup: Always call Stop() on topics used for publishing to clean up background goroutines
  2. Reuse Topics: Avoid creating many Topic instances for publishing; reuse a single instance
  3. Labels: Use labels for organization, cost tracking, and filtering
  4. Retention: Set retention duration to enable message replay and seeking
  5. Schema Validation: Use schema settings to enforce message structure
  6. IAM: Use least-privilege access with IAM policies
  7. Monitoring: Check topic state regularly when using data ingestion