Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications
—
The SchemaServiceClient provides functionality for managing Pub/Sub schemas, including schema creation, validation, evolution, and message validation against schemas. Schemas ensure message structure consistency and enable safe schema evolution.
Create and configure a SchemaServiceClient.
class SchemaServiceClient:
def __init__(self, **kwargs):
"""
Initialize the schema service client.
Parameters:
- **kwargs: Additional arguments passed to underlying GAPIC client
"""
@classmethod
def from_service_account_file(
cls,
filename: str,
**kwargs
) -> "SchemaServiceClient":
"""
Create client from service account file.
Parameters:
- filename: Path to service account JSON file
- **kwargs: Additional arguments for client initialization
Returns:
SchemaServiceClient instance
"""Create, retrieve, update, and delete schemas.
def create_schema(
self,
request: Optional[CreateSchemaRequest] = None,
parent: Optional[str] = None,
schema: Optional[Schema] = None,
schema_id: Optional[str] = None,
**kwargs
) -> Schema:
"""
Create a new schema.
Parameters:
- request: The request object for creating a schema
- parent: Parent project path (e.g., "projects/my-project")
- schema: Schema definition
- schema_id: ID for the new schema
Returns:
Created Schema object
"""
def get_schema(
self,
request: Optional[GetSchemaRequest] = None,
name: Optional[str] = None,
view: Optional[SchemaView] = None,
**kwargs
) -> Schema:
"""
Get a schema.
Parameters:
- request: The request object for getting a schema
- name: Full schema name (e.g., "projects/my-project/schemas/my-schema")
- view: Schema view (BASIC or FULL)
Returns:
Schema object
"""
def list_schemas(
self,
request: Optional[ListSchemasRequest] = None,
parent: Optional[str] = None,
**kwargs
) -> ListSchemasResponse:
"""
List schemas in a project.
Parameters:
- request: The request object for listing schemas
- parent: Parent project path
Returns:
ListSchemasResponse with schemas
"""
def delete_schema(
self,
request: Optional[DeleteSchemaRequest] = None,
name: Optional[str] = None,
**kwargs
) -> None:
"""
Delete a schema.
Parameters:
- request: The request object for deleting a schema
- name: Full schema name to delete
"""Validate schemas and messages against schemas.
def validate_schema(
self,
request: Optional[ValidateSchemaRequest] = None,
parent: Optional[str] = None,
schema: Optional[Schema] = None,
**kwargs
) -> ValidateSchemaResponse:
"""
Validate a schema definition.
Parameters:
- request: The request object for validating a schema
- parent: Parent project path
- schema: Schema to validate
Returns:
ValidateSchemaResponse indicating validation result
"""
def validate_message(
self,
request: Optional[ValidateMessageRequest] = None,
parent: Optional[str] = None,
name: Optional[str] = None,
schema: Optional[Schema] = None,
message: Optional[bytes] = None,
encoding: Optional[Encoding] = None,
**kwargs
) -> ValidateMessageResponse:
"""
Validate a message against a schema.
Parameters:
- request: The request object for validating a message
- parent: Parent project path
- name: Schema name to validate against
- schema: Schema definition (alternative to name)
- message: Message data to validate
- encoding: Message encoding (JSON or BINARY)
Returns:
ValidateMessageResponse indicating validation result
"""Manage schema revisions and evolution.
def commit_schema(
self,
request: Optional[CommitSchemaRequest] = None,
name: Optional[str] = None,
schema: Optional[Schema] = None,
**kwargs
) -> Schema:
"""
Commit a new schema revision.
Parameters:
- request: The request object for committing a schema
- name: Schema name
- schema: New schema definition
Returns:
Updated Schema object
"""
def rollback_schema(
self,
request: Optional[RollbackSchemaRequest] = None,
name: Optional[str] = None,
revision_id: Optional[str] = None,
**kwargs
) -> Schema:
"""
Rollback a schema to a previous revision.
Parameters:
- request: The request object for rolling back a schema
- name: Schema name
- revision_id: Revision to rollback to
Returns:
Rolled back Schema object
"""
def list_schema_revisions(
self,
request: Optional[ListSchemaRevisionsRequest] = None,
name: Optional[str] = None,
**kwargs
) -> ListSchemaRevisionsResponse:
"""
List all revisions of a schema.
Parameters:
- request: The request object for listing schema revisions
- name: Schema name
Returns:
ListSchemaRevisionsResponse with schema revisions
"""
def delete_schema_revision(
self,
request: Optional[DeleteSchemaRevisionRequest] = None,
name: Optional[str] = None,
revision_id: Optional[str] = None,
**kwargs
) -> Schema:
"""
Delete a specific schema revision.
Parameters:
- request: The request object for deleting a schema revision
- name: Schema name
- revision_id: Revision to delete
Returns:
Updated Schema object
"""class Schema:
"""
A schema resource.
Attributes:
- name: Schema resource name
- type: Schema type (AVRO, PROTOCOL_BUFFER)
- definition: Schema definition string
- revision_id: Current revision ID
- revision_create_time: When revision was created
"""
name: str
type: Schema.Type
definition: str
revision_id: str
revision_create_time: Timestamp
class Encoding(Enum):
"""
Message encoding types for schema validation.
"""
JSON = "JSON"
BINARY = "BINARY"
class SchemaView(Enum):
"""
Schema view options for retrieval.
"""
BASIC = "BASIC" # Schema name and type only
FULL = "FULL" # Full schema definitionfrom google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
# Create schema service client
schema_client = pubsub_v1.SchemaServiceClient()
# Define an Avro schema
avro_schema_definition = """
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "action", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
"""
# Create schema
parent = schema_client.common_project_path("my-project")
schema = types.Schema(
type=types.Schema.Type.AVRO,
definition=avro_schema_definition
)
created_schema = schema_client.create_schema(
parent=parent,
schema=schema,
schema_id="user-events-v1"
)
print(f"Created schema: {created_schema.name}")# Define a Protocol Buffer schema
protobuf_schema_definition = """
syntax = "proto3";
message UserEvent {
string user_id = 1;
string action = 2;
int64 timestamp = 3;
}
"""
schema = types.Schema(
type=types.Schema.Type.PROTOCOL_BUFFER,
definition=protobuf_schema_definition
)
created_schema = schema_client.create_schema(
parent=parent,
schema=schema,
schema_id="user-events-protobuf-v1"
)import json
# Validate a JSON message against Avro schema
message_data = {
"user_id": "12345",
"action": "login",
"timestamp": 1640995200
}
message_bytes = json.dumps(message_data).encode('utf-8')
validation_response = schema_client.validate_message(
parent=parent,
name=created_schema.name,
message=message_bytes,
encoding=types.Encoding.JSON
)
print("Message validation successful!")# Evolve schema by adding a new field
evolved_schema_definition = """
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "action", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "session_id", "type": ["null", "string"], "default": null}
]
}
"""
evolved_schema = types.Schema(
type=types.Schema.Type.AVRO,
definition=evolved_schema_definition
)
# Commit new revision
updated_schema = schema_client.commit_schema(
name=created_schema.name,
schema=evolved_schema
)
print(f"Updated schema to revision: {updated_schema.revision_id}")# List all schemas in project
schemas_response = schema_client.list_schemas(parent=parent)
for schema in schemas_response.schemas:
print(f"Schema: {schema.name}, Type: {schema.type}")
# Get specific schema
retrieved_schema = schema_client.get_schema(
name=created_schema.name,
view=types.SchemaView.FULL
)
print(f"Schema definition: {retrieved_schema.definition}")
# List schema revisions
revisions_response = schema_client.list_schema_revisions(
name=created_schema.name
)
for revision in revisions_response.schemas:
print(f"Revision: {revision.revision_id}, Created: {revision.revision_create_time}")# Create topic with schema
publisher_client = pubsub_v1.PublisherClient()
topic_path = publisher_client.topic_path("my-project", "user-events")
schema_settings = types.SchemaSettings(
schema=created_schema.name,
encoding=types.Encoding.JSON
)
topic = types.Topic(
name=topic_path,
schema_settings=schema_settings
)
created_topic = publisher_client.create_topic(request={"name": topic_path, "topic": topic})
# Publish schema-validated message
message_data = {
"user_id": "67890",
"action": "purchase",
"timestamp": 1640998800,
"session_id": "sess_123"
}
future = publisher_client.publish(
topic_path,
json.dumps(message_data).encode('utf-8')
)
message_id = future.result()
print(f"Published validated message: {message_id}")from google.api_core import exceptions
try:
# Attempt to validate invalid message
invalid_message = json.dumps({"invalid": "structure"}).encode('utf-8')
schema_client.validate_message(
parent=parent,
name=created_schema.name,
message=invalid_message,
encoding=types.Encoding.JSON
)
except exceptions.InvalidArgument as e:
print(f"Message validation failed: {e}")
try:
# Attempt to create duplicate schema
schema_client.create_schema(
parent=parent,
schema=schema,
schema_id="user-events-v1" # Already exists
)
except exceptions.AlreadyExists as e:
print(f"Schema already exists: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-pubsub