Base classes and utilities for Apache Flink Kafka connectors providing common functionality for stream processing with exactly-once guarantees
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-base_2-11@1.5.0A foundational library providing base classes and common functionality for Apache Flink's Kafka connectors. This library enables building version-specific Kafka connectors (0.8, 0.9, 0.10, etc.) while sharing core streaming connector functionality including state management, offset tracking, checkpointing, and fault tolerance for exactly-once processing guarantees.
pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.5.1</version>
</dependency>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import java.util.Properties;
// Environment setup
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
// Create consumer (abstract - extend for specific Kafka version)
// This shows the pattern for extending the base consumer
public class MyKafkaConsumer extends FlinkKafkaConsumerBase<String> {
public MyKafkaConsumer(String topic, DeserializationSchema<String> schema, Properties props) {
super(Arrays.asList(topic), null, new KeyedDeserializationSchemaWrapper<>(schema),
PARTITION_DISCOVERY_DISABLED, false);
}
// Implement abstract methods for specific Kafka version
// ...
}
// Use the consumer
DataStream<String> stream = env.addSource(
new MyKafkaConsumer("my-topic", new SimpleStringSchema(), properties)
.setStartFromEarliest()
.setCommitOffsetsOnCheckpoints(true)
);The Flink Kafka Connector Base library is organized around several key architectural patterns:
FlinkKafkaConsumerBase and FlinkKafkaProducerBase provide version-agnostic functionality that concrete implementations extend for specific Kafka versionsKeyedDeserializationSchema and KeyedSerializationSchema interfaces handle message serialization with key-value semantics and metadata accessAbstract base implementations for Kafka consumers providing common functionality across all Kafka versions including offset management, checkpointing, and watermark assignment.
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
public FlinkKafkaConsumerBase<T> setStartFromEarliest();
public FlinkKafkaConsumerBase<T> setStartFromLatest();
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
}Abstract base implementations for Kafka producers providing common functionality including serialization, partitioning, and exactly-once delivery semantics.
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
implements CheckpointedFunction {
public void setLogFailuresOnly(boolean logFailuresOnly);
public void setFlushOnCheckpoint(boolean flush);
public static Properties getPropertiesFromBrokerList(String brokerList);
}Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety.
public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
boolean isEndOfStream(T nextElement);
}
public interface KeyedSerializationSchema<T> extends Serializable {
byte[] serializeKey(T element);
byte[] serializeValue(T element);
String getTargetTopic(T element);
}Custom partitioning logic for determining target Kafka partitions when producing messages, including fixed partitioning and delegation to Kafka's default partitioner.
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
public void open(int parallelInstanceId, int parallelInstances);
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference and connector descriptors.
public abstract class KafkaTableSource implements StreamTableSource<Row>,
DefinedProctimeAttribute, DefinedRowtimeAttributes, FilterableTableSource<Row> {
// Abstract methods implemented by concrete table sources
}
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
// Abstract methods implemented by concrete table sinks
}public final class KafkaTopicPartition implements Serializable {
public KafkaTopicPartition(String topic, int partition);
public String getTopic();
public int getPartition();
public boolean equals(Object obj);
public int hashCode();
public String toString();
}public class FlinkKafkaConsumerBase<T> {
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
}