or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

consumer.mdindex.mdproducer.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12

Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.10_2.12@1.11.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12@1.11.0

index.mddocs/

Apache Flink Kafka 0.10 Connector

Apache Flink Kafka 0.10 connector provides streaming data integration between Apache Flink and Apache Kafka 0.10.x message brokers. It enables both consuming from and producing to Kafka topics with exactly-once processing guarantees, dynamic partition discovery, and comprehensive error handling.

Package Information

  • Package Name: flink-connector-kafka-0.10_2.12
  • Package Type: maven
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-kafka-0.10_2.12
  • Language: Java
  • Installation: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.12</artifactId><version>1.11.6</version></dependency>

Core Imports

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

Basic Usage

Consumer Example

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import java.util.Properties;

// Create Kafka consumer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-consumer-group");

// Create consumer
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

// Add to Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(consumer)
   .print();

Producer Example

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import java.util.Properties;

// Create Kafka producer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

// Create producer
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(
    "output-topic",
    new SimpleStringSchema(),
    properties
);

// Add to streaming pipeline
dataStream.addSink(producer);

Architecture

The Flink Kafka 0.10 connector is built around several key components:

  • Consumer Classes: FlinkKafkaConsumer010 for reading data from Kafka topics
  • Producer Classes: FlinkKafkaProducer010 for writing data to Kafka topics
  • Table API Integration: Legacy and dynamic table factory support for SQL/Table API
  • Internal Components: Fetcher, partition discoverer, and consumer thread management
  • Serialization Support: Both simple value serialization and key-value serialization schemas

Capabilities

Data Stream Consumer

Core consumer functionality for reading data from Kafka 0.10.x topics with exactly-once processing guarantees and flexible topic subscription patterns.

public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
    // Single topic constructors
    public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    // Multiple topics constructors
    public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    // Pattern-based subscription constructors
    public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
    public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props);
    
    // Rate limiting methods
    public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
    public FlinkConnectorRateLimiter getRateLimiter();
}

Data Stream Consumer

Data Stream Producer

Core producer functionality for writing data to Kafka 0.10.x topics with exactly-once processing guarantees and custom partitioning support.

public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
    // Value-only serialization constructors
    public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);
    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
    
    // Key-value serialization constructors
    public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);
    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);
    
    // Configuration methods
    public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
}

Data Stream Producer

Table API Integration

SQL and Table API integration for declarative stream processing with Kafka sources and sinks, supporting both legacy and dynamic table factories.

// Legacy table factory
public class Kafka010TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
    protected String kafkaVersion();
    protected boolean supportsKafkaTimestamps();
}

// Dynamic table factory
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
    public static final String IDENTIFIER = "kafka-0.10";
}

Table API Integration

Configuration Properties

Consumer Configuration Constants

public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
public static final long DEFAULT_POLL_TIMEOUT = 100L;

Common Configuration Properties

  • bootstrap.servers: Kafka broker addresses (required)
  • group.id: Consumer group identifier
  • flink.poll-timeout: Consumer polling timeout in milliseconds (default: 100)
  • enable.auto.commit: Automatic offset commit (managed by Flink)
  • auto.offset.reset: Initial offset behavior when no committed offset exists

Types

Core Consumer Type

public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
    // Generic type T represents the output data type after deserialization
}

Core Producer Type

public class FlinkKafkaProducer010<T> extends FlinkKafkaProducerBase<T> {
    // Generic type T represents the input data type before serialization
}

Serialization Interfaces

// Simple value deserialization
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    T deserialize(byte[] message) throws IOException;
    boolean isEndOfStream(T nextElement);
}

// Key-value deserialization  
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
    boolean isEndOfStream(T nextElement);
}

// Simple value serialization
public interface SerializationSchema<T> extends Serializable {
    byte[] serialize(T element);
}

// Key-value serialization
public interface KeyedSerializationSchema<T> extends Serializable {
    byte[] serializeKey(T element);
    byte[] serializeValue(T element);
    String getTargetTopic(T element);
}

Partitioning Interface

public abstract class FlinkKafkaPartitioner<T> implements Serializable {
    public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
    public void open(int parallelInstanceId, int parallelInstances);
}

Rate Limiting Interface

public interface FlinkConnectorRateLimiter extends Serializable {
    void open(RuntimeContext runtimeContext) throws Exception;
    void acquire(long bytes);
    void close() throws Exception;
}