The Flink Kafka consumer provides reliable message consumption from Kafka 0.8.x topics with exactly-once processing guarantees through Flink's checkpointing mechanism.
The primary consumer class for Kafka 0.8.x integration with comprehensive configuration options.
/**
* Kafka consumer for Apache Kafka 0.8.x with exactly-once processing guarantees
*/
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/** Configuration key for partition retrieval retries */
public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
/** Default number of partition retrieval retries */
public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
/**
* Creates consumer for single topic with value-only deserialization
* @param topic Kafka topic name
* @param valueDeserializer Deserialization schema for message values
* @param props Kafka consumer properties
*/
public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
/**
* Creates consumer for single topic with key-value deserialization
* @param topic Kafka topic name
* @param deserializer Keyed deserialization schema for both keys and values
* @param props Kafka consumer properties
*/
public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props);
/**
* Creates consumer for multiple topics with value-only deserialization
* @param topics List of Kafka topic names
* @param deserializer Deserialization schema for message values
* @param props Kafka consumer properties
*/
public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props);
/**
* Creates consumer for multiple topics with key-value deserialization
* @param topics List of Kafka topic names
* @param deserializer Keyed deserialization schema for both keys and values
* @param props Kafka consumer properties
*/
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props);
/**
* Gets partition information for specified topics
* @param topics List of topic names to analyze
* @param properties Kafka connection properties
* @return List of partition leaders for the topics
*/
public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties);
}Usage Examples:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import java.util.Properties;
import java.util.Arrays;
// Basic single topic consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "my-consumer-group");
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
props
);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(consumer);
// Multi-topic consumer with JSON deserialization
FlinkKafkaConsumer08<ObjectNode> jsonConsumer = new FlinkKafkaConsumer08<>(
Arrays.asList("topic1", "topic2", "topic3"),
new JSONKeyValueDeserializationSchema(false), // false = include metadata
props
);
DataStream<ObjectNode> jsonStream = env.addSource(jsonConsumer);
// Consumer with custom offset reset
Properties customProps = new Properties();
customProps.setProperty("bootstrap.servers", "localhost:9092");
customProps.setProperty("zookeeper.connect", "localhost:2181");
customProps.setProperty("group.id", "custom-group");
customProps.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer08<String> earliestConsumer = new FlinkKafkaConsumer08<>(
"events-topic",
new SimpleStringSchema(),
customProps
);Deprecated alias that redirects to FlinkKafkaConsumer08.
/**
* @deprecated Use FlinkKafkaConsumer08 instead
*/
@Deprecated
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> {
/**
* @deprecated Use FlinkKafkaConsumer08 constructor instead
*/
@Deprecated
public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
}Deprecated alias that redirects to FlinkKafkaConsumer08.
/**
* @deprecated Use FlinkKafkaConsumer08 instead
*/
@Deprecated
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> {
/**
* @deprecated Use FlinkKafkaConsumer08 constructor instead
*/
@Deprecated
public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props);
}earliest, latest)The consumer integrates with Flink's checkpointing mechanism:
// Enable checkpointing for exactly-once guarantees
env.enableCheckpointing(5000); // checkpoint every 5 seconds
// Consumer automatically participates in checkpointing
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
props
);
// Starting position can be configured
consumer.setStartFromEarliest(); // start from earliest available
consumer.setStartFromLatest(); // start from latest (default)
consumer.setStartFromGroupOffsets(); // start from committed group offsets
consumer.setStartFromTimestamp(timestamp); // start from specific timestampCommon exceptions and handling strategies:
// Proper error handling setup
try {
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
props
);
env.addSource(consumer);
env.execute("Kafka Consumer Job");
} catch (Exception e) {
// Handle consumer setup or execution errors
logger.error("Kafka consumer failed", e);
}