or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-client.mdcore-producer-consumer.mderror-handling.mdindex.mdschema-registry.mdserialization.md

serialization.mddocs/

0

# Serialization Framework

1

2

Comprehensive serialization framework providing pluggable serialization/deserialization with built-in support for common data types and high-level producer/consumer APIs with automatic serialization.

3

4

## Capabilities

5

6

### Base Serializer/Deserializer Classes

7

8

Abstract base classes for implementing custom serialization logic.

9

10

```python { .api }

11

class Serializer:

12

def __call__(self, obj, ctx=None):

13

"""

14

Serialize an object.

15

16

Args:

17

obj: Object to serialize

18

ctx (SerializationContext, optional): Serialization context

19

20

Returns:

21

bytes: Serialized data

22

23

Raises:

24

SerializationError: If serialization fails

25

"""

26

raise NotImplementedError

27

28

class Deserializer:

29

def __call__(self, value, ctx=None):

30

"""

31

Deserialize data.

32

33

Args:

34

value (bytes): Data to deserialize

35

ctx (SerializationContext, optional): Serialization context

36

37

Returns:

38

object: Deserialized object

39

40

Raises:

41

SerializationError: If deserialization fails

42

"""

43

raise NotImplementedError

44

```

45

46

### Built-in Serializers

47

48

Ready-to-use serializers for common data types.

49

50

#### StringSerializer

51

52

```python { .api }

53

class StringSerializer(Serializer):

54

def __init__(self, codec='utf_8'):

55

"""

56

Create StringSerializer.

57

58

Args:

59

codec (str): Character encoding (default: 'utf_8')

60

"""

61

62

def __call__(self, obj, ctx=None):

63

"""

64

Serialize string to bytes.

65

66

Args:

67

obj (str): String to serialize

68

ctx (SerializationContext, optional): Serialization context

69

70

Returns:

71

bytes: UTF-8 encoded string or None if obj is None

72

73

Raises:

74

SerializationError: If obj is not a string

75

"""

76

```

77

78

#### IntegerSerializer

79

80

```python { .api }

81

class IntegerSerializer(Serializer):

82

def __call__(self, obj, ctx=None):

83

"""

84

Serialize integer to int32 bytes.

85

86

Args:

87

obj (int): Integer to serialize

88

ctx (SerializationContext, optional): Serialization context

89

90

Returns:

91

bytes: 4-byte big-endian int32 or None if obj is None

92

93

Raises:

94

SerializationError: If obj is not an integer or out of int32 range

95

"""

96

```

97

98

#### DoubleSerializer

99

100

```python { .api }

101

class DoubleSerializer(Serializer):

102

def __call__(self, obj, ctx=None):

103

"""

104

Serialize float to IEEE 754 binary64 bytes.

105

106

Args:

107

obj (float): Float to serialize

108

ctx (SerializationContext, optional): Serialization context

109

110

Returns:

111

bytes: 8-byte IEEE 754 binary64 or None if obj is None

112

113

Raises:

114

SerializationError: If obj is not a float

115

"""

116

```

117

118

### Built-in Deserializers

119

120

Ready-to-use deserializers for common data types.

121

122

#### StringDeserializer

123

124

```python { .api }

125

class StringDeserializer(Deserializer):

126

def __init__(self, codec='utf_8'):

127

"""

128

Create StringDeserializer.

129

130

Args:

131

codec (str): Character encoding (default: 'utf_8')

132

"""

133

134

def __call__(self, value, ctx=None):

135

"""

136

Deserialize bytes to string.

137

138

Args:

139

value (bytes): Bytes to deserialize

140

ctx (SerializationContext, optional): Serialization context

141

142

Returns:

143

str: Decoded string or None if value is None

144

145

Raises:

146

SerializationError: If decoding fails

147

"""

148

```

149

150

#### IntegerDeserializer

151

152

```python { .api }

153

class IntegerDeserializer(Deserializer):

154

def __call__(self, value, ctx=None):

155

"""

156

Deserialize int32 bytes to integer.

157

158

Args:

159

value (bytes): 4-byte big-endian int32

160

ctx (SerializationContext, optional): Serialization context

161

162

Returns:

163

int: Deserialized integer or None if value is None

164

165

Raises:

166

SerializationError: If value is not 4 bytes

167

"""

168

```

169

170

#### DoubleDeserializer

171

172

```python { .api }

173

class DoubleDeserializer(Deserializer):

174

def __call__(self, value, ctx=None):

175

"""

176

Deserialize IEEE 754 binary64 bytes to float.

177

178

Args:

179

value (bytes): 8-byte IEEE 754 binary64

180

ctx (SerializationContext, optional): Serialization context

181

182

Returns:

183

float: Deserialized float or None if value is None

184

185

Raises:

186

SerializationError: If value is not 8 bytes

187

"""

188

```

189

190

### Serialization Context

191

192

Provides contextual information for serialization operations.

193

194

```python { .api }

195

class SerializationContext:

196

def __init__(self, topic, field, headers=None):

197

"""

198

Create SerializationContext.

199

200

Args:

201

topic (str): Topic name

202

field (MessageField): Message field being serialized

203

headers (dict, optional): Message headers

204

"""

205

206

@property

207

def topic(self):

208

"""

209

Topic name.

210

211

Returns:

212

str: Topic name

213

"""

214

215

@property

216

def field(self):

217

"""

218

Message field being serialized.

219

220

Returns:

221

MessageField: NONE, KEY, or VALUE

222

"""

223

224

@property

225

def headers(self):

226

"""

227

Message headers.

228

229

Returns:

230

dict: Message headers or None

231

"""

232

```

233

234

#### MessageField

235

236

```python { .api }

237

class MessageField:

238

NONE = 0

239

KEY = 1

240

VALUE = 2

241

```

242

243

### High-Level Producer/Consumer APIs

244

245

#### SerializingProducer

246

247

High-level producer with pluggable serialization.

248

249

```python { .api }

250

class SerializingProducer:

251

def __init__(self, conf):

252

"""

253

Create SerializingProducer.

254

255

Args:

256

conf (dict): Configuration including 'key.serializer' and 'value.serializer'

257

"""

258

259

def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None):

260

"""

261

Produce message with automatic serialization.

262

263

Args:

264

topic (str): Topic to produce to

265

key: Key object (will be serialized using key.serializer)

266

value: Value object (will be serialized using value.serializer)

267

partition (int, optional): Specific partition (-1 for automatic)

268

on_delivery (callable, optional): Delivery report callback

269

timestamp (int, optional): Message timestamp (0 for current time)

270

headers (dict, optional): Message headers

271

272

Raises:

273

SerializationError: If serialization fails

274

BufferError: If local producer queue is full

275

KafkaException: For other produce errors

276

"""

277

278

def poll(self, timeout=-1):

279

"""

280

Poll for events and call registered callbacks.

281

282

Args:

283

timeout (float): Maximum time to wait in seconds (-1 for infinite)

284

285

Returns:

286

int: Number of events processed

287

"""

288

289

def flush(self, timeout=-1):

290

"""

291

Wait for all messages to be delivered.

292

293

Args:

294

timeout (float): Maximum time to wait in seconds (-1 for infinite)

295

296

Returns:

297

int: Number of messages still in queue (0 on success)

298

"""

299

300

def purge(self, in_queue=True, in_flight=True, blocking=True):

301

"""

302

Purge messages from internal queues.

303

304

Args:

305

in_queue (bool): Purge messages in local queue

306

in_flight (bool): Purge messages in flight to broker

307

blocking (bool): Block until purge is complete

308

309

Returns:

310

int: Number of messages purged

311

"""

312

313

def abort_transaction(self, timeout=-1):

314

"""Abort ongoing transaction."""

315

316

def begin_transaction(self):

317

"""Begin a new transaction."""

318

319

def commit_transaction(self, timeout=-1):

320

"""Commit current transaction."""

321

322

def init_transactions(self, timeout=-1):

323

"""Initialize transactions for this producer."""

324

```

325

326

#### DeserializingConsumer

327

328

High-level consumer with pluggable deserialization.

329

330

```python { .api }

331

class DeserializingConsumer:

332

def __init__(self, conf):

333

"""

334

Create DeserializingConsumer.

335

336

Args:

337

conf (dict): Configuration including 'key.deserializer' and 'value.deserializer'

338

"""

339

340

def subscribe(self, topics, listener=None):

341

"""

342

Subscribe to list of topics for automatic partition assignment.

343

344

Args:

345

topics (list): List of topic names to subscribe to

346

listener (RebalanceCallback, optional): Rebalance callback

347

"""

348

349

def poll(self, timeout=-1):

350

"""

351

Poll for messages with automatic deserialization.

352

353

Args:

354

timeout (float): Maximum time to wait in seconds (-1 for infinite)

355

356

Returns:

357

Message: Message with deserialized key/value or None if timeout

358

359

Note:

360

If deserialization fails, the error is stored in the message

361

and can be accessed via ConsumeError.

362

"""

363

364

def consume(self, num_messages=1, timeout=-1):

365

"""

366

Consume multiple messages (not implemented).

367

368

Raises:

369

NotImplementedError: This method is not supported

370

"""

371

372

def assign(self, partitions):

373

"""

374

Manually assign partitions to consume from.

375

376

Args:

377

partitions (list): List of TopicPartition objects

378

"""

379

380

def assignment(self):

381

"""

382

Get current partition assignment.

383

384

Returns:

385

list: List of assigned TopicPartition objects

386

"""

387

388

def unassign(self):

389

"""Remove current partition assignment."""

390

391

def commit(self, message=None, offsets=None, asynchronous=True):

392

"""

393

Commit message offset or specified offsets.

394

395

Args:

396

message (Message, optional): Commit offset for this message

397

offsets (list, optional): List of TopicPartition objects with offsets

398

asynchronous (bool): Commit asynchronously if True

399

400

Returns:

401

list: Committed offsets if synchronous, None if asynchronous

402

"""

403

404

def committed(self, partitions, timeout=-1):

405

"""

406

Get committed offsets for partitions.

407

408

Args:

409

partitions (list): List of TopicPartition objects

410

timeout (float): Maximum time to wait in seconds

411

412

Returns:

413

list: List of TopicPartition objects with committed offsets

414

"""

415

416

def position(self, partitions):

417

"""

418

Get current position (next fetch offset) for partitions.

419

420

Args:

421

partitions (list): List of TopicPartition objects

422

423

Returns:

424

list: List of TopicPartition objects with current positions

425

"""

426

427

def seek(self, partition):

428

"""

429

Seek to offset for partition.

430

431

Args:

432

partition (TopicPartition): Partition with offset to seek to

433

"""

434

435

def pause(self, partitions):

436

"""

437

Pause consumption for partitions.

438

439

Args:

440

partitions (list): List of TopicPartition objects to pause

441

"""

442

443

def resume(self, partitions):

444

"""

445

Resume consumption for partitions.

446

447

Args:

448

partitions (list): List of TopicPartition objects to resume

449

"""

450

451

def close(self):

452

"""Close the consumer and leave consumer group."""

453

454

def store_offsets(self, message=None, offsets=None):

455

"""

456

Store offset for message or specified offsets.

457

458

Args:

459

message (Message, optional): Store offset for this message

460

offsets (list, optional): List of TopicPartition objects with offsets

461

"""

462

```

463

464

### Error Classes

465

466

```python { .api }

467

class SerializationError(Exception):

468

"""Base class for serialization errors."""

469

470

def __init__(self, message, inner_exception=None):

471

"""

472

Create SerializationError.

473

474

Args:

475

message (str): Error message

476

inner_exception (Exception, optional): Underlying exception

477

"""

478

479

@property

480

def inner_exception(self):

481

"""Underlying exception that caused the serialization error."""

482

```

483

484

### Usage Examples

485

486

#### Custom Serializer/Deserializer

487

488

```python

489

from confluent_kafka.serialization import Serializer, Deserializer, SerializationError

490

import json

491

492

class JSONSerializer(Serializer):

493

"""Custom JSON serializer."""

494

495

def __call__(self, obj, ctx=None):

496

if obj is None:

497

return None

498

try:

499

return json.dumps(obj).encode('utf-8')

500

except Exception as e:

501

raise SerializationError(f"Failed to serialize to JSON: {e}")

502

503

class JSONDeserializer(Deserializer):

504

"""Custom JSON deserializer."""

505

506

def __call__(self, value, ctx=None):

507

if value is None:

508

return None

509

try:

510

return json.loads(value.decode('utf-8'))

511

except Exception as e:

512

raise SerializationError(f"Failed to deserialize JSON: {e}")

513

514

# Use custom serializers

515

from confluent_kafka import SerializingProducer, DeserializingConsumer

516

517

producer_conf = {

518

'bootstrap.servers': 'localhost:9092',

519

'key.serializer': StringSerializer('utf_8'),

520

'value.serializer': JSONSerializer()

521

}

522

523

consumer_conf = {

524

'bootstrap.servers': 'localhost:9092',

525

'key.deserializer': StringDeserializer('utf_8'),

526

'value.deserializer': JSONDeserializer(),

527

'group.id': 'json-group',

528

'auto.offset.reset': 'earliest'

529

}

530

531

producer = SerializingProducer(producer_conf)

532

consumer = DeserializingConsumer(consumer_conf)

533

```

534

535

#### Using Built-in Serializers

536

537

```python

538

from confluent_kafka import SerializingProducer, DeserializingConsumer

539

from confluent_kafka.serialization import (

540

StringSerializer, StringDeserializer,

541

IntegerSerializer, IntegerDeserializer,

542

DoubleSerializer, DoubleDeserializer

543

)

544

545

# Producer with different serializers for key and value

546

producer_conf = {

547

'bootstrap.servers': 'localhost:9092',

548

'key.serializer': StringSerializer('utf_8'),

549

'value.serializer': IntegerSerializer()

550

}

551

552

producer = SerializingProducer(producer_conf)

553

554

# Produce messages with automatic serialization

555

for i in range(10):

556

producer.produce(

557

'numbers-topic',

558

key=f'key-{i}', # String key

559

value=i * 100 # Integer value

560

)

561

562

producer.flush()

563

564

# Consumer with corresponding deserializers

565

consumer_conf = {

566

'bootstrap.servers': 'localhost:9092',

567

'key.deserializer': StringDeserializer('utf_8'),

568

'value.deserializer': IntegerDeserializer(),

569

'group.id': 'numbers-group',

570

'auto.offset.reset': 'earliest'

571

}

572

573

consumer = DeserializingConsumer(consumer_conf)

574

consumer.subscribe(['numbers-topic'])

575

576

try:

577

while True:

578

msg = consumer.poll(1.0)

579

if msg is None:

580

continue

581

582

if msg.error():

583

print(f"Consumer error: {msg.error()}")

584

continue

585

586

# Key and value are automatically deserialized

587

print(f"Key: {msg.key()} (type: {type(msg.key())})")

588

print(f"Value: {msg.value()} (type: {type(msg.value())})")

589

590

finally:

591

consumer.close()

592

```

593

594

#### Serialization Context Usage

595

596

```python

597

from confluent_kafka.serialization import SerializationContext, MessageField

598

599

class ContextAwareSerializer(Serializer):

600

"""Serializer that uses serialization context."""

601

602

def __call__(self, obj, ctx=None):

603

if obj is None:

604

return None

605

606

# Use context information

607

if ctx is not None:

608

print(f"Serializing for topic: {ctx.topic}")

609

print(f"Field: {ctx.field}")

610

if ctx.headers:

611

print(f"Headers: {ctx.headers}")

612

613

# Different serialization based on field

614

if ctx and ctx.field == MessageField.KEY:

615

# Keys serialized as uppercase strings

616

return str(obj).upper().encode('utf-8')

617

else:

618

# Values serialized as JSON

619

return json.dumps(obj).encode('utf-8')

620

621

# The SerializingProducer automatically creates SerializationContext

622

# and passes it to serializers

623

producer_conf = {

624

'bootstrap.servers': 'localhost:9092',

625

'key.serializer': ContextAwareSerializer(),

626

'value.serializer': ContextAwareSerializer()

627

}

628

629

producer = SerializingProducer(producer_conf)

630

producer.produce('my-topic', key='mykey', value={'data': 'value'})

631

```

632

633

#### Error Handling in Serialization

634

635

```python

636

from confluent_kafka import SerializingProducer

637

from confluent_kafka.serialization import SerializationError

638

from confluent_kafka.error import ProduceError, KeySerializationError, ValueSerializationError

639

640

class StrictIntegerSerializer(Serializer):

641

"""Integer serializer that raises errors for non-integers."""

642

643

def __call__(self, obj, ctx=None):

644

if obj is None:

645

return None

646

if not isinstance(obj, int):

647

raise SerializationError(f"Expected int, got {type(obj)}")

648

return obj.to_bytes(4, 'big', signed=True)

649

650

producer_conf = {

651

'bootstrap.servers': 'localhost:9092',

652

'key.serializer': StringSerializer('utf_8'),

653

'value.serializer': StrictIntegerSerializer()

654

}

655

656

producer = SerializingProducer(producer_conf)

657

658

def delivery_callback(err, msg):

659

if err is not None:

660

if isinstance(err, ValueSerializationError):

661

print(f"Value serialization failed: {err}")

662

elif isinstance(err, KeySerializationError):

663

print(f"Key serialization failed: {err}")

664

else:

665

print(f"Other error: {err}")

666

else:

667

print(f"Message delivered: {msg.topic()} [{msg.partition()}]")

668

669

try:

670

# This will succeed

671

producer.produce('numbers', key='valid', value=42, callback=delivery_callback)

672

673

# This will fail due to serialization error

674

producer.produce('numbers', key='invalid', value='not-a-number', callback=delivery_callback)

675

676

except Exception as e:

677

print(f"Immediate error: {e}")

678

679

producer.poll(0) # Process delivery callbacks

680

producer.flush()

681

```

682

683

#### Advanced Configuration

684

685

```python

686

from confluent_kafka import SerializingProducer, DeserializingConsumer

687

688

# Producer with serializer-specific configuration

689

producer_conf = {

690

'bootstrap.servers': 'localhost:9092',

691

'key.serializer': StringSerializer('utf_8'),

692

'value.serializer': StringSerializer('ascii'), # Different encoding

693

694

# Standard producer settings

695

'acks': 'all',

696

'retries': 3,

697

'batch.size': 16384,

698

'linger.ms': 5

699

}

700

701

# Consumer with error handling configuration

702

consumer_conf = {

703

'bootstrap.servers': 'localhost:9092',

704

'key.deserializer': StringDeserializer('utf_8'),

705

'value.deserializer': StringDeserializer('utf_8'),

706

707

# Standard consumer settings

708

'group.id': 'my-group',

709

'auto.offset.reset': 'earliest',

710

'enable.auto.commit': False,

711

'max.poll.interval.ms': 300000

712

}

713

714

producer = SerializingProducer(producer_conf)

715

consumer = DeserializingConsumer(consumer_conf)

716

717

# Error handling in consumer

718

try:

719

while True:

720

msg = consumer.poll(1.0)

721

if msg is None:

722

continue

723

724

if msg.error():

725

print(f"Message error: {msg.error()}")

726

continue

727

728

# Process message

729

try:

730

key = msg.key()

731

value = msg.value()

732

733

# Process deserialized data

734

print(f"Processed: key={key}, value={value}")

735

736

# Manual commit after successful processing

737

consumer.commit(message=msg)

738

739

except Exception as e:

740

print(f"Processing error: {e}")

741

# Could skip this message or handle error differently

742

743

finally:

744

consumer.close()

745

```