or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-11

Apache Flink connector for integrating with Apache Kafka 0.8.x message broker systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.8_2.11@1.10.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kafka-0-8_2-11@1.10.0

index.mddocs/

Apache Flink Kafka Connector 0.8

A comprehensive streaming connector for integrating Apache Flink with Apache Kafka 0.8.x, providing both source and sink capabilities with exactly-once processing guarantees for consumers and Table API integration.

Package Information

Maven Coordinates:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>1.10.3</version>
</dependency>

Note: The artifact ID includes the Scala binary version suffix (e.g., _2.11 for Scala 2.11, _2.12 for Scala 2.12)

Java Version: Java 8+

Kafka Compatibility: Apache Kafka 0.8.x

Main Package: org.apache.flink.streaming.connectors.kafka

Core Imports

// Core Consumer Classes
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

// Core Producer Classes
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

// Serialization
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

// Table API (Internal)
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;

// Required Dependencies
import java.util.Properties;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

Key Features

Consumer Features

  • Exactly-once processing guarantees through Flink's checkpointing mechanism
  • Dynamic partition discovery with pattern-based topic subscription
  • Multiple startup modes: earliest, latest, group offsets, or specific offsets
  • Watermark support for event time processing
  • Fault tolerance with automatic offset recovery
  • ZooKeeper integration for Kafka 0.8 metadata management

Producer Features

  • Custom partitioning strategies with FlinkKafkaPartitioner
  • Key/Value serialization support
  • Checkpointing integration with flush capabilities
  • Broker list or Properties configuration
  • Note: Kafka 0.8 producer provides no reliability guarantees

Table API Integration

  • Schema evolution support with field mapping
  • Processing time and rowtime attributes
  • Configurable startup modes and offset management
  • Factory-based configuration for SQL/Table API

Quick Start

Basic Consumer Example

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "my-consumer-group");

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

consumer.setStartFromEarliest();
env.addSource(consumer).print();
env.execute("Kafka Consumer Job");

Basic Producer Example

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

Properties properties = new Properties();
properties.setProperty("metadata.broker.list", "localhost:9092");

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);

Architecture

Consumer Architecture

The Flink Kafka Consumer 0.8 is built on a multi-layered architecture:

  1. FlinkKafkaConsumer08 - Main consumer class extending FlinkKafkaConsumerBase
  2. AbstractFetcher - Handles partition fetching and watermark generation
  3. AbstractPartitionDiscoverer - Manages partition discovery and metadata
  4. KafkaDeserializationSchema - Converts Kafka ConsumerRecords to Flink data types
// Core consumer setup with custom deserialization
KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {
    @Override
    public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        // Custom deserialization logic
        return MyEvent.fromBytes(record.value());
    }
    
    @Override
    public boolean isEndOfStream(MyEvent nextElement) {
        return false; // Never end stream
    }
    
    @Override
    public TypeInformation<MyEvent> getProducedType() {
        return TypeInformation.of(MyEvent.class);
    }
};

Producer Architecture

The producer architecture focuses on reliable message delivery within Kafka 0.8 constraints:

  1. FlinkKafkaProducer08 - Main producer extending FlinkKafkaProducerBase
  2. FlinkKafkaPartitioner - Custom partitioning logic (optional)
  3. SerializationSchema/KeyedSerializationSchema - Message serialization
// Custom partitioner example
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {
    @Override
    public int partition(MyEvent record, byte[] key, byte[] value, 
                        String targetTopic, int[] partitions) {
        // Custom partitioning logic based on record content
        return Math.abs(record.getCustomerId().hashCode() % partitions.length);
    }
};

Configuration

Required Kafka 0.8 Properties

Properties kafkaProps = new Properties();

// Required for Consumer
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("group.id", "my-group");

// Required for Producer  
kafkaProps.setProperty("metadata.broker.list", "localhost:9092");

// Optional Consumer Properties
kafkaProps.setProperty("auto.offset.reset", "earliest"); // or "latest"
kafkaProps.setProperty("fetch.message.max.bytes", "1048576");
kafkaProps.setProperty("socket.timeout.ms", "30000");
kafkaProps.setProperty("auto.commit.enable", "false"); // Recommended for exactly-once

// Flink-specific Properties
kafkaProps.setProperty("flink.partition-discovery.interval-millis", "30000");
kafkaProps.setProperty("flink.disable-metrics", "false");

Startup Mode Configuration

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic", new SimpleStringSchema(), properties);

// Start from earliest available messages
consumer.setStartFromEarliest();

// Start from latest messages (skip existing)
consumer.setStartFromLatest();

// Start from consumer group's committed offsets (default)
consumer.setStartFromGroupOffsets();

// Start from specific offsets per partition
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
consumer.setStartFromSpecificOffsets(specificOffsets);

Watermark and Timestamp Assignment

import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(
    "my-topic", myDeserializer, properties);

// Assign periodic watermarks
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyEvent>() {
    private long currentMaxTimestamp = Long.MIN_VALUE;
    
    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getTimestamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
    
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - 5000); // 5 second tolerance
    }
});

Error Handling and Reliability

// Consumer reliability configuration
consumer.setCommitOffsetsOnCheckpoints(true); // Enable exactly-once

// Producer error handling
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "my-topic", new SimpleStringSchema(), properties);

producer.setLogFailuresOnly(false); // Fail on errors (default)
producer.setFlushOnCheckpoint(true); // Flush data on checkpoint

API Documentation

For detailed API documentation of specific components:

  • Kafka Consumer API - Complete FlinkKafkaConsumer08 API reference
  • Kafka Producer API - Complete FlinkKafkaProducer08 API reference
  • Table API Integration - Kafka08TableSource, Kafka08TableSink, and factory classes (Note: These are @Internal APIs)

Version Notes

Kafka 0.8 Limitations

  • No transactional support: Producers cannot provide exactly-once guarantees
  • ZooKeeper dependency: Consumers require ZooKeeper for metadata operations
  • No timestamp support: Cannot fetch offsets by timestamp
  • Limited reliability: Producers may lose messages on failures

Deprecated Classes

  • FlinkKafkaConsumer081 - Use FlinkKafkaConsumer08 instead
  • FlinkKafkaConsumer082 - Use FlinkKafkaConsumer08 instead
  • FlinkKafkaProducer - Use FlinkKafkaProducer08 instead

Internal APIs

  • Table API classes (Kafka08TableSource, Kafka08TableSink, Kafka08TableSourceSinkFactory) are marked @Internal
  • These may change without notice and are not part of the official public API
  • Use through Flink's Table API framework rather than directly

Migration Path

When upgrading from Kafka 0.8 to newer versions, consider:

  1. Replacing with flink-connector-kafka-0.9+ for better reliability guarantees
  2. Updating ZooKeeper-based configuration to bootstrap servers
  3. Migrating to transactional producers for exactly-once semantics