or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/cloud.google.com/go/pubsub@v1.50.1

docs

client.mddelivery.mdindex.mdingestion.mdlow-level.mdpublishing.mdreceiving.mdschemas.mdsnapshots.mdsubscriptions.mdtesting.mdtopics.mdtransforms.md
tile.json

tessl/golang-cloud-google-com--go--pubsub

tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5

Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources

schemas.mddocs/

Schema Management

Schemas define the structure and type of messages published to topics. This document covers schema creation, management, validation, and integration with topics.

SchemaClient

type SchemaClient struct {
    // Has unexported fields
}

func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error)
func (s *SchemaClient) Close() error

Client for schema management operations, scoped to a single project.

Example:

schemaClient, err := pubsub.NewSchemaClient(ctx, "my-project")
if err != nil {
    log.Fatalf("Failed to create schema client: %v", err)
}
defer schemaClient.Close()

Schema Configuration

SchemaConfig

type SchemaConfig struct {
    Name               string
    Type               SchemaType
    Definition         string
    RevisionID         string
    RevisionCreateTime time.Time
}

Configuration for a schema definition.

Fields:

  • Name: Schema resource name (format: projects/PROJECT/schemas/SCHEMA)
  • Type: Schema type (Protocol Buffer or Avro)
  • Definition: Full schema definition
  • RevisionID: Revision ID of the schema (output only)
  • RevisionCreateTime: Timestamp of revision creation (output only)

SchemaType

type SchemaType int

const (
    SchemaTypeUnspecified SchemaType = 0
    SchemaProtocolBuffer  SchemaType = 1
    SchemaAvro            SchemaType = 2
)

Schema definition types:

  • SchemaTypeUnspecified: Default/unused value
  • SchemaProtocolBuffer: Protocol Buffer schema definition
  • SchemaAvro: Apache Avro schema definition

SchemaView

type SchemaView int

const (
    SchemaViewUnspecified SchemaView = 0
    SchemaViewBasic       SchemaView = 1
    SchemaViewFull        SchemaView = 2
)

View of schema object fields to return:

  • SchemaViewUnspecified: Default/unset value
  • SchemaViewBasic: Name and type only (excludes definition)
  • SchemaViewFull: All schema fields

Creating Schemas

func (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)

Creates a new schema with the given ID and configuration. Schemas cannot be updated after creation; use revisions instead.

Parameters:

  • ctx: Context for the operation
  • schemaID: Schema ID (unique within project)
  • s: Schema configuration (Type and Definition required)

Returns: Created SchemaConfig and error

Example with Protocol Buffer:

protoDefinition := `
syntax = "proto3";

message Message {
  string user_id = 1;
  int64 timestamp = 2;
  string message_text = 3;
}
`

schema, err := schemaClient.CreateSchema(ctx, "my-proto-schema", pubsub.SchemaConfig{
    Type:       pubsub.SchemaProtocolBuffer,
    Definition: protoDefinition,
})
if err != nil {
    log.Fatalf("Failed to create schema: %v", err)
}
fmt.Printf("Created schema: %s\n", schema.Name)

Example with Avro:

avroDefinition := `{
  "type": "record",
  "name": "Message",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "message_text", "type": "string"}
  ]
}`

schema, err := schemaClient.CreateSchema(ctx, "my-avro-schema", pubsub.SchemaConfig{
    Type:       pubsub.SchemaAvro,
    Definition: avroDefinition,
})

Retrieving Schemas

func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error)

Retrieves a schema configuration by ID.

Parameters:

  • ctx: Context for the operation
  • schemaID: Schema ID
  • view: Schema view (Basic or Full)

Returns: SchemaConfig and error

Example:

// Get full schema
schema, err := schemaClient.Schema(ctx, "my-schema", pubsub.SchemaViewFull)
if err != nil {
    log.Fatalf("Failed to get schema: %v", err)
}
fmt.Printf("Schema: %s\n", schema.Name)
fmt.Printf("Type: %v\n", schema.Type)
fmt.Printf("Definition: %s\n", schema.Definition)

// Get basic info only (faster)
basic, err := schemaClient.Schema(ctx, "my-schema", pubsub.SchemaViewBasic)
if err != nil {
    log.Fatalf("Failed to get schema: %v", err)
}
fmt.Printf("Schema name: %s, type: %v\n", basic.Name, basic.Type)

Listing Schemas

func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIterator

Returns an iterator for listing all schemas in the project.

SchemaIterator

type SchemaIterator struct {
    // Has unexported fields
}

func (s *SchemaIterator) Next() (*SchemaConfig, error)

Iterator for listing schemas. Call Next() repeatedly until it returns iterator.Done.

Example:

// List all schemas
it := schemaClient.Schemas(ctx, pubsub.SchemaViewFull)
for {
    schema, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to iterate: %v", err)
    }
    fmt.Printf("Schema: %s, Type: %v\n", schema.Name, schema.Type)
}

Schema Revisions

Schemas support versioning through revisions. Each commit creates a new revision.

List Schema Revisions

func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIterator

Lists all revisions for a schema.

Example:

it := schemaClient.ListSchemaRevisions(ctx, "my-schema", pubsub.SchemaViewFull)
for {
    revision, err := it.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Fatalf("Failed to iterate revisions: %v", err)
    }
    fmt.Printf("Revision: %s, Created: %v\n", revision.RevisionID, revision.RevisionCreateTime)
}

Commit Schema Revision

func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)

Commits a new revision to an existing schema.

Example:

updatedDefinition := `
syntax = "proto3";

message Message {
  string user_id = 1;
  int64 timestamp = 2;
  string message_text = 3;
  string priority = 4;  // New field
}
`

newRevision, err := schemaClient.CommitSchema(ctx, "my-schema", pubsub.SchemaConfig{
    Type:       pubsub.SchemaProtocolBuffer,
    Definition: updatedDefinition,
})
if err != nil {
    log.Fatalf("Failed to commit schema: %v", err)
}
fmt.Printf("New revision: %s\n", newRevision.RevisionID)

Rollback Schema

func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)

Creates a new revision that is a copy of the specified revision.

Example:

rolledBack, err := schemaClient.RollbackSchema(ctx, "my-schema", "previous-revision-id")
if err != nil {
    log.Fatalf("Failed to rollback schema: %v", err)
}
fmt.Printf("Rolled back to: %s\n", rolledBack.RevisionID)

Delete Schema Revision

func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)

Deletes a specific schema revision. Returns the oldest remaining revision.

Example:

remaining, err := schemaClient.DeleteSchemaRevision(ctx, "my-schema", "old-revision-id")
if err != nil {
    log.Fatalf("Failed to delete revision: %v", err)
}
fmt.Printf("Oldest remaining revision: %s\n", remaining.RevisionID)

Deleting Schemas

func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error

Deletes a schema and all its revisions.

Example:

if err := schemaClient.DeleteSchema(ctx, "my-schema"); err != nil {
    log.Fatalf("Failed to delete schema: %v", err)
}

Schema Validation

Validate Schema Definition

type ValidateSchemaResult struct{}

func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error)

Validates a schema definition without creating it.

Example:

testSchema := pubsub.SchemaConfig{
    Type: pubsub.SchemaAvro,
    Definition: `{
        "type": "record",
        "name": "Test",
        "fields": [{"name": "field1", "type": "string"}]
    }`,
}

result, err := schemaClient.ValidateSchema(ctx, testSchema)
if err != nil {
    log.Printf("Schema is invalid: %v", err)
} else {
    fmt.Println("Schema is valid")
}

Validate Message Against Schema

type ValidateMessageResult struct{}

func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error)
func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error)

Validates a message against a schema.

Methods:

  • ValidateMessageWithConfig(): Validate against a schema config
  • ValidateMessageWithID(): Validate against an existing schema by ID

Example:

// Validate with schema ID
messageBytes := []byte(`{"user_id": "123", "timestamp": 1234567890, "message_text": "Hello"}`)

result, err := schemaClient.ValidateMessageWithID(
    ctx,
    messageBytes,
    pubsub.EncodingJSON,
    "my-avro-schema",
)
if err != nil {
    log.Printf("Message is invalid: %v", err)
} else {
    fmt.Println("Message is valid")
}

// Validate with schema config
result, err = schemaClient.ValidateMessageWithConfig(
    ctx,
    messageBytes,
    pubsub.EncodingJSON,
    schemaConfig,
)

Schema Settings for Topics

SchemaSettings

type SchemaSettings struct {
    Schema          string
    Encoding        SchemaEncoding
    FirstRevisionID string
    LastRevisionID  string
}

Schema validation settings for a topic.

Fields:

  • Schema: Schema resource name (format: projects/PROJECT/schemas/SCHEMA)
  • Encoding: Encoding type for messages (JSON or Binary)
  • FirstRevisionID: Minimum (inclusive) revision allowed for validation (EXPERIMENTAL)
  • LastRevisionID: Maximum (inclusive) revision allowed for validation (EXPERIMENTAL)

SchemaEncoding

type SchemaEncoding int

const (
    EncodingUnspecified SchemaEncoding = 0
    EncodingJSON        SchemaEncoding = 1
    EncodingBinary      SchemaEncoding = 2
)

Encoding expected for messages:

  • EncodingUnspecified: Default/unused value
  • EncodingJSON: JSON encoding
  • EncodingBinary: Binary encoding (not available for all schema types)

Attach Schema to Topic

Example:

// Create topic with schema validation
topic, err := client.CreateTopicWithConfig(ctx, "validated-topic", &pubsub.TopicConfig{
    SchemaSettings: &pubsub.SchemaSettings{
        Schema:   "projects/my-project/schemas/my-schema",
        Encoding: pubsub.EncodingJSON,
    },
})
if err != nil {
    log.Fatalf("Failed to create topic: %v", err)
}

// Update topic to use schema
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
    SchemaSettings: &pubsub.SchemaSettings{
        Schema:          "projects/my-project/schemas/my-schema",
        Encoding:        pubsub.EncodingJSON,
        FirstRevisionID: "revision-1",
        LastRevisionID:  "revision-5",
    },
})

// Remove schema from topic
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
    SchemaSettings: &pubsub.SchemaSettings{}, // Zero value removes schema
})

Publishing with Schemas

When a topic has schema validation enabled, published messages must conform to the schema.

Example:

// JSON encoding (Avro schema)
message := &pubsub.Message{
    Data: []byte(`{"user_id": "user123", "timestamp": 1234567890, "message_text": "Hello, world!"}`),
}
result := topic.Publish(ctx, message)
id, err := result.Get(ctx)
if err != nil {
    log.Printf("Failed to publish: %v", err)
} else {
    fmt.Printf("Published message: %s\n", id)
}

// Binary encoding (Protocol Buffer schema)
// Encode your protobuf message to bytes
data, err := proto.Marshal(protoMessage)
if err != nil {
    log.Fatalf("Failed to marshal proto: %v", err)
}
message = &pubsub.Message{
    Data: data,
}
result = topic.Publish(ctx, message)

Schema Evolution

Schemas support evolution while maintaining backward compatibility:

Avro Evolution Rules:

  • Add optional fields with defaults
  • Remove fields
  • Change field order
  • Promote types (int to long, float to double)

Protocol Buffer Evolution Rules:

  • Add optional fields
  • Remove fields (but preserve field numbers)
  • Add fields to the end
  • Don't change field types

Example:

// Original schema
originalAvro := `{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"}
  ]
}`

// Evolved schema (backward compatible)
evolvedAvro := `{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}`

// Commit new revision
newRevision, err := schemaClient.CommitSchema(ctx, "user-schema", pubsub.SchemaConfig{
    Type:       pubsub.SchemaAvro,
    Definition: evolvedAvro,
})

Best Practices

  1. Schema Type: Choose Avro for flexible evolution, Protocol Buffers for performance
  2. Validation: Always validate schemas and messages before production use
  3. Evolution: Plan for schema evolution from the start
  4. Revisions: Use revisions to track schema changes over time
  5. Encoding: Use binary encoding for performance, JSON for debugging
  6. Testing: Validate messages against schemas in tests
  7. Rollback: Keep old revisions for rollback capability
  8. Documentation: Document schema changes in revision comments
  9. Backward Compatibility: Maintain backward compatibility when evolving schemas
  10. Cleanup: Delete unused schemas and revisions to reduce clutter