or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-consumption.mddata-production.mdindex.mdtable-api-integration.md
tile.json

tessl/maven-org-apache-flink--flink-connector-kafka-0-9_2-12

Apache Flink Kafka Connector for streaming data integration with Kafka 0.9.x versions

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.9_2.12@1.10.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-9_2-12@1.10.0

index.mddocs/

Apache Flink Kafka Connector 0.9

Apache Flink Kafka Connector 0.9 provides streaming data integration between Apache Flink and Kafka 0.9.x message brokers. The connector enables real-time data processing pipelines with exactly-once processing guarantees, fault tolerance, and high-throughput capabilities for both consuming from and producing to Kafka topics.

Package Information

  • Package Name: flink-connector-kafka-0.9_2.12
  • Package Type: Maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-kafka-0.9_2.12
  • Version: 1.10.3
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
      <version>1.10.3</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

// Set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure Kafka properties
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "my-consumer-group");

// Create Kafka consumer
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    "my-input-topic",
    new SimpleStringSchema(),
    kafkaProps
);

// Create Kafka producer
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
    "my-output-topic",
    new SimpleStringSchema(),
    kafkaProps
);

// Build streaming pipeline
env.addSource(consumer)
   .map(value -> value.toUpperCase())
   .addSink(producer);

// Execute the job
env.execute("Kafka Streaming Job");

Architecture

Apache Flink Kafka Connector 0.9 is built around several key components:

  • Consumer API: FlinkKafkaConsumer09 for reading data from Kafka topics with configurable parallelism and offset management
  • Producer API: FlinkKafkaProducer09 for writing data to Kafka topics with partitioning and serialization control
  • Table Integration: Table API factories for declarative SQL-based processing
  • Internal Engine: Kafka 0.9-specific implementation handling partition discovery, consumer threading, and offset coordination
  • Fault Tolerance: Integration with Flink's checkpointing mechanism for exactly-once processing guarantees

Capabilities

Data Consumption

Streaming data source functionality for consuming from Kafka 0.9.x topics with configurable deserialization, offset management, and fault tolerance.

public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
    public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
    public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
    public FlinkConnectorRateLimiter getRateLimiter();
}

Data Consumption

Data Production

Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees.

public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
}

Data Production

Table API Integration

Table API and SQL integration for declarative stream processing with Kafka sources and sinks through factory-based configuration.

public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
    protected String kafkaVersion();
    protected boolean supportsKafkaTimestamps();
    protected KafkaTableSourceBase createKafkaTableSource(...);
    protected KafkaTableSinkBase createKafkaTableSink(...);
}

Table API Integration

Common Types and Interfaces

// Kafka consumer configuration key constants
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
public static final long DEFAULT_POLL_TIMEOUT = 100L;

// Deserialization interfaces for data conversion
interface DeserializationSchema<T> {
    T deserialize(byte[] message) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

interface KafkaDeserializationSchema<T> {
    T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

// Serialization interfaces for data conversion
interface SerializationSchema<T> {
    byte[] serialize(T element);
}

interface KeyedSerializationSchema<T> {
    byte[] serializeKey(T element);
    byte[] serializeValue(T element);
    String getTargetTopic(T element);
}

// Partitioning interface for custom distribution logic
interface FlinkKafkaPartitioner<T> extends Serializable {
    int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

// Rate limiting interface for consumption throttling
interface FlinkConnectorRateLimiter {
    void open(RuntimeContext runtimeContext) throws Exception;
    void acquire(long permits);
    void close() throws Exception;
}

// Kafka topic partition representation
class KafkaTopicPartition implements Comparable<KafkaTopicPartition>, Serializable {
    public KafkaTopicPartition(String topic, int partition);
    public String getTopic();
    public int getPartition();
    public String toString();
    public boolean equals(Object o);
    public int hashCode();
    public int compareTo(KafkaTopicPartition other);
}

// Startup mode enumeration for consumers
enum StartupMode {
    EARLIEST,
    LATEST, 
    GROUP_OFFSETS,
    SPECIFIC_OFFSETS,
    TIMESTAMP
}

// Watermark assignment interfaces for time-based processing
interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    Watermark getCurrentWatermark();
}

interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}