tessl install tessl/golang-cloud-google-com--go--pubsub@1.50.5Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
Schemas define the structure and type of messages published to topics. This document covers schema creation, management, validation, and integration with topics.
type SchemaClient struct {
// Has unexported fields
}
func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error)
func (s *SchemaClient) Close() errorClient for schema management operations, scoped to a single project.
Example:
schemaClient, err := pubsub.NewSchemaClient(ctx, "my-project")
if err != nil {
log.Fatalf("Failed to create schema client: %v", err)
}
defer schemaClient.Close()type SchemaConfig struct {
Name string
Type SchemaType
Definition string
RevisionID string
RevisionCreateTime time.Time
}Configuration for a schema definition.
Fields:
Name: Schema resource name (format: projects/PROJECT/schemas/SCHEMA)Type: Schema type (Protocol Buffer or Avro)Definition: Full schema definitionRevisionID: Revision ID of the schema (output only)RevisionCreateTime: Timestamp of revision creation (output only)type SchemaType int
const (
SchemaTypeUnspecified SchemaType = 0
SchemaProtocolBuffer SchemaType = 1
SchemaAvro SchemaType = 2
)Schema definition types:
SchemaTypeUnspecified: Default/unused valueSchemaProtocolBuffer: Protocol Buffer schema definitionSchemaAvro: Apache Avro schema definitiontype SchemaView int
const (
SchemaViewUnspecified SchemaView = 0
SchemaViewBasic SchemaView = 1
SchemaViewFull SchemaView = 2
)View of schema object fields to return:
SchemaViewUnspecified: Default/unset valueSchemaViewBasic: Name and type only (excludes definition)SchemaViewFull: All schema fieldsfunc (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)Creates a new schema with the given ID and configuration. Schemas cannot be updated after creation; use revisions instead.
Parameters:
ctx: Context for the operationschemaID: Schema ID (unique within project)s: Schema configuration (Type and Definition required)Returns: Created SchemaConfig and error
Example with Protocol Buffer:
protoDefinition := `
syntax = "proto3";
message Message {
string user_id = 1;
int64 timestamp = 2;
string message_text = 3;
}
`
schema, err := schemaClient.CreateSchema(ctx, "my-proto-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaProtocolBuffer,
Definition: protoDefinition,
})
if err != nil {
log.Fatalf("Failed to create schema: %v", err)
}
fmt.Printf("Created schema: %s\n", schema.Name)Example with Avro:
avroDefinition := `{
"type": "record",
"name": "Message",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "message_text", "type": "string"}
]
}`
schema, err := schemaClient.CreateSchema(ctx, "my-avro-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaAvro,
Definition: avroDefinition,
})func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error)Retrieves a schema configuration by ID.
Parameters:
ctx: Context for the operationschemaID: Schema IDview: Schema view (Basic or Full)Returns: SchemaConfig and error
Example:
// Get full schema
schema, err := schemaClient.Schema(ctx, "my-schema", pubsub.SchemaViewFull)
if err != nil {
log.Fatalf("Failed to get schema: %v", err)
}
fmt.Printf("Schema: %s\n", schema.Name)
fmt.Printf("Type: %v\n", schema.Type)
fmt.Printf("Definition: %s\n", schema.Definition)
// Get basic info only (faster)
basic, err := schemaClient.Schema(ctx, "my-schema", pubsub.SchemaViewBasic)
if err != nil {
log.Fatalf("Failed to get schema: %v", err)
}
fmt.Printf("Schema name: %s, type: %v\n", basic.Name, basic.Type)func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIteratorReturns an iterator for listing all schemas in the project.
type SchemaIterator struct {
// Has unexported fields
}
func (s *SchemaIterator) Next() (*SchemaConfig, error)Iterator for listing schemas. Call Next() repeatedly until it returns iterator.Done.
Example:
// List all schemas
it := schemaClient.Schemas(ctx, pubsub.SchemaViewFull)
for {
schema, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to iterate: %v", err)
}
fmt.Printf("Schema: %s, Type: %v\n", schema.Name, schema.Type)
}Schemas support versioning through revisions. Each commit creates a new revision.
func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIteratorLists all revisions for a schema.
Example:
it := schemaClient.ListSchemaRevisions(ctx, "my-schema", pubsub.SchemaViewFull)
for {
revision, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to iterate revisions: %v", err)
}
fmt.Printf("Revision: %s, Created: %v\n", revision.RevisionID, revision.RevisionCreateTime)
}func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)Commits a new revision to an existing schema.
Example:
updatedDefinition := `
syntax = "proto3";
message Message {
string user_id = 1;
int64 timestamp = 2;
string message_text = 3;
string priority = 4; // New field
}
`
newRevision, err := schemaClient.CommitSchema(ctx, "my-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaProtocolBuffer,
Definition: updatedDefinition,
})
if err != nil {
log.Fatalf("Failed to commit schema: %v", err)
}
fmt.Printf("New revision: %s\n", newRevision.RevisionID)func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)Creates a new revision that is a copy of the specified revision.
Example:
rolledBack, err := schemaClient.RollbackSchema(ctx, "my-schema", "previous-revision-id")
if err != nil {
log.Fatalf("Failed to rollback schema: %v", err)
}
fmt.Printf("Rolled back to: %s\n", rolledBack.RevisionID)func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)Deletes a specific schema revision. Returns the oldest remaining revision.
Example:
remaining, err := schemaClient.DeleteSchemaRevision(ctx, "my-schema", "old-revision-id")
if err != nil {
log.Fatalf("Failed to delete revision: %v", err)
}
fmt.Printf("Oldest remaining revision: %s\n", remaining.RevisionID)func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) errorDeletes a schema and all its revisions.
Example:
if err := schemaClient.DeleteSchema(ctx, "my-schema"); err != nil {
log.Fatalf("Failed to delete schema: %v", err)
}type ValidateSchemaResult struct{}
func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error)Validates a schema definition without creating it.
Example:
testSchema := pubsub.SchemaConfig{
Type: pubsub.SchemaAvro,
Definition: `{
"type": "record",
"name": "Test",
"fields": [{"name": "field1", "type": "string"}]
}`,
}
result, err := schemaClient.ValidateSchema(ctx, testSchema)
if err != nil {
log.Printf("Schema is invalid: %v", err)
} else {
fmt.Println("Schema is valid")
}type ValidateMessageResult struct{}
func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error)
func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error)Validates a message against a schema.
Methods:
ValidateMessageWithConfig(): Validate against a schema configValidateMessageWithID(): Validate against an existing schema by IDExample:
// Validate with schema ID
messageBytes := []byte(`{"user_id": "123", "timestamp": 1234567890, "message_text": "Hello"}`)
result, err := schemaClient.ValidateMessageWithID(
ctx,
messageBytes,
pubsub.EncodingJSON,
"my-avro-schema",
)
if err != nil {
log.Printf("Message is invalid: %v", err)
} else {
fmt.Println("Message is valid")
}
// Validate with schema config
result, err = schemaClient.ValidateMessageWithConfig(
ctx,
messageBytes,
pubsub.EncodingJSON,
schemaConfig,
)type SchemaSettings struct {
Schema string
Encoding SchemaEncoding
FirstRevisionID string
LastRevisionID string
}Schema validation settings for a topic.
Fields:
Schema: Schema resource name (format: projects/PROJECT/schemas/SCHEMA)Encoding: Encoding type for messages (JSON or Binary)FirstRevisionID: Minimum (inclusive) revision allowed for validation (EXPERIMENTAL)LastRevisionID: Maximum (inclusive) revision allowed for validation (EXPERIMENTAL)type SchemaEncoding int
const (
EncodingUnspecified SchemaEncoding = 0
EncodingJSON SchemaEncoding = 1
EncodingBinary SchemaEncoding = 2
)Encoding expected for messages:
EncodingUnspecified: Default/unused valueEncodingJSON: JSON encodingEncodingBinary: Binary encoding (not available for all schema types)Example:
// Create topic with schema validation
topic, err := client.CreateTopicWithConfig(ctx, "validated-topic", &pubsub.TopicConfig{
SchemaSettings: &pubsub.SchemaSettings{
Schema: "projects/my-project/schemas/my-schema",
Encoding: pubsub.EncodingJSON,
},
})
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
// Update topic to use schema
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
SchemaSettings: &pubsub.SchemaSettings{
Schema: "projects/my-project/schemas/my-schema",
Encoding: pubsub.EncodingJSON,
FirstRevisionID: "revision-1",
LastRevisionID: "revision-5",
},
})
// Remove schema from topic
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
SchemaSettings: &pubsub.SchemaSettings{}, // Zero value removes schema
})When a topic has schema validation enabled, published messages must conform to the schema.
Example:
// JSON encoding (Avro schema)
message := &pubsub.Message{
Data: []byte(`{"user_id": "user123", "timestamp": 1234567890, "message_text": "Hello, world!"}`),
}
result := topic.Publish(ctx, message)
id, err := result.Get(ctx)
if err != nil {
log.Printf("Failed to publish: %v", err)
} else {
fmt.Printf("Published message: %s\n", id)
}
// Binary encoding (Protocol Buffer schema)
// Encode your protobuf message to bytes
data, err := proto.Marshal(protoMessage)
if err != nil {
log.Fatalf("Failed to marshal proto: %v", err)
}
message = &pubsub.Message{
Data: data,
}
result = topic.Publish(ctx, message)Schemas support evolution while maintaining backward compatibility:
Avro Evolution Rules:
Protocol Buffer Evolution Rules:
Example:
// Original schema
originalAvro := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"}
]
}`
// Evolved schema (backward compatible)
evolvedAvro := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}`
// Commit new revision
newRevision, err := schemaClient.CommitSchema(ctx, "user-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaAvro,
Definition: evolvedAvro,
})