or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

serialization.mddocs/common/

Serialization

Kafka's serialization API provides pluggable serializers and deserializers for converting between typed objects and byte arrays. The Serdes factory class provides built-in implementations for common types.

Serdes Factory

The Serdes class provides factory methods for creating Serde<T> instances that combine serializers and deserializers.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serde;

// Factory methods for built-in types
public class Serdes {
    // Primitive types
    public static Serde<Long> Long();
    public static Serde<Integer> Integer();
    public static Serde<Short> Short();
    public static Serde<Float> Float();
    public static Serde<Double> Double();
    public static Serde<Boolean> Boolean();

    // String and byte types
    public static Serde<String> String();
    public static Serde<byte[]> ByteArray();
    public static Serde<ByteBuffer> ByteBuffer();
    public static Serde<Bytes> Bytes();

    // Special types
    public static Serde<UUID> UUID();
    public static Serde<Void> Void();

    // Complex types
    public static <L extends List<Inner>, Inner> Serde<List<Inner>> ListSerde(
        Class<L> listClass,
        Serde<Inner> innerSerde
    );

    // Custom serde creation
    public static <T> Serde<T> serdeFrom(Class<T> type);
    public static <T> Serde<T> serdeFrom(
        Serializer<T> serializer,
        Deserializer<T> deserializer
    );
}

Usage Examples

Using Built-in Serdes:

import org.apache.kafka.common.serialization.Serdes;

// For Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = builder.stream(
    "input-topic",
    Consumed.with(Serdes.String(), Serdes.Long())
);

Creating Custom Serde from Serializer/Deserializer:

import org.apache.kafka.common.serialization.Serdes;

Serde<MyCustomType> customSerde = Serdes.serdeFrom(
    new MyCustomSerializer(),
    new MyCustomDeserializer()
);

Creating Serde by Type:

// Automatically selects appropriate serde for the type
Serde<String> stringSerde = Serdes.serdeFrom(String.class);
Serde<Long> longSerde = Serdes.serdeFrom(Long.class);

Built-in Serde Implementations

All built-in serdes are inner classes of Serdes that can be instantiated directly or via factory methods.

Primitive Type Serdes

// Serde for Long values (8 bytes, big-endian)
public static final class LongSerde implements Serde<Long> {
    public LongSerde();
}

// Serde for Integer values (4 bytes, big-endian)
public static final class IntegerSerde implements Serde<Integer> {
    public IntegerSerde();
}

// Serde for Short values (2 bytes, big-endian)
public static final class ShortSerde implements Serde<Short> {
    public ShortSerde();
}

// Serde for Float values (4 bytes, IEEE 754 floating-point)
public static final class FloatSerde implements Serde<Float> {
    public FloatSerde();
}

// Serde for Double values (8 bytes, IEEE 754 floating-point)
public static final class DoubleSerde implements Serde<Double> {
    public DoubleSerde();
}

// Serde for Boolean values (1 byte: 0x00 = false, 0x01 = true)
public static final class BooleanSerde implements Serde<Boolean> {
    public BooleanSerde();
}

String and Byte Serdes

// Serde for String values (UTF-8 encoding)
public static final class StringSerde implements Serde<String> {
    public StringSerde();
}

// Serde for byte arrays (pass-through, no transformation)
public static final class ByteArraySerde implements Serde<byte[]> {
    public ByteArraySerde();
}

// Serde for ByteBuffer (converts to/from byte array)
public static final class ByteBufferSerde implements Serde<ByteBuffer> {
    public ByteBufferSerde();
}

// Serde for Kafka's Bytes wrapper class
public static final class BytesSerde implements Serde<Bytes> {
    public BytesSerde();
}

Special Type Serdes

// Serde for UUID values (converts to/from 36-character string)
public static final class UUIDSerde implements Serde<UUID> {
    public UUIDSerde();
}

// Serde for Void/null values (always serializes to null)
public static final class VoidSerde implements Serde<Void> {
    public VoidSerde();
}

List Serde

The ListSerde serializes and deserializes lists of elements using an inner serde.

public static final class ListSerde<Inner> implements Serde<List<Inner>> {
    // Default constructor (requires configuration)
    public ListSerde();

    // Constructor with list class and inner serde
    public <L extends List<Inner>> ListSerde(
        Class<L> listClass,
        Serde<Inner> innerSerde
    );
}

Usage Example:

import org.apache.kafka.common.serialization.Serdes;
import java.util.ArrayList;

// Create a serde for List<String>
Serde<List<String>> listSerde = Serdes.ListSerde(
    ArrayList.class,
    Serdes.String()
);

// Use in Kafka Streams
KStream<String, List<String>> stream = builder.stream(
    "input-topic",
    Consumed.with(Serdes.String(), listSerde)
);

Individual Serializer/Deserializer Classes

All Serde implementations are backed by individual serializer and deserializer classes.

Primitive Type Serializers

import org.apache.kafka.common.serialization.*;

// Serialize/Deserialize Long (8 bytes, big-endian)
public class LongSerializer implements Serializer<Long> {}
public class LongDeserializer implements Deserializer<Long> {}

// Serialize/Deserialize Integer (4 bytes, big-endian)
public class IntegerSerializer implements Serializer<Integer> {}
public class IntegerDeserializer implements Deserializer<Integer> {}

// Serialize/Deserialize Short (2 bytes, big-endian)
public class ShortSerializer implements Serializer<Short> {}
public class ShortDeserializer implements Deserializer<Short> {}

// Serialize/Deserialize Float (4 bytes)
public class FloatSerializer implements Serializer<Float> {}
public class FloatDeserializer implements Deserializer<Float> {}

// Serialize/Deserialize Double (8 bytes)
public class DoubleSerializer implements Serializer<Double> {}
public class DoubleDeserializer implements Deserializer<Double> {}

// Serialize/Deserialize Boolean (1 byte)
public class BooleanSerializer implements Serializer<Boolean> {}
public class BooleanDeserializer implements Deserializer<Boolean> {}

String and Byte Serializers

// Serialize/Deserialize String (UTF-8)
public class StringSerializer implements Serializer<String> {}
public class StringDeserializer implements Deserializer<String> {}

// Serialize/Deserialize byte[] (pass-through)
public class ByteArraySerializer implements Serializer<byte[]> {}
public class ByteArrayDeserializer implements Deserializer<byte[]> {}

// Serialize/Deserialize ByteBuffer
public class ByteBufferSerializer implements Serializer<ByteBuffer> {}
public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {}

// Serialize/Deserialize Bytes wrapper
public class BytesSerializer implements Serializer<Bytes> {}
public class BytesDeserializer implements Deserializer<Bytes> {}

Special Type Serializers

// Serialize/Deserialize UUID
public class UUIDSerializer implements Serializer<UUID> {}
public class UUIDDeserializer implements Deserializer<UUID> {}

// Serialize/Deserialize Void (always null)
public class VoidSerializer implements Serializer<Void> {
    @Override
    public byte[] serialize(String topic, Void data) {
        return null;
    }
}

public class VoidDeserializer implements Deserializer<Void> {
    @Override
    public Void deserialize(String topic, byte[] data) {
        return null;
    }
}

List Serializers

// Serialize List<T> using inner serializer
public class ListSerializer<Inner> implements Serializer<List<Inner>> {
    public ListSerializer();
    public ListSerializer(Serializer<Inner> innerSerializer);
}

// Deserialize List<T> using inner deserializer
public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
    public ListDeserializer();
    public <L extends List<Inner>> ListDeserializer(
        Class<L> listClass,
        Deserializer<Inner> innerDeserializer
    );
}

Bytes Wrapper Class

The Bytes class is a wrapper around byte arrays providing hashCode and equals implementations.

import org.apache.kafka.common.utils.Bytes;

public class Bytes implements Comparable<Bytes> {
    // Constructor
    public Bytes(byte[] bytes);

    // Factory methods
    public static Bytes wrap(byte[] bytes);

    // Access methods
    public byte[] get();

    // Comparison
    public int compareTo(Bytes that);

    // Standard methods
    public boolean equals(Object o);
    public int hashCode();
}

Usage:

import org.apache.kafka.common.utils.Bytes;

// Wrap byte array
byte[] data = "hello".getBytes();
Bytes bytes = Bytes.wrap(data);

// Use with BytesSerde
Serde<Bytes> bytesSerde = Serdes.Bytes();

Configuration

Serializers and deserializers can be configured via the configure() method:

import org.apache.kafka.common.serialization.*;
import java.util.Map;
import java.util.HashMap;

Serializer<String> serializer = new StringSerializer();
Map<String, Object> configs = new HashMap<>();
configs.put("key.serializer.encoding", "UTF-8");
serializer.configure(configs, true); // true = key serializer

Common Usage Patterns

Producer Configuration

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Specify serializers by class name
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    LongSerializer.class.getName());

Producer<String, Long> producer = new KafkaProducer<>(props);

Consumer Configuration

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;
import java.util.Properties;

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

// Specify deserializers by class name
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    LongDeserializer.class.getName());

Consumer<String, Long> consumer = new KafkaConsumer<>(props);

Kafka Streams Configuration

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Set default serdes
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
    Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
    Serdes.Long().getClass());

// Or specify per-operation
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = builder.stream(
    "input-topic",
    Consumed.with(Serdes.String(), Serdes.Long())
);

stream.to("output-topic",
    Produced.with(Serdes.String(), Serdes.Long()));

Null Handling

All built-in serializers handle null values:

  • Serializers return null when given null input
  • Deserializers return null when given null input
StringSerializer serializer = new StringSerializer();
byte[] result = serializer.serialize("topic", null);
// result is null

StringDeserializer deserializer = new StringDeserializer();
String value = deserializer.deserialize("topic", null);
// value is null

Custom Serialization

To create custom serializers, implement the Serializer<T> and Deserializer<T> interfaces:

import org.apache.kafka.common.serialization.*;
import java.util.Map;

public class MyObjectSerializer implements Serializer<MyObject> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configure serializer
    }

    @Override
    public byte[] serialize(String topic, MyObject data) {
        if (data == null) {
            return null;
        }
        // Serialize MyObject to byte array
        return toBytes(data);
    }

    @Override
    public void close() {
        // Cleanup resources
    }
}

public class MyObjectDeserializer implements Deserializer<MyObject> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configure deserializer
    }

    @Override
    public MyObject deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        // Deserialize byte array to MyObject
        return fromBytes(data);
    }

    @Override
    public void close() {
        // Cleanup resources
    }
}

// Create Serde from custom serializer/deserializer
Serde<MyObject> myObjectSerde = Serdes.serdeFrom(
    new MyObjectSerializer(),
    new MyObjectDeserializer()
);

Serialization Best Practices

Error Handling in Custom Serializers

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.*;
import java.util.Map;

public class RobustSerializer implements Serializer<MyObject> {
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Validate configuration
        Object encoding = configs.get("serializer.encoding");
        if (encoding != null && !encoding.equals("UTF-8")) {
            throw new SerializationException(
                "Only UTF-8 encoding supported, got: " + encoding);
        }
    }
    
    @Override
    public byte[] serialize(String topic, MyObject data) {
        if (data == null) {
            return null; // Always handle null
        }
        
        try {
            // Validate data before serialization
            if (!data.isValid()) {
                throw new SerializationException(
                    "Invalid object state: " + data);
            }
            
            // Perform serialization
            byte[] bytes = convertToBytes(data);
            
            // Validate output
            if (bytes.length > 1_000_000) { // 1MB limit
                throw new SerializationException(
                    "Serialized data too large: " + bytes.length + " bytes");
            }
            
            return bytes;
            
        } catch (Exception e) {
            throw new SerializationException(
                "Failed to serialize object: " + e.getMessage(), e);
        }
    }
    
    @Override
    public void close() {
        // Cleanup resources
    }
    
    private byte[] convertToBytes(MyObject data) {
        // Implementation
        return new byte[0];
    }
}

public class RobustDeserializer implements Deserializer<MyObject> {
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration
    }
    
    @Override
    public MyObject deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        
        try {
            // Validate input
            if (data.length == 0) {
                throw new SerializationException("Empty data");
            }
            
            // Deserialize
            MyObject obj = convertFromBytes(data);
            
            // Validate result
            if (obj == null) {
                throw new SerializationException("Deserialization produced null");
            }
            
            if (!obj.isValid()) {
                throw new SerializationException("Deserialized object is invalid");
            }
            
            return obj;
            
        } catch (Exception e) {
            throw new SerializationException(
                "Failed to deserialize: " + e.getMessage(), e);
        }
    }
    
    @Override
    public void close() {
        // Cleanup
    }
    
    private MyObject convertFromBytes(byte[] data) {
        // Implementation
        return null;
    }
}

Schema Evolution Patterns

// Versioned serialization for schema evolution
import java.io.*;

public class VersionedSerializer implements Serializer<MyObject> {
    private static final byte VERSION_1 = 1;
    private static final byte CURRENT_VERSION = VERSION_1;
    
    @Override
    public byte[] serialize(String topic, MyObject data) {
        if (data == null) {
            return null;
        }
        
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(baos)) {
            
            // Write version byte first
            dos.writeByte(CURRENT_VERSION);
            
            // Write data according to version
            switch (CURRENT_VERSION) {
                case VERSION_1:
                    serializeV1(dos, data);
                    break;
                default:
                    throw new SerializationException("Unknown version: " + CURRENT_VERSION);
            }
            
            return baos.toByteArray();
            
        } catch (IOException e) {
            throw new SerializationException("Serialization failed", e);
        }
    }
    
    private void serializeV1(DataOutputStream dos, MyObject data) throws IOException {
        dos.writeUTF(data.getName());
        dos.writeLong(data.getId());
    }
}

public class VersionedDeserializer implements Deserializer<MyObject> {
    
    @Override
    public MyObject deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        
        try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
             DataInputStream dis = new DataInputStream(bais)) {
            
            // Read version byte
            byte version = dis.readByte();
            
            // Deserialize according to version
            switch (version) {
                case 1:
                    return deserializeV1(dis);
                default:
                    throw new SerializationException("Unknown version: " + version);
            }
            
        } catch (IOException e) {
            throw new SerializationException("Deserialization failed", e);
        }
    }
    
    private MyObject deserializeV1(DataInputStream dis) throws IOException {
        String name = dis.readUTF();
        long id = dis.readLong();
        return new MyObject(name, id);
    }
}

Edge Cases

Handling Corrupt Data

// Deserializer with corruption detection
public class ChecksumDeserializer implements Deserializer<MyObject> {
    
    @Override
    public MyObject deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        
        if (data.length < 4) {
            throw new SerializationException("Data too short for checksum");
        }
        
        try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
             DataInputStream dis = new DataInputStream(bais)) {
            
            // Read checksum (first 4 bytes)
            int expectedChecksum = dis.readInt();
            
            // Read remaining data
            byte[] payload = new byte[data.length - 4];
            dis.readFully(payload);
            
            // Verify checksum
            int actualChecksum = calculateChecksum(payload);
            if (actualChecksum != expectedChecksum) {
                throw new SerializationException(
                    "Checksum mismatch - data corrupted. Expected: " + 
                    expectedChecksum + ", got: " + actualChecksum);
            }
            
            // Deserialize payload
            return deserializePayload(payload);
            
        } catch (IOException e) {
            throw new SerializationException("Deserialization failed", e);
        }
    }
    
    private int calculateChecksum(byte[] data) {
        // Simple checksum (use CRC32 in production)
        int checksum = 0;
        for (byte b : data) {
            checksum += b;
        }
        return checksum;
    }
    
    private MyObject deserializePayload(byte[] payload) {
        // Implementation
        return null;
    }
}

Large Object Serialization

// Handling large objects that may exceed Kafka limits
public class ChunkedSerializer implements Serializer<LargeObject> {
    private static final int MAX_CHUNK_SIZE = 900_000; // 900KB
    
    @Override
    public byte[] serialize(String topic, LargeObject data) {
        if (data == null) {
            return null;
        }
        
        byte[] fullData = convertToBytes(data);
        
        if (fullData.length > MAX_CHUNK_SIZE) {
            // Object too large for single message
            throw new SerializationException(
                "Object too large: " + fullData.length + " bytes. " +
                "Consider chunking or using external storage with reference.");
        }
        
        return fullData;
    }
    
    private byte[] convertToBytes(LargeObject data) {
        // Implementation
        return new byte[0];
    }
}

// Alternative: Store large objects externally, serialize reference
public class ReferenceSerializer implements Serializer<LargeObject> {
    private ExternalStorage storage;
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String storageUrl = (String) configs.get("external.storage.url");
        this.storage = new ExternalStorage(storageUrl);
    }
    
    @Override
    public byte[] serialize(String topic, LargeObject data) {
        if (data == null) {
            return null;
        }
        
        // Store object externally
        String objectId = storage.store(data);
        
        // Serialize reference
        ObjectReference ref = new ObjectReference(objectId, data.getSize());
        return serializeReference(ref);
    }
    
    private byte[] serializeReference(ObjectReference ref) {
        // Serialize small reference object
        return new byte[0];
    }
}

Performance Optimization

Serialization Performance Comparison

// Benchmark different serialization approaches
import java.util.*;

public class SerializationBenchmark {
    
    public void compareSerialization() {
        MyObject testObject = createTestObject();
        int iterations = 10000;
        
        // Test Java serialization
        long javaTime = benchmarkSerializer(
            new JavaSerializer(), testObject, iterations);
        
        // Test JSON serialization
        long jsonTime = benchmarkSerializer(
            new JsonSerializer(), testObject, iterations);
        
        // Test Avro serialization
        long avroTime = benchmarkSerializer(
            new AvroSerializer(), testObject, iterations);
        
        // Test Protocol Buffers
        long protobufTime = benchmarkSerializer(
            new ProtobufSerializer(), testObject, iterations);
        
        System.out.println("Serialization performance (10k iterations):");
        System.out.println("  Java: " + javaTime + "ms");
        System.out.println("  JSON: " + jsonTime + "ms");
        System.out.println("  Avro: " + avroTime + "ms");
        System.out.println("  Protobuf: " + protobufTime + "ms");
        
        // Typical results:
        // - Java: Slowest, largest size
        // - JSON: Slow, human-readable
        // - Avro: Fast, compact, schema evolution support
        // - Protobuf: Fastest, most compact
    }
    
    private long benchmarkSerializer(Serializer<MyObject> serializer,
                                     MyObject object,
                                     int iterations) {
        long start = System.currentTimeMillis();
        for (int i = 0; i < iterations; i++) {
            byte[] bytes = serializer.serialize("test-topic", object);
        }
        return System.currentTimeMillis() - start;
    }
    
    private MyObject createTestObject() {
        return new MyObject("test", 123L);
    }
}

Choosing Serialization Format

// Decision guide for serialization format

public class SerializationFormatGuide {
    
    public Serializer<?> chooseSerializer(Requirements requirements) {
        // For simple types (String, Long, etc.)
        if (requirements.isSimpleType()) {
            return Serdes.String().serializer();
        }
        
        // For human-readable data (debugging, logging)
        if (requirements.needsHumanReadable()) {
            return new JsonSerializer();
        }
        
        // For schema evolution support
        if (requirements.needsSchemaEvolution()) {
            return new AvroSerializer();
            // Or: return new ProtobufSerializer();
        }
        
        // For maximum performance
        if (requirements.needsMaxPerformance()) {
            return new ProtobufSerializer();
        }
        
        // For compatibility with external systems
        if (requirements.needsExternalCompatibility()) {
            return new JsonSerializer(); // Most compatible
        }
        
        // Default: Avro (good balance)
        return new AvroSerializer();
    }
}

// Format comparison:
// 
// String/Primitive Serdes:
// - Use when: Simple data types
// - Pros: Fast, built-in, no dependencies
// - Cons: No structure, no evolution
//
// JSON:
// - Use when: Human-readable, debugging, external systems
// - Pros: Human-readable, widely supported
// - Cons: Slow, large size, no schema enforcement
//
// Avro:
// - Use when: Schema evolution, Kafka ecosystem
// - Pros: Compact, schema evolution, Kafka integration
// - Cons: Requires schema registry, learning curve
//
// Protocol Buffers:
// - Use when: Maximum performance, cross-language
// - Pros: Fastest, most compact, strong typing
// - Cons: Requires .proto files, code generation