Apache Flink SQL connector for Apache Kafka 0.11 with shaded dependencies providing streaming and table API integration
The Flink Kafka connector provides comprehensive serialization and deserialization interfaces that enable flexible data format handling with access to Kafka record metadata including headers, timestamps, and partition information.
Interface for serialization schemas that need access to Kafka context information like partition assignments and parallelism details.
/**
* Interface for context-aware Kafka serialization schemas
* Provides access to Flink parallelism and Kafka partition information
*/
@PublicEvolving
interface KafkaContextAware<T> extends Serializable {
/**
* Set the parallel instance ID for this serialization instance
* @param parallelInstanceId the parallel instance ID (0-based)
*/
default void setParallelInstanceId(int parallelInstanceId) {
// Default implementation does nothing
}
/**
* Set the total number of parallel instances
* @param numParallelInstances total number of parallel instances
*/
default void setNumParallelInstances(int numParallelInstances) {
// Default implementation does nothing
}
/**
* Set the available Kafka partitions for the target topic
* @param partitions array of available partition IDs
*/
default void setPartitions(int[] partitions) {
// Default implementation does nothing
}
/**
* Determine the target topic for a given element
* @param element the element to be serialized
* @return the target Kafka topic name
*/
String getTargetTopic(T element);
}Advanced deserialization interface providing access to complete Kafka ConsumerRecord metadata.
/**
* Interface for deserializing Kafka records with access to metadata
* Provides both single-record and collector-based deserialization patterns
*/
@PublicEvolving
interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Initialization method called once per deserialization instance
* @param context initialization context providing metrics and user code classloader
*/
default void open(DeserializationSchema.InitializationContext context) throws Exception {
// Default implementation does nothing
}
/**
* Check if the given element signals end of input stream
* @param nextElement the element to check
* @return true if this element indicates end of stream
*/
boolean isEndOfStream(T nextElement);
/**
* Deserialize a Kafka ConsumerRecord into a single output element
* @param record the Kafka ConsumerRecord containing key, value, headers, etc.
* @return deserialized element, or null to skip this record
* @throws Exception if deserialization fails
*/
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
/**
* Deserialize a Kafka ConsumerRecord and emit zero or more output elements
* This method enables one-to-many deserialization patterns
* @param message the Kafka ConsumerRecord to deserialize
* @param out collector for emitting output elements
* @throws Exception if deserialization fails
*/
default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
T deserialized = deserialize(message);
if (deserialized != null) {
out.collect(deserialized);
}
}
}Usage Examples:
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
// Custom deserialization schema with metadata access
public class UserEventDeserializer implements KafkaDeserializationSchema<UserEvent> {
@Override
public boolean isEndOfStream(UserEvent nextElement) {
return false; // Never end stream
}
@Override
public UserEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
// Access Kafka metadata
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
long timestamp = record.timestamp();
// Access headers
String userId = null;
if (record.headers().headers("user_id").iterator().hasNext()) {
userId = new String(record.headers().headers("user_id").iterator().next().value());
}
// Deserialize value
String jsonValue = new String(record.value(), StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(jsonValue);
return new UserEvent(
userId != null ? userId : node.get("user_id").asText(),
node.get("event_type").asText(),
node.get("event_data").asText(),
timestamp,
topic,
partition,
offset
);
}
@Override
public TypeInformation<UserEvent> getProducedType() {
return TypeInformation.of(UserEvent.class);
}
}
// Usage with consumer
FlinkKafkaConsumer011<UserEvent> consumer = new FlinkKafkaConsumer011<>(
"user-events",
new UserEventDeserializer(),
properties
);Advanced serialization interface for creating complete Kafka ProducerRecords with control over topic, partition, key, value, headers, and timestamp.
/**
* Interface for serializing records to Kafka ProducerRecords
* Provides complete control over all aspects of the Kafka record
*/
@PublicEvolving
interface KafkaSerializationSchema<T> extends Serializable {
/**
* Initialization method called once per serialization instance
* @param context initialization context providing metrics and user code classloader
*/
default void open(SerializationSchema.InitializationContext context) throws Exception {
// Default implementation does nothing
}
/**
* Serialize an element into a Kafka ProducerRecord
* @param element the element to serialize
* @param timestamp the timestamp of the record (may be null)
* @return ProducerRecord with topic, partition, key, value, headers, and timestamp
*/
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
}Usage Examples:
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
// Custom serialization schema with full record control
public class UserEventSerializer implements KafkaSerializationSchema<UserEvent> {
private final String defaultTopic;
private final ObjectMapper objectMapper;
public UserEventSerializer(String defaultTopic) {
this.defaultTopic = defaultTopic;
this.objectMapper = new ObjectMapper();
}
@Override
public ProducerRecord<byte[], byte[]> serialize(UserEvent event, Long timestamp) {
try {
// Determine target topic (could be dynamic based on event)
String topic = event.getEventType().equals("critical") ? "critical-events" : defaultTopic;
// Create key for partitioning
String key = event.getUserId();
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
// Serialize value to JSON
String jsonValue = objectMapper.writeValueAsString(event);
byte[] valueBytes = jsonValue.getBytes(StandardCharsets.UTF_8);
// Create producer record with headers
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topic,
null, // Let Kafka determine partition based on key
timestamp, // Use provided timestamp
keyBytes,
valueBytes
);
// Add custom headers
record.headers().add(new RecordHeader("user_id", event.getUserId().getBytes()));
record.headers().add(new RecordHeader("event_type", event.getEventType().getBytes()));
record.headers().add(new RecordHeader("source_system", "flink-processor".getBytes()));
return record;
} catch (Exception e) {
throw new RuntimeException("Failed to serialize UserEvent", e);
}
}
}
// Usage with producer
FlinkKafkaProducer011<UserEvent> producer = new FlinkKafkaProducer011<>(
"default-topic", // This will be overridden by serialization schema
new UserEventSerializer("user-events"),
properties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);Using the collector-based deserialization for one-to-many record processing.
// Example of one-to-many deserialization using collector
public class BatchEventDeserializer implements KafkaDeserializationSchema<UserEvent> {
@Override
public boolean isEndOfStream(UserEvent nextElement) {
return false;
}
@Override
public UserEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
// Not used when collector-based method is overridden
throw new UnsupportedOperationException("Use collector-based deserialize method");
}
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<UserEvent> out) throws Exception {
// Deserialize JSON array of events
String jsonValue = new String(message.value(), StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
JsonNode arrayNode = mapper.readTree(jsonValue);
// Emit multiple events from single Kafka record
if (arrayNode.isArray()) {
for (JsonNode eventNode : arrayNode) {
UserEvent event = mapper.treeToValue(eventNode, UserEvent.class);
// Enrich with Kafka metadata
event.setKafkaTopic(message.topic());
event.setKafkaPartition(message.partition());
event.setKafkaOffset(message.offset());
event.setKafkaTimestamp(message.timestamp());
out.collect(event);
}
}
}
@Override
public TypeInformation<UserEvent> getProducedType() {
return TypeInformation.of(UserEvent.class);
}
}The connector works with Flink's built-in serialization schemas for common use cases.
// Using built-in schemas (these work with both simple and Kafka-specific interfaces)
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
// Simple string serialization/deserialization
SimpleStringSchema stringSchema = new SimpleStringSchema();
// JSON serialization using Flink's built-in support
// Note: For Kafka-specific features, wrap in adapter
KafkaDeserializationSchema<String> kafkaStringSchema = new KafkaDeserializationSchema<String>() {
private final SimpleStringSchema stringSchema = new SimpleStringSchema();
@Override
public boolean isEndOfStream(String nextElement) {
return stringSchema.isEndOfStream(nextElement);
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return stringSchema.deserialize(record.value());
}
@Override
public TypeInformation<String> getProducedType() {
return stringSchema.getProducedType();
}
};Usage Examples:
// Consumer with built-in string deserialization
FlinkKafkaConsumer011<String> stringConsumer = new FlinkKafkaConsumer011<>(
"text-topic",
new SimpleStringSchema(), // Flink automatically wraps in adapter
properties
);
// Consumer with custom Kafka-aware deserialization
FlinkKafkaConsumer011<UserEvent> eventConsumer = new FlinkKafkaConsumer011<>(
"user-events",
new UserEventDeserializer(), // Direct KafkaDeserializationSchema usage
properties
);
// Producer with custom Kafka-aware serialization
FlinkKafkaProducer011<UserEvent> eventProducer = new FlinkKafkaProducer011<>(
"processed-events",
new UserEventSerializer("processed-events"),
properties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);Best practices for handling serialization errors and malformed records.
// Robust deserialization with error handling
public class RobustUserEventDeserializer implements KafkaDeserializationSchema<UserEvent> {
private final Counter deserializationErrors;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
this.deserializationErrors = context.getMetricGroup()
.counter("deserialization_errors");
}
@Override
public UserEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
try {
// Attempt deserialization
String jsonValue = new String(record.value(), StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonValue, UserEvent.class);
} catch (Exception e) {
// Log error with Kafka metadata for debugging
LOG.warn("Failed to deserialize record from topic {} partition {} offset {}: {}",
record.topic(), record.partition(), record.offset(), e.getMessage());
deserializationErrors.inc();
// Return null to skip this record, or create a poison pill record for downstream handling
return null;
// Alternative: Create error record for downstream error handling
// return new UserEvent("PARSE_ERROR", record.topic(), record.partition(), record.offset());
}
}
@Override
public boolean isEndOfStream(UserEvent nextElement) {
return false;
}
@Override
public TypeInformation<UserEvent> getProducedType() {
return TypeInformation.of(UserEvent.class);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-kafka-0-11-2-11