CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-10

Apache Kafka 0.8.x connector for Apache Flink streaming data processing

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Apache Flink Kafka 0.8 Connector

The Apache Flink Kafka 0.8 connector enables high-performance streaming integration between Apache Flink and Apache Kafka 0.8.x clusters. It provides exactly-once processing guarantees through checkpointing, supports parallel consumption and production, and offers comprehensive offset management with ZooKeeper integration.

Package Information

  • Package Name: flink-connector-kafka-0.8_2.10
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-kafka-0.8_2.10
  • Installation: Include as Maven dependency
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Core Imports

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

// Set up Flink environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure Kafka consumer properties
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("zookeeper.connect", "localhost:2181");
consumerProps.setProperty("group.id", "flink-consumer");

// Create consumer
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "input-topic",
    new SimpleStringSchema(),
    consumerProps
);

// Add consumer as source
DataStream<String> stream = env.addSource(consumer);

// Configure producer properties
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");

// Create producer and add as sink
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "output-topic",
    new SimpleStringSchema(),
    producerProps
);

stream.addSink(producer);

// Execute
env.execute("Kafka Streaming Job");

Architecture

The Flink Kafka 0.8 connector is built around several key components:

  • Consumer Architecture: FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase and uses Kafka08Fetcher for message retrieval with ZooKeeper-based offset management
  • Producer Architecture: FlinkKafkaProducer08 extends FlinkKafkaProducerBase for message publishing with configurable partitioning
  • Checkpointing Integration: Seamless integration with Flink's distributed snapshots for exactly-once processing guarantees
  • Table API Integration: Table sources and sinks for SQL API usage with JSON and Avro format support
  • Offset Management: ZooKeeper-based offset storage with periodic commits and recovery support

Capabilities

Kafka Consumer

Primary consumer for reading from Kafka 0.8.x topics with exactly-once processing guarantees and checkpointing support.

public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
    public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);
    public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);
}

Kafka Consumer

Kafka Producer

Producer for writing to Kafka 0.8.x topics with configurable partitioning and serialization support.

public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
    public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
    public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
}

Kafka Producer

Table API Integration

Table sources and sinks for integrating Kafka with Flink's SQL API, supporting JSON and Avro formats.

public class Kafka08TableSource extends KafkaTableSource {
    public Kafka08TableSource(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, TypeInformation<Row> typeInfo);
}

public class Kafka08JsonTableSource extends KafkaJsonTableSource {
    public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
}

public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);
}

Table API Integration

Offset Management

ZooKeeper-based offset management utilities for handling consumer offset storage and retrieval.

public class ZookeeperOffsetHandler {
    public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset);
    public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition);
}

Offset Management

Error Handling

The connector throws various exceptions that should be handled:

  • IllegalArgumentException: Invalid configuration parameters
  • RuntimeException: Connection or serialization errors
  • Exception: General Kafka or ZooKeeper connectivity issues

Proper error handling should include retry logic for transient failures and graceful degradation for persistent issues.

Configuration Properties

Key Kafka properties for consumer configuration:

  • bootstrap.servers: Kafka broker addresses
  • zookeeper.connect: ZooKeeper connection string
  • group.id: Consumer group identifier
  • auto.offset.reset: Offset reset strategy

Key Kafka properties for producer configuration:

  • bootstrap.servers: Kafka broker addresses
  • key.serializer: Key serialization class
  • value.serializer: Value serialization class

Types

/**
 * Interface for deserializing Kafka message values only
 */
public interface DeserializationSchema<T> {
    T deserialize(byte[] message) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

/**
 * Interface for deserializing Kafka messages with key, value, topic, partition, and offset
 */
public interface KeyedDeserializationSchema<T> {
    T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

/**
 * Interface for serializing objects to Kafka message values only
 */
public interface SerializationSchema<T> {
    byte[] serialize(T element);
}

/**
 * Interface for serializing objects to Kafka messages with keys and values
 */
public interface KeyedSerializationSchema<T> {
    byte[] serializeKey(T element);
    byte[] serializeValue(T element);
    String getTargetTopic(T element);
}

/**
 * Interface for custom partitioning logic
 */
public interface FlinkKafkaPartitioner<T> {
    int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.8_2.10@1.3.x
Publish Source
CLI
Badge
tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-10 badge