Google Cloud Pub/Sub client library for Go providing high-level idiomatic APIs for publishing and receiving messages with automatic batching, flow control, and support for advanced features including message ordering, schema validation, exactly-once delivery, and ingestion from external data sources
—
Schemas define the structure and type of messages published to topics. This document covers schema creation, management, validation, and integration with topics.
type SchemaClient struct {
// Has unexported fields
}
func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error)
func (s *SchemaClient) Close() errorClient for schema management operations, scoped to a single project.
Example:
schemaClient, err := pubsub.NewSchemaClient(ctx, "my-project")
if err != nil {
log.Fatalf("Failed to create schema client: %v", err)
}
defer schemaClient.Close()type SchemaConfig struct {
Name string
Type SchemaType
Definition string
RevisionID string
RevisionCreateTime time.Time
}Configuration for a schema definition.
Fields:
Name: Schema resource name (format: projects/PROJECT/schemas/SCHEMA)Type: Schema type (Protocol Buffer or Avro)Definition: Full schema definitionRevisionID: Revision ID of the schema (output only)RevisionCreateTime: Timestamp of revision creation (output only)type SchemaType int
const (
SchemaTypeUnspecified SchemaType = 0
SchemaProtocolBuffer SchemaType = 1
SchemaAvro SchemaType = 2
)Schema definition types:
SchemaTypeUnspecified: Default/unused valueSchemaProtocolBuffer: Protocol Buffer schema definitionSchemaAvro: Apache Avro schema definitiontype SchemaView int
const (
SchemaViewUnspecified SchemaView = 0
SchemaViewBasic SchemaView = 1
SchemaViewFull SchemaView = 2
)View of schema object fields to return:
SchemaViewUnspecified: Default/unset valueSchemaViewBasic: Name and type only (excludes definition)SchemaViewFull: All schema fieldsfunc (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)Creates a new schema with the given ID and configuration. Schemas cannot be updated after creation; use revisions instead.
Parameters:
ctx: Context for the operationschemaID: Schema ID (unique within project)s: Schema configuration (Type and Definition required)Returns: Created SchemaConfig and error
Example with Protocol Buffer:
protoDefinition := `
syntax = "proto3";
message Message {
string user_id = 1;
int64 timestamp = 2;
string message_text = 3;
}
`
schema, err := schemaClient.CreateSchema(ctx, "my-proto-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaProtocolBuffer,
Definition: protoDefinition,
})
if err != nil {
log.Fatalf("Failed to create schema: %v", err)
}
fmt.Printf("Created schema: %s\n", schema.Name)Example with Avro:
avroDefinition := `{
"type": "record",
"name": "Message",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "message_text", "type": "string"}
]
}`
schema, err := schemaClient.CreateSchema(ctx, "my-avro-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaAvro,
Definition: avroDefinition,
})func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error)Retrieves a schema configuration by ID.
Parameters:
ctx: Context for the operationschemaID: Schema IDview: Schema view (Basic or Full)Returns: SchemaConfig and error
Example:
// Get full schema
schema, err := schemaClient.Schema(ctx, "my-schema", pubsub.SchemaViewFull)
if err != nil {
log.Fatalf("Failed to get schema: %v", err)
}
fmt.Printf("Schema: %s\n", schema.Name)
fmt.Printf("Type: %v\n", schema.Type)
fmt.Printf("Definition: %s\n", schema.Definition)
// Get basic info only (faster)
basic, err := schemaClient.Schema(ctx, "my-schema", pubsub.SchemaViewBasic)
if err != nil {
log.Fatalf("Failed to get schema: %v", err)
}
fmt.Printf("Schema name: %s, type: %v\n", basic.Name, basic.Type)func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIteratorReturns an iterator for listing all schemas in the project.
type SchemaIterator struct {
// Has unexported fields
}
func (s *SchemaIterator) Next() (*SchemaConfig, error)Iterator for listing schemas. Call Next() repeatedly until it returns iterator.Done.
Example:
// List all schemas
it := schemaClient.Schemas(ctx, pubsub.SchemaViewFull)
for {
schema, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to iterate: %v", err)
}
fmt.Printf("Schema: %s, Type: %v\n", schema.Name, schema.Type)
}Schemas support versioning through revisions. Each commit creates a new revision.
func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIteratorLists all revisions for a schema.
Example:
it := schemaClient.ListSchemaRevisions(ctx, "my-schema", pubsub.SchemaViewFull)
for {
revision, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("Failed to iterate revisions: %v", err)
}
fmt.Printf("Revision: %s, Created: %v\n", revision.RevisionID, revision.RevisionCreateTime)
}func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)Commits a new revision to an existing schema.
Example:
updatedDefinition := `
syntax = "proto3";
message Message {
string user_id = 1;
int64 timestamp = 2;
string message_text = 3;
string priority = 4; // New field
}
`
newRevision, err := schemaClient.CommitSchema(ctx, "my-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaProtocolBuffer,
Definition: updatedDefinition,
})
if err != nil {
log.Fatalf("Failed to commit schema: %v", err)
}
fmt.Printf("New revision: %s\n", newRevision.RevisionID)func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)Creates a new revision that is a copy of the specified revision.
Example:
rolledBack, err := schemaClient.RollbackSchema(ctx, "my-schema", "previous-revision-id")
if err != nil {
log.Fatalf("Failed to rollback schema: %v", err)
}
fmt.Printf("Rolled back to: %s\n", rolledBack.RevisionID)func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)Deletes a specific schema revision. Returns the oldest remaining revision.
Example:
remaining, err := schemaClient.DeleteSchemaRevision(ctx, "my-schema", "old-revision-id")
if err != nil {
log.Fatalf("Failed to delete revision: %v", err)
}
fmt.Printf("Oldest remaining revision: %s\n", remaining.RevisionID)func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) errorDeletes a schema and all its revisions.
Example:
if err := schemaClient.DeleteSchema(ctx, "my-schema"); err != nil {
log.Fatalf("Failed to delete schema: %v", err)
}type ValidateSchemaResult struct{}
func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error)Validates a schema definition without creating it.
Example:
testSchema := pubsub.SchemaConfig{
Type: pubsub.SchemaAvro,
Definition: `{
"type": "record",
"name": "Test",
"fields": [{"name": "field1", "type": "string"}]
}`,
}
result, err := schemaClient.ValidateSchema(ctx, testSchema)
if err != nil {
log.Printf("Schema is invalid: %v", err)
} else {
fmt.Println("Schema is valid")
}type ValidateMessageResult struct{}
func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error)
func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error)Validates a message against a schema.
Methods:
ValidateMessageWithConfig(): Validate against a schema configValidateMessageWithID(): Validate against an existing schema by IDExample:
// Validate with schema ID
messageBytes := []byte(`{"user_id": "123", "timestamp": 1234567890, "message_text": "Hello"}`)
result, err := schemaClient.ValidateMessageWithID(
ctx,
messageBytes,
pubsub.EncodingJSON,
"my-avro-schema",
)
if err != nil {
log.Printf("Message is invalid: %v", err)
} else {
fmt.Println("Message is valid")
}
// Validate with schema config
result, err = schemaClient.ValidateMessageWithConfig(
ctx,
messageBytes,
pubsub.EncodingJSON,
schemaConfig,
)type SchemaSettings struct {
Schema string
Encoding SchemaEncoding
FirstRevisionID string
LastRevisionID string
}Schema validation settings for a topic.
Fields:
Schema: Schema resource name (format: projects/PROJECT/schemas/SCHEMA)Encoding: Encoding type for messages (JSON or Binary)FirstRevisionID: Minimum (inclusive) revision allowed for validation (EXPERIMENTAL)LastRevisionID: Maximum (inclusive) revision allowed for validation (EXPERIMENTAL)type SchemaEncoding int
const (
EncodingUnspecified SchemaEncoding = 0
EncodingJSON SchemaEncoding = 1
EncodingBinary SchemaEncoding = 2
)Encoding expected for messages:
EncodingUnspecified: Default/unused valueEncodingJSON: JSON encodingEncodingBinary: Binary encoding (not available for all schema types)Example:
// Create topic with schema validation
topic, err := client.CreateTopicWithConfig(ctx, "validated-topic", &pubsub.TopicConfig{
SchemaSettings: &pubsub.SchemaSettings{
Schema: "projects/my-project/schemas/my-schema",
Encoding: pubsub.EncodingJSON,
},
})
if err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
// Update topic to use schema
config, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
SchemaSettings: &pubsub.SchemaSettings{
Schema: "projects/my-project/schemas/my-schema",
Encoding: pubsub.EncodingJSON,
FirstRevisionID: "revision-1",
LastRevisionID: "revision-5",
},
})
// Remove schema from topic
config, err = topic.Update(ctx, pubsub.TopicConfigToUpdate{
SchemaSettings: &pubsub.SchemaSettings{}, // Zero value removes schema
})When a topic has schema validation enabled, published messages must conform to the schema.
Example:
// JSON encoding (Avro schema)
message := &pubsub.Message{
Data: []byte(`{"user_id": "user123", "timestamp": 1234567890, "message_text": "Hello, world!"}`),
}
result := topic.Publish(ctx, message)
id, err := result.Get(ctx)
if err != nil {
log.Printf("Failed to publish: %v", err)
} else {
fmt.Printf("Published message: %s\n", id)
}
// Binary encoding (Protocol Buffer schema)
// Encode your protobuf message to bytes
data, err := proto.Marshal(protoMessage)
if err != nil {
log.Fatalf("Failed to marshal proto: %v", err)
}
message = &pubsub.Message{
Data: data,
}
result = topic.Publish(ctx, message)Schemas support evolution while maintaining backward compatibility:
Avro Evolution Rules:
Protocol Buffer Evolution Rules:
Example:
// Original schema
originalAvro := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"}
]
}`
// Evolved schema (backward compatible)
evolvedAvro := `{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}`
// Commit new revision
newRevision, err := schemaClient.CommitSchema(ctx, "user-schema", pubsub.SchemaConfig{
Type: pubsub.SchemaAvro,
Definition: evolvedAvro,
})Install with Tessl CLI
npx tessl i tessl/golang-cloud-google-com--go--pubsub