or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-10

Apache Kafka 0.8.x connector for Apache Flink streaming data processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.8_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-10@1.3.0

index.mddocs/

Apache Flink Kafka 0.8 Connector

The Apache Flink Kafka 0.8 connector enables high-performance streaming integration between Apache Flink and Apache Kafka 0.8.x clusters. It provides exactly-once processing guarantees through checkpointing, supports parallel consumption and production, and offers comprehensive offset management with ZooKeeper integration.

Package Information

  • Package Name: flink-connector-kafka-0.8_2.10
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-connector-kafka-0.8_2.10
  • Installation: Include as Maven dependency
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Core Imports

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.Properties;

// Set up Flink environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure Kafka consumer properties
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "localhost:9092");
consumerProps.setProperty("zookeeper.connect", "localhost:2181");
consumerProps.setProperty("group.id", "flink-consumer");

// Create consumer
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "input-topic",
    new SimpleStringSchema(),
    consumerProps
);

// Add consumer as source
DataStream<String> stream = env.addSource(consumer);

// Configure producer properties
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");

// Create producer and add as sink
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "output-topic",
    new SimpleStringSchema(),
    producerProps
);

stream.addSink(producer);

// Execute
env.execute("Kafka Streaming Job");

Architecture

The Flink Kafka 0.8 connector is built around several key components:

  • Consumer Architecture: FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase and uses Kafka08Fetcher for message retrieval with ZooKeeper-based offset management
  • Producer Architecture: FlinkKafkaProducer08 extends FlinkKafkaProducerBase for message publishing with configurable partitioning
  • Checkpointing Integration: Seamless integration with Flink's distributed snapshots for exactly-once processing guarantees
  • Table API Integration: Table sources and sinks for SQL API usage with JSON and Avro format support
  • Offset Management: ZooKeeper-based offset storage with periodic commits and recovery support

Capabilities

Kafka Consumer

Primary consumer for reading from Kafka 0.8.x topics with exactly-once processing guarantees and checkpointing support.

public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
    public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
    public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);
    public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
    public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);
}

Kafka Consumer

Kafka Producer

Producer for writing to Kafka 0.8.x topics with configurable partitioning and serialization support.

public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
    public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
    public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
    public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
}

Kafka Producer

Table API Integration

Table sources and sinks for integrating Kafka with Flink's SQL API, supporting JSON and Avro formats.

public class Kafka08TableSource extends KafkaTableSource {
    public Kafka08TableSource(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema, TypeInformation<Row> typeInfo);
}

public class Kafka08JsonTableSource extends KafkaJsonTableSource {
    public Kafka08JsonTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo);
}

public class Kafka08JsonTableSink extends KafkaJsonTableSink {
    public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner);
}

Table API Integration

Offset Management

ZooKeeper-based offset management utilities for handling consumer offset storage and retrieval.

public class ZookeeperOffsetHandler {
    public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset);
    public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition);
}

Offset Management

Error Handling

The connector throws various exceptions that should be handled:

  • IllegalArgumentException: Invalid configuration parameters
  • RuntimeException: Connection or serialization errors
  • Exception: General Kafka or ZooKeeper connectivity issues

Proper error handling should include retry logic for transient failures and graceful degradation for persistent issues.

Configuration Properties

Key Kafka properties for consumer configuration:

  • bootstrap.servers: Kafka broker addresses
  • zookeeper.connect: ZooKeeper connection string
  • group.id: Consumer group identifier
  • auto.offset.reset: Offset reset strategy

Key Kafka properties for producer configuration:

  • bootstrap.servers: Kafka broker addresses
  • key.serializer: Key serialization class
  • value.serializer: Value serialization class

Types

/**
 * Interface for deserializing Kafka message values only
 */
public interface DeserializationSchema<T> {
    T deserialize(byte[] message) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

/**
 * Interface for deserializing Kafka messages with key, value, topic, partition, and offset
 */
public interface KeyedDeserializationSchema<T> {
    T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
    boolean isEndOfStream(T nextElement);
    TypeInformation<T> getProducedType();
}

/**
 * Interface for serializing objects to Kafka message values only
 */
public interface SerializationSchema<T> {
    byte[] serialize(T element);
}

/**
 * Interface for serializing objects to Kafka messages with keys and values
 */
public interface KeyedSerializationSchema<T> {
    byte[] serializeKey(T element);
    byte[] serializeValue(T element);
    String getTargetTopic(T element);
}

/**
 * Interface for custom partitioning logic
 */
public interface FlinkKafkaPartitioner<T> {
    int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}