tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Subscriptions are named resources representing a stream of messages from a topic. This document covers subscription creation, configuration, updates, deletion, IAM management, and seeking operations.
type Subscription struct {
ReceiveSettings ReceiveSettings
// Has unexported fields
}A reference to a Pub/Sub subscription. Configure ReceiveSettings before calling Receive().
Fields:
ReceiveSettings: Settings for receiving messages (flow control, goroutines, ack deadlines)func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)Creates a new subscription with the specified configuration.
Parameters:
ctx: Context for the operationid: Subscription ID (must start with letter, 3-255 chars)cfg: Subscription configuration including topic, ack deadline, and delivery settingsReturns: Pointer to created Subscription and error
Example:
topic := client.Topic("my-topic")
sub, err := client.CreateSubscription(ctx, "my-subscription", pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 20 * time.Second,
Labels: map[string]string{
"env": "production",
},
})
if err != nil {
log.Fatalf("Failed to create subscription: %v", err)
}type SubscriptionConfig struct {
Topic *Topic
PushConfig PushConfig
BigQueryConfig BigQueryConfig
CloudStorageConfig CloudStorageConfig
AckDeadline time.Duration
RetainAckedMessages bool
RetentionDuration time.Duration
ExpirationPolicy optional.Duration
Labels map[string]string
EnableMessageOrdering bool
DeadLetterPolicy *DeadLetterPolicy
Filter string
RetryPolicy *RetryPolicy
Detached bool
TopicMessageRetentionDuration time.Duration
State SubscriptionState
EnableExactlyOnceDelivery bool
MessageTransforms []MessageTransform
}Configuration of a subscription (read-only output from Config()).
Fields:
Topic: The topic from which messages are receivedPushConfig: Push delivery configuration (mutually exclusive with BigQuery/CloudStorage)BigQueryConfig: BigQuery delivery configurationCloudStorageConfig: Cloud Storage delivery configurationAckDeadline: Maximum time after receiving a message before auto-extending (default: 10s, max: 600s)RetainAckedMessages: Retain acknowledged messages in backlogRetentionDuration: Message retention in backlog (7 days default, 10 minutes to 7 days)ExpirationPolicy: Subscription expiration policy (31 days default, 0 for never expire)Labels: User-defined key-value labelsEnableMessageOrdering: Enable ordered message deliveryDeadLetterPolicy: Dead letter topic configurationFilter: Message filter expression (immutable after creation)RetryPolicy: Retry policy for message deliveryDetached: Whether subscription is detached from topic (output only)TopicMessageRetentionDuration: Topic's message retention duration (output only)State: Current subscription stateEnableExactlyOnceDelivery: Enable exactly-once delivery guaranteesMessageTransforms: Message transformation functionsMethods:
func (s *SubscriptionConfig) String() string
func (s *SubscriptionConfig) ID() stringfunc (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)Retrieves the current configuration of the subscription.
Example:
config, err := sub.Config(ctx)
if err != nil {
log.Fatalf("Failed to get config: %v", err)
}
fmt.Printf("Subscription: %s\n", config.String())
fmt.Printf("Topic: %s\n", config.Topic.ID())
fmt.Printf("AckDeadline: %v\n", config.AckDeadline)
fmt.Printf("RetentionDuration: %v\n", config.RetentionDuration)type SubscriptionConfigToUpdate struct {
PushConfig *PushConfig
BigQueryConfig *BigQueryConfig
CloudStorageConfig *CloudStorageConfig
AckDeadline time.Duration
RetainAckedMessages bool
RetentionDuration time.Duration
ExpirationPolicy optional.Duration
DeadLetterPolicy *DeadLetterPolicy
Labels map[string]string
RetryPolicy *RetryPolicy
EnableExactlyOnceDelivery bool
MessageTransforms []MessageTransform
}Fields to update on a subscription. Only non-nil/non-zero fields are updated.
func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)Updates an existing subscription according to the fields set in cfg. Returns the new SubscriptionConfig.
Example:
// Update ack deadline and labels
config, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
AckDeadline: 30 * time.Second,
Labels: map[string]string{
"env": "staging",
"version": "v2",
},
})
if err != nil {
log.Fatalf("Failed to update subscription: %v", err)
}
// Enable dead letter policy
config, err = sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
DeadLetterTopic: "projects/my-project/topics/dead-letter",
MaxDeliveryAttempts: 5,
},
})func (s *Subscription) ID() string
func (s *Subscription) String() stringID(): Returns the unique identifier within the subscription's projectString(): Returns the fully qualified subscription nameExample:
fmt.Printf("Subscription ID: %s\n", sub.ID()) // "my-sub"
fmt.Printf("Subscription Name: %s\n", sub.String()) // "projects/my-project/subscriptions/my-sub"func (s *Subscription) Exists(ctx context.Context) (bool, error)Checks whether the subscription exists on the server.
Example:
exists, err := sub.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check existence: %v", err)
}
if !exists {
fmt.Println("Subscription does not exist")
}func (s *Subscription) Delete(ctx context.Context) errorDeletes the subscription.
Example:
if err := sub.Delete(ctx); err != nil {
log.Fatalf("Failed to delete subscription: %v", err)
}type SubscriptionState int
const (
SubscriptionStateUnspecified = iota
SubscriptionStateActive
SubscriptionStateResourceError
)Denotes the possible states for a subscription:
SubscriptionStateUnspecified: Default/unused valueSubscriptionStateActive: Subscription is activeSubscriptionStateResourceError: Error with resource to which subscription pushes messagesfunc (s *Subscription) IAM() *iam.HandleReturns an IAM handle for managing subscription permissions.
Example:
// Get IAM policy
policy, err := sub.IAM().Policy(ctx)
if err != nil {
log.Fatalf("Failed to get policy: %v", err)
}
// Add a member to a role
policy.Add("user:bob@example.com", iam.Viewer)
policy.Add("serviceAccount:worker@project.iam.gserviceaccount.com", "roles/pubsub.subscriber")
// Set the updated policy
if err := sub.IAM().SetPolicy(ctx, policy); err != nil {
log.Fatalf("Failed to set policy: %v", err)
}
// Test permissions
perms := []string{"pubsub.subscriptions.consume", "pubsub.subscriptions.get"}
allowed, err := sub.IAM().TestPermissions(ctx, perms)
if err != nil {
log.Fatalf("Failed to test permissions: %v", err)
}func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIteratorReturns an iterator for listing all subscriptions in the project.
type SubscriptionIterator struct {
// Has unexported fields
}
func (it *SubscriptionIterator) Next() (*Subscription, error)
func (it *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error)Iterator for listing subscriptions:
Next(): Returns next Subscription reference (returns iterator.Done when finished)NextConfig(): Returns next SubscriptionConfig with full configuration (EXPERIMENTAL)Example:
// List subscriptions with references
it := client.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to list subscriptions: %v", err)
}
fmt.Printf("Subscription: %s\n", sub.ID())
}
// List subscriptions with full configuration
it2 := client.Subscriptions(ctx)
for {
config, err := it2.NextConfig()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to list subscriptions: %v", err)
}
fmt.Printf("Subscription: %s, Topic: %s\n", config.ID(), config.Topic.ID())
}type DeadLetterPolicy struct {
DeadLetterTopic string
MaxDeliveryAttempts int
}Configuration for dead letter topics. Messages that cannot be delivered are sent to the dead letter topic.
Fields:
DeadLetterTopic: Full topic name (format: projects/PROJECT/topics/TOPIC)MaxDeliveryAttempts: Maximum delivery attempts before dead lettering (5-100)Example:
sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 20 * time.Second,
DeadLetterPolicy: &pubsub.DeadLetterPolicy{
DeadLetterTopic: "projects/my-project/topics/dead-letters",
MaxDeliveryAttempts: 10,
},
})type RetryPolicy struct {
MinimumBackoff optional.Duration
MaximumBackoff optional.Duration
}Retry policy for message delivery. Controls exponential backoff between delivery retries.
Fields:
MinimumBackoff: Minimum wait before retry (default: 10s)MaximumBackoff: Maximum wait before retry (default: 600s)Example:
sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 20 * time.Second,
RetryPolicy: &pubsub.RetryPolicy{
MinimumBackoff: optional.Duration(5 * time.Second),
MaximumBackoff: optional.Duration(300 * time.Second),
},
})Subscriptions can filter messages using the Cloud Pub/Sub filter language. Filters are immutable after subscription creation.
Example:
// Create subscription with filter
sub, err := client.CreateSubscription(ctx, "filtered-sub", pubsub.SubscriptionConfig{
Topic: topic,
Filter: `attributes.region = "us-east1" AND attributes.priority = "high"`,
})
// Filter syntax examples:
// - attributes.foo = "bar"
// - attributes.foo = "bar" AND attributes.baz = "qux"
// - attributes.foo = "bar" OR attributes.baz = "qux"
// - attributes.foo IN ("bar", "baz", "qux")
// - hasPrefix(attributes.foo, "bar")EnableMessageOrdering boolEnable ordered message delivery using ordering keys. When enabled, messages with the same ordering key are delivered in the order they were published.
Example:
sub, err := client.CreateSubscription(ctx, "ordered-sub", pubsub.SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
})EnableExactlyOnceDelivery boolEnables exactly-once delivery guarantees:
Note: Subscribers may still receive multiple copies if messages were published multiple times.
Example:
sub, err := client.CreateSubscription(ctx, "exactly-once-sub", pubsub.SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: true,
})func (c *Client) Subscription(id string) *Subscription
func (c *Client) SubscriptionInProject(id, projectID string) *SubscriptionCreates references to subscriptions:
Subscription(): Reference to subscription in client's projectSubscriptionInProject(): Reference to subscription in specific projectExample:
// Subscription in client's project
sub := client.Subscription("my-sub")
// Subscription in different project
crossProjectSub := client.SubscriptionInProject("my-sub", "other-project")See the Snapshots and Seeking documentation for details on:
CreateSnapshot(): Create a snapshot from subscriptionSeekToTime(): Seek to a specific timestampSeekToSnapshot(): Seek to a snapshot