or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-client.mdcore-producer-consumer.mderror-handling.mdindex.mdschema-registry.mdserialization.md

schema-registry.mddocs/

0

# Schema Registry Integration

1

2

Complete integration with Confluent Schema Registry supporting schema evolution, compatibility checking, and automatic serialization/deserialization for Avro, JSON Schema, and Protobuf formats.

3

4

## Capabilities

5

6

### SchemaRegistryClient

7

8

Synchronous client for interacting with Confluent Schema Registry.

9

10

```python { .api }

11

class SchemaRegistryClient:

12

def __init__(self, conf):

13

"""

14

Create SchemaRegistryClient instance.

15

16

Args:

17

conf (dict): Configuration properties including 'url' and optional auth settings

18

"""

19

20

def register_schema(self, subject_name, schema, normalize_schemas=False):

21

"""

22

Register a schema for the specified subject.

23

24

Args:

25

subject_name (str): Subject name for the schema

26

schema (Schema): Schema object to register

27

normalize_schemas (bool): Whether to normalize the schema

28

29

Returns:

30

int: Schema ID assigned by registry

31

32

Raises:

33

SchemaRegistryError: If registration fails

34

"""

35

36

def get_latest_version(self, subject_name):

37

"""

38

Get the latest schema version for a subject.

39

40

Args:

41

subject_name (str): Subject name

42

43

Returns:

44

RegisteredSchema: Latest registered schema

45

46

Raises:

47

SchemaRegistryError: If subject not found

48

"""

49

50

def get_version(self, subject_name, version):

51

"""

52

Get a specific schema version for a subject.

53

54

Args:

55

subject_name (str): Subject name

56

version (int): Schema version number

57

58

Returns:

59

RegisteredSchema: Registered schema at specified version

60

61

Raises:

62

SchemaRegistryError: If schema version not found

63

"""

64

65

def get_schema(self, schema_id, fetch_max_id=True):

66

"""

67

Get schema by ID.

68

69

Args:

70

schema_id (int): Schema ID

71

fetch_max_id (bool): Whether to fetch maximum schema ID

72

73

Returns:

74

Schema: Schema object

75

76

Raises:

77

SchemaRegistryError: If schema not found

78

"""

79

80

def get_subjects(self):

81

"""

82

Get list of all subjects.

83

84

Returns:

85

list: List of subject names

86

87

Raises:

88

SchemaRegistryError: If request fails

89

"""

90

91

def delete_subject(self, subject_name, permanent=False):

92

"""

93

Delete a subject.

94

95

Args:

96

subject_name (str): Subject name to delete

97

permanent (bool): Whether to permanently delete

98

99

Returns:

100

list: List of deleted version numbers

101

102

Raises:

103

SchemaRegistryError: If deletion fails

104

"""

105

106

def delete_version(self, subject_name, version, permanent=False):

107

"""

108

Delete a specific version of a subject.

109

110

Args:

111

subject_name (str): Subject name

112

version (int): Version number to delete

113

permanent (bool): Whether to permanently delete

114

115

Returns:

116

int: Deleted version number

117

118

Raises:

119

SchemaRegistryError: If deletion fails

120

"""

121

122

def get_compatibility(self, subject_name=None):

123

"""

124

Get compatibility level for subject or global default.

125

126

Args:

127

subject_name (str, optional): Subject name (None for global)

128

129

Returns:

130

str: Compatibility level

131

132

Raises:

133

SchemaRegistryError: If request fails

134

"""

135

136

def set_compatibility(self, subject_name=None, level=None):

137

"""

138

Set compatibility level for subject or global default.

139

140

Args:

141

subject_name (str, optional): Subject name (None for global)

142

level (str): Compatibility level to set

143

144

Returns:

145

str: Updated compatibility level

146

147

Raises:

148

SchemaRegistryError: If update fails

149

"""

150

151

def test_compatibility(self, subject_name, schema, version='latest'):

152

"""

153

Test schema compatibility with subject.

154

155

Args:

156

subject_name (str): Subject name

157

schema (Schema): Schema to test

158

version (str|int): Version to test against

159

160

Returns:

161

bool: True if compatible, False otherwise

162

163

Raises:

164

SchemaRegistryError: If test fails

165

"""

166

```

167

168

### AsyncSchemaRegistryClient

169

170

Asynchronous client for Schema Registry operations.

171

172

```python { .api }

173

class AsyncSchemaRegistryClient:

174

def __init__(self, conf):

175

"""

176

Create AsyncSchemaRegistryClient instance.

177

178

Args:

179

conf (dict): Configuration properties

180

"""

181

182

async def register_schema(self, subject_name, schema, normalize_schemas=False):

183

"""

184

Async version of register_schema.

185

186

Returns:

187

int: Schema ID

188

"""

189

190

async def get_latest_version(self, subject_name):

191

"""

192

Async version of get_latest_version.

193

194

Returns:

195

RegisteredSchema: Latest registered schema

196

"""

197

198

async def get_schema(self, schema_id, fetch_max_id=True):

199

"""

200

Async version of get_schema.

201

202

Returns:

203

Schema: Schema object

204

"""

205

206

async def close(self):

207

"""Close the async client and cleanup resources."""

208

```

209

210

### Schema Classes

211

212

#### Schema

213

214

Represents a schema with its type and definition.

215

216

```python { .api }

217

class Schema:

218

def __init__(self, schema_str, schema_type, references=None):

219

"""

220

Create Schema object.

221

222

Args:

223

schema_str (str): Schema definition string

224

schema_type (str): Schema type ('AVRO', 'JSON', 'PROTOBUF')

225

references (list, optional): List of schema references

226

"""

227

228

@property

229

def schema_str(self):

230

"""Schema definition string."""

231

232

@property

233

def schema_type(self):

234

"""Schema type."""

235

236

@property

237

def references(self):

238

"""Schema references."""

239

240

def __eq__(self, other):

241

"""Equality comparison."""

242

243

def __hash__(self):

244

"""Hash for use in sets and dicts."""

245

```

246

247

#### RegisteredSchema

248

249

Schema with registry metadata.

250

251

```python { .api }

252

class RegisteredSchema:

253

def __init__(self, schema_id, schema, version, subject):

254

"""

255

Create RegisteredSchema object.

256

257

Args:

258

schema_id (int): Schema ID in registry

259

schema (Schema): Schema object

260

version (int): Schema version

261

subject (str): Subject name

262

"""

263

264

@property

265

def schema_id(self):

266

"""Schema ID."""

267

268

@property

269

def schema(self):

270

"""Schema object."""

271

272

@property

273

def version(self):

274

"""Schema version."""

275

276

@property

277

def subject(self):

278

"""Subject name."""

279

```

280

281

### Avro Serialization

282

283

#### AvroSerializer

284

285

Serializes Python objects to Avro binary format with Schema Registry integration.

286

287

```python { .api }

288

class AvroSerializer:

289

def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):

290

"""

291

Create AvroSerializer.

292

293

Args:

294

schema_registry_client (SchemaRegistryClient): Registry client

295

schema_str (str): Avro schema definition

296

to_dict (callable, optional): Function to convert object to dict

297

conf (dict, optional): Serializer configuration

298

"""

299

300

def __call__(self, obj, ctx):

301

"""

302

Serialize object to Avro bytes.

303

304

Args:

305

obj: Object to serialize

306

ctx (SerializationContext): Serialization context

307

308

Returns:

309

bytes: Serialized data with schema ID prefix

310

311

Raises:

312

SerializationError: If serialization fails

313

"""

314

```

315

316

#### AvroDeserializer

317

318

Deserializes Avro binary data to Python objects using Schema Registry.

319

320

```python { .api }

321

class AvroDeserializer:

322

def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):

323

"""

324

Create AvroDeserializer.

325

326

Args:

327

schema_registry_client (SchemaRegistryClient): Registry client

328

schema_str (str, optional): Reader schema definition

329

from_dict (callable, optional): Function to convert dict to object

330

return_record_name (bool): Whether to return record name

331

"""

332

333

def __call__(self, value, ctx):

334

"""

335

Deserialize Avro bytes to object.

336

337

Args:

338

value (bytes): Serialized data with schema ID prefix

339

ctx (SerializationContext): Serialization context

340

341

Returns:

342

object: Deserialized object

343

344

Raises:

345

SerializationError: If deserialization fails

346

"""

347

```

348

349

### JSON Schema Serialization

350

351

#### JSONSerializer

352

353

Serializes Python objects to JSON with Schema Registry integration.

354

355

```python { .api }

356

class JSONSerializer:

357

def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):

358

"""

359

Create JSONSerializer.

360

361

Args:

362

schema_registry_client (SchemaRegistryClient): Registry client

363

schema_str (str): JSON schema definition

364

to_dict (callable, optional): Function to convert object to dict

365

conf (dict, optional): Serializer configuration

366

"""

367

368

def __call__(self, obj, ctx):

369

"""

370

Serialize object to JSON bytes.

371

372

Args:

373

obj: Object to serialize

374

ctx (SerializationContext): Serialization context

375

376

Returns:

377

bytes: Serialized JSON data with schema ID prefix

378

379

Raises:

380

SerializationError: If serialization fails

381

"""

382

```

383

384

#### JSONDeserializer

385

386

Deserializes JSON data to Python objects using Schema Registry.

387

388

```python { .api }

389

class JSONDeserializer:

390

def __init__(self, schema_registry_client, schema_str=None, from_dict=None):

391

"""

392

Create JSONDeserializer.

393

394

Args:

395

schema_registry_client (SchemaRegistryClient): Registry client

396

schema_str (str, optional): JSON schema definition

397

from_dict (callable, optional): Function to convert dict to object

398

"""

399

400

def __call__(self, value, ctx):

401

"""

402

Deserialize JSON bytes to object.

403

404

Args:

405

value (bytes): Serialized JSON data with schema ID prefix

406

ctx (SerializationContext): Serialization context

407

408

Returns:

409

object: Deserialized object

410

411

Raises:

412

SerializationError: If deserialization fails

413

"""

414

```

415

416

### Protobuf Serialization

417

418

#### ProtobufSerializer

419

420

Serializes Protobuf messages with Schema Registry integration.

421

422

```python { .api }

423

class ProtobufSerializer:

424

def __init__(self, msg_type, schema_registry_client, conf=None):

425

"""

426

Create ProtobufSerializer.

427

428

Args:

429

msg_type: Protobuf message class

430

schema_registry_client (SchemaRegistryClient): Registry client

431

conf (dict, optional): Serializer configuration

432

"""

433

434

def __call__(self, obj, ctx):

435

"""

436

Serialize Protobuf message to bytes.

437

438

Args:

439

obj: Protobuf message instance

440

ctx (SerializationContext): Serialization context

441

442

Returns:

443

bytes: Serialized data with schema ID prefix

444

445

Raises:

446

SerializationError: If serialization fails

447

"""

448

```

449

450

#### ProtobufDeserializer

451

452

Deserializes Protobuf binary data using Schema Registry.

453

454

```python { .api }

455

class ProtobufDeserializer:

456

def __init__(self, msg_type, schema_registry_client, conf=None):

457

"""

458

Create ProtobufDeserializer.

459

460

Args:

461

msg_type: Protobuf message class

462

schema_registry_client (SchemaRegistryClient): Registry client

463

conf (dict, optional): Deserializer configuration

464

"""

465

466

def __call__(self, value, ctx):

467

"""

468

Deserialize Protobuf bytes to message.

469

470

Args:

471

value (bytes): Serialized data with schema ID prefix

472

ctx (SerializationContext): Serialization context

473

474

Returns:

475

object: Deserialized Protobuf message

476

477

Raises:

478

SerializationError: If deserialization fails

479

"""

480

```

481

482

### Subject Naming Strategies

483

484

Functions for generating subject names for schemas.

485

486

```python { .api }

487

def topic_subject_name_strategy(ctx, record_name):

488

"""

489

Create subject name as {topic}-{key|value}.

490

491

Args:

492

ctx (SerializationContext): Context with topic and field info

493

record_name (str): Record name (unused)

494

495

Returns:

496

str: Subject name in format 'topic-key' or 'topic-value'

497

"""

498

499

def topic_record_subject_name_strategy(ctx, record_name):

500

"""

501

Create subject name as {topic}-{record_name}.

502

503

Args:

504

ctx (SerializationContext): Context with topic info

505

record_name (str): Record name from schema

506

507

Returns:

508

str: Subject name in format 'topic-recordname'

509

"""

510

511

def record_subject_name_strategy(ctx, record_name):

512

"""

513

Create subject name as {record_name}.

514

515

Args:

516

ctx (SerializationContext): Context (unused)

517

record_name (str): Record name from schema

518

519

Returns:

520

str: Subject name as record name

521

"""

522

```

523

524

### Schema ID Serialization

525

526

Functions for handling schema ID serialization in messages.

527

528

```python { .api }

529

def prefix_schema_id_serializer(schema_id, data):

530

"""

531

Serialize schema ID into payload prefix.

532

533

Args:

534

schema_id (int): Schema ID to serialize

535

data (bytes): Message payload

536

537

Returns:

538

bytes: Data with 5-byte schema ID prefix

539

"""

540

541

def header_schema_id_serializer(schema_id, data):

542

"""

543

Serialize schema ID into message headers.

544

545

Args:

546

schema_id (int): Schema ID to serialize

547

data (bytes): Message payload

548

549

Returns:

550

tuple: (data, headers_dict)

551

"""

552

553

def prefix_schema_id_deserializer(data):

554

"""

555

Deserialize schema ID from payload prefix.

556

557

Args:

558

data (bytes): Data with schema ID prefix

559

560

Returns:

561

tuple: (schema_id, payload)

562

"""

563

564

def dual_schema_id_deserializer(data, headers=None):

565

"""

566

Deserialize schema ID from headers or payload prefix.

567

568

Args:

569

data (bytes): Message payload

570

headers (dict, optional): Message headers

571

572

Returns:

573

tuple: (schema_id, payload)

574

"""

575

```

576

577

### Configuration Classes

578

579

#### ServerConfig

580

581

Schema Registry server configuration.

582

583

```python { .api }

584

class ServerConfig:

585

@property

586

def compatibility_level(self):

587

"""Default compatibility level."""

588

589

@property

590

def mode(self):

591

"""Schema Registry mode."""

592

```

593

594

### Enumeration Classes

595

596

```python { .api }

597

class ConfigCompatibilityLevel:

598

BACKWARD = "BACKWARD"

599

BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"

600

FORWARD = "FORWARD"

601

FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE"

602

FULL = "FULL"

603

FULL_TRANSITIVE = "FULL_TRANSITIVE"

604

NONE = "NONE"

605

606

class RuleKind:

607

CONDITION = "CONDITION"

608

TRANSFORM = "TRANSFORM"

609

610

class RuleMode:

611

UPGRADE = "UPGRADE"

612

DOWNGRADE = "DOWNGRADE"

613

UPDOWN = "UPDOWN"

614

WRITE = "WRITE"

615

READ = "READ"

616

WRITEREAD = "WRITEREAD"

617

```

618

619

### Usage Examples

620

621

#### Basic Avro Serialization

622

623

```python

624

from confluent_kafka import SerializingProducer, DeserializingConsumer

625

from confluent_kafka.schema_registry import SchemaRegistryClient

626

from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

627

from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField

628

629

# Schema Registry client

630

schema_registry_conf = {'url': 'http://localhost:8081'}

631

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

632

633

# Avro schema

634

value_schema_str = """

635

{

636

"type": "record",

637

"name": "User",

638

"fields": [

639

{"name": "name", "type": "string"},

640

{"name": "age", "type": "int"}

641

]

642

}

643

"""

644

645

# Create serializer

646

avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)

647

648

# Producer configuration

649

producer_conf = {

650

'bootstrap.servers': 'localhost:9092',

651

'key.serializer': StringSerializer('utf_8'),

652

'value.serializer': avro_serializer

653

}

654

655

producer = SerializingProducer(producer_conf)

656

657

# Produce message

658

user_record = {'name': 'Alice', 'age': 30}

659

producer.produce(topic='users', key='user1', value=user_record)

660

producer.flush()

661

662

# Consumer configuration

663

avro_deserializer = AvroDeserializer(schema_registry_client, value_schema_str)

664

consumer_conf = {

665

'bootstrap.servers': 'localhost:9092',

666

'key.deserializer': StringDeserializer('utf_8'),

667

'value.deserializer': avro_deserializer,

668

'group.id': 'user-group',

669

'auto.offset.reset': 'earliest'

670

}

671

672

consumer = DeserializingConsumer(consumer_conf)

673

consumer.subscribe(['users'])

674

675

msg = consumer.poll(1.0)

676

if msg is not None:

677

user_object = msg.value()

678

print(f"User: {user_object['name']}, Age: {user_object['age']}")

679

680

consumer.close()

681

```

682

683

#### Schema Evolution Example

684

685

```python

686

from confluent_kafka.schema_registry import SchemaRegistryClient, Schema

687

688

schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})

689

690

# Register initial schema

691

initial_schema = Schema("""

692

{

693

"type": "record",

694

"name": "User",

695

"fields": [

696

{"name": "name", "type": "string"},

697

{"name": "age", "type": "int"}

698

]

699

}

700

""", schema_type='AVRO')

701

702

schema_id = schema_registry_client.register_schema('users-value', initial_schema)

703

print(f"Registered schema with ID: {schema_id}")

704

705

# Evolve schema (add optional field)

706

evolved_schema = Schema("""

707

{

708

"type": "record",

709

"name": "User",

710

"fields": [

711

{"name": "name", "type": "string"},

712

{"name": "age", "type": "int"},

713

{"name": "email", "type": ["null", "string"], "default": null}

714

]

715

}

716

""", schema_type='AVRO')

717

718

# Test compatibility

719

compatible = schema_registry_client.test_compatibility('users-value', evolved_schema)

720

print(f"Schema is compatible: {compatible}")

721

722

if compatible:

723

new_schema_id = schema_registry_client.register_schema('users-value', evolved_schema)

724

print(f"Registered evolved schema with ID: {new_schema_id}")

725

```

726

727

#### JSON Schema Example

728

729

```python

730

from confluent_kafka.schema_registry.json_schema import JSONSerializer, JSONDeserializer

731

732

# JSON schema

733

json_schema_str = """

734

{

735

"$schema": "http://json-schema.org/draft-07/schema#",

736

"type": "object",

737

"properties": {

738

"name": {"type": "string"},

739

"age": {"type": "integer", "minimum": 0}

740

},

741

"required": ["name", "age"]

742

}

743

"""

744

745

json_serializer = JSONSerializer(schema_registry_client, json_schema_str)

746

json_deserializer = JSONDeserializer(schema_registry_client, json_schema_str)

747

748

# Use with SerializingProducer/DeserializingConsumer

749

producer_conf = {

750

'bootstrap.servers': 'localhost:9092',

751

'value.serializer': json_serializer

752

}

753

754

consumer_conf = {

755

'bootstrap.servers': 'localhost:9092',

756

'value.deserializer': json_deserializer,

757

'group.id': 'json-group',

758

'auto.offset.reset': 'earliest'

759

}

760

```

761

762

#### Protobuf Example

763

764

```python

765

from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer

766

import user_pb2 # Generated from .proto file

767

768

# Protobuf serializer/deserializer

769

protobuf_serializer = ProtobufSerializer(user_pb2.User, schema_registry_client)

770

protobuf_deserializer = ProtobufDeserializer(user_pb2.User, schema_registry_client)

771

772

# Create protobuf message

773

user = user_pb2.User()

774

user.name = "Bob"

775

user.age = 25

776

777

# Use with producer/consumer

778

producer_conf = {

779

'bootstrap.servers': 'localhost:9092',

780

'value.serializer': protobuf_serializer

781

}

782

783

producer = SerializingProducer(producer_conf)

784

producer.produce('users-protobuf', value=user)

785

producer.flush()

786

```