tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
The apiv1 package provides low-level gRPC clients for direct access to Pub/Sub API RPCs. Most users should use the high-level cloud.google.com/go/pubsub package instead.
Package: cloud.google.com/go/pubsub/apiv1
Note: This is a low-level API providing direct access to protocol buffer operations. The high-level API in the parent package is recommended for most use cases.
func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error)
func NewPublisherRESTClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error)Creates a new publisher client using gRPC or REST.
Example:
import "cloud.google.com/go/pubsub/apiv1"
ctx := context.Background()
publisherClient, err := apiv1.NewPublisherClient(ctx)
if err != nil {
log.Fatalf("Failed to create publisher client: %v", err)
}
defer publisherClient.Close()func NewSubscriberClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error)
func NewSubscriberRESTClient(ctx context.Context, opts ...option.ClientOption) (*SubscriberClient, error)Creates a new subscriber client using gRPC or REST.
Example:
subscriberClient, err := apiv1.NewSubscriberClient(ctx)
if err != nil {
log.Fatalf("Failed to create subscriber client: %v", err)
}
defer subscriberClient.Close()func NewSchemaClient(ctx context.Context, opts ...option.ClientOption) (*SchemaClient, error)
func NewSchemaRESTClient(ctx context.Context, opts ...option.ClientOption) (*SchemaClient, error)Creates a new schema client using gRPC or REST.
Example:
schemaClient, err := apiv1.NewSchemaClient(ctx)
if err != nil {
log.Fatalf("Failed to create schema client: %v", err)
}
defer schemaClient.Close()type PublisherClient struct {
// Has unexported fields
}Low-level gRPC client for publisher operations.
func (c *PublisherClient) Close() error
func (c *PublisherClient) Connection() *grpc.ClientConnClose(): Closes the connectionConnection(): Returns the gRPC connection (deprecated)func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error)
func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error)
func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error)
func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error
func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIteratorExample:
import (
"cloud.google.com/go/pubsub/apiv1"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
)
// Create topic
topic, err := publisherClient.CreateTopic(ctx, &pb.Topic{
Name: "projects/my-project/topics/my-topic",
Labels: map[string]string{
"env": "production",
},
})
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
// Get topic
topic, err = publisherClient.GetTopic(ctx, &pb.GetTopicRequest{
Topic: "projects/my-project/topics/my-topic",
})
// List topics
it := publisherClient.ListTopics(ctx, &pb.ListTopicsRequest{
Project: "projects/my-project",
})
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Topic: %s\n", topic.Name)
}func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error)Publishes messages to a topic.
Example:
resp, err := publisherClient.Publish(ctx, &pb.PublishRequest{
Topic: "projects/my-project/topics/my-topic",
Messages: []*pb.PubsubMessage{
{
Data: []byte("message 1"),
Attributes: map[string]string{
"key": "value",
},
},
{
Data: []byte("message 2"),
},
},
})
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
fmt.Printf("Published message IDs: %v\n", resp.MessageIds)func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator
func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIteratorExample:
// List subscriptions for a topic
it := publisherClient.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
Topic: "projects/my-project/topics/my-topic",
})
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Subscription: %s\n", sub)
}func (c *PublisherClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error)func (c *PublisherClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error)
func (c *PublisherClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error)
func (c *PublisherClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error)type SubscriberClient struct {
// Has unexported fields
}Low-level gRPC client for subscriber operations.
func (c *SubscriberClient) CreateSubscription(ctx context.Context, req *pubsubpb.Subscription, opts ...gax.CallOption) (*pubsubpb.Subscription, error)
func (c *SubscriberClient) GetSubscription(ctx context.Context, req *pubsubpb.GetSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error)
func (c *SubscriberClient) UpdateSubscription(ctx context.Context, req *pubsubpb.UpdateSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.Subscription, error)
func (c *SubscriberClient) DeleteSubscription(ctx context.Context, req *pubsubpb.DeleteSubscriptionRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ListSubscriptions(ctx context.Context, req *pubsubpb.ListSubscriptionsRequest, opts ...gax.CallOption) *SubscriptionIteratorExample:
// Create subscription
sub, err := subscriberClient.CreateSubscription(ctx, &pb.Subscription{
Name: "projects/my-project/subscriptions/my-sub",
Topic: "projects/my-project/topics/my-topic",
AckDeadlineSeconds: 10,
})
if err != nil {
log.Fatalf("Failed to create subscription: %v", err)
}
// Get subscription
sub, err = subscriberClient.GetSubscription(ctx, &pb.GetSubscriptionRequest{
Subscription: "projects/my-project/subscriptions/my-sub",
})
// Update subscription
sub, err = subscriberClient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{
Subscription: &pb.Subscription{
Name: "projects/my-project/subscriptions/my-sub",
AckDeadlineSeconds: 20,
},
UpdateMask: &fieldmaskpb.FieldMask{
Paths: []string{"ack_deadline_seconds"},
},
})func (c *SubscriberClient) Pull(ctx context.Context, req *pubsubpb.PullRequest, opts ...gax.CallOption) (*pubsubpb.PullResponse, error)
func (c *SubscriberClient) StreamingPull(ctx context.Context, opts ...gax.CallOption) (pubsubpb.Subscriber_StreamingPullClient, error)
func (c *SubscriberClient) Acknowledge(ctx context.Context, req *pubsubpb.AcknowledgeRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ModifyAckDeadline(ctx context.Context, req *pubsubpb.ModifyAckDeadlineRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ModifyPushConfig(ctx context.Context, req *pubsubpb.ModifyPushConfigRequest, opts ...gax.CallOption) errorExample:
// Pull messages
pullResp, err := subscriberClient.Pull(ctx, &pb.PullRequest{
Subscription: "projects/my-project/subscriptions/my-sub",
MaxMessages: 10,
})
if err != nil {
log.Fatalf("Failed to pull: %v", err)
}
// Process and acknowledge
for _, msg := range pullResp.ReceivedMessages {
fmt.Printf("Message: %s\n", msg.Message.Data)
// Acknowledge message
err := subscriberClient.Acknowledge(ctx, &pb.AcknowledgeRequest{
Subscription: "projects/my-project/subscriptions/my-sub",
AckIds: []string{msg.AckId},
})
if err != nil {
log.Printf("Failed to ack: %v", err)
}
}
// Modify ack deadline
err = subscriberClient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
Subscription: "projects/my-project/subscriptions/my-sub",
AckIds: []string{ackId},
AckDeadlineSeconds: 30,
})func (c *SubscriberClient) CreateSnapshot(ctx context.Context, req *pubsubpb.CreateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error)
func (c *SubscriberClient) GetSnapshot(ctx context.Context, req *pubsubpb.GetSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error)
func (c *SubscriberClient) UpdateSnapshot(ctx context.Context, req *pubsubpb.UpdateSnapshotRequest, opts ...gax.CallOption) (*pubsubpb.Snapshot, error)
func (c *SubscriberClient) DeleteSnapshot(ctx context.Context, req *pubsubpb.DeleteSnapshotRequest, opts ...gax.CallOption) error
func (c *SubscriberClient) ListSnapshots(ctx context.Context, req *pubsubpb.ListSnapshotsRequest, opts ...gax.CallOption) *SnapshotIteratorExample:
// Create snapshot
snapshot, err := subscriberClient.CreateSnapshot(ctx, &pb.CreateSnapshotRequest{
Name: "projects/my-project/snapshots/my-snapshot",
Subscription: "projects/my-project/subscriptions/my-sub",
})
if err != nil {
log.Fatalf("Failed to create snapshot: %v", err)
}
// List snapshots
it := subscriberClient.ListSnapshots(ctx, &pb.ListSnapshotsRequest{
Project: "projects/my-project",
})
for {
snapshot, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Snapshot: %s\n", snapshot.Name)
}func (c *SubscriberClient) Seek(ctx context.Context, req *pubsubpb.SeekRequest, opts ...gax.CallOption) (*pubsubpb.SeekResponse, error)Example:
import "google.golang.org/protobuf/types/known/timestamppb"
// Seek to time
_, err := subscriberClient.Seek(ctx, &pb.SeekRequest{
Subscription: "projects/my-project/subscriptions/my-sub",
Target: &pb.SeekRequest_Time{
Time: timestamppb.New(time.Now().Add(-1 * time.Hour)),
},
})
// Seek to snapshot
_, err = subscriberClient.Seek(ctx, &pb.SeekRequest{
Subscription: "projects/my-project/subscriptions/my-sub",
Target: &pb.SeekRequest_Snapshot{
Snapshot: "projects/my-project/snapshots/my-snapshot",
},
})type SchemaClient struct {
// Has unexported fields
}Low-level gRPC client for schema operations.
func (c *SchemaClient) CreateSchema(ctx context.Context, req *pubsubpb.CreateSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) GetSchema(ctx context.Context, req *pubsubpb.GetSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) ListSchemas(ctx context.Context, req *pubsubpb.ListSchemasRequest, opts ...gax.CallOption) *SchemaIterator
func (c *SchemaClient) DeleteSchema(ctx context.Context, req *pubsubpb.DeleteSchemaRequest, opts ...gax.CallOption) errorExample:
// Create schema
schema, err := schemaClient.CreateSchema(ctx, &pb.CreateSchemaRequest{
Parent: "projects/my-project",
SchemaId: "my-schema",
Schema: &pb.Schema{
Type: pb.Schema_AVRO,
Definition: `{
"type": "record",
"name": "Message",
"fields": [{"name": "text", "type": "string"}]
}`,
},
})
if err != nil {
log.Fatalf("Failed to create schema: %v", err)
}
// Get schema
schema, err = schemaClient.GetSchema(ctx, &pb.GetSchemaRequest{
Name: "projects/my-project/schemas/my-schema",
View: pb.SchemaView_FULL,
})func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, req *pubsubpb.ListSchemaRevisionsRequest, opts ...gax.CallOption) *SchemaIterator
func (c *SchemaClient) CommitSchema(ctx context.Context, req *pubsubpb.CommitSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) RollbackSchema(ctx context.Context, req *pubsubpb.RollbackSchemaRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)
func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, req *pubsubpb.DeleteSchemaRevisionRequest, opts ...gax.CallOption) (*pubsubpb.Schema, error)Example:
// Commit new revision
newRevision, err := schemaClient.CommitSchema(ctx, &pb.CommitSchemaRequest{
Name: "projects/my-project/schemas/my-schema",
Schema: &pb.Schema{
Type: pb.Schema_AVRO,
Definition: updatedDefinition,
},
})
// List revisions
it := schemaClient.ListSchemaRevisions(ctx, &pb.ListSchemaRevisionsRequest{
Name: "projects/my-project/schemas/my-schema",
View: pb.SchemaView_FULL,
})func (c *SchemaClient) ValidateSchema(ctx context.Context, req *pubsubpb.ValidateSchemaRequest, opts ...gax.CallOption) (*pubsubpb.ValidateSchemaResponse, error)
func (c *SchemaClient) ValidateMessage(ctx context.Context, req *pubsubpb.ValidateMessageRequest, opts ...gax.CallOption) (*pubsubpb.ValidateMessageResponse, error)Example:
// Validate schema
_, err := schemaClient.ValidateSchema(ctx, &pb.ValidateSchemaRequest{
Parent: "projects/my-project",
Schema: &pb.Schema{
Type: pb.Schema_AVRO,
Definition: schemaDefinition,
},
})
// Validate message
_, err = schemaClient.ValidateMessage(ctx, &pb.ValidateMessageRequest{
Parent: "projects/my-project",
SchemaSpec: &pb.ValidateMessageRequest_Name{
Name: "projects/my-project/schemas/my-schema",
},
Message: messageBytes,
Encoding: pb.Encoding_JSON,
})type TopicIterator struct {
// Has unexported fields
}
func (it *TopicIterator) Next() (*pubsubpb.Topic, error)
func (it *TopicIterator) PageInfo() *iterator.PageInfotype SubscriptionIterator struct {
// Has unexported fields
}
func (it *SubscriptionIterator) Next() (*pubsubpb.Subscription, error)
func (it *SubscriptionIterator) PageInfo() *iterator.PageInfotype SnapshotIterator struct {
// Has unexported fields
}
func (it *SnapshotIterator) Next() (*pubsubpb.Snapshot, error)
func (it *SnapshotIterator) PageInfo() *iterator.PageInfotype SchemaIterator struct {
// Has unexported fields
}
func (it *SchemaIterator) Next() (*pubsubpb.Schema, error)
func (it *PageInfo) *iterator.PageInfotype StringIterator struct {
// Has unexported fields
}
func (it *StringIterator) Next() (string, error)
func (it *StringIterator) PageInfo() *iterator.PageInfotype PublisherCallOptions struct {
CreateTopic []gax.CallOption
UpdateTopic []gax.CallOption
Publish []gax.CallOption
GetTopic []gax.CallOption
ListTopics []gax.CallOption
ListTopicSubscriptions []gax.CallOption
ListTopicSnapshots []gax.CallOption
DeleteTopic []gax.CallOption
DetachSubscription []gax.CallOption
GetIamPolicy []gax.CallOption
SetIamPolicy []gax.CallOption
TestIamPermissions []gax.CallOption
}type SubscriberCallOptions struct {
CreateSubscription []gax.CallOption
GetSubscription []gax.CallOption
UpdateSubscription []gax.CallOption
ListSubscriptions []gax.CallOption
DeleteSubscription []gax.CallOption
ModifyAckDeadline []gax.CallOption
Acknowledge []gax.CallOption
Pull []gax.CallOption
StreamingPull []gax.CallOption
ModifyPushConfig []gax.CallOption
GetSnapshot []gax.CallOption
ListSnapshots []gax.CallOption
CreateSnapshot []gax.CallOption
UpdateSnapshot []gax.CallOption
DeleteSnapshot []gax.CallOption
Seek []gax.CallOption
GetIamPolicy []gax.CallOption
SetIamPolicy []gax.CallOption
TestIamPermissions []gax.CallOption
}type SchemaCallOptions struct {
CreateSchema []gax.CallOption
GetSchema []gax.CallOption
ListSchemas []gax.CallOption
ListSchemaRevisions []gax.CallOption
CommitSchema []gax.CallOption
RollbackSchema []gax.CallOption
DeleteSchemaRevision []gax.CallOption
DeleteSchema []gax.CallOption
ValidateSchema []gax.CallOption
ValidateMessage []gax.CallOption
GetIamPolicy []gax.CallOption
SetIamPolicy []gax.CallOption
TestIamPermissions []gax.CallOption
}func DefaultAuthScopes() []stringReturns the default OAuth scopes for Pub/Sub API.
Use the low-level API when you need:
For most use cases, prefer the high-level cloud.google.com/go/pubsub package.
import (
"cloud.google.com/go/pubsub/apiv1"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
)
publisherClient, _ := apiv1.NewPublisherClient(ctx)
// Custom retry configuration
retryOpts := []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable,
codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 10 * time.Second,
Multiplier: 1.3,
})
}),
}
// Publish with custom retry
resp, err := publisherClient.Publish(ctx, &pb.PublishRequest{
Topic: "projects/my-project/topics/my-topic",
Messages: messages,
}, retryOpts...)cloud.google.com/go/pubsub unless you specifically need low-level controldefer client.Close()PageInfo() for pagination control