or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

testing.mddocs/

Testing Utilities

The pstest package provides a fake Pub/Sub server for testing. It implements a simplified form of the service suitable for unit tests.

Package: cloud.google.com/go/pubsub/pstest

Note: This package is EXPERIMENTAL and subject to change without notice.

Server

Server Types

type Server struct {
    Addr    string
    GServer GServer
}

type GServer struct {
    // Has unexported fields
}

Fake Pub/Sub server for testing.

Fields:

  • Addr: Server address (host:port)
  • GServer: Underlying service implementation (not intended for direct use)

Creating Test Servers

func NewServer(opts ...ServerReactorOption) *Server
func NewServerWithPort(port int, opts ...ServerReactorOption) *Server
func NewServerWithAddress(address string, opts ...ServerReactorOption) *Server
func NewServerWithCallback(port int, callback func(*grpc.Server), opts ...ServerReactorOption) *Server

Functions for creating test servers:

  • NewServer(): Creates server at auto-assigned port
  • NewServerWithPort(): Creates server at specific port
  • NewServerWithAddress(): Creates server at specific address (host:port)
  • NewServerWithCallback(): Creates server with gRPC server callback for registering additional fakes

Example:

import (
    "cloud.google.com/go/pubsub"
    "cloud.google.com/go/pubsub/pstest"
    "google.golang.org/api/option"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func TestPubSub(t *testing.T) {
    ctx := context.Background()

    // Start fake server
    srv := pstest.NewServer()
    defer srv.Close()

    // Connect to fake server
    conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        t.Fatal(err)
    }
    defer conn.Close()

    // Create client
    client, err := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
    if err != nil {
        t.Fatal(err)
    }
    defer client.Close()

    // Use client for testing
    topic, err := client.CreateTopic(ctx, "test-topic")
    if err != nil {
        t.Fatal(err)
    }
}

Server Methods

func (s *Server) Close() error
func (s *Server) Wait()
  • Close(): Shuts down the server
  • Wait(): Blocks until all server activity completes

Publishing Messages

Publish

func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string

Simulates the Publish RPC with a message. Returns the message ID. Creates topic if it doesn't exist.

Parameters:

  • topic: Topic name (format: projects/*/topics/*)
  • data: Message data
  • attrs: Message attributes

Returns: Message ID

Example:

messageID := srv.Publish("projects/test-project/topics/test-topic",
    []byte("test message"),
    map[string]string{"key": "value"})
fmt.Printf("Published message: %s\n", messageID)

PublishOrdered

func (s *Server) PublishOrdered(topic string, data []byte, attrs map[string]string, orderingKey string) string

Simulates the Publish RPC with an ordered message (includes ordering key).

Example:

messageID := srv.PublishOrdered(
    "projects/test-project/topics/test-topic",
    []byte("ordered message"),
    map[string]string{"key": "value"},
    "order-key-1")

Inspecting Messages

Message

type Message struct {
    ID          string
    Data        []byte
    Attributes  map[string]string
    PublishTime time.Time
    Deliveries  int
    Acks        int
    Modacks     []Modack
    OrderingKey string
    Topic       string
}

Information about a published message.

Fields:

  • ID: Message ID
  • Data: Message payload
  • Attributes: Message attributes
  • PublishTime: Time message was published
  • Deliveries: Number of delivery attempts
  • Acks: Number of acknowledgments received
  • Modacks: Acknowledgment deadline modifications
  • OrderingKey: Ordering key
  • Topic: Topic name

Modack

type Modack struct {
    AckID       string
    AckDeadline int32
    ReceivedAt  time.Time
}

Record of an acknowledgment deadline modification.

Fields:

  • AckID: Acknowledgment ID
  • AckDeadline: New ack deadline in seconds
  • ReceivedAt: Time modification was received

Retrieving Messages

func (s *Server) Messages() []*Message
func (s *Server) Message(id string) *Message
  • Messages(): Returns all published messages
  • Message(): Returns message by ID (nil if not found)

Example:

// Publish messages
srv.Publish("projects/p/topics/t", []byte("msg1"), nil)
srv.Publish("projects/p/topics/t", []byte("msg2"), nil)

// Get all messages
messages := srv.Messages()
for _, msg := range messages {
    fmt.Printf("Message %s: %s\n", msg.ID, msg.Data)
    fmt.Printf("  Deliveries: %d, Acks: %d\n", msg.Deliveries, msg.Acks)
}

// Get specific message
msg := srv.Message(messageID)
if msg != nil {
    fmt.Printf("Found message: %s\n", msg.Data)
}

Clearing Messages

func (s *Server) ClearMessages()

Removes all messages from the server.

Example:

// Clear messages between tests
srv.ClearMessages()

Publish Response Control

SetAutoPublishResponse

func (s *Server) SetAutoPublishResponse(autoResp bool)

Controls whether to automatically respond to publish requests or use user-added responses.

Example:

// Disable automatic responses
srv.SetAutoPublishResponse(false)

AddPublishResponse

func (s *Server) AddPublishResponse(pbr *pb.PublishResponse, err error)

Adds a response to the publish response queue.

Example:

import pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"

// Disable auto-response
srv.SetAutoPublishResponse(false)

// Queue specific responses
srv.AddPublishResponse(&pb.PublishResponse{
    MessageIds: []string{"msg-1", "msg-2"},
}, nil)

// Queue error response
srv.AddPublishResponse(nil, errors.New("publish failed"))

ResetPublishResponses

func (s *Server) ResetPublishResponses(size int)

Resets the publish response channel with a new buffered channel of the given size.

Example:

srv.ResetPublishResponses(100)

Server Configuration

SetStreamTimeout

func (s *Server) SetStreamTimeout(d time.Duration)

Sets the timeout for streaming pull connections. Mimics the real service's behavior of closing streams after 30 minutes. Pass zero to never shut down streams.

Example:

// Set 5 minute stream timeout
srv.SetStreamTimeout(5 * time.Minute)

SetTimeNowFunc

func (s *Server) SetTimeNowFunc(f func() time.Time)

Overrides time.Now() for testing time-dependent behavior.

Example:

fixedTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
srv.SetTimeNowFunc(func() time.Time {
    return fixedTime
})

// Publish will use fixed time
srv.Publish("projects/p/topics/t", []byte("msg"), nil)

messages := srv.Messages()
fmt.Printf("Publish time: %v\n", messages[0].PublishTime) // Uses fixedTime

Reactor System

Reactor Interface

type Reactor interface {
    React(interface{}) (handled bool, ret interface{}, err error)
}

Interface for custom request handlers. Allows injection of custom behavior for specific RPC calls.

Methods:

  • React(): Handles a request. Returns handled=false to fall through to next reactor or original handler.

ServerReactorOption

type ServerReactorOption struct {
    FuncName string
    Reactor  Reactor
}

Configuration option for server reactors.

Fields:

  • FuncName: Function name to intercept
  • Reactor: Reactor to handle the function

Error Injection

func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReactorOption

Creates an option that injects errors into specific functions.

Example:

import (
    "google.golang.org/grpc/codes"
)

// Create server that fails on Publish
srv := pstest.NewServer(
    pstest.WithErrorInjection("Publish", codes.Internal, "simulated error"),
)

// Publish will fail
client, _ := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))
topic := client.Topic("test-topic")
result := topic.Publish(ctx, &pubsub.Message{Data: []byte("test")})
_, err := result.Get(ctx)
// err will be "simulated error"

Filter Validation

func ValidateFilter(filter string) error

Validates a subscription filter expression.

Example:

// Valid filter
err := pstest.ValidateFilter(`attributes.region = "us-east1"`)
if err != nil {
    t.Errorf("Filter validation failed: %v", err)
}

// Invalid filter
err = pstest.ValidateFilter("invalid syntax")
if err == nil {
    t.Error("Expected validation error")
}

Ack Deadline Configuration

func ResetMinAckDeadline()
func SetMinAckDeadline(d time.Duration)

Functions to configure minimum ack deadline:

  • ResetMinAckDeadline(): Resets to default (1 second)
  • SetMinAckDeadline(): Sets custom minimum

Example:

// Set custom minimum
pstest.SetMinAckDeadline(5 * time.Second)

// Reset to default
pstest.ResetMinAckDeadline()

Complete Testing Example

func TestPubSubWorkflow(t *testing.T) {
    ctx := context.Background()

    // Create fake server
    srv := pstest.NewServer()
    defer srv.Close()

    // Create client connection
    conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        t.Fatal(err)
    }
    defer conn.Close()

    // Create Pub/Sub client
    client, err := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
    if err != nil {
        t.Fatal(err)
    }
    defer client.Close()

    // Create topic
    topic, err := client.CreateTopic(ctx, "test-topic")
    if err != nil {
        t.Fatal(err)
    }
    defer topic.Stop()

    // Create subscription
    sub, err := client.CreateSubscription(ctx, "test-sub", pubsub.SubscriptionConfig{
        Topic:       topic,
        AckDeadline: 10 * time.Second,
    })
    if err != nil {
        t.Fatal(err)
    }

    // Publish messages
    for i := 0; i < 5; i++ {
        result := topic.Publish(ctx, &pubsub.Message{
            Data: []byte(fmt.Sprintf("message-%d", i)),
            Attributes: map[string]string{
                "index": fmt.Sprintf("%d", i),
            },
        })
        if _, err := result.Get(ctx); err != nil {
            t.Fatalf("Failed to publish: %v", err)
        }
    }

    // Verify messages in server
    messages := srv.Messages()
    if len(messages) != 5 {
        t.Errorf("Expected 5 messages, got %d", len(messages))
    }

    // Receive messages
    var received int
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()

    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        received++
        msg.Ack()
    })
    if err != nil && err != context.DeadlineExceeded {
        t.Fatalf("Receive error: %v", err)
    }

    if received != 5 {
        t.Errorf("Expected to receive 5 messages, got %d", received)
    }

    // Check message stats
    messages = srv.Messages()
    for _, msg := range messages {
        if msg.Deliveries < 1 {
            t.Errorf("Message %s was not delivered", msg.ID)
        }
        if msg.Acks < 1 {
            t.Errorf("Message %s was not acked", msg.ID)
        }
    }
}

Testing with Error Injection

func TestPublishError(t *testing.T) {
    ctx := context.Background()

    // Create server with error injection
    srv := pstest.NewServer(
        pstest.WithErrorInjection("Publish", codes.Internal, "publish failed"),
    )
    defer srv.Close()

    conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        t.Fatal(err)
    }
    defer conn.Close()

    client, err := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
    if err != nil {
        t.Fatal(err)
    }
    defer client.Close()

    topic := client.Topic("test-topic")
    defer topic.Stop()

    // Publish should fail
    result := topic.Publish(ctx, &pubsub.Message{Data: []byte("test")})
    _, err = result.Get(ctx)
    if err == nil {
        t.Error("Expected publish error, got nil")
    }
    if !strings.Contains(err.Error(), "publish failed") {
        t.Errorf("Expected 'publish failed' error, got: %v", err)
    }
}

Testing Message Ordering

func TestMessageOrdering(t *testing.T) {
    ctx := context.Background()
    srv := pstest.NewServer()
    defer srv.Close()

    // Setup client and topic with ordering
    conn, _ := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    client, _ := pubsub.NewClient(ctx, "test-project", option.WithGRPCConn(conn))
    topic := client.Topic("test-topic")
    topic.EnableMessageOrdering = true
    defer topic.Stop()

    // Publish ordered messages
    orderingKey := "key-1"
    for i := 0; i < 3; i++ {
        srv.PublishOrdered(
            "projects/test-project/topics/test-topic",
            []byte(fmt.Sprintf("msg-%d", i)),
            nil,
            orderingKey,
        )
    }

    // Verify ordering
    messages := srv.Messages()
    for _, msg := range messages {
        if msg.OrderingKey != orderingKey {
            t.Errorf("Expected ordering key %s, got %s", orderingKey, msg.OrderingKey)
        }
    }
}

Best Practices

  1. Cleanup: Always defer srv.Close() and client.Close()
  2. Isolation: Clear messages between tests with ClearMessages()
  3. Timeouts: Use context timeouts for Receive operations
  4. Verification: Inspect server messages to verify publish behavior
  5. Error Testing: Use error injection to test error handling
  6. Ordering: Test ordered message delivery with PublishOrdered()
  7. Concurrency: Test concurrent publish/subscribe operations
  8. Ack Behavior: Verify acknowledgment and nack behavior
  9. Time Control: Use SetTimeNowFunc() for time-dependent tests
  10. Filters: Validate filter expressions with ValidateFilter()

Limitations

The fake server is simplified and may behave differently from the actual service in:

  • Timing and latency
  • Delivery order (except for ordered messages)
  • Resource limits and quotas
  • Advanced features not fully implemented
  • Error codes and messages