tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
The pstest package provides a fake Pub/Sub server for testing. It implements a simplified form of the service suitable for unit tests.
Package: cloud.google.com/go/pubsub/pstest
Note: This package is EXPERIMENTAL and subject to change without notice.
type Server struct {
Addr string
GServer GServer
}
type GServer struct {
// Has unexported fields
}Fake Pub/Sub server for testing.
Fields:
Addr: Server address (host:port)GServer: Underlying service implementation (not intended for direct use)func NewServer(opts ...ServerReactorOption) *Server
func NewServerWithPort(port int, opts ...ServerReactorOption) *Server
func NewServerWithAddress(address string, opts ...ServerReactorOption) *Server
func NewServerWithCallback(port int, callback func(*grpc.Server), opts ...ServerReactorOption) *ServerFunctions for creating test servers:
NewServer(): Creates server at auto-assigned portNewServerWithPort(): Creates server at specific portNewServerWithAddress(): Creates server at specific address (host:port)NewServerWithCallback(): Creates server with gRPC server callback for registering additional fakesExample:
import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestPubSub(t *testing.T) {
ctx := context.Background()
// Start fake server
srv := pstest.NewServer()
defer srv.Close()
// Connect to fake server
conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Create client
client, err := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
if err != nil {
t.Fatal(err)
}
defer client.Close()
// Use client for testing
topic, err := client.CreateTopic(ctx, "test-topic")
if err != nil {
t.Fatal(err)
}
}func (s *Server) Close() error
func (s *Server) Wait()Close(): Shuts down the serverWait(): Blocks until all server activity completesfunc (s *Server) Publish(topic string, data []byte, attrs map[string]string) stringSimulates the Publish RPC with a message. Returns the message ID. Creates topic if it doesn't exist.
Parameters:
topic: Topic name (format: projects/*/topics/*)data: Message dataattrs: Message attributesReturns: Message ID
Example:
messageID := srv.Publish("projects/test-project/topics/test-topic",
[]byte("test message"),
map[string]string{"key": "value"})
fmt.Printf("Published message: %s\n", messageID)func (s *Server) PublishOrdered(topic string, data []byte, attrs map[string]string, orderingKey string) stringSimulates the Publish RPC with an ordered message (includes ordering key).
Example:
messageID := srv.PublishOrdered(
"projects/test-project/topics/test-topic",
[]byte("ordered message"),
map[string]string{"key": "value"},
"order-key-1")type Message struct {
ID string
Data []byte
Attributes map[string]string
PublishTime time.Time
Deliveries int
Acks int
Modacks []Modack
OrderingKey string
Topic string
}Information about a published message.
Fields:
ID: Message IDData: Message payloadAttributes: Message attributesPublishTime: Time message was publishedDeliveries: Number of delivery attemptsAcks: Number of acknowledgments receivedModacks: Acknowledgment deadline modificationsOrderingKey: Ordering keyTopic: Topic nametype Modack struct {
AckID string
AckDeadline int32
ReceivedAt time.Time
}Record of an acknowledgment deadline modification.
Fields:
AckID: Acknowledgment IDAckDeadline: New ack deadline in secondsReceivedAt: Time modification was receivedfunc (s *Server) Messages() []*Message
func (s *Server) Message(id string) *MessageMessages(): Returns all published messagesMessage(): Returns message by ID (nil if not found)Example:
// Publish messages
srv.Publish("projects/p/topics/t", []byte("msg1"), nil)
srv.Publish("projects/p/topics/t", []byte("msg2"), nil)
// Get all messages
messages := srv.Messages()
for _, msg := range messages {
fmt.Printf("Message %s: %s\n", msg.ID, msg.Data)
fmt.Printf(" Deliveries: %d, Acks: %d\n", msg.Deliveries, msg.Acks)
}
// Get specific message
msg := srv.Message(messageID)
if msg != nil {
fmt.Printf("Found message: %s\n", msg.Data)
}func (s *Server) ClearMessages()Removes all messages from the server.
Example:
// Clear messages between tests
srv.ClearMessages()func (s *Server) SetAutoPublishResponse(autoResp bool)Controls whether to automatically respond to publish requests or use user-added responses.
Example:
// Disable automatic responses
srv.SetAutoPublishResponse(false)func (s *Server) AddPublishResponse(pbr *pb.PublishResponse, err error)Adds a response to the publish response queue.
Example:
import pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
// Disable auto-response
srv.SetAutoPublishResponse(false)
// Queue specific responses
srv.AddPublishResponse(&pb.PublishResponse{
MessageIds: []string{"msg-1", "msg-2"},
}, nil)
// Queue error response
srv.AddPublishResponse(nil, errors.New("publish failed"))func (s *Server) ResetPublishResponses(size int)Resets the publish response channel with a new buffered channel of the given size.
Example:
srv.ResetPublishResponses(100)func (s *Server) SetStreamTimeout(d time.Duration)Sets the timeout for streaming pull connections. Mimics the real service's behavior of closing streams after 30 minutes. Pass zero to never shut down streams.
Example:
// Set 5 minute stream timeout
srv.SetStreamTimeout(5 * time.Minute)func (s *Server) SetTimeNowFunc(f func() time.Time)Overrides time.Now() for testing time-dependent behavior.
Example:
fixedTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
srv.SetTimeNowFunc(func() time.Time {
return fixedTime
})
// Publish will use fixed time
srv.Publish("projects/p/topics/t", []byte("msg"), nil)
messages := srv.Messages()
fmt.Printf("Publish time: %v\n", messages[0].PublishTime) // Uses fixedTimetype Reactor interface {
React(interface{}) (handled bool, ret interface{}, err error)
}Interface for custom request handlers. Allows injection of custom behavior for specific RPC calls.
Methods:
React(): Handles a request. Returns handled=false to fall through to next reactor or original handler.type ServerReactorOption struct {
FuncName string
Reactor Reactor
}Configuration option for server reactors.
Fields:
FuncName: Function name to interceptReactor: Reactor to handle the functionfunc WithErrorInjection(funcName string, code codes.Code, msg string) ServerReactorOptionCreates an option that injects errors into specific functions.
Example:
import (
"google.golang.org/grpc/codes"
)
// Create server that fails on Publish
srv := pstest.NewServer(
pstest.WithErrorInjection("Publish", codes.Internal, "simulated error"),
)
// Publish will fail
client, _ := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))
topic := client.Topic("test-topic")
result := topic.Publish(ctx, &pubsub.Message{Data: []byte("test")})
_, err := result.Get(ctx)
// err will be "simulated error"func ValidateFilter(filter string) errorValidates a subscription filter expression.
Example:
// Valid filter
err := pstest.ValidateFilter(`attributes.region = "us-east1"`)
if err != nil {
t.Errorf("Filter validation failed: %v", err)
}
// Invalid filter
err = pstest.ValidateFilter("invalid syntax")
if err == nil {
t.Error("Expected validation error")
}func ResetMinAckDeadline()
func SetMinAckDeadline(d time.Duration)Functions to configure minimum ack deadline:
ResetMinAckDeadline(): Resets to default (1 second)SetMinAckDeadline(): Sets custom minimumExample:
// Set custom minimum
pstest.SetMinAckDeadline(5 * time.Second)
// Reset to default
pstest.ResetMinAckDeadline()func TestPubSubWorkflow(t *testing.T) {
ctx := context.Background()
// Create fake server
srv := pstest.NewServer()
defer srv.Close()
// Create client connection
conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()
// Create Pub/Sub client
client, err := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
if err != nil {
t.Fatal(err)
}
defer client.Close()
// Create topic
topic, err := client.CreateTopic(ctx, "test-topic")
if err != nil {
t.Fatal(err)
}
defer topic.Stop()
// Create subscription
sub, err := client.CreateSubscription(ctx, "test-sub", pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
})
if err != nil {
t.Fatal(err)
}
// Publish messages
for i := 0; i < 5; i++ {
result := topic.Publish(ctx, &pubsub.Message{
Data: []byte(fmt.Sprintf("message-%d", i)),
Attributes: map[string]string{
"index": fmt.Sprintf("%d", i),
},
})
if _, err := result.Get(ctx); err != nil {
t.Fatalf("Failed to publish: %v", err)
}
}
// Verify messages in server
messages := srv.Messages()
if len(messages) != 5 {
t.Errorf("Expected 5 messages, got %d", len(messages))
}
// Receive messages
var received int
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
received++
msg.Ack()
})
if err != nil && err != context.DeadlineExceeded {
t.Fatalf("Receive error: %v", err)
}
if received != 5 {
t.Errorf("Expected to receive 5 messages, got %d", received)
}
// Check message stats
messages = srv.Messages()
for _, msg := range messages {
if msg.Deliveries < 1 {
t.Errorf("Message %s was not delivered", msg.ID)
}
if msg.Acks < 1 {
t.Errorf("Message %s was not acked", msg.ID)
}
}
}func TestPublishError(t *testing.T) {
ctx := context.Background()
// Create server with error injection
srv := pstest.NewServer(
pstest.WithErrorInjection("Publish", codes.Internal, "publish failed"),
)
defer srv.Close()
conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client, err := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
if err != nil {
t.Fatal(err)
}
defer client.Close()
topic := client.Topic("test-topic")
defer topic.Stop()
// Publish should fail
result := topic.Publish(ctx, &pubsub.Message{Data: []byte("test")})
_, err = result.Get(ctx)
if err == nil {
t.Error("Expected publish error, got nil")
}
if !strings.Contains(err.Error(), "publish failed") {
t.Errorf("Expected 'publish failed' error, got: %v", err)
}
}func TestMessageOrdering(t *testing.T) {
ctx := context.Background()
srv := pstest.NewServer()
defer srv.Close()
// Setup client and topic with ordering
conn, _ := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
client, _ := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
topic := client.Topic("test-topic")
topic.EnableMessageOrdering = true
defer topic.Stop()
// Publish ordered messages
orderingKey := "key-1"
for i := 0; i < 3; i++ {
srv.PublishOrdered(
"projects/test-project/topics/test-topic",
[]byte(fmt.Sprintf("msg-%d", i)),
nil,
orderingKey,
)
}
// Verify ordering
messages := srv.Messages()
for _, msg := range messages {
if msg.OrderingKey != orderingKey {
t.Errorf("Expected ordering key %s, got %s", orderingKey, msg.OrderingKey)
}
}
}srv.Close() and client.Close()ClearMessages()PublishOrdered()SetTimeNowFunc() for time-dependent testsValidateFilter()The fake server is simplified and may behave differently from the actual service in: