tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
Kafka's serialization API provides pluggable serializers and deserializers for converting between typed objects and byte arrays. The Serdes factory class provides built-in implementations for common types.
The Serdes class provides factory methods for creating Serde<T> instances that combine serializers and deserializers.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serde;
// Factory methods for built-in types
public class Serdes {
// Primitive types
public static Serde<Long> Long();
public static Serde<Integer> Integer();
public static Serde<Short> Short();
public static Serde<Float> Float();
public static Serde<Double> Double();
public static Serde<Boolean> Boolean();
// String and byte types
public static Serde<String> String();
public static Serde<byte[]> ByteArray();
public static Serde<ByteBuffer> ByteBuffer();
public static Serde<Bytes> Bytes();
// Special types
public static Serde<UUID> UUID();
public static Serde<Void> Void();
// Complex types
public static <L extends List<Inner>, Inner> Serde<List<Inner>> ListSerde(
Class<L> listClass,
Serde<Inner> innerSerde
);
// Custom serde creation
public static <T> Serde<T> serdeFrom(Class<T> type);
public static <T> Serde<T> serdeFrom(
Serializer<T> serializer,
Deserializer<T> deserializer
);
}Using Built-in Serdes:
import org.apache.kafka.common.serialization.Serdes;
// For Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = builder.stream(
"input-topic",
Consumed.with(Serdes.String(), Serdes.Long())
);Creating Custom Serde from Serializer/Deserializer:
import org.apache.kafka.common.serialization.Serdes;
Serde<MyCustomType> customSerde = Serdes.serdeFrom(
new MyCustomSerializer(),
new MyCustomDeserializer()
);Creating Serde by Type:
// Automatically selects appropriate serde for the type
Serde<String> stringSerde = Serdes.serdeFrom(String.class);
Serde<Long> longSerde = Serdes.serdeFrom(Long.class);All built-in serdes are inner classes of Serdes that can be instantiated directly or via factory methods.
// Serde for Long values (8 bytes, big-endian)
public static final class LongSerde implements Serde<Long> {
public LongSerde();
}
// Serde for Integer values (4 bytes, big-endian)
public static final class IntegerSerde implements Serde<Integer> {
public IntegerSerde();
}
// Serde for Short values (2 bytes, big-endian)
public static final class ShortSerde implements Serde<Short> {
public ShortSerde();
}
// Serde for Float values (4 bytes, IEEE 754 floating-point)
public static final class FloatSerde implements Serde<Float> {
public FloatSerde();
}
// Serde for Double values (8 bytes, IEEE 754 floating-point)
public static final class DoubleSerde implements Serde<Double> {
public DoubleSerde();
}
// Serde for Boolean values (1 byte: 0x00 = false, 0x01 = true)
public static final class BooleanSerde implements Serde<Boolean> {
public BooleanSerde();
}// Serde for String values (UTF-8 encoding)
public static final class StringSerde implements Serde<String> {
public StringSerde();
}
// Serde for byte arrays (pass-through, no transformation)
public static final class ByteArraySerde implements Serde<byte[]> {
public ByteArraySerde();
}
// Serde for ByteBuffer (converts to/from byte array)
public static final class ByteBufferSerde implements Serde<ByteBuffer> {
public ByteBufferSerde();
}
// Serde for Kafka's Bytes wrapper class
public static final class BytesSerde implements Serde<Bytes> {
public BytesSerde();
}// Serde for UUID values (converts to/from 36-character string)
public static final class UUIDSerde implements Serde<UUID> {
public UUIDSerde();
}
// Serde for Void/null values (always serializes to null)
public static final class VoidSerde implements Serde<Void> {
public VoidSerde();
}The ListSerde serializes and deserializes lists of elements using an inner serde.
public static final class ListSerde<Inner> implements Serde<List<Inner>> {
// Default constructor (requires configuration)
public ListSerde();
// Constructor with list class and inner serde
public <L extends List<Inner>> ListSerde(
Class<L> listClass,
Serde<Inner> innerSerde
);
}Usage Example:
import org.apache.kafka.common.serialization.Serdes;
import java.util.ArrayList;
// Create a serde for List<String>
Serde<List<String>> listSerde = Serdes.ListSerde(
ArrayList.class,
Serdes.String()
);
// Use in Kafka Streams
KStream<String, List<String>> stream = builder.stream(
"input-topic",
Consumed.with(Serdes.String(), listSerde)
);All Serde implementations are backed by individual serializer and deserializer classes.
import org.apache.kafka.common.serialization.*;
// Serialize/Deserialize Long (8 bytes, big-endian)
public class LongSerializer implements Serializer<Long> {}
public class LongDeserializer implements Deserializer<Long> {}
// Serialize/Deserialize Integer (4 bytes, big-endian)
public class IntegerSerializer implements Serializer<Integer> {}
public class IntegerDeserializer implements Deserializer<Integer> {}
// Serialize/Deserialize Short (2 bytes, big-endian)
public class ShortSerializer implements Serializer<Short> {}
public class ShortDeserializer implements Deserializer<Short> {}
// Serialize/Deserialize Float (4 bytes)
public class FloatSerializer implements Serializer<Float> {}
public class FloatDeserializer implements Deserializer<Float> {}
// Serialize/Deserialize Double (8 bytes)
public class DoubleSerializer implements Serializer<Double> {}
public class DoubleDeserializer implements Deserializer<Double> {}
// Serialize/Deserialize Boolean (1 byte)
public class BooleanSerializer implements Serializer<Boolean> {}
public class BooleanDeserializer implements Deserializer<Boolean> {}// Serialize/Deserialize String (UTF-8)
public class StringSerializer implements Serializer<String> {}
public class StringDeserializer implements Deserializer<String> {}
// Serialize/Deserialize byte[] (pass-through)
public class ByteArraySerializer implements Serializer<byte[]> {}
public class ByteArrayDeserializer implements Deserializer<byte[]> {}
// Serialize/Deserialize ByteBuffer
public class ByteBufferSerializer implements Serializer<ByteBuffer> {}
public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {}
// Serialize/Deserialize Bytes wrapper
public class BytesSerializer implements Serializer<Bytes> {}
public class BytesDeserializer implements Deserializer<Bytes> {}// Serialize/Deserialize UUID
public class UUIDSerializer implements Serializer<UUID> {}
public class UUIDDeserializer implements Deserializer<UUID> {}
// Serialize/Deserialize Void (always null)
public class VoidSerializer implements Serializer<Void> {
@Override
public byte[] serialize(String topic, Void data) {
return null;
}
}
public class VoidDeserializer implements Deserializer<Void> {
@Override
public Void deserialize(String topic, byte[] data) {
return null;
}
}// Serialize List<T> using inner serializer
public class ListSerializer<Inner> implements Serializer<List<Inner>> {
public ListSerializer();
public ListSerializer(Serializer<Inner> innerSerializer);
}
// Deserialize List<T> using inner deserializer
public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
public ListDeserializer();
public <L extends List<Inner>> ListDeserializer(
Class<L> listClass,
Deserializer<Inner> innerDeserializer
);
}The Bytes class is a wrapper around byte arrays providing hashCode and equals implementations.
import org.apache.kafka.common.utils.Bytes;
public class Bytes implements Comparable<Bytes> {
// Constructor
public Bytes(byte[] bytes);
// Factory methods
public static Bytes wrap(byte[] bytes);
// Access methods
public byte[] get();
// Comparison
public int compareTo(Bytes that);
// Standard methods
public boolean equals(Object o);
public int hashCode();
}Usage:
import org.apache.kafka.common.utils.Bytes;
// Wrap byte array
byte[] data = "hello".getBytes();
Bytes bytes = Bytes.wrap(data);
// Use with BytesSerde
Serde<Bytes> bytesSerde = Serdes.Bytes();Serializers and deserializers can be configured via the configure() method:
import org.apache.kafka.common.serialization.*;
import java.util.Map;
import java.util.HashMap;
Serializer<String> serializer = new StringSerializer();
Map<String, Object> configs = new HashMap<>();
configs.put("key.serializer.encoding", "UTF-8");
serializer.configure(configs, true); // true = key serializerimport org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Specify serializers by class name
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
Producer<String, Long> producer = new KafkaProducer<>(props);import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
// Specify deserializers by class name
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
Consumer<String, Long> consumer = new KafkaConsumer<>(props);import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Set default serdes
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass());
// Or specify per-operation
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = builder.stream(
"input-topic",
Consumed.with(Serdes.String(), Serdes.Long())
);
stream.to("output-topic",
Produced.with(Serdes.String(), Serdes.Long()));All built-in serializers handle null values:
null when given null inputnull when given null inputStringSerializer serializer = new StringSerializer();
byte[] result = serializer.serialize("topic", null);
// result is null
StringDeserializer deserializer = new StringDeserializer();
String value = deserializer.deserialize("topic", null);
// value is nullTo create custom serializers, implement the Serializer<T> and Deserializer<T> interfaces:
import org.apache.kafka.common.serialization.*;
import java.util.Map;
public class MyObjectSerializer implements Serializer<MyObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configure serializer
}
@Override
public byte[] serialize(String topic, MyObject data) {
if (data == null) {
return null;
}
// Serialize MyObject to byte array
return toBytes(data);
}
@Override
public void close() {
// Cleanup resources
}
}
public class MyObjectDeserializer implements Deserializer<MyObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configure deserializer
}
@Override
public MyObject deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
// Deserialize byte array to MyObject
return fromBytes(data);
}
@Override
public void close() {
// Cleanup resources
}
}
// Create Serde from custom serializer/deserializer
Serde<MyObject> myObjectSerde = Serdes.serdeFrom(
new MyObjectSerializer(),
new MyObjectDeserializer()
);import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.*;
import java.util.Map;
public class RobustSerializer implements Serializer<MyObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Validate configuration
Object encoding = configs.get("serializer.encoding");
if (encoding != null && !encoding.equals("UTF-8")) {
throw new SerializationException(
"Only UTF-8 encoding supported, got: " + encoding);
}
}
@Override
public byte[] serialize(String topic, MyObject data) {
if (data == null) {
return null; // Always handle null
}
try {
// Validate data before serialization
if (!data.isValid()) {
throw new SerializationException(
"Invalid object state: " + data);
}
// Perform serialization
byte[] bytes = convertToBytes(data);
// Validate output
if (bytes.length > 1_000_000) { // 1MB limit
throw new SerializationException(
"Serialized data too large: " + bytes.length + " bytes");
}
return bytes;
} catch (Exception e) {
throw new SerializationException(
"Failed to serialize object: " + e.getMessage(), e);
}
}
@Override
public void close() {
// Cleanup resources
}
private byte[] convertToBytes(MyObject data) {
// Implementation
return new byte[0];
}
}
public class RobustDeserializer implements Deserializer<MyObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configuration
}
@Override
public MyObject deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try {
// Validate input
if (data.length == 0) {
throw new SerializationException("Empty data");
}
// Deserialize
MyObject obj = convertFromBytes(data);
// Validate result
if (obj == null) {
throw new SerializationException("Deserialization produced null");
}
if (!obj.isValid()) {
throw new SerializationException("Deserialized object is invalid");
}
return obj;
} catch (Exception e) {
throw new SerializationException(
"Failed to deserialize: " + e.getMessage(), e);
}
}
@Override
public void close() {
// Cleanup
}
private MyObject convertFromBytes(byte[] data) {
// Implementation
return null;
}
}// Versioned serialization for schema evolution
import java.io.*;
public class VersionedSerializer implements Serializer<MyObject> {
private static final byte VERSION_1 = 1;
private static final byte CURRENT_VERSION = VERSION_1;
@Override
public byte[] serialize(String topic, MyObject data) {
if (data == null) {
return null;
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
// Write version byte first
dos.writeByte(CURRENT_VERSION);
// Write data according to version
switch (CURRENT_VERSION) {
case VERSION_1:
serializeV1(dos, data);
break;
default:
throw new SerializationException("Unknown version: " + CURRENT_VERSION);
}
return baos.toByteArray();
} catch (IOException e) {
throw new SerializationException("Serialization failed", e);
}
}
private void serializeV1(DataOutputStream dos, MyObject data) throws IOException {
dos.writeUTF(data.getName());
dos.writeLong(data.getId());
}
}
public class VersionedDeserializer implements Deserializer<MyObject> {
@Override
public MyObject deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dis = new DataInputStream(bais)) {
// Read version byte
byte version = dis.readByte();
// Deserialize according to version
switch (version) {
case 1:
return deserializeV1(dis);
default:
throw new SerializationException("Unknown version: " + version);
}
} catch (IOException e) {
throw new SerializationException("Deserialization failed", e);
}
}
private MyObject deserializeV1(DataInputStream dis) throws IOException {
String name = dis.readUTF();
long id = dis.readLong();
return new MyObject(name, id);
}
}// Deserializer with corruption detection
public class ChecksumDeserializer implements Deserializer<MyObject> {
@Override
public MyObject deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
if (data.length < 4) {
throw new SerializationException("Data too short for checksum");
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dis = new DataInputStream(bais)) {
// Read checksum (first 4 bytes)
int expectedChecksum = dis.readInt();
// Read remaining data
byte[] payload = new byte[data.length - 4];
dis.readFully(payload);
// Verify checksum
int actualChecksum = calculateChecksum(payload);
if (actualChecksum != expectedChecksum) {
throw new SerializationException(
"Checksum mismatch - data corrupted. Expected: " +
expectedChecksum + ", got: " + actualChecksum);
}
// Deserialize payload
return deserializePayload(payload);
} catch (IOException e) {
throw new SerializationException("Deserialization failed", e);
}
}
private int calculateChecksum(byte[] data) {
// Simple checksum (use CRC32 in production)
int checksum = 0;
for (byte b : data) {
checksum += b;
}
return checksum;
}
private MyObject deserializePayload(byte[] payload) {
// Implementation
return null;
}
}// Handling large objects that may exceed Kafka limits
public class ChunkedSerializer implements Serializer<LargeObject> {
private static final int MAX_CHUNK_SIZE = 900_000; // 900KB
@Override
public byte[] serialize(String topic, LargeObject data) {
if (data == null) {
return null;
}
byte[] fullData = convertToBytes(data);
if (fullData.length > MAX_CHUNK_SIZE) {
// Object too large for single message
throw new SerializationException(
"Object too large: " + fullData.length + " bytes. " +
"Consider chunking or using external storage with reference.");
}
return fullData;
}
private byte[] convertToBytes(LargeObject data) {
// Implementation
return new byte[0];
}
}
// Alternative: Store large objects externally, serialize reference
public class ReferenceSerializer implements Serializer<LargeObject> {
private ExternalStorage storage;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String storageUrl = (String) configs.get("external.storage.url");
this.storage = new ExternalStorage(storageUrl);
}
@Override
public byte[] serialize(String topic, LargeObject data) {
if (data == null) {
return null;
}
// Store object externally
String objectId = storage.store(data);
// Serialize reference
ObjectReference ref = new ObjectReference(objectId, data.getSize());
return serializeReference(ref);
}
private byte[] serializeReference(ObjectReference ref) {
// Serialize small reference object
return new byte[0];
}
}// Benchmark different serialization approaches
import java.util.*;
public class SerializationBenchmark {
public void compareSerialization() {
MyObject testObject = createTestObject();
int iterations = 10000;
// Test Java serialization
long javaTime = benchmarkSerializer(
new JavaSerializer(), testObject, iterations);
// Test JSON serialization
long jsonTime = benchmarkSerializer(
new JsonSerializer(), testObject, iterations);
// Test Avro serialization
long avroTime = benchmarkSerializer(
new AvroSerializer(), testObject, iterations);
// Test Protocol Buffers
long protobufTime = benchmarkSerializer(
new ProtobufSerializer(), testObject, iterations);
System.out.println("Serialization performance (10k iterations):");
System.out.println(" Java: " + javaTime + "ms");
System.out.println(" JSON: " + jsonTime + "ms");
System.out.println(" Avro: " + avroTime + "ms");
System.out.println(" Protobuf: " + protobufTime + "ms");
// Typical results:
// - Java: Slowest, largest size
// - JSON: Slow, human-readable
// - Avro: Fast, compact, schema evolution support
// - Protobuf: Fastest, most compact
}
private long benchmarkSerializer(Serializer<MyObject> serializer,
MyObject object,
int iterations) {
long start = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
byte[] bytes = serializer.serialize("test-topic", object);
}
return System.currentTimeMillis() - start;
}
private MyObject createTestObject() {
return new MyObject("test", 123L);
}
}// Decision guide for serialization format
public class SerializationFormatGuide {
public Serializer<?> chooseSerializer(Requirements requirements) {
// For simple types (String, Long, etc.)
if (requirements.isSimpleType()) {
return Serdes.String().serializer();
}
// For human-readable data (debugging, logging)
if (requirements.needsHumanReadable()) {
return new JsonSerializer();
}
// For schema evolution support
if (requirements.needsSchemaEvolution()) {
return new AvroSerializer();
// Or: return new ProtobufSerializer();
}
// For maximum performance
if (requirements.needsMaxPerformance()) {
return new ProtobufSerializer();
}
// For compatibility with external systems
if (requirements.needsExternalCompatibility()) {
return new JsonSerializer(); // Most compatible
}
// Default: Avro (good balance)
return new AvroSerializer();
}
}
// Format comparison:
//
// String/Primitive Serdes:
// - Use when: Simple data types
// - Pros: Fast, built-in, no dependencies
// - Cons: No structure, no evolution
//
// JSON:
// - Use when: Human-readable, debugging, external systems
// - Pros: Human-readable, widely supported
// - Cons: Slow, large size, no schema enforcement
//
// Avro:
// - Use when: Schema evolution, Kafka ecosystem
// - Pros: Compact, schema evolution, Kafka integration
// - Cons: Requires schema registry, learning curve
//
// Protocol Buffers:
// - Use when: Maximum performance, cross-language
// - Pros: Fastest, most compact, strong typing
// - Cons: Requires .proto files, code generation