CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kafka-base-2-11

Base classes and utilities for Apache Flink Kafka connectors providing common functionality for stream processing with exactly-once guarantees

Overview
Eval results
Files

serialization.mddocs/

Serialization Schemas

Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety. These schemas provide the bridge between Flink's data types and Kafka's byte array format.

Capabilities

KeyedDeserializationSchema

Interface for deserializing Kafka messages with access to key, value, topic, partition, and offset information.

public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

Methods:

  • deserialize() - Convert Kafka message bytes to typed objects

    • messageKey - Message key as byte array (null if no key)
    • message - Message value as byte array (null if tombstone/deleted)
    • topic - Topic name where message originated
    • partition - Partition number within the topic
    • offset - Message offset within the partition
    • Returns: Deserialized object (null to skip this message)
    • Throws: IOException if deserialization fails
  • isEndOfStream() - Check if element signals end of stream

    • nextElement - The deserialized element to test
    • Returns: true if this element should terminate the stream
  • getProducedType() - Provide type information for Flink's type system

Usage Example:

public class MyEventDeserializationSchema implements KeyedDeserializationSchema<MyEvent> {
    @Override
    public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 
            throws IOException {
        if (message == null) {
            return null; // Skip tombstone messages
        }
        
        String keyStr = messageKey != null ? new String(messageKey, StandardCharsets.UTF_8) : null;
        String valueStr = new String(message, StandardCharsets.UTF_8);
        
        return new MyEvent(keyStr, valueStr, topic, partition, offset, System.currentTimeMillis());
    }
    
    @Override
    public boolean isEndOfStream(MyEvent nextElement) {
        return nextElement != null && "END_MARKER".equals(nextElement.getType());
    }
    
    @Override
    public TypeInformation<MyEvent> getProducedType() {
        return TypeInformation.of(MyEvent.class);
    }
}

KeyedSerializationSchema

Interface for serializing elements to Kafka messages with separate key and value handling and optional target topic selection.

public interface KeyedSerializationSchema<T> extends Serializable {
    byte[] serializeKey(T element);
    byte[] serializeValue(T element);
    String getTargetTopic(T element);
}

Methods:

  • serializeKey() - Extract and serialize the message key

    • element - The element to serialize
    • Returns: Key as byte array (null if no key)
  • serializeValue() - Serialize the message value

    • element - The element to serialize
    • Returns: Value as byte array
  • getTargetTopic() - Determine target topic for this element

    • element - The element being sent
    • Returns: Topic name (null to use producer's default topic)

Usage Example:

public class MyEventSerializationSchema implements KeyedSerializationSchema<MyEvent> {
    @Override
    public byte[] serializeKey(MyEvent element) {
        return element.getUserId() != null ? 
            element.getUserId().getBytes(StandardCharsets.UTF_8) : null;
    }
    
    @Override
    public byte[] serializeValue(MyEvent element) {
        // Use JSON serialization for the value
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsBytes(element);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event", e);
        }
    }
    
    @Override
    public String getTargetTopic(MyEvent element) {
        // Route to different topics based on event type
        switch (element.getType()) {
            case "USER_ACTION":
                return "user-actions";
            case "SYSTEM_EVENT":
                return "system-events";
            default:
                return null; // Use default topic
        }
    }
}

Schema Wrappers

Utility classes for adapting between simple and keyed serialization interfaces.

KeyedDeserializationSchemaWrapper

Wraps a simple DeserializationSchema to work with the keyed interface.

public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
    public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
}

Usage Example:

// Wrap a simple string deserializer
DeserializationSchema<String> simpleSchema = new SimpleStringSchema();
KeyedDeserializationSchema<String> keyedSchema = new KeyedDeserializationSchemaWrapper<>(simpleSchema);

KeyedSerializationSchemaWrapper

Wraps a simple SerializationSchema to work with the keyed interface.

public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
    public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema);
}

Usage Example:

// Wrap a simple string serializer
SerializationSchema<String> simpleSchema = new SimpleStringSchema();
KeyedSerializationSchema<String> keyedSchema = new KeyedSerializationSchemaWrapper<>(simpleSchema);

Built-in Schema Implementations

JSONKeyValueDeserializationSchema

Deserializes JSON key-value messages into Jackson ObjectNode with optional metadata inclusion.

public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
    public JSONKeyValueDeserializationSchema(boolean includeMetadata);
}

Parameters:

  • includeMetadata - Whether to include topic, partition, offset in the output

Usage Example:

// Include metadata in the deserialized JSON
KeyedDeserializationSchema<ObjectNode> schema = new JSONKeyValueDeserializationSchema(true);

// The resulting ObjectNode will have structure:
// {
//   "key": { ... },           // Original message key as JSON
//   "value": { ... },         // Original message value as JSON  
//   "metadata": {
//     "topic": "my-topic",
//     "partition": 0,
//     "offset": 12345
//   }
// }

JSONDeserializationSchema

Simple JSON deserialization extending JsonNodeDeserializationSchema.

public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {
    public JSONDeserializationSchema();
}

TypeInformationKeyValueSerializationSchema

Type-safe serialization for key-value pairs using Flink's type information system.

public class TypeInformationKeyValueSerializationSchema<K, V> 
    implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
    
    public TypeInformationKeyValueSerializationSchema(
        Class<K> keyClass,
        TypeInformation<K> keyTypeInfo,
        Class<V> valueClass,
        TypeInformation<V> valueTypeInfo,
        ExecutionConfig config
    );
}

Usage Example:

// Create schema for String keys and Long values
TypeInformationKeyValueSerializationSchema<String, Long> schema = 
    new TypeInformationKeyValueSerializationSchema<>(
        String.class,
        BasicTypeInfo.STRING_TYPE_INFO,
        Long.class,
        BasicTypeInfo.LONG_TYPE_INFO,
        env.getConfig()
    );

Table-Related Schemas

JSON schemas for table API integration:

public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {
    // Extends format-specific JSON row deserialization
}

public class JsonRowSerializationSchema extends org.apache.flink.formats.json.JsonRowSerializationSchema {
    // Extends format-specific JSON row serialization
}

Best Practices

Error Handling

Always handle deserialization errors gracefully:

@Override
public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 
        throws IOException {
    try {
        // Deserialization logic
        return parseMessage(message);
    } catch (Exception e) {
        // Option 1: Skip malformed messages
        logger.warn("Failed to deserialize message at {}:{}:{}", topic, partition, offset, e);
        return null;
        
        // Option 2: Create error record
        // return new MyEvent.error(topic, partition, offset, e.getMessage());
        
        // Option 3: Rethrow to fail the job (strictest)
        // throw new IOException("Deserialization failed", e);
    }
}

Type Safety

Always provide accurate type information:

@Override
public TypeInformation<MyEvent> getProducedType() {
    // For POJOs
    return TypeInformation.of(MyEvent.class);
    
    // For generic types
    return new TypeHint<List<MyEvent>>(){}.getTypeInfo();
    
    // For tuples
    return Types.TUPLE(Types.STRING, Types.LONG);
}

Performance Considerations

  • Reuse serializers and deserializers when possible
  • Avoid heavy computation in serialization methods (they're called for every record)
  • Use efficient serialization formats (Avro, Protobuf) for high-throughput scenarios
  • Consider message size impact on network and storage

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-base-2-11

docs

consumer-base.md

index.md

partitioners.md

producer-base.md

serialization.md

table-api.md

tile.json