0
# Schema Service
1
2
The SchemaServiceClient provides functionality for managing Pub/Sub schemas, including schema creation, validation, evolution, and message validation against schemas. Schemas ensure message structure consistency and enable safe schema evolution.
3
4
## Capabilities
5
6
### Client Initialization
7
8
Create and configure a SchemaServiceClient.
9
10
```python { .api }
11
class SchemaServiceClient:
12
def __init__(self, **kwargs):
13
"""
14
Initialize the schema service client.
15
16
Parameters:
17
- **kwargs: Additional arguments passed to underlying GAPIC client
18
"""
19
20
@classmethod
21
def from_service_account_file(
22
cls,
23
filename: str,
24
**kwargs
25
) -> "SchemaServiceClient":
26
"""
27
Create client from service account file.
28
29
Parameters:
30
- filename: Path to service account JSON file
31
- **kwargs: Additional arguments for client initialization
32
33
Returns:
34
SchemaServiceClient instance
35
"""
36
```
37
38
### Schema Management
39
40
Create, retrieve, update, and delete schemas.
41
42
```python { .api }
43
def create_schema(
44
self,
45
request: Optional[CreateSchemaRequest] = None,
46
parent: Optional[str] = None,
47
schema: Optional[Schema] = None,
48
schema_id: Optional[str] = None,
49
**kwargs
50
) -> Schema:
51
"""
52
Create a new schema.
53
54
Parameters:
55
- request: The request object for creating a schema
56
- parent: Parent project path (e.g., "projects/my-project")
57
- schema: Schema definition
58
- schema_id: ID for the new schema
59
60
Returns:
61
Created Schema object
62
"""
63
64
def get_schema(
65
self,
66
request: Optional[GetSchemaRequest] = None,
67
name: Optional[str] = None,
68
view: Optional[SchemaView] = None,
69
**kwargs
70
) -> Schema:
71
"""
72
Get a schema.
73
74
Parameters:
75
- request: The request object for getting a schema
76
- name: Full schema name (e.g., "projects/my-project/schemas/my-schema")
77
- view: Schema view (BASIC or FULL)
78
79
Returns:
80
Schema object
81
"""
82
83
def list_schemas(
84
self,
85
request: Optional[ListSchemasRequest] = None,
86
parent: Optional[str] = None,
87
**kwargs
88
) -> ListSchemasResponse:
89
"""
90
List schemas in a project.
91
92
Parameters:
93
- request: The request object for listing schemas
94
- parent: Parent project path
95
96
Returns:
97
ListSchemasResponse with schemas
98
"""
99
100
def delete_schema(
101
self,
102
request: Optional[DeleteSchemaRequest] = None,
103
name: Optional[str] = None,
104
**kwargs
105
) -> None:
106
"""
107
Delete a schema.
108
109
Parameters:
110
- request: The request object for deleting a schema
111
- name: Full schema name to delete
112
"""
113
```
114
115
### Schema Validation
116
117
Validate schemas and messages against schemas.
118
119
```python { .api }
120
def validate_schema(
121
self,
122
request: Optional[ValidateSchemaRequest] = None,
123
parent: Optional[str] = None,
124
schema: Optional[Schema] = None,
125
**kwargs
126
) -> ValidateSchemaResponse:
127
"""
128
Validate a schema definition.
129
130
Parameters:
131
- request: The request object for validating a schema
132
- parent: Parent project path
133
- schema: Schema to validate
134
135
Returns:
136
ValidateSchemaResponse indicating validation result
137
"""
138
139
def validate_message(
140
self,
141
request: Optional[ValidateMessageRequest] = None,
142
parent: Optional[str] = None,
143
name: Optional[str] = None,
144
schema: Optional[Schema] = None,
145
message: Optional[bytes] = None,
146
encoding: Optional[Encoding] = None,
147
**kwargs
148
) -> ValidateMessageResponse:
149
"""
150
Validate a message against a schema.
151
152
Parameters:
153
- request: The request object for validating a message
154
- parent: Parent project path
155
- name: Schema name to validate against
156
- schema: Schema definition (alternative to name)
157
- message: Message data to validate
158
- encoding: Message encoding (JSON or BINARY)
159
160
Returns:
161
ValidateMessageResponse indicating validation result
162
"""
163
```
164
165
### Schema Evolution
166
167
Manage schema revisions and evolution.
168
169
```python { .api }
170
def commit_schema(
171
self,
172
request: Optional[CommitSchemaRequest] = None,
173
name: Optional[str] = None,
174
schema: Optional[Schema] = None,
175
**kwargs
176
) -> Schema:
177
"""
178
Commit a new schema revision.
179
180
Parameters:
181
- request: The request object for committing a schema
182
- name: Schema name
183
- schema: New schema definition
184
185
Returns:
186
Updated Schema object
187
"""
188
189
def rollback_schema(
190
self,
191
request: Optional[RollbackSchemaRequest] = None,
192
name: Optional[str] = None,
193
revision_id: Optional[str] = None,
194
**kwargs
195
) -> Schema:
196
"""
197
Rollback a schema to a previous revision.
198
199
Parameters:
200
- request: The request object for rolling back a schema
201
- name: Schema name
202
- revision_id: Revision to rollback to
203
204
Returns:
205
Rolled back Schema object
206
"""
207
208
def list_schema_revisions(
209
self,
210
request: Optional[ListSchemaRevisionsRequest] = None,
211
name: Optional[str] = None,
212
**kwargs
213
) -> ListSchemaRevisionsResponse:
214
"""
215
List all revisions of a schema.
216
217
Parameters:
218
- request: The request object for listing schema revisions
219
- name: Schema name
220
221
Returns:
222
ListSchemaRevisionsResponse with schema revisions
223
"""
224
225
def delete_schema_revision(
226
self,
227
request: Optional[DeleteSchemaRevisionRequest] = None,
228
name: Optional[str] = None,
229
revision_id: Optional[str] = None,
230
**kwargs
231
) -> Schema:
232
"""
233
Delete a specific schema revision.
234
235
Parameters:
236
- request: The request object for deleting a schema revision
237
- name: Schema name
238
- revision_id: Revision to delete
239
240
Returns:
241
Updated Schema object
242
"""
243
```
244
245
## Schema Types
246
247
```python { .api }
248
class Schema:
249
"""
250
A schema resource.
251
252
Attributes:
253
- name: Schema resource name
254
- type: Schema type (AVRO, PROTOCOL_BUFFER)
255
- definition: Schema definition string
256
- revision_id: Current revision ID
257
- revision_create_time: When revision was created
258
"""
259
260
name: str
261
type: Schema.Type
262
definition: str
263
revision_id: str
264
revision_create_time: Timestamp
265
266
class Encoding(Enum):
267
"""
268
Message encoding types for schema validation.
269
"""
270
271
JSON = "JSON"
272
BINARY = "BINARY"
273
274
class SchemaView(Enum):
275
"""
276
Schema view options for retrieval.
277
"""
278
279
BASIC = "BASIC" # Schema name and type only
280
FULL = "FULL" # Full schema definition
281
```
282
283
## Usage Examples
284
285
### Creating a Schema
286
287
```python
288
from google.cloud import pubsub_v1
289
from google.cloud.pubsub_v1 import types
290
291
# Create schema service client
292
schema_client = pubsub_v1.SchemaServiceClient()
293
294
# Define an Avro schema
295
avro_schema_definition = """
296
{
297
"type": "record",
298
"name": "UserEvent",
299
"fields": [
300
{"name": "user_id", "type": "string"},
301
{"name": "action", "type": "string"},
302
{"name": "timestamp", "type": "long"}
303
]
304
}
305
"""
306
307
# Create schema
308
parent = schema_client.common_project_path("my-project")
309
schema = types.Schema(
310
type=types.Schema.Type.AVRO,
311
definition=avro_schema_definition
312
)
313
314
created_schema = schema_client.create_schema(
315
parent=parent,
316
schema=schema,
317
schema_id="user-events-v1"
318
)
319
320
print(f"Created schema: {created_schema.name}")
321
```
322
323
### Protocol Buffer Schema
324
325
```python
326
# Define a Protocol Buffer schema
327
protobuf_schema_definition = """
328
syntax = "proto3";
329
330
message UserEvent {
331
string user_id = 1;
332
string action = 2;
333
int64 timestamp = 3;
334
}
335
"""
336
337
schema = types.Schema(
338
type=types.Schema.Type.PROTOCOL_BUFFER,
339
definition=protobuf_schema_definition
340
)
341
342
created_schema = schema_client.create_schema(
343
parent=parent,
344
schema=schema,
345
schema_id="user-events-protobuf-v1"
346
)
347
```
348
349
### Validating Messages
350
351
```python
352
import json
353
354
# Validate a JSON message against Avro schema
355
message_data = {
356
"user_id": "12345",
357
"action": "login",
358
"timestamp": 1640995200
359
}
360
361
message_bytes = json.dumps(message_data).encode('utf-8')
362
363
validation_response = schema_client.validate_message(
364
parent=parent,
365
name=created_schema.name,
366
message=message_bytes,
367
encoding=types.Encoding.JSON
368
)
369
370
print("Message validation successful!")
371
```
372
373
### Schema Evolution
374
375
```python
376
# Evolve schema by adding a new field
377
evolved_schema_definition = """
378
{
379
"type": "record",
380
"name": "UserEvent",
381
"fields": [
382
{"name": "user_id", "type": "string"},
383
{"name": "action", "type": "string"},
384
{"name": "timestamp", "type": "long"},
385
{"name": "session_id", "type": ["null", "string"], "default": null}
386
]
387
}
388
"""
389
390
evolved_schema = types.Schema(
391
type=types.Schema.Type.AVRO,
392
definition=evolved_schema_definition
393
)
394
395
# Commit new revision
396
updated_schema = schema_client.commit_schema(
397
name=created_schema.name,
398
schema=evolved_schema
399
)
400
401
print(f"Updated schema to revision: {updated_schema.revision_id}")
402
```
403
404
### Schema Management
405
406
```python
407
# List all schemas in project
408
schemas_response = schema_client.list_schemas(parent=parent)
409
410
for schema in schemas_response.schemas:
411
print(f"Schema: {schema.name}, Type: {schema.type}")
412
413
# Get specific schema
414
retrieved_schema = schema_client.get_schema(
415
name=created_schema.name,
416
view=types.SchemaView.FULL
417
)
418
419
print(f"Schema definition: {retrieved_schema.definition}")
420
421
# List schema revisions
422
revisions_response = schema_client.list_schema_revisions(
423
name=created_schema.name
424
)
425
426
for revision in revisions_response.schemas:
427
print(f"Revision: {revision.revision_id}, Created: {revision.revision_create_time}")
428
```
429
430
### Topic with Schema
431
432
```python
433
# Create topic with schema
434
publisher_client = pubsub_v1.PublisherClient()
435
436
topic_path = publisher_client.topic_path("my-project", "user-events")
437
schema_settings = types.SchemaSettings(
438
schema=created_schema.name,
439
encoding=types.Encoding.JSON
440
)
441
442
topic = types.Topic(
443
name=topic_path,
444
schema_settings=schema_settings
445
)
446
447
created_topic = publisher_client.create_topic(request={"name": topic_path, "topic": topic})
448
449
# Publish schema-validated message
450
message_data = {
451
"user_id": "67890",
452
"action": "purchase",
453
"timestamp": 1640998800,
454
"session_id": "sess_123"
455
}
456
457
future = publisher_client.publish(
458
topic_path,
459
json.dumps(message_data).encode('utf-8')
460
)
461
462
message_id = future.result()
463
print(f"Published validated message: {message_id}")
464
```
465
466
### Error Handling
467
468
```python
469
from google.api_core import exceptions
470
471
try:
472
# Attempt to validate invalid message
473
invalid_message = json.dumps({"invalid": "structure"}).encode('utf-8')
474
475
schema_client.validate_message(
476
parent=parent,
477
name=created_schema.name,
478
message=invalid_message,
479
encoding=types.Encoding.JSON
480
)
481
482
except exceptions.InvalidArgument as e:
483
print(f"Message validation failed: {e}")
484
485
try:
486
# Attempt to create duplicate schema
487
schema_client.create_schema(
488
parent=parent,
489
schema=schema,
490
schema_id="user-events-v1" # Already exists
491
)
492
493
except exceptions.AlreadyExists as e:
494
print(f"Schema already exists: {e}")
495
```