or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

low-level.mddocs/

Low-Level API

The apiv1 package provides low-level gRPC clients for direct access to Pub/Sub API RPCs. Most users should use the high-level cloud.google.com/go/pubsub package instead.

Package: cloud.google.com/go/pubsub/apiv1

Note: This is a low-level API providing direct access to protocol buffer operations. The high-level API in the parent package is recommended for most use cases.

Client Creation

PublisherClient

func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error)
func NewPublisherRESTClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error)

Creates a new publisher client using gRPC or REST.

Example:

import "cloud.google.com/go/pubsub/apiv1"

ctx := context.Background()
publisherClient, err := apiv1.NewPublisherClient(ctx)
if err != nil {
    log.Fatalf("Failed to create publisher client: %v", err)
}
defer publisherClient.Close()

SubscriberClient

func NewSubscriberClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error)
func NewSubscriberRESTClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error)

Creates a new subscriber client using gRPC or REST.

Example:

subscriberClient, err := apiv1.NewSubscriberClient(ctx)
if err != nil {
    log.Fatalf("Failed to create subscriber client: %v", err)
}
defer subscriberClient.Close()

SchemaClient

func NewSchemaClient(ctx context.Context, opts ...option.ClientOption) (*SchemaClient, error)
func NewSchemaRESTClient(ctx context.Context, opts ...option.ClientOption) (*SchemaClient, error)

Creates a new schema client using gRPC or REST.

Example:

schemaClient, err := apiv1.NewSchemaClient(ctx)
if err != nil {
    log.Fatalf("Failed to create schema client: %v", err)
}
defer schemaClient.Close()

PublisherClient

type PublisherClient struct {
    // Has unexported fields
}

Low-level gRPC client for publisher operations.

Connection Management

func (c *PublisherClient) Close() error
func (c *PublisherClient) Connection() *grpc.ClientConn
  • Close(): Closes the connection
  • Connection(): Returns the gRPC connection (deprecated)

Topic Operations

func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error)
func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error)
func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error)
func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error
func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator

Example:

import (
    "cloud.google.com/go/pubsub/apiv1"
    pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
)

// Create topic
topic, err := publisherClient.CreateTopic(ctx, &pb.Topic{
    Name: "projects/my-project/topics/my-topic",
    Labels: map[string]string{
        "env": "production",
    },
})
if err != nil {
    log.Fatalf("Failed to create topic: %v", err)
}

// Get topic
topic, err = publisherClient.GetTopic(ctx, &pb.GetTopicRequest{
    Topic: "projects/my-project/topics/my-topic",
})

// List topics
it := publisherClient.ListTopics(ctx, &pb.ListTopicsRequest{
    Project: "projects/my-project",
})
for {
    topic, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Topic: %s\n", topic.Name)
}

Publishing

func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error)

Publishes messages to a topic.

Example:

resp, err := publisherClient.Publish(ctx, &pb.PublishRequest{
    Topic: "projects/my-project/topics/my-topic",
    Messages: []*pb.PubsubMessage{
        {
            Data: []byte("message 1"),
            Attributes: map[string]string{
                "key": "value",
            },
        },
        {
            Data: []byte("message 2"),
        },
    },
})
if err != nil {
    log.Fatalf("Failed to publish: %v", err)
}
fmt.Printf("Published message IDs: %v\n", resp.MessageIds)

Topic Listings

func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator
func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator

Example:

// List subscriptions for a topic
it := publisherClient.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
    Topic: "projects/my-project/topics/my-topic",
})
for {
    sub, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Subscription: %s\n", sub)
}

Detach Subscription

func (c *PublisherClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error)

IAM Operations

func (c *PublisherClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error)
func (c *PublisherClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error)
func (c *PublisherClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error)

SubscriberClient

type SubscriberClient struct {
    // Has unexported fields
}

Low-level gRPC client for subscriber operations.

Subscription Operations

func (c *SubscriberClient) CreateSubscription(ctx context.Context, req *pubsubpb.Subscription, opts ...gax.CallOption) (*pubsubpb.Subscription, error)
func (c *SubscriberClient) GetSubscription(ctx context.Context, req *pubsubpb.GetSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error)
func (c *SubscriberClient) UpdateSubscription(ctx context.Context, req *pubsubpb.UpdateSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error)
func (c *SubscriberClient) DeleteSubscription(ctx context.Context, req *pubsubpb.DeleteSubscriptionRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ListSubscriptions(ctx context.Context, req *pubsubpb.ListSubscriptionsRequest, opts ...gax.CallOption) *SubscriptionIterator

Example:

// Create subscription
sub, err := subscriberClient.CreateSubscription(ctx, &pb.Subscription{
    Name:  "projects/my-project/subscriptions/my-sub",
    Topic: "projects/my-project/topics/my-topic",
    AckDeadlineSeconds: 10,
})
if err != nil {
    log.Fatalf("Failed to create subscription: %v", err)
}

// Get subscription
sub, err = subscriberClient.GetSubscription(ctx, &pb.GetSubscriptionRequest{
    Subscription: "projects/my-project/subscriptions/my-sub",
})

// Update subscription
sub, err = subscriberClient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{
    Subscription: &pb.Subscription{
        Name: "projects/my-project/subscriptions/my-sub",
        AckDeadlineSeconds: 20,
    },
    UpdateMask: &fieldmaskpb.FieldMask{
        Paths: []string{"ack_deadline_seconds"},
    },
})

Message Operations

func (c *SubscriberClient) Pull(ctx context.Context, req *pubsubpb.PullRequest, opts ...gax.CallOption) (*pubsubpb.PullResponse, error)
func (c *SubscriberClient) StreamingPull(ctx context.Context, opts ...gax.CallOption) (pubsubpb.Subscriber_StreamingPullClient, error)
func (c *SubscriberClient) Acknowledge(ctx context.Context, req *pubsubpb.AcknowledgeRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ModifyAckDeadline(ctx context.Context, req *pubsubpb.ModifyAckDeadlineRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ModifyPushConfig(ctx context.Context, req *pubsubpb.ModifyPushConfigRequest, opts ...gax.CallOption) error

Example:

// Pull messages
pullResp, err := subscriberClient.Pull(ctx, &pb.PullRequest{
    Subscription: "projects/my-project/subscriptions/my-sub",
    MaxMessages:  10,
})
if err != nil {
    log.Fatalf("Failed to pull: %v", err)
}

// Process and acknowledge
for _, msg := range pullResp.ReceivedMessages {
    fmt.Printf("Message: %s\n", msg.Message.Data)

    // Acknowledge message
    err := subscriberClient.Acknowledge(ctx, &pb.AcknowledgeRequest{
        Subscription: "projects/my-project/subscriptions/my-sub",
        AckIds:       []string{msg.AckId},
    })
    if err != nil {
        log.Printf("Failed to ack: %v", err)
    }
}

// Modify ack deadline
err = subscriberClient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
    Subscription:       "projects/my-project/subscriptions/my-sub",
    AckIds:             []string{ackId},
    AckDeadlineSeconds: 30,
})

Snapshot Operations

func (c *SubscriberClient) CreateSnapshot(ctx context.Context, req *pubsubpb.CreateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error)
func (c *SubscriberClient) GetSnapshot(ctx context.Context, req *pubsubpb.GetSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error)
func (c *SubscriberClient) UpdateSnapshot(ctx context.Context, req *pubsubpb.UpdateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error)
func (c *SubscriberClient) DeleteSnapshot(ctx context.Context, req *pubsubpb.DeleteSnapshotRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ListSnapshots(ctx context.Context, req *pubsubpb.ListSnapshotsRequest, opts ...gax.CallOption) *SnapshotIterator

Example:

// Create snapshot
snapshot, err := subscriberClient.CreateSnapshot(ctx, &pb.CreateSnapshotRequest{
    Name:         "projects/my-project/snapshots/my-snapshot",
    Subscription: "projects/my-project/subscriptions/my-sub",
})
if err != nil {
    log.Fatalf("Failed to create snapshot: %v", err)
}

// List snapshots
it := subscriberClient.ListSnapshots(ctx, &pb.ListSnapshotsRequest{
    Project: "projects/my-project",
})
for {
    snapshot, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Snapshot: %s\n", snapshot.Name)
}

Seek Operations

func (c *SubscriberClient) Seek(ctx context.Context, req *pubsubpb.SeekRequest, opts ...gax.CallOption) (*pubsubpb.SeekResponse, error)

Example:

import "google.golang.org/protobuf/types/known/timestamppb"

// Seek to time
_, err := subscriberClient.Seek(ctx, &pb.SeekRequest{
    Subscription: "projects/my-project/subscriptions/my-sub",
    Target: &pb.SeekRequest_Time{
        Time: timestamppb.New(time.Now().Add(-1 * time.Hour)),
    },
})

// Seek to snapshot
_, err = subscriberClient.Seek(ctx, &pb.SeekRequest{
    Subscription: "projects/my-project/subscriptions/my-sub",
    Target: &pb.SeekRequest_Snapshot{
        Snapshot: "projects/my-project/snapshots/my-snapshot",
    },
})

SchemaClient

type SchemaClient struct {
    // Has unexported fields
}

Low-level gRPC client for schema operations.

Schema Operations

func (c *SchemaClient) CreateSchema(ctx context.Context, req *pubsubpb.CreateSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) GetSchema(ctx context.Context, req *pubsubpb.GetSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) ListSchemas(ctx context.Context, req *pubsubpb.ListSchemasRequest, opts ...gax.CallOption) *SchemaIterator
func (c *SchemaClient) DeleteSchema(ctx context.Context, req *pubsubpb.DeleteSchemaRequest, opts ...gax.CallOption) error

Example:

// Create schema
schema, err := schemaClient.CreateSchema(ctx, &pb.CreateSchemaRequest{
    Parent:   "projects/my-project",
    SchemaId: "my-schema",
    Schema: &pb.Schema{
        Type: pb.Schema_AVRO,
        Definition: `{
            "type": "record",
            "name": "Message",
            "fields": [{"name": "text", "type": "string"}]
        }`,
    },
})
if err != nil {
    log.Fatalf("Failed to create schema: %v", err)
}

// Get schema
schema, err = schemaClient.GetSchema(ctx, &pb.GetSchemaRequest{
    Name: "projects/my-project/schemas/my-schema",
    View: pb.SchemaView_FULL,
})

Schema Revision Operations

func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, req *pubsubpb.ListSchemaRevisionsRequest, opts ...gax.CallOption) *SchemaIterator
func (c *SchemaClient) CommitSchema(ctx context.Context, req *pubsubpb.CommitSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) RollbackSchema(ctx context.Context, req *pubsubpb.RollbackSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, req *pubsubpb.DeleteSchemaRevisionRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)

Example:

// Commit new revision
newRevision, err := schemaClient.CommitSchema(ctx, &pb.CommitSchemaRequest{
    Name: "projects/my-project/schemas/my-schema",
    Schema: &pb.Schema{
        Type:       pb.Schema_AVRO,
        Definition: updatedDefinition,
    },
})

// List revisions
it := schemaClient.ListSchemaRevisions(ctx, &pb.ListSchemaRevisionsRequest{
    Name: "projects/my-project/schemas/my-schema",
    View: pb.SchemaView_FULL,
})

Schema Validation

func (c *SchemaClient) ValidateSchema(ctx context.Context, req *pubsubpb.ValidateSchemaRequest, opts ...gax.CallOption) (*pubsubpb.ValidateSchemaResponse, error)
func (c *SchemaClient) ValidateMessage(ctx context.Context, req *pubsubpb.ValidateMessageRequest, opts ...gax.CallOption) (*pubsubpb.ValidateMessageResponse, error)

Example:

// Validate schema
_, err := schemaClient.ValidateSchema(ctx, &pb.ValidateSchemaRequest{
    Parent: "projects/my-project",
    Schema: &pb.Schema{
        Type:       pb.Schema_AVRO,
        Definition: schemaDefinition,
    },
})

// Validate message
_, err = schemaClient.ValidateMessage(ctx, &pb.ValidateMessageRequest{
    Parent: "projects/my-project",
    SchemaSpec: &pb.ValidateMessageRequest_Name{
        Name: "projects/my-project/schemas/my-schema",
    },
    Message:  messageBytes,
    Encoding: pb.Encoding_JSON,
})

Iterators

TopicIterator

type TopicIterator struct {
    // Has unexported fields
}

func (it *TopicIterator) Next() (*pubsubpb.Topic, error)
func (it *TopicIterator) PageInfo() *iterator.PageInfo

SubscriptionIterator

type SubscriptionIterator struct {
    // Has unexported fields
}

func (it *SubscriptionIterator) Next() (*pubsubpb.Subscription, error)
func (it *SubscriptionIterator) PageInfo() *iterator.PageInfo

SnapshotIterator

type SnapshotIterator struct {
    // Has unexported fields
}

func (it *SnapshotIterator) Next() (*pubsubpb.Snapshot, error)
func (it *SnapshotIterator) PageInfo() *iterator.PageInfo

SchemaIterator

type SchemaIterator struct {
    // Has unexported fields
}

func (it *SchemaIterator) Next() (*pubsubpb.Schema, error)
func (it *PageInfo) *iterator.PageInfo

StringIterator

type StringIterator struct {
    // Has unexported fields
}

func (it *StringIterator) Next() (string, error)
func (it *StringIterator) PageInfo() *iterator.PageInfo

Call Options

PublisherCallOptions

type PublisherCallOptions struct {
    CreateTopic             []gax.CallOption
    UpdateTopic             []gax.CallOption
    Publish                 []gax.CallOption
    GetTopic                []gax.CallOption
    ListTopics              []gax.CallOption
    ListTopicSubscriptions  []gax.CallOption
    ListTopicSnapshots      []gax.CallOption
    DeleteTopic             []gax.CallOption
    DetachSubscription      []gax.CallOption
    GetIamPolicy            []gax.CallOption
    SetIamPolicy            []gax.CallOption
    TestIamPermissions      []gax.CallOption
}

SubscriberCallOptions

type SubscriberCallOptions struct {
    CreateSubscription  []gax.CallOption
    GetSubscription     []gax.CallOption
    UpdateSubscription  []gax.CallOption
    ListSubscriptions   []gax.CallOption
    DeleteSubscription  []gax.CallOption
    ModifyAckDeadline   []gax.CallOption
    Acknowledge         []gax.CallOption
    Pull                []gax.CallOption
    StreamingPull       []gax.CallOption
    ModifyPushConfig    []gax.CallOption
    GetSnapshot         []gax.CallOption
    ListSnapshots       []gax.CallOption
    CreateSnapshot      []gax.CallOption
    UpdateSnapshot      []gax.CallOption
    DeleteSnapshot      []gax.CallOption
    Seek                []gax.CallOption
    GetIamPolicy        []gax.CallOption
    SetIamPolicy        []gax.CallOption
    TestIamPermissions  []gax.CallOption
}

SchemaCallOptions

type SchemaCallOptions struct {
    CreateSchema          []gax.CallOption
    GetSchema             []gax.CallOption
    ListSchemas           []gax.CallOption
    ListSchemaRevisions   []gax.CallOption
    CommitSchema          []gax.CallOption
    RollbackSchema        []gax.CallOption
    DeleteSchemaRevision  []gax.CallOption
    DeleteSchema          []gax.CallOption
    ValidateSchema        []gax.CallOption
    ValidateMessage       []gax.CallOption
    GetIamPolicy          []gax.CallOption
    SetIamPolicy          []gax.CallOption
    TestIamPermissions    []gax.CallOption
}

Utility Functions

func DefaultAuthScopes() []string

Returns the default OAuth scopes for Pub/Sub API.

When to Use Low-Level API

Use the low-level API when you need:

  1. Direct control over protocol buffer messages
  2. Access to features not yet in high-level API
  3. Custom retry or timeout logic per operation
  4. Integration with custom gRPC interceptors
  5. Maximum performance with minimal abstraction

For most use cases, prefer the high-level cloud.google.com/go/pubsub package.

Example: Custom Retry Logic

import (
    "cloud.google.com/go/pubsub/apiv1"
    gax "github.com/googleapis/gax-go/v2"
    "google.golang.org/grpc/codes"
)

publisherClient, _ := apiv1.NewPublisherClient(ctx)

// Custom retry configuration
retryOpts := []gax.CallOption{
    gax.WithRetry(func() gax.Retryer {
        return gax.OnCodes([]codes.Code{
            codes.Unavailable,
            codes.DeadlineExceeded,
        }, gax.Backoff{
            Initial:    100 * time.Millisecond,
            Max:        10 * time.Second,
            Multiplier: 1.3,
        })
    }),
}

// Publish with custom retry
resp, err := publisherClient.Publish(ctx, &pb.PublishRequest{
    Topic:    "projects/my-project/topics/my-topic",
    Messages: messages,
}, retryOpts...)

Best Practices

  1. Prefer High-Level API: Use cloud.google.com/go/pubsub unless you specifically need low-level control
  2. Resource Cleanup: Always close clients with defer client.Close()
  3. Error Handling: Handle gRPC status codes appropriately
  4. Pagination: Use iterator PageInfo() for pagination control
  5. Field Masks: Use field masks in update operations to specify which fields to update
  6. Context: Pass appropriate contexts with timeouts
  7. Call Options: Use call options for custom retry/timeout behavior