or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md

serialization.mddocs/

0

# Serialization

1

2

Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes. Provides the foundation for data transformation in Kafka producers and consumers.

3

4

## Capabilities

5

6

### Abstract Base Classes

7

8

Foundation classes for implementing custom serialization logic.

9

10

```python { .api }

11

class Serializer:

12

"""

13

Abstract base class for key and value serializers.

14

15

Serializers convert Python objects to bytes for transmission to Kafka.

16

"""

17

18

def serialize(self, topic: str, value):

19

"""

20

Serialize a value to bytes.

21

22

Args:

23

topic (str): Topic name (can be used for topic-specific serialization)

24

value: Python object to serialize

25

26

Returns:

27

bytes: Serialized bytes, or None if value is None

28

29

Raises:

30

SerializationError: If serialization fails

31

"""

32

33

def close(self):

34

"""

35

Close the serializer and clean up resources.

36

37

Called when the producer/consumer is closed.

38

"""

39

40

class Deserializer:

41

"""

42

Abstract base class for key and value deserializers.

43

44

Deserializers convert bytes received from Kafka back to Python objects.

45

"""

46

47

def deserialize(self, topic: str, bytes_: bytes):

48

"""

49

Deserialize bytes to a Python object.

50

51

Args:

52

topic (str): Topic name (can be used for topic-specific deserialization)

53

bytes_ (bytes): Bytes to deserialize

54

55

Returns:

56

object: Deserialized Python object, or None if bytes_ is None

57

58

Raises:

59

SerializationError: If deserialization fails

60

"""

61

62

def close(self):

63

"""

64

Close the deserializer and clean up resources.

65

66

Called when the consumer is closed.

67

"""

68

```

69

70

### Serialization Errors

71

72

Exception class for serialization-related errors.

73

74

```python { .api }

75

class SerializationError(KafkaError):

76

"""Error occurred during serialization or deserialization."""

77

```

78

79

## Usage Examples

80

81

### String Serialization

82

83

```python

84

from kafka.serializer import Serializer, Deserializer

85

86

class StringSerializer(Serializer):

87

def __init__(self, encoding='utf-8'):

88

self.encoding = encoding

89

90

def serialize(self, topic, value):

91

if value is None:

92

return None

93

if isinstance(value, str):

94

return value.encode(self.encoding)

95

elif isinstance(value, bytes):

96

return value

97

else:

98

return str(value).encode(self.encoding)

99

100

def close(self):

101

pass

102

103

class StringDeserializer(Deserializer):

104

def __init__(self, encoding='utf-8'):

105

self.encoding = encoding

106

107

def deserialize(self, topic, bytes_):

108

if bytes_ is None:

109

return None

110

return bytes_.decode(self.encoding)

111

112

def close(self):

113

pass

114

115

# Usage with producer/consumer

116

from kafka import KafkaProducer, KafkaConsumer

117

118

producer = KafkaProducer(

119

bootstrap_servers=['localhost:9092'],

120

key_serializer=StringSerializer(),

121

value_serializer=StringSerializer()

122

)

123

124

consumer = KafkaConsumer(

125

'my-topic',

126

bootstrap_servers=['localhost:9092'],

127

key_deserializer=StringDeserializer(),

128

value_deserializer=StringDeserializer()

129

)

130

```

131

132

### JSON Serialization

133

134

```python

135

import json

136

from kafka.serializer import Serializer, Deserializer

137

from kafka.errors import SerializationError

138

139

class JSONSerializer(Serializer):

140

def __init__(self, encoding='utf-8'):

141

self.encoding = encoding

142

143

def serialize(self, topic, value):

144

if value is None:

145

return None

146

try:

147

return json.dumps(value).encode(self.encoding)

148

except (TypeError, ValueError) as e:

149

raise SerializationError(f"JSON serialization failed: {e}")

150

151

def close(self):

152

pass

153

154

class JSONDeserializer(Deserializer):

155

def __init__(self, encoding='utf-8'):

156

self.encoding = encoding

157

158

def deserialize(self, topic, bytes_):

159

if bytes_ is None:

160

return None

161

try:

162

return json.loads(bytes_.decode(self.encoding))

163

except (ValueError, UnicodeDecodeError) as e:

164

raise SerializationError(f"JSON deserialization failed: {e}")

165

166

def close(self):

167

pass

168

169

# Usage

170

producer = KafkaProducer(

171

bootstrap_servers=['localhost:9092'],

172

value_serializer=JSONSerializer()

173

)

174

175

# Send Python objects as JSON

176

producer.send('events', {

177

'user_id': 123,

178

'action': 'login',

179

'timestamp': '2024-01-01T12:00:00Z'

180

})

181

182

consumer = KafkaConsumer(

183

'events',

184

bootstrap_servers=['localhost:9092'],

185

value_deserializer=JSONDeserializer()

186

)

187

188

for message in consumer:

189

event = message.value # Already deserialized to Python dict

190

print(f"User {event['user_id']} performed {event['action']}")

191

```

192

193

### Avro Serialization

194

195

```python

196

import avro.schema

197

import avro.io

198

import io

199

from kafka.serializer import Serializer, Deserializer

200

from kafka.errors import SerializationError

201

202

class AvroSerializer(Serializer):

203

def __init__(self, schema_str):

204

self.schema = avro.schema.parse(schema_str)

205

206

def serialize(self, topic, value):

207

if value is None:

208

return None

209

try:

210

writer = avro.io.DatumWriter(self.schema)

211

bytes_writer = io.BytesIO()

212

encoder = avro.io.BinaryEncoder(bytes_writer)

213

writer.write(value, encoder)

214

return bytes_writer.getvalue()

215

except Exception as e:

216

raise SerializationError(f"Avro serialization failed: {e}")

217

218

def close(self):

219

pass

220

221

class AvroDeserializer(Deserializer):

222

def __init__(self, schema_str):

223

self.schema = avro.schema.parse(schema_str)

224

225

def deserialize(self, topic, bytes_):

226

if bytes_ is None:

227

return None

228

try:

229

reader = avro.io.DatumReader(self.schema)

230

bytes_reader = io.BytesIO(bytes_)

231

decoder = avro.io.BinaryDecoder(bytes_reader)

232

return reader.read(decoder)

233

except Exception as e:

234

raise SerializationError(f"Avro deserialization failed: {e}")

235

236

def close(self):

237

pass

238

239

# Schema definition

240

schema_str = """

241

{

242

"type": "record",

243

"name": "User",

244

"fields": [

245

{"name": "id", "type": "int"},

246

{"name": "name", "type": "string"},

247

{"name": "email", "type": "string"}

248

]

249

}

250

"""

251

252

producer = KafkaProducer(

253

bootstrap_servers=['localhost:9092'],

254

value_serializer=AvroSerializer(schema_str)

255

)

256

257

# Send Avro record

258

producer.send('users', {

259

'id': 123,

260

'name': 'Alice',

261

'email': 'alice@example.com'

262

})

263

```

264

265

### Protobuf Serialization

266

267

```python

268

from kafka.serializer import Serializer, Deserializer

269

from kafka.errors import SerializationError

270

# Assuming you have generated protobuf classes

271

272

class ProtobufSerializer(Serializer):

273

def __init__(self, protobuf_class):

274

self.protobuf_class = protobuf_class

275

276

def serialize(self, topic, value):

277

if value is None:

278

return None

279

try:

280

if isinstance(value, self.protobuf_class):

281

return value.SerializeToString()

282

else:

283

# Convert dict to protobuf object

284

pb_obj = self.protobuf_class()

285

for field, val in value.items():

286

setattr(pb_obj, field, val)

287

return pb_obj.SerializeToString()

288

except Exception as e:

289

raise SerializationError(f"Protobuf serialization failed: {e}")

290

291

def close(self):

292

pass

293

294

class ProtobufDeserializer(Deserializer):

295

def __init__(self, protobuf_class):

296

self.protobuf_class = protobuf_class

297

298

def deserialize(self, topic, bytes_):

299

if bytes_ is None:

300

return None

301

try:

302

pb_obj = self.protobuf_class()

303

pb_obj.ParseFromString(bytes_)

304

return pb_obj

305

except Exception as e:

306

raise SerializationError(f"Protobuf deserialization failed: {e}")

307

308

def close(self):

309

pass

310

```

311

312

### Topic-Specific Serialization

313

314

```python

315

from kafka.serializer import Serializer, Deserializer

316

import json

317

318

class TopicAwareJSONSerializer(Serializer):

319

def __init__(self):

320

self.topic_schemas = {

321

'user-events': ['user_id', 'action', 'timestamp'],

322

'order-events': ['order_id', 'status', 'amount'],

323

}

324

325

def serialize(self, topic, value):

326

if value is None:

327

return None

328

329

# Validate required fields for topic

330

if topic in self.topic_schemas:

331

required_fields = self.topic_schemas[topic]

332

for field in required_fields:

333

if field not in value:

334

raise SerializationError(f"Missing required field '{field}' for topic '{topic}'")

335

336

return json.dumps(value).encode('utf-8')

337

338

def close(self):

339

pass

340

341

producer = KafkaProducer(

342

bootstrap_servers=['localhost:9092'],

343

value_serializer=TopicAwareJSONSerializer()

344

)

345

346

# This will validate that required fields are present

347

producer.send('user-events', {

348

'user_id': 123,

349

'action': 'login',

350

'timestamp': '2024-01-01T12:00:00Z'

351

})

352

```

353

354

### Compression-Aware Serialization

355

356

```python

357

import gzip

358

import json

359

from kafka.serializer import Serializer, Deserializer

360

361

class CompressedJSONSerializer(Serializer):

362

def __init__(self, compression_threshold=1024):

363

self.compression_threshold = compression_threshold

364

365

def serialize(self, topic, value):

366

if value is None:

367

return None

368

369

json_bytes = json.dumps(value).encode('utf-8')

370

371

# Compress if data is large enough

372

if len(json_bytes) > self.compression_threshold:

373

# Add compression marker

374

return b'\x01' + gzip.compress(json_bytes)

375

else:

376

# No compression marker

377

return b'\x00' + json_bytes

378

379

def close(self):

380

pass

381

382

class CompressedJSONDeserializer(Deserializer):

383

def deserialize(self, topic, bytes_):

384

if bytes_ is None or len(bytes_) == 0:

385

return None

386

387

# Check compression marker

388

if bytes_[0] == 1: # Compressed

389

json_bytes = gzip.decompress(bytes_[1:])

390

else: # Uncompressed

391

json_bytes = bytes_[1:]

392

393

return json.loads(json_bytes.decode('utf-8'))

394

395

def close(self):

396

pass

397

```

398

399

### Error Handling in Serializers

400

401

```python

402

from kafka.serializer import Serializer, Deserializer

403

from kafka.errors import SerializationError

404

import json

405

import logging

406

407

logger = logging.getLogger(__name__)

408

409

class RobustJSONSerializer(Serializer):

410

def __init__(self, encoding='utf-8', strict=True):

411

self.encoding = encoding

412

self.strict = strict

413

414

def serialize(self, topic, value):

415

if value is None:

416

return None

417

418

try:

419

return json.dumps(value, ensure_ascii=False).encode(self.encoding)

420

except (TypeError, ValueError) as e:

421

if self.strict:

422

raise SerializationError(f"JSON serialization failed for topic '{topic}': {e}")

423

else:

424

# Fallback: serialize as string

425

logger.warning(f"JSON serialization failed for topic '{topic}', falling back to string: {e}")

426

return str(value).encode(self.encoding)

427

428

def close(self):

429

pass

430

431

class RobustJSONDeserializer(Deserializer):

432

def __init__(self, encoding='utf-8', strict=True):

433

self.encoding = encoding

434

self.strict = strict

435

436

def deserialize(self, topic, bytes_):

437

if bytes_ is None:

438

return None

439

440

try:

441

return json.loads(bytes_.decode(self.encoding))

442

except (ValueError, UnicodeDecodeError) as e:

443

if self.strict:

444

raise SerializationError(f"JSON deserialization failed for topic '{topic}': {e}")

445

else:

446

# Fallback: return raw string

447

logger.warning(f"JSON deserialization failed for topic '{topic}', returning raw string: {e}")

448

return bytes_.decode(self.encoding, errors='replace')

449

450

def close(self):

451

pass

452

```

453

454

### Lambda Function Serializers

455

456

```python

457

from kafka import KafkaProducer, KafkaConsumer

458

import json

459

import pickle

460

461

# Simple lambda serializers

462

producer = KafkaProducer(

463

bootstrap_servers=['localhost:9092'],

464

key_serializer=lambda k: str(k).encode('utf-8') if k is not None else None,

465

value_serializer=lambda v: json.dumps(v).encode('utf-8') if v is not None else None

466

)

467

468

consumer = KafkaConsumer(

469

'my-topic',

470

bootstrap_servers=['localhost:9092'],

471

key_deserializer=lambda k: k.decode('utf-8') if k is not None else None,

472

value_deserializer=lambda v: json.loads(v.decode('utf-8')) if v is not None else None

473

)

474

475

# Pickle serialization for Python objects (use with caution)

476

pickle_producer = KafkaProducer(

477

bootstrap_servers=['localhost:9092'],

478

value_serializer=lambda v: pickle.dumps(v) if v is not None else None

479

)

480

481

pickle_consumer = KafkaConsumer(

482

'pickle-topic',

483

bootstrap_servers=['localhost:9092'],

484

value_deserializer=lambda v: pickle.loads(v) if v is not None else None

485

)

486

```