CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/golang-cloud-google-com--go--pubsub

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Google Cloud Pub/Sub for Go

A comprehensive Go client library for Google Cloud Pub/Sub, a fully-managed real-time messaging service that enables asynchronous communication between applications through a publish-subscribe pattern. This library provides high-level, idiomatic Go APIs that abstract the underlying gRPC calls, offering easy-to-use interfaces for publishing messages to topics and receiving messages from subscriptions with automatic batching, flow control, and message lifecycle management.

Package Information

  • Package: cloud.google.com/go/pubsub
  • Language: Go
  • Installation: go get cloud.google.com/go/pubsub@v1.50.1
  • Module: cloud.google.com/go/pubsub

Core Imports

import (
    "cloud.google.com/go/pubsub"
    "context"
)

For testing:

import "cloud.google.com/go/pubsub/pstest"

For low-level gRPC access:

import "cloud.google.com/go/pubsub/apiv1"

Basic Usage

package main

import (
    "context"
    "fmt"
    "log"

    "cloud.google.com/go/pubsub"
)

func main() {
    ctx := context.Background()

    // Create a Pub/Sub client
    client, err := pubsub.NewClient(ctx, "my-project-id")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create a topic
    topic, err := client.CreateTopic(ctx, "my-topic")
    if err != nil {
        log.Fatal(err)
    }

    // Publish a message
    result := topic.Publish(ctx, &pubsub.Message{
        Data: []byte("Hello, Pub/Sub!"),
        Attributes: map[string]string{
            "origin": "example",
        },
    })

    // Block until the message is sent
    msgID, err := result.Get(ctx)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Published message with ID: %s\n", msgID)

    // Create a subscription
    sub, err := client.CreateSubscription(ctx, "my-subscription", pubsub.SubscriptionConfig{
        Topic: topic,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Receive messages
    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        fmt.Printf("Received message: %s\n", msg.Data)
        msg.Ack() // Acknowledge the message
    })
    if err != nil {
        log.Fatal(err)
    }
}

Architecture

The library is organized into several key components:

  • Client: Main entry point for interacting with Pub/Sub, scoped to a single project
  • Topic: Represents a named resource to which messages are published
  • Subscription: Represents a named resource for receiving messages from a topic
  • Message: Encapsulates message data, attributes, and acknowledgment controls
  • Publisher: Handles automatic message batching and asynchronous publishing
  • Subscriber: Manages streaming pull connections with automatic ack deadline extensions

The library provides three distinct API levels:

  1. High-level idiomatic API (pubsub package): Recommended for most use cases with automatic batching and flow control
  2. Low-level gRPC API (pubsub/apiv1 package): Direct access to Pub/Sub RPCs for advanced scenarios
  3. Testing utilities (pubsub/pstest package): In-memory fake server for unit testing

Capabilities

Client Management

Create and manage Pub/Sub clients for accessing topics and subscriptions.

func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error)

type Client struct {
    // Has unexported fields
}

func (c *Client) Close() error
func (c *Client) Project() string
func (c *Client) Topic(id string) *Topic
func (c *Client) TopicInProject(id, projectID string) *Topic
func (c *Client) Subscription(id string) *Subscription
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription
func (c *Client) Topics(ctx context.Context) *TopicIterator
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error)
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error)
func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)

Client Management

Publishing Messages

Publish messages to topics with automatic batching, flow control, and ordering support.

type Topic struct {
    PublishSettings    PublishSettings
    EnableMessageOrdering bool
    // Has unexported fields
}

func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
func (t *Topic) Stop()
func (t *Topic) Flush()

type PublishSettings struct {
    DelayThreshold    time.Duration
    CountThreshold    int
    ByteThreshold     int
    Timeout           time.Duration
    BundleByteLimit   int
    BundleCountLimit  int
    EnableCompression bool
    CompressionBytesThreshold int
}

type PublishResult struct {
    // Has unexported fields
}

func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)
func (r *PublishResult) Ready() <-chan struct{}

Publishing

Receiving Messages

Receive messages from subscriptions with configurable concurrency, flow control, and automatic ack deadline management.

type Subscription struct {
    ReceiveSettings ReceiveSettings
    // Has unexported fields
}

func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error

type ReceiveSettings struct {
    MaxOutstandingMessages int
    MaxOutstandingBytes    int
    MaxExtension           time.Duration
    MaxExtensionPeriod     time.Duration
    MinExtensionPeriod     time.Duration
    NumGoroutines          int
    Synchronous            bool
    UseLegacyFlowControl   bool
    ExactlyOnceDelivery    bool
}

type Message struct {
    Data            []byte
    Attributes      map[string]string
    ID              string
    PublishTime     time.Time
    OrderingKey     string
    DeliveryAttempt *int
}

func (m *Message) Ack()
func (m *Message) Nack()
func (m *Message) AckWithResult() *AckResult
func (m *Message) NackWithResult() *AckResult

Receiving

Topic Management

Create, configure, update, and delete topics with support for message retention, schema validation, and ingestion from external sources.

func (t *Topic) ID() string
func (t *Topic) String() string
func (t *Topic) Exists(ctx context.Context) (bool, error)
func (t *Topic) Delete(ctx context.Context) error
func (t *Topic) Config(ctx context.Context) (TopicConfig, error)
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator

type TopicConfig struct {
    Labels                   map[string]string
    MessageStoragePolicy     MessageStoragePolicy
    KMSKeyName              string
    SchemaSettings          *SchemaSettings
    RetentionDuration       time.Duration
    State                   TopicState
    IngestionDataSourceSettings *IngestionDataSourceSettings
    MessageTransforms       []MessageTransform
}

type TopicConfigToUpdate struct {
    Labels                   map[string]string
    MessageStoragePolicy     *MessageStoragePolicy
    RetentionDuration       optional.Duration
    SchemaSettings          *SchemaSettings
    IngestionDataSourceSettings *IngestionDataSourceSettings
    MessageTransforms       []MessageTransform
}

Topic Management

Subscription Management

Create, configure, update, and delete subscriptions with support for push/pull delivery, dead letter topics, message filtering, and export to BigQuery or Cloud Storage.

func (s *Subscription) ID() string
func (s *Subscription) String() string
func (s *Subscription) Exists(ctx context.Context) (bool, error)
func (s *Subscription) Delete(ctx context.Context) error
func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)
func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)

type SubscriptionConfig struct {
    Topic                           *Topic
    PushConfig                      PushConfig
    BigQueryConfig                  BigQueryConfig
    CloudStorageConfig              CloudStorageConfig
    AckDeadline                     time.Duration
    RetainAckedMessages             bool
    RetentionDuration               time.Duration
    ExpirationPolicy                optional.Duration
    Labels                          map[string]string
    EnableMessageOrdering           bool
    DeadLetterPolicy                *DeadLetterPolicy
    Filter                          string
    RetryPolicy                     *RetryPolicy
    Detached                        bool
    TopicMessageRetentionDuration   time.Duration
    State                           SubscriptionState
    EnableExactlyOnceDelivery       bool
    MessageTransforms               []MessageTransform
}

type SubscriptionConfigToUpdate struct {
    PushConfig                *PushConfig
    BigQueryConfig            *BigQueryConfig
    CloudStorageConfig        *CloudStorageConfig
    AckDeadline               time.Duration
    RetainAckedMessages       bool
    RetentionDuration         time.Duration
    ExpirationPolicy          optional.Duration
    DeadLetterPolicy          *DeadLetterPolicy
    Labels                    map[string]string
    RetryPolicy               *RetryPolicy
    EnableExactlyOnceDelivery bool
    MessageTransforms         []MessageTransform
}

Subscription Management

Schema Management

Define and manage message schemas for Protocol Buffer or Avro validation.

func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error)

type SchemaClient struct {
    // Has unexported fields
}

func (c *SchemaClient) Close() error

type SchemaConfig struct {
    Name               string
    Type               SchemaType
    Definition         string
    RevisionID         string
    RevisionCreateTime time.Time
}

type SchemaSettings struct {
    Schema          string
    Encoding        SchemaEncoding
    FirstRevisionID string
    LastRevisionID  string
}

type SchemaType int
const (
    SchemaTypeUnspecified SchemaType = iota
    SchemaProtocolBuffer
    SchemaAvro
)

type SchemaEncoding int

Schemas

Snapshots and Seeking

Create snapshots of subscription state and seek to specific points in time or snapshots.

func (c *Client) Snapshot(id string) *Snapshot
func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator

func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error
func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error

type Snapshot struct {
    // Has unexported fields
}

func (s *Snapshot) ID() string
func (s *Snapshot) Delete(ctx context.Context) error
func (s *Snapshot) SetLabels(ctx context.Context, labels map[string]string) error

type SnapshotConfig struct {
    Snapshot   *Snapshot
    Topic      Topic
    Expiration time.Time
    Labels     map[string]string
}

Snapshots

Delivery Configurations

Configure push delivery to HTTP endpoints, export to BigQuery, or write to Cloud Storage.

type PushConfig struct {
    Endpoint             string
    Attributes           map[string]string
    AuthenticationMethod AuthenticationMethod
    Wrapper              Wrapper
}

type OIDCToken struct {
    ServiceAccountEmail string
    Audience            string
}

type BigQueryConfig struct {
    Table                string
    UseTopicSchema       bool
    WriteMetadata        bool
    DropUnknownFields    bool
    State                BigQueryConfigState
    UseTableSchema       bool
    ServiceAccountEmail  string
}

type CloudStorageConfig struct {
    Bucket                 string
    FilenamePrefix         string
    FilenameSuffix         string
    FilenameDatetimeFormat string
    TextConfig             *CloudStorageOutputFormatTextConfig
    AvroConfig             *CloudStorageOutputFormatAvroConfig
    MaxDuration            time.Duration
    MaxBytes               int64
    MaxMessages            int64
    State                  CloudStorageConfigState
    ServiceAccountEmail    string
}

Delivery Configurations

Data Ingestion

Ingest messages from external sources including AWS Kinesis, Amazon MSK, Azure Event Hubs, Confluent Cloud, and Cloud Storage.

type IngestionDataSourceSettings struct {
    Source               IngestionDataSource
    PlatformLogsSettings *PlatformLogsSettings
}

type IngestionDataSourceCloudStorage struct {
    State                   CloudStorageIngestionState
    Bucket                  string
    MinimumObjectCreateTime time.Time
    MatchGlob               string
    TextFormat              *IngestionDataSourceCloudStorageTextFormat
    AvroFormat              *IngestionDataSourceCloudStorageAvroFormat
    PubSubAvroFormat        *IngestionDataSourceCloudStoragePubSubAvroFormat
}

type IngestionDataSourceAWSKinesis struct {
    State             AWSKinesisState
    StreamARN         string
    ConsumerARN       string
    AWSRoleArn        string
    GcpServiceAccount string
}

type IngestionDataSourceAmazonMSK struct {
    State            AmazonMSKState
    ClusterArn       string
    TopicName        string
    BootstrapServers []string
    Authentication   AmazonMSKAuthentication
}

type IngestionDataSourceAzureEventHubs struct {
    State                ResourceState
    ResourceGroup        string
    Namespace            string
    EventHubName         string
    ConsumerGroup        string
    AuthenticationMethod AzureEventHubsAuthenticationMethod
}

type IngestionDataSourceConfluentCloud struct {
    State            ConfluentCloudState
    BootstrapServers []string
    TopicName        string
    Authentication   ConfluentCloudAuthentication
}

Data Ingestion

Message Transforms

Apply JavaScript-based transformations to messages before delivery.

type MessageTransform struct {
    Transform Transform
}

type Transform interface {
    // Has unexported methods
}

type JavaScriptUDF struct {
    FunctionName string
    Code         string
}

Message Transforms

IAM and Access Control

Manage Identity and Access Management policies for topics and subscriptions.

func (t *Topic) IAM() *iam.Handle
func (s *Subscription) IAM() *iam.Handle

The IAM handle provides methods for getting, setting, and testing IAM policies using the standard cloud.google.com/go/iam package.

Testing Utilities

In-memory fake Pub/Sub server for unit testing without connecting to the actual service.

func NewServer(opts ...ServerOption) *Server
func NewServerWithPort(port int, opts ...ServerOption) (*Server, error)

type Server struct {
    Addr    string
    GServer *GServer
}

func (s *Server) Close() error
func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string
func (s *Server) Messages() []*Message
func (s *Server) ClearMessages()

func ValidateFilter(filter string) error

Testing

Low-Level API

Direct access to Pub/Sub gRPC APIs for advanced use cases.

import "cloud.google.com/go/pubsub/apiv1"

func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error)
func NewSubscriberClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error)
func NewSchemaClient(ctx context.Context, opts ...option.ClientOption) (*SchemaClient, error)

Low-Level API

Constants and Configuration

Authentication Scopes

const (
    ScopePubSub        = "https://www.googleapis.com/auth/pubsub"
    ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
)

Default Settings

var DefaultPublishSettings = PublishSettings{
    DelayThreshold:  1 * time.Millisecond,
    CountThreshold:  100,
    ByteThreshold:   1e6,
    Timeout:         60 * time.Second,
}

var DefaultReceiveSettings = ReceiveSettings{
    MaxOutstandingMessages: 1000,
    MaxOutstandingBytes:    1e9,
    MaxExtension:           60 * time.Minute,
    MaxExtensionPeriod:     10 * time.Minute,
    NumGoroutines:          10,
}

Error Variables

var (
    ErrEmptyProjectID                      = errors.New("pubsub: project ID is empty")
    ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded")
    ErrFlowControllerMaxOutstandingBytes    = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded")
    ErrOversizedMessage                     error // Message exceeds the 10 MB size limit
    ErrTopicStopped                         = errors.New("pubsub: Topic.Stop has been called")
)

Project ID Detection

const DetectProjectID = "*detect-project-id*"

Sentinel value for automatic project ID detection from credentials.

State Enumerations

The library provides various state enumerations for monitoring resource health:

  • TopicState: TopicStateUnspecified, TopicStateActive, TopicStateIngestionResourceError
  • SubscriptionState: SubscriptionStateUnspecified, SubscriptionStateActive, SubscriptionStateResourceError
  • BigQueryConfigState: BigQueryConfigStateUnspecified, BigQueryConfigActive, BigQueryConfigPermissionDenied, BigQueryConfigNotFound, BigQueryConfigSchemaMismatch
  • CloudStorageConfigState: CloudStorageConfigStateUnspecified, CloudStorageConfigActive, CloudStorageConfigPermissionDenied, CloudStorageConfigNotFound
  • AWSKinesisState: Multiple states for Kinesis ingestion health
  • EventHubsState: Multiple states for Event Hubs ingestion health
  • AmazonMSKState: Multiple states for Amazon MSK ingestion health
  • ConfluentCloudState: Multiple states for Confluent Cloud ingestion health

OpenTelemetry and OpenCensus Integration

The library provides optional integration with OpenTelemetry for distributed tracing and OpenCensus for metrics collection.

type ClientConfig struct {
    PublisherCallOptions          *vkit.PublisherCallOptions
    SubscriberCallOptions         *vkit.SubscriberCallOptions
    EnableOpenTelemetryTracing    bool
}

Multiple OpenCensus metrics are available including published message counts, publish latency, pull counts, ack/nack counts, and outstanding message/byte gauges.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1
Publish Source
CLI
Badge
tessl/golang-cloud-google-com--go--pubsub badge