Base classes and utilities for Apache Flink Kafka connectors providing common functionality for stream processing with exactly-once guarantees
Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety. These schemas provide the bridge between Flink's data types and Kafka's byte array format.
Interface for deserializing Kafka messages with access to key, value, topic, partition, and offset information.
public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
boolean isEndOfStream(T nextElement);
TypeInformation<T> getProducedType();
}Methods:
deserialize() - Convert Kafka message bytes to typed objects
messageKey - Message key as byte array (null if no key)message - Message value as byte array (null if tombstone/deleted)topic - Topic name where message originatedpartition - Partition number within the topicoffset - Message offset within the partitionisEndOfStream() - Check if element signals end of stream
nextElement - The deserialized element to testgetProducedType() - Provide type information for Flink's type system
Usage Example:
public class MyEventDeserializationSchema implements KeyedDeserializationSchema<MyEvent> {
@Override
public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException {
if (message == null) {
return null; // Skip tombstone messages
}
String keyStr = messageKey != null ? new String(messageKey, StandardCharsets.UTF_8) : null;
String valueStr = new String(message, StandardCharsets.UTF_8);
return new MyEvent(keyStr, valueStr, topic, partition, offset, System.currentTimeMillis());
}
@Override
public boolean isEndOfStream(MyEvent nextElement) {
return nextElement != null && "END_MARKER".equals(nextElement.getType());
}
@Override
public TypeInformation<MyEvent> getProducedType() {
return TypeInformation.of(MyEvent.class);
}
}Interface for serializing elements to Kafka messages with separate key and value handling and optional target topic selection.
public interface KeyedSerializationSchema<T> extends Serializable {
byte[] serializeKey(T element);
byte[] serializeValue(T element);
String getTargetTopic(T element);
}Methods:
serializeKey() - Extract and serialize the message key
element - The element to serializeserializeValue() - Serialize the message value
element - The element to serializegetTargetTopic() - Determine target topic for this element
element - The element being sentUsage Example:
public class MyEventSerializationSchema implements KeyedSerializationSchema<MyEvent> {
@Override
public byte[] serializeKey(MyEvent element) {
return element.getUserId() != null ?
element.getUserId().getBytes(StandardCharsets.UTF_8) : null;
}
@Override
public byte[] serializeValue(MyEvent element) {
// Use JSON serialization for the value
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsBytes(element);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event", e);
}
}
@Override
public String getTargetTopic(MyEvent element) {
// Route to different topics based on event type
switch (element.getType()) {
case "USER_ACTION":
return "user-actions";
case "SYSTEM_EVENT":
return "system-events";
default:
return null; // Use default topic
}
}
}Utility classes for adapting between simple and keyed serialization interfaces.
Wraps a simple DeserializationSchema to work with the keyed interface.
public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema);
}Usage Example:
// Wrap a simple string deserializer
DeserializationSchema<String> simpleSchema = new SimpleStringSchema();
KeyedDeserializationSchema<String> keyedSchema = new KeyedDeserializationSchemaWrapper<>(simpleSchema);Wraps a simple SerializationSchema to work with the keyed interface.
public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema);
}Usage Example:
// Wrap a simple string serializer
SerializationSchema<String> simpleSchema = new SimpleStringSchema();
KeyedSerializationSchema<String> keyedSchema = new KeyedSerializationSchemaWrapper<>(simpleSchema);Deserializes JSON key-value messages into Jackson ObjectNode with optional metadata inclusion.
public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
public JSONKeyValueDeserializationSchema(boolean includeMetadata);
}Parameters:
includeMetadata - Whether to include topic, partition, offset in the outputUsage Example:
// Include metadata in the deserialized JSON
KeyedDeserializationSchema<ObjectNode> schema = new JSONKeyValueDeserializationSchema(true);
// The resulting ObjectNode will have structure:
// {
// "key": { ... }, // Original message key as JSON
// "value": { ... }, // Original message value as JSON
// "metadata": {
// "topic": "my-topic",
// "partition": 0,
// "offset": 12345
// }
// }Simple JSON deserialization extending JsonNodeDeserializationSchema.
public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {
public JSONDeserializationSchema();
}Type-safe serialization for key-value pairs using Flink's type information system.
public class TypeInformationKeyValueSerializationSchema<K, V>
implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
public TypeInformationKeyValueSerializationSchema(
Class<K> keyClass,
TypeInformation<K> keyTypeInfo,
Class<V> valueClass,
TypeInformation<V> valueTypeInfo,
ExecutionConfig config
);
}Usage Example:
// Create schema for String keys and Long values
TypeInformationKeyValueSerializationSchema<String, Long> schema =
new TypeInformationKeyValueSerializationSchema<>(
String.class,
BasicTypeInfo.STRING_TYPE_INFO,
Long.class,
BasicTypeInfo.LONG_TYPE_INFO,
env.getConfig()
);JSON schemas for table API integration:
public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {
// Extends format-specific JSON row deserialization
}
public class JsonRowSerializationSchema extends org.apache.flink.formats.json.JsonRowSerializationSchema {
// Extends format-specific JSON row serialization
}Always handle deserialization errors gracefully:
@Override
public MyEvent deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException {
try {
// Deserialization logic
return parseMessage(message);
} catch (Exception e) {
// Option 1: Skip malformed messages
logger.warn("Failed to deserialize message at {}:{}:{}", topic, partition, offset, e);
return null;
// Option 2: Create error record
// return new MyEvent.error(topic, partition, offset, e.getMessage());
// Option 3: Rethrow to fail the job (strictest)
// throw new IOException("Deserialization failed", e);
}
}Always provide accurate type information:
@Override
public TypeInformation<MyEvent> getProducedType() {
// For POJOs
return TypeInformation.of(MyEvent.class);
// For generic types
return new TypeHint<List<MyEvent>>(){}.getTypeInfo();
// For tuples
return Types.TUPLE(Types.STRING, Types.LONG);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-base-2-11