CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kafka-0-9-2-12

Apache Flink Kafka Connector for streaming data integration with Kafka 0.9.x versions

Overview
Eval results
Files

Apache Flink Kafka Connector 0.9

Apache Flink Kafka Connector 0.9 provides streaming data integration between Apache Flink and Kafka 0.9.x message brokers. The connector enables real-time data processing pipelines with exactly-once processing guarantees, fault tolerance, and high-throughput capabilities for both consuming from and producing to Kafka topics.

Package Information

  • Package Name: flink-connector-kafka-0.9_2.12
  • Package Type: Maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-kafka-0.9_2.12
  • Version: 1.10.3
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
      <version>1.10.3</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

// Set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure Kafka properties
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "my-consumer-group");

// Create Kafka consumer
FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
    "my-input-topic",
    new SimpleStringSchema(),
    kafkaProps
);

// Create Kafka producer
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
    "my-output-topic",
    new SimpleStringSchema(),
    kafkaProps
);

// Build streaming pipeline
env.addSource(consumer)
   .map(value -> value.toUpperCase())
   .addSink(producer);

// Execute the job
env.execute("Kafka Streaming Job");

Architecture

Apache Flink Kafka Connector 0.9 is built around several key components:

  • Consumer API: FlinkKafkaConsumer09 for reading data from Kafka topics with configurable parallelism and offset management
  • Producer API: FlinkKafkaProducer09 for writing data to Kafka topics with partitioning and serialization control
  • Table Integration: Table API factories for declarative SQL-based processing
  • Internal Engine: Kafka 0.9-specific implementation handling partition discovery, consumer threading, and offset coordination
  • Fault Tolerance: Integration with Flink's checkpointing mechanism for exactly-once processing guarantees

Capabilities

Data Consumption

Streaming data source functionality for consuming from Kafka 0.9.x topics with configurable deserialization, offset management, and fault tolerance.

public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
    public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props);
    public void setRateLimiter(FlinkConnectorRateLimiter kafkaRateLimiter);
    public FlinkConnectorRateLimiter getRateLimiter();
}

Data Consumption

Data Production

Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees.

public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
}

Data Production

Table API Integration

Table API and SQL integration for declarative stream processing with Kafka sources and sinks through factory-based configuration.

public class Kafka09TableSourceSinkFactory extends KafkaTableSourceSinkFactoryBase {
    protected String kafkaVersion();
    protected boolean supportsKafkaTimestamps();
    protected KafkaTableSourceBase createKafkaTableSource(...);
    protected KafkaTableSinkBase createKafkaTableSink(...);
}

Table API Integration

Common Types and Interfaces

// Kafka consumer configuration key constants
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
public static final long DEFAULT_POLL_TIMEOUT = 100L;

// Deserialization interfaces for data conversion
interface DeserializationSchema<T> {
    T deserialize(byte[] message) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

interface KafkaDeserializationSchema<T> {
    T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

// Serialization interfaces for data conversion
interface SerializationSchema<T> {
    byte[] serialize(T element);
}

interface KeyedSerializationSchema<T> {
    byte[] serializeKey(T element);
    byte[] serializeValue(T element);
    String getTargetTopic(T element);
}

// Partitioning interface for custom distribution logic
interface FlinkKafkaPartitioner<T> extends Serializable {
    int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

// Rate limiting interface for consumption throttling
interface FlinkConnectorRateLimiter {
    void open(RuntimeContext runtimeContext) throws Exception;
    void acquire(long permits);
    void close() throws Exception;
}

// Kafka topic partition representation
class KafkaTopicPartition implements Comparable<KafkaTopicPartition>, Serializable {
    public KafkaTopicPartition(String topic, int partition);
    public String getTopic();
    public int getPartition();
    public String toString();
    public boolean equals(Object o);
    public int hashCode();
    public int compareTo(KafkaTopicPartition other);
}

// Startup mode enumeration for consumers
enum StartupMode {
    EARLIEST,
    LATEST, 
    GROUP_OFFSETS,
    SPECIFIC_OFFSETS,
    TIMESTAMP
}

// Watermark assignment interfaces for time-based processing
interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    Watermark getCurrentWatermark();
}

interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-9-2-12
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.9_2.12@1.10.x