or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

subscriptions.mddocs/

Subscription Management

Subscriptions are named resources representing a stream of messages from a topic. This document covers subscription creation, configuration, updates, deletion, IAM management, and seeking operations.

Subscription Type

type Subscription struct {
    ReceiveSettings ReceiveSettings
    // Has unexported fields
}

A reference to a Pub/Sub subscription. Configure ReceiveSettings before calling Receive().

Fields:

  • ReceiveSettings: Settings for receiving messages (flow control, goroutines, ack deadlines)

Subscription Creation

func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)

Creates a new subscription with the specified configuration.

Parameters:

  • ctx: Context for the operation
  • id: Subscription ID (must start with letter, 3-255 chars)
  • cfg: Subscription configuration including topic, ack deadline, and delivery settings

Returns: Pointer to created Subscription and error

Example:

topic := client.Topic("my-topic")

sub, err := client.CreateSubscription(ctx, "my-subscription", pubsub.SubscriptionConfig{
    Topic:       topic,
    AckDeadline: 20 * time.Second,
    Labels: map[string]string{
        "env": "production",
    },
})
if err != nil {
    log.Fatalf("Failed to create subscription: %v", err)
}

Subscription Configuration

SubscriptionConfig

type SubscriptionConfig struct {
    Topic                         *Topic
    PushConfig                    PushConfig
    BigQueryConfig                BigQueryConfig
    CloudStorageConfig            CloudStorageConfig
    AckDeadline                   time.Duration
    RetainAckedMessages           bool
    RetentionDuration             time.Duration
    ExpirationPolicy              optional.Duration
    Labels                        map[string]string
    EnableMessageOrdering         bool
    DeadLetterPolicy              *DeadLetterPolicy
    Filter                        string
    RetryPolicy                   *RetryPolicy
    Detached                      bool
    TopicMessageRetentionDuration time.Duration
    State                         SubscriptionState
    EnableExactlyOnceDelivery     bool
    MessageTransforms             []MessageTransform
}

Configuration of a subscription (read-only output from Config()).

Fields:

  • Topic: The topic from which messages are received
  • PushConfig: Push delivery configuration (mutually exclusive with BigQuery/CloudStorage)
  • BigQueryConfig: BigQuery delivery configuration
  • CloudStorageConfig: Cloud Storage delivery configuration
  • AckDeadline: Maximum time after receiving a message before auto-extending (default: 10s, max: 600s)
  • RetainAckedMessages: Retain acknowledged messages in backlog
  • RetentionDuration: Message retention in backlog (7 days default, 10 minutes to 7 days)
  • ExpirationPolicy: Subscription expiration policy (31 days default, 0 for never expire)
  • Labels: User-defined key-value labels
  • EnableMessageOrdering: Enable ordered message delivery
  • DeadLetterPolicy: Dead letter topic configuration
  • Filter: Message filter expression (immutable after creation)
  • RetryPolicy: Retry policy for message delivery
  • Detached: Whether subscription is detached from topic (output only)
  • TopicMessageRetentionDuration: Topic's message retention duration (output only)
  • State: Current subscription state
  • EnableExactlyOnceDelivery: Enable exactly-once delivery guarantees
  • MessageTransforms: Message transformation functions

Methods:

func (s *SubscriptionConfig) String() string
func (s *SubscriptionConfig) ID() string

Get Subscription Configuration

func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)

Retrieves the current configuration of the subscription.

Example:

config, err := sub.Config(ctx)
if err != nil {
    log.Fatalf("Failed to get config: %v", err)
}
fmt.Printf("Subscription: %s\n", config.String())
fmt.Printf("Topic: %s\n", config.Topic.ID())
fmt.Printf("AckDeadline: %v\n", config.AckDeadline)
fmt.Printf("RetentionDuration: %v\n", config.RetentionDuration)

Subscription Updates

SubscriptionConfigToUpdate

type SubscriptionConfigToUpdate struct {
    PushConfig                *PushConfig
    BigQueryConfig            *BigQueryConfig
    CloudStorageConfig        *CloudStorageConfig
    AckDeadline               time.Duration
    RetainAckedMessages       bool
    RetentionDuration         time.Duration
    ExpirationPolicy          optional.Duration
    DeadLetterPolicy          *DeadLetterPolicy
    Labels                    map[string]string
    RetryPolicy               *RetryPolicy
    EnableExactlyOnceDelivery bool
    MessageTransforms         []MessageTransform
}

Fields to update on a subscription. Only non-nil/non-zero fields are updated.

Update Subscription

func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)

Updates an existing subscription according to the fields set in cfg. Returns the new SubscriptionConfig.

Example:

// Update ack deadline and labels
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    AckDeadline: 30 * time.Second,
    Labels: map[string]string{
        "env": "staging",
        "version": "v2",
    },
})
if err != nil {
    log.Fatalf("Failed to update subscription: %v", err)
}

// Enable dead letter policy
config, err = sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
    DeadLetterPolicy: &pubsub.DeadLetterPolicy{
        DeadLetterTopic:     "projects/my-project/topics/dead-letter",
        MaxDeliveryAttempts: 5,
    },
})

Subscription Information

Subscription ID and Name

func (s *Subscription) ID() string
func (s *Subscription) String() string
  • ID(): Returns the unique identifier within the subscription's project
  • String(): Returns the fully qualified subscription name

Example:

fmt.Printf("Subscription ID: %s\n", sub.ID())       // "my-sub"
fmt.Printf("Subscription Name: %s\n", sub.String()) // "projects/my-project/subscriptions/my-sub"

Check Subscription Existence

func (s *Subscription) Exists(ctx context.Context) (bool, error)

Checks whether the subscription exists on the server.

Example:

exists, err := sub.Exists(ctx)
if err != nil {
    log.Fatalf("Failed to check existence: %v", err)
}
if !exists {
    fmt.Println("Subscription does not exist")
}

Subscription Deletion

func (s *Subscription) Delete(ctx context.Context) error

Deletes the subscription.

Example:

if err := sub.Delete(ctx); err != nil {
    log.Fatalf("Failed to delete subscription: %v", err)
}

Subscription States

type SubscriptionState int

const (
    SubscriptionStateUnspecified = iota
    SubscriptionStateActive
    SubscriptionStateResourceError
)

Denotes the possible states for a subscription:

  • SubscriptionStateUnspecified: Default/unused value
  • SubscriptionStateActive: Subscription is active
  • SubscriptionStateResourceError: Error with resource to which subscription pushes messages

IAM Management

func (s *Subscription) IAM() *iam.Handle

Returns an IAM handle for managing subscription permissions.

Example:

// Get IAM policy
policy, err := sub.IAM().Policy(ctx)
if err != nil {
    log.Fatalf("Failed to get policy: %v", err)
}

// Add a member to a role
policy.Add("user:bob@example.com", iam.Viewer)
policy.Add("serviceAccount:worker@project.iam.gserviceaccount.com", "roles/pubsub.subscriber")

// Set the updated policy
if err := sub.IAM().SetPolicy(ctx, policy); err != nil {
    log.Fatalf("Failed to set policy: %v", err)
}

// Test permissions
perms := []string{"pubsub.subscriptions.consume", "pubsub.subscriptions.get"}
allowed, err := sub.IAM().TestPermissions(ctx, perms)
if err != nil {
    log.Fatalf("Failed to test permissions: %v", err)
}

Listing Subscriptions

func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator

Returns an iterator for listing all subscriptions in the project.

SubscriptionIterator

type SubscriptionIterator struct {
    // Has unexported fields
}

func (it *SubscriptionIterator) Next() (*Subscription, error)
func (it *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error)

Iterator for listing subscriptions:

  • Next(): Returns next Subscription reference (returns iterator.Done when finished)
  • NextConfig(): Returns next SubscriptionConfig with full configuration (EXPERIMENTAL)

Example:

// List subscriptions with references
it := client.Subscriptions(ctx)
for {
    sub, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to list subscriptions: %v", err)
    }
    fmt.Printf("Subscription: %s\n", sub.ID())
}

// List subscriptions with full configuration
it2 := client.Subscriptions(ctx)
for {
    config, err := it2.NextConfig()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to list subscriptions: %v", err)
    }
    fmt.Printf("Subscription: %s, Topic: %s\n", config.ID(), config.Topic.ID())
}

Dead Letter Policy

type DeadLetterPolicy struct {
    DeadLetterTopic     string
    MaxDeliveryAttempts int
}

Configuration for dead letter topics. Messages that cannot be delivered are sent to the dead letter topic.

Fields:

  • DeadLetterTopic: Full topic name (format: projects/PROJECT/topics/TOPIC)
  • MaxDeliveryAttempts: Maximum delivery attempts before dead lettering (5-100)

Example:

sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
    Topic:       topic,
    AckDeadline: 20 * time.Second,
    DeadLetterPolicy: &pubsub.DeadLetterPolicy{
        DeadLetterTopic:     "projects/my-project/topics/dead-letters",
        MaxDeliveryAttempts: 10,
    },
})

Retry Policy

type RetryPolicy struct {
    MinimumBackoff optional.Duration
    MaximumBackoff optional.Duration
}

Retry policy for message delivery. Controls exponential backoff between delivery retries.

Fields:

  • MinimumBackoff: Minimum wait before retry (default: 10s)
  • MaximumBackoff: Maximum wait before retry (default: 600s)

Example:

sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
    Topic:       topic,
    AckDeadline: 20 * time.Second,
    RetryPolicy: &pubsub.RetryPolicy{
        MinimumBackoff: optional.Duration(5 * time.Second),
        MaximumBackoff: optional.Duration(300 * time.Second),
    },
})

Message Filtering

Subscriptions can filter messages using the Cloud Pub/Sub filter language. Filters are immutable after subscription creation.

Example:

// Create subscription with filter
sub, err := client.CreateSubscription(ctx, "filtered-sub", pubsub.SubscriptionConfig{
    Topic:  topic,
    Filter: `attributes.region = "us-east1" AND attributes.priority = "high"`,
})

// Filter syntax examples:
// - attributes.foo = "bar"
// - attributes.foo = "bar" AND attributes.baz = "qux"
// - attributes.foo = "bar" OR attributes.baz = "qux"
// - attributes.foo IN ("bar", "baz", "qux")
// - hasPrefix(attributes.foo, "bar")

Message Ordering

EnableMessageOrdering bool

Enable ordered message delivery using ordering keys. When enabled, messages with the same ordering key are delivered in the order they were published.

Example:

sub, err := client.CreateSubscription(ctx, "ordered-sub", pubsub.SubscriptionConfig{
    Topic:                 topic,
    EnableMessageOrdering: true,
})

Exactly-Once Delivery

EnableExactlyOnceDelivery bool

Enables exactly-once delivery guarantees:

  • Messages are not resent before ack deadline expires
  • Acknowledged messages are not resent

Note: Subscribers may still receive multiple copies if messages were published multiple times.

Example:

sub, err := client.CreateSubscription(ctx, "exactly-once-sub", pubsub.SubscriptionConfig{
    Topic:                     topic,
    EnableExactlyOnceDelivery: true,
})

Subscription Access from Client

func (c *Client) Subscription(id string) *Subscription
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription

Creates references to subscriptions:

  • Subscription(): Reference to subscription in client's project
  • SubscriptionInProject(): Reference to subscription in specific project

Example:

// Subscription in client's project
sub := client.Subscription("my-sub")

// Subscription in different project
crossProjectSub := client.SubscriptionInProject("my-sub", "other-project")

Seeking Operations

See the Snapshots and Seeking documentation for details on:

  • CreateSnapshot(): Create a snapshot from subscription
  • SeekToTime(): Seek to a specific timestamp
  • SeekToSnapshot(): Seek to a snapshot

Best Practices

  1. Ack Deadlines: Set appropriate ack deadlines based on message processing time
  2. Retention: Configure retention to enable message replay
  3. Dead Letter: Use dead letter topics to handle poison messages
  4. Filtering: Use filters to reduce unnecessary message processing
  5. Ordering: Enable message ordering only when necessary (impacts throughput)
  6. Exactly-Once: Use exactly-once delivery for critical workflows requiring deduplication
  7. Expiration: Set expiration policies to automatically clean up unused subscriptions
  8. Labels: Use labels for organization and cost tracking
  9. IAM: Use least-privilege access with IAM policies
  10. Monitoring: Monitor delivery attempt counts and dead letter topic activity