CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-11

Apache Flink connector for integrating with Apache Kafka 0.8.x message broker systems

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Apache Flink Kafka Connector 0.8

A comprehensive streaming connector for integrating Apache Flink with Apache Kafka 0.8.x, providing both source and sink capabilities with exactly-once processing guarantees for consumers and Table API integration.

Package Information

Maven Coordinates:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>1.10.3</version>
</dependency>

Note: The artifact ID includes the Scala binary version suffix (e.g., _2.11 for Scala 2.11, _2.12 for Scala 2.12)

Java Version: Java 8+

Kafka Compatibility: Apache Kafka 0.8.x

Main Package: org.apache.flink.streaming.connectors.kafka

Core Imports

// Core Consumer Classes
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

// Core Producer Classes
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;

// Serialization
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

// Table API (Internal)
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSource;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSink;
import org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory;

// Required Dependencies
import java.util.Properties;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

Key Features

Consumer Features

  • Exactly-once processing guarantees through Flink's checkpointing mechanism
  • Dynamic partition discovery with pattern-based topic subscription
  • Multiple startup modes: earliest, latest, group offsets, or specific offsets
  • Watermark support for event time processing
  • Fault tolerance with automatic offset recovery
  • ZooKeeper integration for Kafka 0.8 metadata management

Producer Features

  • Custom partitioning strategies with FlinkKafkaPartitioner
  • Key/Value serialization support
  • Checkpointing integration with flush capabilities
  • Broker list or Properties configuration
  • Note: Kafka 0.8 producer provides no reliability guarantees

Table API Integration

  • Schema evolution support with field mapping
  • Processing time and rowtime attributes
  • Configurable startup modes and offset management
  • Factory-based configuration for SQL/Table API

Quick Start

Basic Consumer Example

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "my-consumer-group");

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

consumer.setStartFromEarliest();
env.addSource(consumer).print();
env.execute("Kafka Consumer Job");

Basic Producer Example

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

Properties properties = new Properties();
properties.setProperty("metadata.broker.list", "localhost:9092");

FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "my-topic",
    new SimpleStringSchema(),
    properties
);

DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);

Architecture

Consumer Architecture

The Flink Kafka Consumer 0.8 is built on a multi-layered architecture:

  1. FlinkKafkaConsumer08 - Main consumer class extending FlinkKafkaConsumerBase
  2. AbstractFetcher - Handles partition fetching and watermark generation
  3. AbstractPartitionDiscoverer - Manages partition discovery and metadata
  4. KafkaDeserializationSchema - Converts Kafka ConsumerRecords to Flink data types
// Core consumer setup with custom deserialization
KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {
    @Override
    public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        // Custom deserialization logic
        return MyEvent.fromBytes(record.value());
    }
    
    @Override
    public boolean isEndOfStream(MyEvent nextElement) {
        return false; // Never end stream
    }
    
    @Override
    public TypeInformation<MyEvent> getProducedType() {
        return TypeInformation.of(MyEvent.class);
    }
};

Producer Architecture

The producer architecture focuses on reliable message delivery within Kafka 0.8 constraints:

  1. FlinkKafkaProducer08 - Main producer extending FlinkKafkaProducerBase
  2. FlinkKafkaPartitioner - Custom partitioning logic (optional)
  3. SerializationSchema/KeyedSerializationSchema - Message serialization
// Custom partitioner example
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {
    @Override
    public int partition(MyEvent record, byte[] key, byte[] value, 
                        String targetTopic, int[] partitions) {
        // Custom partitioning logic based on record content
        return Math.abs(record.getCustomerId().hashCode() % partitions.length);
    }
};

Configuration

Required Kafka 0.8 Properties

Properties kafkaProps = new Properties();

// Required for Consumer
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("group.id", "my-group");

// Required for Producer  
kafkaProps.setProperty("metadata.broker.list", "localhost:9092");

// Optional Consumer Properties
kafkaProps.setProperty("auto.offset.reset", "earliest"); // or "latest"
kafkaProps.setProperty("fetch.message.max.bytes", "1048576");
kafkaProps.setProperty("socket.timeout.ms", "30000");
kafkaProps.setProperty("auto.commit.enable", "false"); // Recommended for exactly-once

// Flink-specific Properties
kafkaProps.setProperty("flink.partition-discovery.interval-millis", "30000");
kafkaProps.setProperty("flink.disable-metrics", "false");

Startup Mode Configuration

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
    "my-topic", new SimpleStringSchema(), properties);

// Start from earliest available messages
consumer.setStartFromEarliest();

// Start from latest messages (skip existing)
consumer.setStartFromLatest();

// Start from consumer group's committed offsets (default)
consumer.setStartFromGroupOffsets();

// Start from specific offsets per partition
Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
specificOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
consumer.setStartFromSpecificOffsets(specificOffsets);

Watermark and Timestamp Assignment

import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(
    "my-topic", myDeserializer, properties);

// Assign periodic watermarks
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyEvent>() {
    private long currentMaxTimestamp = Long.MIN_VALUE;
    
    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getTimestamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
    
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - 5000); // 5 second tolerance
    }
});

Error Handling and Reliability

// Consumer reliability configuration
consumer.setCommitOffsetsOnCheckpoints(true); // Enable exactly-once

// Producer error handling
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
    "my-topic", new SimpleStringSchema(), properties);

producer.setLogFailuresOnly(false); // Fail on errors (default)
producer.setFlushOnCheckpoint(true); // Flush data on checkpoint

API Documentation

For detailed API documentation of specific components:

  • Kafka Consumer API - Complete FlinkKafkaConsumer08 API reference
  • Kafka Producer API - Complete FlinkKafkaProducer08 API reference
  • Table API Integration - Kafka08TableSource, Kafka08TableSink, and factory classes (Note: These are @Internal APIs)

Version Notes

Kafka 0.8 Limitations

  • No transactional support: Producers cannot provide exactly-once guarantees
  • ZooKeeper dependency: Consumers require ZooKeeper for metadata operations
  • No timestamp support: Cannot fetch offsets by timestamp
  • Limited reliability: Producers may lose messages on failures

Deprecated Classes

  • FlinkKafkaConsumer081 - Use FlinkKafkaConsumer08 instead
  • FlinkKafkaConsumer082 - Use FlinkKafkaConsumer08 instead
  • FlinkKafkaProducer - Use FlinkKafkaProducer08 instead

Internal APIs

  • Table API classes (Kafka08TableSource, Kafka08TableSink, Kafka08TableSourceSinkFactory) are marked @Internal
  • These may change without notice and are not part of the official public API
  • Use through Flink's Table API framework rather than directly

Migration Path

When upgrading from Kafka 0.8 to newer versions, consider:

  1. Replacing with flink-connector-kafka-0.9+ for better reliability guarantees
  2. Updating ZooKeeper-based configuration to bootstrap servers
  3. Migrating to transactional producers for exactly-once semantics
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kafka-0.8_2.11@1.10.x
Publish Source
CLI
Badge
tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-11 badge