or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdindex.mdmessage-handling.mdpublisher.mdschedulers.mdschema-service.mdsubscriber.mdtypes.md

schema-service.mddocs/

0

# Schema Service

1

2

The SchemaServiceClient provides functionality for managing Pub/Sub schemas, including schema creation, validation, evolution, and message validation against schemas. Schemas ensure message structure consistency and enable safe schema evolution.

3

4

## Capabilities

5

6

### Client Initialization

7

8

Create and configure a SchemaServiceClient.

9

10

```python { .api }

11

class SchemaServiceClient:

12

def __init__(self, **kwargs):

13

"""

14

Initialize the schema service client.

15

16

Parameters:

17

- **kwargs: Additional arguments passed to underlying GAPIC client

18

"""

19

20

@classmethod

21

def from_service_account_file(

22

cls,

23

filename: str,

24

**kwargs

25

) -> "SchemaServiceClient":

26

"""

27

Create client from service account file.

28

29

Parameters:

30

- filename: Path to service account JSON file

31

- **kwargs: Additional arguments for client initialization

32

33

Returns:

34

SchemaServiceClient instance

35

"""

36

```

37

38

### Schema Management

39

40

Create, retrieve, update, and delete schemas.

41

42

```python { .api }

43

def create_schema(

44

self,

45

request: Optional[CreateSchemaRequest] = None,

46

parent: Optional[str] = None,

47

schema: Optional[Schema] = None,

48

schema_id: Optional[str] = None,

49

**kwargs

50

) -> Schema:

51

"""

52

Create a new schema.

53

54

Parameters:

55

- request: The request object for creating a schema

56

- parent: Parent project path (e.g., "projects/my-project")

57

- schema: Schema definition

58

- schema_id: ID for the new schema

59

60

Returns:

61

Created Schema object

62

"""

63

64

def get_schema(

65

self,

66

request: Optional[GetSchemaRequest] = None,

67

name: Optional[str] = None,

68

view: Optional[SchemaView] = None,

69

**kwargs

70

) -> Schema:

71

"""

72

Get a schema.

73

74

Parameters:

75

- request: The request object for getting a schema

76

- name: Full schema name (e.g., "projects/my-project/schemas/my-schema")

77

- view: Schema view (BASIC or FULL)

78

79

Returns:

80

Schema object

81

"""

82

83

def list_schemas(

84

self,

85

request: Optional[ListSchemasRequest] = None,

86

parent: Optional[str] = None,

87

**kwargs

88

) -> ListSchemasResponse:

89

"""

90

List schemas in a project.

91

92

Parameters:

93

- request: The request object for listing schemas

94

- parent: Parent project path

95

96

Returns:

97

ListSchemasResponse with schemas

98

"""

99

100

def delete_schema(

101

self,

102

request: Optional[DeleteSchemaRequest] = None,

103

name: Optional[str] = None,

104

**kwargs

105

) -> None:

106

"""

107

Delete a schema.

108

109

Parameters:

110

- request: The request object for deleting a schema

111

- name: Full schema name to delete

112

"""

113

```

114

115

### Schema Validation

116

117

Validate schemas and messages against schemas.

118

119

```python { .api }

120

def validate_schema(

121

self,

122

request: Optional[ValidateSchemaRequest] = None,

123

parent: Optional[str] = None,

124

schema: Optional[Schema] = None,

125

**kwargs

126

) -> ValidateSchemaResponse:

127

"""

128

Validate a schema definition.

129

130

Parameters:

131

- request: The request object for validating a schema

132

- parent: Parent project path

133

- schema: Schema to validate

134

135

Returns:

136

ValidateSchemaResponse indicating validation result

137

"""

138

139

def validate_message(

140

self,

141

request: Optional[ValidateMessageRequest] = None,

142

parent: Optional[str] = None,

143

name: Optional[str] = None,

144

schema: Optional[Schema] = None,

145

message: Optional[bytes] = None,

146

encoding: Optional[Encoding] = None,

147

**kwargs

148

) -> ValidateMessageResponse:

149

"""

150

Validate a message against a schema.

151

152

Parameters:

153

- request: The request object for validating a message

154

- parent: Parent project path

155

- name: Schema name to validate against

156

- schema: Schema definition (alternative to name)

157

- message: Message data to validate

158

- encoding: Message encoding (JSON or BINARY)

159

160

Returns:

161

ValidateMessageResponse indicating validation result

162

"""

163

```

164

165

### Schema Evolution

166

167

Manage schema revisions and evolution.

168

169

```python { .api }

170

def commit_schema(

171

self,

172

request: Optional[CommitSchemaRequest] = None,

173

name: Optional[str] = None,

174

schema: Optional[Schema] = None,

175

**kwargs

176

) -> Schema:

177

"""

178

Commit a new schema revision.

179

180

Parameters:

181

- request: The request object for committing a schema

182

- name: Schema name

183

- schema: New schema definition

184

185

Returns:

186

Updated Schema object

187

"""

188

189

def rollback_schema(

190

self,

191

request: Optional[RollbackSchemaRequest] = None,

192

name: Optional[str] = None,

193

revision_id: Optional[str] = None,

194

**kwargs

195

) -> Schema:

196

"""

197

Rollback a schema to a previous revision.

198

199

Parameters:

200

- request: The request object for rolling back a schema

201

- name: Schema name

202

- revision_id: Revision to rollback to

203

204

Returns:

205

Rolled back Schema object

206

"""

207

208

def list_schema_revisions(

209

self,

210

request: Optional[ListSchemaRevisionsRequest] = None,

211

name: Optional[str] = None,

212

**kwargs

213

) -> ListSchemaRevisionsResponse:

214

"""

215

List all revisions of a schema.

216

217

Parameters:

218

- request: The request object for listing schema revisions

219

- name: Schema name

220

221

Returns:

222

ListSchemaRevisionsResponse with schema revisions

223

"""

224

225

def delete_schema_revision(

226

self,

227

request: Optional[DeleteSchemaRevisionRequest] = None,

228

name: Optional[str] = None,

229

revision_id: Optional[str] = None,

230

**kwargs

231

) -> Schema:

232

"""

233

Delete a specific schema revision.

234

235

Parameters:

236

- request: The request object for deleting a schema revision

237

- name: Schema name

238

- revision_id: Revision to delete

239

240

Returns:

241

Updated Schema object

242

"""

243

```

244

245

## Schema Types

246

247

```python { .api }

248

class Schema:

249

"""

250

A schema resource.

251

252

Attributes:

253

- name: Schema resource name

254

- type: Schema type (AVRO, PROTOCOL_BUFFER)

255

- definition: Schema definition string

256

- revision_id: Current revision ID

257

- revision_create_time: When revision was created

258

"""

259

260

name: str

261

type: Schema.Type

262

definition: str

263

revision_id: str

264

revision_create_time: Timestamp

265

266

class Encoding(Enum):

267

"""

268

Message encoding types for schema validation.

269

"""

270

271

JSON = "JSON"

272

BINARY = "BINARY"

273

274

class SchemaView(Enum):

275

"""

276

Schema view options for retrieval.

277

"""

278

279

BASIC = "BASIC" # Schema name and type only

280

FULL = "FULL" # Full schema definition

281

```

282

283

## Usage Examples

284

285

### Creating a Schema

286

287

```python

288

from google.cloud import pubsub_v1

289

from google.cloud.pubsub_v1 import types

290

291

# Create schema service client

292

schema_client = pubsub_v1.SchemaServiceClient()

293

294

# Define an Avro schema

295

avro_schema_definition = """

296

{

297

"type": "record",

298

"name": "UserEvent",

299

"fields": [

300

{"name": "user_id", "type": "string"},

301

{"name": "action", "type": "string"},

302

{"name": "timestamp", "type": "long"}

303

]

304

}

305

"""

306

307

# Create schema

308

parent = schema_client.common_project_path("my-project")

309

schema = types.Schema(

310

type=types.Schema.Type.AVRO,

311

definition=avro_schema_definition

312

)

313

314

created_schema = schema_client.create_schema(

315

parent=parent,

316

schema=schema,

317

schema_id="user-events-v1"

318

)

319

320

print(f"Created schema: {created_schema.name}")

321

```

322

323

### Protocol Buffer Schema

324

325

```python

326

# Define a Protocol Buffer schema

327

protobuf_schema_definition = """

328

syntax = "proto3";

329

330

message UserEvent {

331

string user_id = 1;

332

string action = 2;

333

int64 timestamp = 3;

334

}

335

"""

336

337

schema = types.Schema(

338

type=types.Schema.Type.PROTOCOL_BUFFER,

339

definition=protobuf_schema_definition

340

)

341

342

created_schema = schema_client.create_schema(

343

parent=parent,

344

schema=schema,

345

schema_id="user-events-protobuf-v1"

346

)

347

```

348

349

### Validating Messages

350

351

```python

352

import json

353

354

# Validate a JSON message against Avro schema

355

message_data = {

356

"user_id": "12345",

357

"action": "login",

358

"timestamp": 1640995200

359

}

360

361

message_bytes = json.dumps(message_data).encode('utf-8')

362

363

validation_response = schema_client.validate_message(

364

parent=parent,

365

name=created_schema.name,

366

message=message_bytes,

367

encoding=types.Encoding.JSON

368

)

369

370

print("Message validation successful!")

371

```

372

373

### Schema Evolution

374

375

```python

376

# Evolve schema by adding a new field

377

evolved_schema_definition = """

378

{

379

"type": "record",

380

"name": "UserEvent",

381

"fields": [

382

{"name": "user_id", "type": "string"},

383

{"name": "action", "type": "string"},

384

{"name": "timestamp", "type": "long"},

385

{"name": "session_id", "type": ["null", "string"], "default": null}

386

]

387

}

388

"""

389

390

evolved_schema = types.Schema(

391

type=types.Schema.Type.AVRO,

392

definition=evolved_schema_definition

393

)

394

395

# Commit new revision

396

updated_schema = schema_client.commit_schema(

397

name=created_schema.name,

398

schema=evolved_schema

399

)

400

401

print(f"Updated schema to revision: {updated_schema.revision_id}")

402

```

403

404

### Schema Management

405

406

```python

407

# List all schemas in project

408

schemas_response = schema_client.list_schemas(parent=parent)

409

410

for schema in schemas_response.schemas:

411

print(f"Schema: {schema.name}, Type: {schema.type}")

412

413

# Get specific schema

414

retrieved_schema = schema_client.get_schema(

415

name=created_schema.name,

416

view=types.SchemaView.FULL

417

)

418

419

print(f"Schema definition: {retrieved_schema.definition}")

420

421

# List schema revisions

422

revisions_response = schema_client.list_schema_revisions(

423

name=created_schema.name

424

)

425

426

for revision in revisions_response.schemas:

427

print(f"Revision: {revision.revision_id}, Created: {revision.revision_create_time}")

428

```

429

430

### Topic with Schema

431

432

```python

433

# Create topic with schema

434

publisher_client = pubsub_v1.PublisherClient()

435

436

topic_path = publisher_client.topic_path("my-project", "user-events")

437

schema_settings = types.SchemaSettings(

438

schema=created_schema.name,

439

encoding=types.Encoding.JSON

440

)

441

442

topic = types.Topic(

443

name=topic_path,

444

schema_settings=schema_settings

445

)

446

447

created_topic = publisher_client.create_topic(request={"name": topic_path, "topic": topic})

448

449

# Publish schema-validated message

450

message_data = {

451

"user_id": "67890",

452

"action": "purchase",

453

"timestamp": 1640998800,

454

"session_id": "sess_123"

455

}

456

457

future = publisher_client.publish(

458

topic_path,

459

json.dumps(message_data).encode('utf-8')

460

)

461

462

message_id = future.result()

463

print(f"Published validated message: {message_id}")

464

```

465

466

### Error Handling

467

468

```python

469

from google.api_core import exceptions

470

471

try:

472

# Attempt to validate invalid message

473

invalid_message = json.dumps({"invalid": "structure"}).encode('utf-8')

474

475

schema_client.validate_message(

476

parent=parent,

477

name=created_schema.name,

478

message=invalid_message,

479

encoding=types.Encoding.JSON

480

)

481

482

except exceptions.InvalidArgument as e:

483

print(f"Message validation failed: {e}")

484

485

try:

486

# Attempt to create duplicate schema

487

schema_client.create_schema(

488

parent=parent,

489

schema=schema,

490

schema_id="user-events-v1" # Already exists

491

)

492

493

except exceptions.AlreadyExists as e:

494

print(f"Schema already exists: {e}")

495

```