or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-connector-kafka-base_2-11

Base classes and utilities for Apache Flink Kafka connectors providing common functionality for stream processing with exactly-once guarantees

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-base_2.11@1.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-base_2-11@1.5.0

index.mddocs/

Flink Kafka Connector Base

A foundational library providing base classes and common functionality for Apache Flink's Kafka connectors. This library enables building version-specific Kafka connectors (0.8, 0.9, 0.10, etc.) while sharing core streaming connector functionality including state management, offset tracking, checkpointing, and fault tolerance for exactly-once processing guarantees.

Package Information

  • Package Name: flink-connector-kafka-base_2.11
  • Package Type: maven
  • Language: Java/Scala
  • Installation: Add to your Maven pom.xml:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-base_2.11</artifactId>
      <version>1.5.1</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import java.util.Properties;

// Environment setup
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// Create consumer (abstract - extend for specific Kafka version)
// This shows the pattern for extending the base consumer
public class MyKafkaConsumer extends FlinkKafkaConsumerBase<String> {
    public MyKafkaConsumer(String topic, DeserializationSchema<String> schema, Properties props) {
        super(Arrays.asList(topic), null, new KeyedDeserializationSchemaWrapper<>(schema), 
              PARTITION_DISCOVERY_DISABLED, false);
    }
    
    // Implement abstract methods for specific Kafka version
    // ...
}

// Use the consumer
DataStream<String> stream = env.addSource(
    new MyKafkaConsumer("my-topic", new SimpleStringSchema(), properties)
        .setStartFromEarliest()
        .setCommitOffsetsOnCheckpoints(true)
);

Architecture

The Flink Kafka Connector Base library is organized around several key architectural patterns:

  • Abstract Base Classes: FlinkKafkaConsumerBase and FlinkKafkaProducerBase provide version-agnostic functionality that concrete implementations extend for specific Kafka versions
  • Serialization Abstraction: KeyedDeserializationSchema and KeyedSerializationSchema interfaces handle message serialization with key-value semantics and metadata access
  • Partition Management: Internal classes manage partition discovery, offset tracking, and state management for fault tolerance
  • Table API Integration: Table sources and sinks provide SQL layer integration with automatic schema inference and connector factory support
  • Exactly-Once Semantics: Built-in support for checkpointing, offset commits, and transaction coordination for guaranteed delivery

Capabilities

Consumer Base Classes

Abstract base implementations for Kafka consumers providing common functionality across all Kafka versions including offset management, checkpointing, and watermark assignment.

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> 
    implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
    
    public FlinkKafkaConsumerBase<T> setStartFromEarliest();
    public FlinkKafkaConsumerBase<T> setStartFromLatest();
    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets();
    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets);
    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints);
    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
}

Consumer Base Classes

Producer Base Classes

Abstract base implementations for Kafka producers providing common functionality including serialization, partitioning, and exactly-once delivery semantics.

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> 
    implements CheckpointedFunction {
    
    public void setLogFailuresOnly(boolean logFailuresOnly);
    public void setFlushOnCheckpoint(boolean flush);
    public static Properties getPropertiesFromBrokerList(String brokerList);
}

Producer Base Classes

Serialization Schemas

Interfaces and implementations for serializing and deserializing Kafka messages with key-value semantics, metadata access, and type safety.

public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
    boolean isEndOfStream(T nextElement);
}

public interface KeyedSerializationSchema<T> extends Serializable {
    byte[] serializeKey(T element);
    byte[] serializeValue(T element);
    String getTargetTopic(T element);
}

Serialization Schemas

Partitioners

Custom partitioning logic for determining target Kafka partitions when producing messages, including fixed partitioning and delegation to Kafka's default partitioner.

public abstract class FlinkKafkaPartitioner<T> implements Serializable {
    public void open(int parallelInstanceId, int parallelInstances);
    public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

Partitioners

Table API Integration

Table sources and sinks for SQL layer integration supporting various data formats (JSON, Avro) with automatic schema inference and connector descriptors.

public abstract class KafkaTableSource implements StreamTableSource<Row>, 
    DefinedProctimeAttribute, DefinedRowtimeAttributes, FilterableTableSource<Row> {
    // Abstract methods implemented by concrete table sources
}

public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
    // Abstract methods implemented by concrete table sinks
}

Table API Integration

Types

Core Types

public final class KafkaTopicPartition implements Serializable {
    public KafkaTopicPartition(String topic, int partition);
    public String getTopic();
    public int getPartition();
    public boolean equals(Object obj);
    public int hashCode();
    public String toString();
}

Configuration Constants

public class FlinkKafkaConsumerBase<T> {
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
}