or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

index.mddocs/

Google Cloud Pub/Sub for Go

A comprehensive Go client library for Google Cloud Pub/Sub, a fully-managed real-time messaging service that enables asynchronous communication between applications through a publish-subscribe pattern. This library provides high-level, idiomatic Go APIs that abstract the underlying gRPC calls, offering easy-to-use interfaces for publishing messages to topics and receiving messages from subscriptions with automatic batching, flow control, and message lifecycle management.

Package Information

  • Package: cloud.google.com/go/pubsub
  • Language: Go
  • Installation: go get cloud.google.com/go/pubsub@v1.50.1
  • Module: cloud.google.com/go/pubsub

Core Imports

import (
    "cloud.google.com/go/pubsub"
    "context"
)

For testing:

import "cloud.google.com/go/pubsub/pstest"

For low-level gRPC access:

import "cloud.google.com/go/pubsub/apiv1"

Basic Usage

package main

import (
    "context"
    "fmt"
    "log"

    "cloud.google.com/go/pubsub"
)

func main() {
    ctx := context.Background()

    // Create a Pub/Sub client
    client, err := pubsub.NewClient(ctx, "my-project-id")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create a topic
    topic, err := client.CreateTopic(ctx, "my-topic")
    if err != nil {
        log.Fatal(err)
    }

    // Publish a message
    result := topic.Publish(ctx, &pubsub.Message{
        Data: []byte("Hello, Pub/Sub!"),
        Attributes: map[string]string{
            "origin": "example",
        },
    })

    // Block until the message is sent
    msgID, err := result.Get(ctx)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Published message with ID: %s\n", msgID)

    // Create a subscription
    sub, err := client.CreateSubscription(ctx, "my-subscription", pubsub.SubscriptionConfig{
        Topic: topic,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Receive messages
    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        fmt.Printf("Received message: %s\n", msg.Data)
        msg.Ack() // Acknowledge the message
    })
    if err != nil {
        log.Fatal(err)
    }
}

Architecture

The library is organized into several key components:

  • Client: Main entry point for interacting with Pub/Sub, scoped to a single project
  • Topic: Represents a named resource to which messages are published
  • Subscription: Represents a named resource for receiving messages from a topic
  • Message: Encapsulates message data, attributes, and acknowledgment controls
  • Publisher: Handles automatic message batching and asynchronous publishing
  • Subscriber: Manages streaming pull connections with automatic ack deadline extensions

The library provides three distinct API levels:

  1. High-level idiomatic API (pubsub package): Recommended for most use cases with automatic batching and flow control
  2. Low-level gRPC API (pubsub/apiv1 package): Direct access to Pub/Sub RPCs for advanced scenarios
  3. Testing utilities (pubsub/pstest package): In-memory fake server for unit testing

Capabilities

Client Management

Create and manage Pub/Sub clients for accessing topics and subscriptions.

func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error)

type Client struct {
    // Has unexported fields
}

func (c *Client) Close() error
func (c *Client) Project() string
func (c *Client) Topic(id string) *Topic
func (c *Client) TopicInProject(id, projectID string) *Topic
func (c *Client) Subscription(id string) *Subscription
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription
func (c *Client) Topics(ctx context.Context) *TopicIterator
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error)
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error)
func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)

Client Management

Publishing Messages

Publish messages to topics with automatic batching, flow control, and ordering support.

type Topic struct {
    PublishSettings    PublishSettings
    EnableMessageOrdering bool
    // Has unexported fields
}

func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
func (t *Topic) Stop()
func (t *Topic) Flush()

type PublishSettings struct {
    DelayThreshold    time.Duration
    CountThreshold    int
    ByteThreshold     int
    Timeout           time.Duration
    BundleByteLimit   int
    BundleCountLimit  int
    EnableCompression bool
    CompressionBytesThreshold int
}

type PublishResult struct {
    // Has unexported fields
}

func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)
func (r *PublishResult) Ready() <-chan struct{}

Publishing

Receiving Messages

Receive messages from subscriptions with configurable concurrency, flow control, and automatic ack deadline management.

type Subscription struct {
    ReceiveSettings ReceiveSettings
    // Has unexported fields
}

func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error

type ReceiveSettings struct {
    MaxOutstandingMessages int
    MaxOutstandingBytes    int
    MaxExtension           time.Duration
    MaxExtensionPeriod     time.Duration
    MinExtensionPeriod     time.Duration
    NumGoroutines          int
    Synchronous            bool
    UseLegacyFlowControl   bool
    ExactlyOnceDelivery    bool
}

type Message struct {
    Data            []byte
    Attributes      map[string]string
    ID              string
    PublishTime     time.Time
    OrderingKey     string
    DeliveryAttempt *int
}

func (m *Message) Ack()
func (m *Message) Nack()
func (m *Message) AckWithResult() *AckResult
func (m *Message) NackWithResult() *AckResult

Receiving

Topic Management

Create, configure, update, and delete topics with support for message retention, schema validation, and ingestion from external sources.

func (t *Topic) ID() string
func (t *Topic) String() string
func (t *Topic) Exists(ctx context.Context) (bool, error)
func (t *Topic) Delete(ctx context.Context) error
func (t *Topic) Config(ctx context.Context) (TopicConfig, error)
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator

type TopicConfig struct {
    Labels                   map[string]string
    MessageStoragePolicy     MessageStoragePolicy
    KMSKeyName              string
    SchemaSettings          *SchemaSettings
    RetentionDuration       time.Duration
    State                   TopicState
    IngestionDataSourceSettings *IngestionDataSourceSettings
    MessageTransforms       []MessageTransform
}

type TopicConfigToUpdate struct {
    Labels                   map[string]string
    MessageStoragePolicy     *MessageStoragePolicy
    RetentionDuration       optional.Duration
    SchemaSettings          *SchemaSettings
    IngestionDataSourceSettings *IngestionDataSourceSettings
    MessageTransforms       []MessageTransform
}

Topic Management

Subscription Management

Create, configure, update, and delete subscriptions with support for push/pull delivery, dead letter topics, message filtering, and export to BigQuery or Cloud Storage.

func (s *Subscription) ID() string
func (s *Subscription) String() string
func (s *Subscription) Exists(ctx context.Context) (bool, error)
func (s *Subscription) Delete(ctx context.Context) error
func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)
func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)

type SubscriptionConfig struct {
    Topic                           *Topic
    PushConfig                      PushConfig
    BigQueryConfig                  BigQueryConfig
    CloudStorageConfig              CloudStorageConfig
    AckDeadline                     time.Duration
    RetainAckedMessages             bool
    RetentionDuration               time.Duration
    ExpirationPolicy                optional.Duration
    Labels                          map[string]string
    EnableMessageOrdering           bool
    DeadLetterPolicy                *DeadLetterPolicy
    Filter                          string
    RetryPolicy                     *RetryPolicy
    Detached                        bool
    TopicMessageRetentionDuration   time.Duration
    State                           SubscriptionState
    EnableExactlyOnceDelivery       bool
    MessageTransforms               []MessageTransform
}

type SubscriptionConfigToUpdate struct {
    PushConfig                *PushConfig
    BigQueryConfig            *BigQueryConfig
    CloudStorageConfig        *CloudStorageConfig
    AckDeadline               time.Duration
    RetainAckedMessages       bool
    RetentionDuration         time.Duration
    ExpirationPolicy          optional.Duration
    DeadLetterPolicy          *DeadLetterPolicy
    Labels                    map[string]string
    RetryPolicy               *RetryPolicy
    EnableExactlyOnceDelivery bool
    MessageTransforms         []MessageTransform
}

Subscription Management

Schema Management

Define and manage message schemas for Protocol Buffer or Avro validation.

func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error)

type SchemaClient struct {
    // Has unexported fields
}

func (c *SchemaClient) Close() error

type SchemaConfig struct {
    Name               string
    Type               SchemaType
    Definition         string
    RevisionID         string
    RevisionCreateTime time.Time
}

type SchemaSettings struct {
    Schema          string
    Encoding        SchemaEncoding
    FirstRevisionID string
    LastRevisionID  string
}

type SchemaType int
const (
    SchemaTypeUnspecified SchemaType = iota
    SchemaProtocolBuffer
    SchemaAvro
)

type SchemaEncoding int

Schemas

Snapshots and Seeking

Create snapshots of subscription state and seek to specific points in time or snapshots.

func (c *Client) Snapshot(id string) *Snapshot
func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator

func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error
func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error

type Snapshot struct {
    // Has unexported fields
}

func (s *Snapshot) ID() string
func (s *Snapshot) Delete(ctx context.Context) error
func (s *Snapshot) SetLabels(ctx context.Context, labels map[string]string) error

type SnapshotConfig struct {
    Snapshot   *Snapshot
    Topic      Topic
    Expiration time.Time
    Labels     map[string]string
}

Snapshots

Delivery Configurations

Configure push delivery to HTTP endpoints, export to BigQuery, or write to Cloud Storage.

type PushConfig struct {
    Endpoint             string
    Attributes           map[string]string
    AuthenticationMethod AuthenticationMethod
    Wrapper              Wrapper
}

type OIDCToken struct {
    ServiceAccountEmail string
    Audience            string
}

type BigQueryConfig struct {
    Table                string
    UseTopicSchema       bool
    WriteMetadata        bool
    DropUnknownFields    bool
    State                BigQueryConfigState
    UseTableSchema       bool
    ServiceAccountEmail  string
}

type CloudStorageConfig struct {
    Bucket                 string
    FilenamePrefix         string
    FilenameSuffix         string
    FilenameDatetimeFormat string
    TextConfig             *CloudStorageOutputFormatTextConfig
    AvroConfig             *CloudStorageOutputFormatAvroConfig
    MaxDuration            time.Duration
    MaxBytes               int64
    MaxMessages            int64
    State                  CloudStorageConfigState
    ServiceAccountEmail    string
}

Delivery Configurations

Data Ingestion

Ingest messages from external sources including AWS Kinesis, Amazon MSK, Azure Event Hubs, Confluent Cloud, and Cloud Storage.

type IngestionDataSourceSettings struct {
    Source               IngestionDataSource
    PlatformLogsSettings *PlatformLogsSettings
}

type IngestionDataSourceCloudStorage struct {
    State                   CloudStorageIngestionState
    Bucket                  string
    MinimumObjectCreateTime time.Time
    MatchGlob               string
    TextFormat              *IngestionDataSourceCloudStorageTextFormat
    AvroFormat              *IngestionDataSourceCloudStorageAvroFormat
    PubSubAvroFormat        *IngestionDataSourceCloudStoragePubSubAvroFormat
}

type IngestionDataSourceAWSKinesis struct {
    State             AWSKinesisState
    StreamARN         string
    ConsumerARN       string
    AWSRoleArn        string
    GcpServiceAccount string
}

type IngestionDataSourceAmazonMSK struct {
    State            AmazonMSKState
    ClusterArn       string
    TopicName        string
    BootstrapServers []string
    Authentication   AmazonMSKAuthentication
}

type IngestionDataSourceAzureEventHubs struct {
    State                ResourceState
    ResourceGroup        string
    Namespace            string
    EventHubName         string
    ConsumerGroup        string
    AuthenticationMethod AzureEventHubsAuthenticationMethod
}

type IngestionDataSourceConfluentCloud struct {
    State            ConfluentCloudState
    BootstrapServers []string
    TopicName        string
    Authentication   ConfluentCloudAuthentication
}

Data Ingestion

Message Transforms

Apply JavaScript-based transformations to messages before delivery.

type MessageTransform struct {
    Transform Transform
}

type Transform interface {
    // Has unexported methods
}

type JavaScriptUDF struct {
    FunctionName string
    Code         string
}

Message Transforms

IAM and Access Control

Manage Identity and Access Management policies for topics and subscriptions.

func (t *Topic) IAM() *iam.Handle
func (s *Subscription) IAM() *iam.Handle

The IAM handle provides methods for getting, setting, and testing IAM policies using the standard cloud.google.com/go/iam package.

Testing Utilities

In-memory fake Pub/Sub server for unit testing without connecting to the actual service.

func NewServer(opts ...ServerOption) *Server
func NewServerWithPort(port int, opts ...ServerOption) (*Server, error)

type Server struct {
    Addr    string
    GServer *GServer
}

func (s *Server) Close() error
func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string
func (s *Server) Messages() []*Message
func (s *Server) ClearMessages()

func ValidateFilter(filter string) error

Testing

Low-Level API

Direct access to Pub/Sub gRPC APIs for advanced use cases.

import "cloud.google.com/go/pubsub/apiv1"

func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error)
func NewSubscriberClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error)
func NewSchemaClient(ctx context.Context, opts ...option.ClientOption) (*SchemaClient, error)

Low-Level API

Constants and Configuration

Authentication Scopes

const (
    ScopePubSub        = "https://www.googleapis.com/auth/pubsub"
    ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
)

Default Settings

var DefaultPublishSettings = PublishSettings{
    DelayThreshold:  1 * time.Millisecond,
    CountThreshold:  100,
    ByteThreshold:   1e6,
    Timeout:         60 * time.Second,
}

var DefaultReceiveSettings = ReceiveSettings{
    MaxOutstandingMessages: 1000,
    MaxOutstandingBytes:    1e9,
    MaxExtension:           60 * time.Minute,
    MaxExtensionPeriod:     10 * time.Minute,
    NumGoroutines:          10,
}

Error Variables

var (
    ErrEmptyProjectID                      = errors.New("pubsub: project ID is empty")
    ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")
    ErrFlowControllerMaxOutstandingBytes    = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
    ErrOversizedMessage                     error // Message exceeds the 10 MB size limit
    ErrTopicStopped                         = errors.New("pubsub: Topic.Stop has been called")
)

Project ID Detection

const DetectProjectID = "*detect-project-id*"

Sentinel value for automatic project ID detection from credentials.

State Enumerations

The library provides various state enumerations for monitoring resource health:

  • TopicState: TopicStateUnspecified, TopicStateActive, TopicStateIngestionResourceError
  • SubscriptionState: SubscriptionStateUnspecified, SubscriptionStateActive, SubscriptionStateResourceError
  • BigQueryConfigState: BigQueryConfigStateUnspecified, BigQueryConfigActive, BigQueryConfigPermissionDenied, BigQueryConfigNotFound, BigQueryConfigSchemaMismatch
  • CloudStorageConfigState: CloudStorageConfigStateUnspecified, CloudStorageConfigActive, CloudStorageConfigPermissionDenied, CloudStorageConfigNotFound
  • AWSKinesisState: Multiple states for Kinesis ingestion health
  • EventHubsState: Multiple states for Event Hubs ingestion health
  • AmazonMSKState: Multiple states for Amazon MSK ingestion health
  • ConfluentCloudState: Multiple states for Confluent Cloud ingestion health

OpenTelemetry and OpenCensus Integration

The library provides optional integration with OpenTelemetry for distributed tracing and OpenCensus for metrics collection.

type ClientConfig struct {
    PublisherCallOptions          *vkit.PublisherCallOptions
    SubscriberCallOptions         *vkit.SubscriberCallOptions
    EnableOpenTelemetryTracing    bool
}

Multiple OpenCensus metrics are available including published message counts, publish latency, pull counts, ack/nack counts, and outstanding message/byte gauges.