CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-pubsub

Google Cloud Pub/Sub API client library for reliable, many-to-many, asynchronous messaging between applications

Pending
Overview
Eval results
Files

schema-service.mddocs/

Schema Service

The SchemaServiceClient provides functionality for managing Pub/Sub schemas, including schema creation, validation, evolution, and message validation against schemas. Schemas ensure message structure consistency and enable safe schema evolution.

Capabilities

Client Initialization

Create and configure a SchemaServiceClient.

class SchemaServiceClient:
    def __init__(self, **kwargs):
        """
        Initialize the schema service client.

        Parameters:
        - **kwargs: Additional arguments passed to underlying GAPIC client
        """

    @classmethod
    def from_service_account_file(
        cls,
        filename: str,
        **kwargs
    ) -> "SchemaServiceClient":
        """
        Create client from service account file.

        Parameters:
        - filename: Path to service account JSON file
        - **kwargs: Additional arguments for client initialization

        Returns:
        SchemaServiceClient instance
        """

Schema Management

Create, retrieve, update, and delete schemas.

def create_schema(
    self,
    request: Optional[CreateSchemaRequest] = None,
    parent: Optional[str] = None,
    schema: Optional[Schema] = None,
    schema_id: Optional[str] = None,
    **kwargs
) -> Schema:
    """
    Create a new schema.

    Parameters:
    - request: The request object for creating a schema
    - parent: Parent project path (e.g., "projects/my-project")
    - schema: Schema definition
    - schema_id: ID for the new schema

    Returns:
    Created Schema object
    """

def get_schema(
    self,
    request: Optional[GetSchemaRequest] = None,
    name: Optional[str] = None,
    view: Optional[SchemaView] = None,
    **kwargs
) -> Schema:
    """
    Get a schema.

    Parameters:
    - request: The request object for getting a schema
    - name: Full schema name (e.g., "projects/my-project/schemas/my-schema")
    - view: Schema view (BASIC or FULL)

    Returns:
    Schema object
    """

def list_schemas(
    self,
    request: Optional[ListSchemasRequest] = None,
    parent: Optional[str] = None,
    **kwargs
) -> ListSchemasResponse:
    """
    List schemas in a project.

    Parameters:
    - request: The request object for listing schemas
    - parent: Parent project path

    Returns:
    ListSchemasResponse with schemas
    """

def delete_schema(
    self,
    request: Optional[DeleteSchemaRequest] = None,
    name: Optional[str] = None,
    **kwargs
) -> None:
    """
    Delete a schema.

    Parameters:
    - request: The request object for deleting a schema
    - name: Full schema name to delete
    """

Schema Validation

Validate schemas and messages against schemas.

def validate_schema(
    self,
    request: Optional[ValidateSchemaRequest] = None,
    parent: Optional[str] = None,
    schema: Optional[Schema] = None,
    **kwargs
) -> ValidateSchemaResponse:
    """
    Validate a schema definition.

    Parameters:
    - request: The request object for validating a schema
    - parent: Parent project path
    - schema: Schema to validate

    Returns:
    ValidateSchemaResponse indicating validation result
    """

def validate_message(
    self,
    request: Optional[ValidateMessageRequest] = None,
    parent: Optional[str] = None,
    name: Optional[str] = None,
    schema: Optional[Schema] = None,
    message: Optional[bytes] = None,
    encoding: Optional[Encoding] = None,
    **kwargs
) -> ValidateMessageResponse:
    """
    Validate a message against a schema.

    Parameters:
    - request: The request object for validating a message
    - parent: Parent project path
    - name: Schema name to validate against
    - schema: Schema definition (alternative to name)
    - message: Message data to validate
    - encoding: Message encoding (JSON or BINARY)

    Returns:
    ValidateMessageResponse indicating validation result
    """

Schema Evolution

Manage schema revisions and evolution.

def commit_schema(
    self,
    request: Optional[CommitSchemaRequest] = None,
    name: Optional[str] = None,
    schema: Optional[Schema] = None,
    **kwargs
) -> Schema:
    """
    Commit a new schema revision.

    Parameters:
    - request: The request object for committing a schema
    - name: Schema name
    - schema: New schema definition

    Returns:
    Updated Schema object
    """

def rollback_schema(
    self,
    request: Optional[RollbackSchemaRequest] = None,
    name: Optional[str] = None,
    revision_id: Optional[str] = None,
    **kwargs
) -> Schema:
    """
    Rollback a schema to a previous revision.

    Parameters:
    - request: The request object for rolling back a schema
    - name: Schema name
    - revision_id: Revision to rollback to

    Returns:
    Rolled back Schema object
    """

def list_schema_revisions(
    self,
    request: Optional[ListSchemaRevisionsRequest] = None,
    name: Optional[str] = None,
    **kwargs
) -> ListSchemaRevisionsResponse:
    """
    List all revisions of a schema.

    Parameters:
    - request: The request object for listing schema revisions
    - name: Schema name

    Returns:
    ListSchemaRevisionsResponse with schema revisions
    """

def delete_schema_revision(
    self,
    request: Optional[DeleteSchemaRevisionRequest] = None,
    name: Optional[str] = None,
    revision_id: Optional[str] = None,
    **kwargs
) -> Schema:
    """
    Delete a specific schema revision.

    Parameters:
    - request: The request object for deleting a schema revision
    - name: Schema name
    - revision_id: Revision to delete

    Returns:
    Updated Schema object
    """

Schema Types

class Schema:
    """
    A schema resource.

    Attributes:
    - name: Schema resource name
    - type: Schema type (AVRO, PROTOCOL_BUFFER)
    - definition: Schema definition string
    - revision_id: Current revision ID
    - revision_create_time: When revision was created
    """
    
    name: str
    type: Schema.Type
    definition: str
    revision_id: str
    revision_create_time: Timestamp

class Encoding(Enum):
    """
    Message encoding types for schema validation.
    """
    
    JSON = "JSON"
    BINARY = "BINARY"

class SchemaView(Enum):
    """
    Schema view options for retrieval.
    """
    
    BASIC = "BASIC"      # Schema name and type only
    FULL = "FULL"        # Full schema definition

Usage Examples

Creating a Schema

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types

# Create schema service client
schema_client = pubsub_v1.SchemaServiceClient()

# Define an Avro schema
avro_schema_definition = """
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "action", "type": "string"},
    {"name": "timestamp", "type": "long"}
  ]
}
"""

# Create schema
parent = schema_client.common_project_path("my-project")
schema = types.Schema(
    type=types.Schema.Type.AVRO,
    definition=avro_schema_definition
)

created_schema = schema_client.create_schema(
    parent=parent,
    schema=schema,
    schema_id="user-events-v1"
)

print(f"Created schema: {created_schema.name}")

Protocol Buffer Schema

# Define a Protocol Buffer schema
protobuf_schema_definition = """
syntax = "proto3";

message UserEvent {
  string user_id = 1;
  string action = 2;
  int64 timestamp = 3;
}
"""

schema = types.Schema(
    type=types.Schema.Type.PROTOCOL_BUFFER,
    definition=protobuf_schema_definition
)

created_schema = schema_client.create_schema(
    parent=parent,
    schema=schema,
    schema_id="user-events-protobuf-v1"
)

Validating Messages

import json

# Validate a JSON message against Avro schema
message_data = {
    "user_id": "12345",
    "action": "login",
    "timestamp": 1640995200
}

message_bytes = json.dumps(message_data).encode('utf-8')

validation_response = schema_client.validate_message(
    parent=parent,
    name=created_schema.name,
    message=message_bytes,
    encoding=types.Encoding.JSON
)

print("Message validation successful!")

Schema Evolution

# Evolve schema by adding a new field
evolved_schema_definition = """
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "action", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "session_id", "type": ["null", "string"], "default": null}
  ]
}
"""

evolved_schema = types.Schema(
    type=types.Schema.Type.AVRO,
    definition=evolved_schema_definition
)

# Commit new revision
updated_schema = schema_client.commit_schema(
    name=created_schema.name,
    schema=evolved_schema
)

print(f"Updated schema to revision: {updated_schema.revision_id}")

Schema Management

# List all schemas in project
schemas_response = schema_client.list_schemas(parent=parent)

for schema in schemas_response.schemas:
    print(f"Schema: {schema.name}, Type: {schema.type}")

# Get specific schema
retrieved_schema = schema_client.get_schema(
    name=created_schema.name,
    view=types.SchemaView.FULL
)

print(f"Schema definition: {retrieved_schema.definition}")

# List schema revisions
revisions_response = schema_client.list_schema_revisions(
    name=created_schema.name
)

for revision in revisions_response.schemas:
    print(f"Revision: {revision.revision_id}, Created: {revision.revision_create_time}")

Topic with Schema

# Create topic with schema
publisher_client = pubsub_v1.PublisherClient()

topic_path = publisher_client.topic_path("my-project", "user-events")
schema_settings = types.SchemaSettings(
    schema=created_schema.name,
    encoding=types.Encoding.JSON
)

topic = types.Topic(
    name=topic_path,
    schema_settings=schema_settings
)

created_topic = publisher_client.create_topic(request={"name": topic_path, "topic": topic})

# Publish schema-validated message
message_data = {
    "user_id": "67890",
    "action": "purchase",
    "timestamp": 1640998800,
    "session_id": "sess_123"
}

future = publisher_client.publish(
    topic_path,
    json.dumps(message_data).encode('utf-8')
)

message_id = future.result()
print(f"Published validated message: {message_id}")

Error Handling

from google.api_core import exceptions

try:
    # Attempt to validate invalid message
    invalid_message = json.dumps({"invalid": "structure"}).encode('utf-8')
    
    schema_client.validate_message(
        parent=parent,
        name=created_schema.name,
        message=invalid_message,
        encoding=types.Encoding.JSON
    )
    
except exceptions.InvalidArgument as e:
    print(f"Message validation failed: {e}")

try:
    # Attempt to create duplicate schema
    schema_client.create_schema(
        parent=parent,
        schema=schema,
        schema_id="user-events-v1"  # Already exists
    )
    
except exceptions.AlreadyExists as e:
    print(f"Schema already exists: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-pubsub

docs

configuration.md

exceptions.md

index.md

message-handling.md

publisher.md

schedulers.md

schema-service.md

subscriber.md

types.md

tile.json