Comprehensive API reference for Kafka 0.8 consumer classes in the Apache Flink Kafka Connector.
Package: org.apache.flink.streaming.connectors.kafka
Annotations: @PublicEvolving
Extends: FlinkKafkaConsumerBase<T>
Description: The main Kafka consumer for Apache Kafka 0.8.x, providing streaming data source capabilities with exactly-once processing guarantees through Flink's checkpointing mechanism.
@PublicEvolving
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T>// Serialization identifier (from FlinkKafkaConsumer08)
private static final long serialVersionUID = -6272159445203409112L;
// Configuration keys (from FlinkKafkaConsumer08)
public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
// Inherited constants from FlinkKafkaConsumerBase
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";/**
* Creates a consumer for a single topic with DeserializationSchema.
*
* @param topic The Kafka topic to read from
* @param valueDeserializer The deserializer used to convert raw bytes to data objects
* @param props Configuration properties for Kafka consumer and ZooKeeper client
*/
public FlinkKafkaConsumer08(String topic,
DeserializationSchema<T> valueDeserializer,
Properties props)
/**
* Creates a consumer for a single topic with KafkaDeserializationSchema.
*
* @param topic The Kafka topic to read from
* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
* @param props Configuration properties for Kafka consumer and ZooKeeper client
*/
public FlinkKafkaConsumer08(String topic,
KafkaDeserializationSchema<T> deserializer,
Properties props)/**
* Creates a consumer for multiple topics with DeserializationSchema.
*
* @param topics List of Kafka topics to read from
* @param deserializer The deserializer used to convert raw bytes to data objects
* @param props Configuration properties for Kafka consumer and ZooKeeper client
*/
public FlinkKafkaConsumer08(List<String> topics,
DeserializationSchema<T> deserializer,
Properties props)
/**
* Creates a consumer for multiple topics with KafkaDeserializationSchema.
*
* @param topics List of Kafka topics to read from
* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
* @param props Configuration properties for Kafka consumer and ZooKeeper client
*/
public FlinkKafkaConsumer08(List<String> topics,
KafkaDeserializationSchema<T> deserializer,
Properties props)/**
* Creates a consumer with pattern-based topic subscription and DeserializationSchema.
* Enables dynamic discovery of topics matching the pattern.
*
* @param subscriptionPattern Regex pattern for topic names to subscribe to
* @param valueDeserializer The deserializer used to convert raw bytes to data objects
* @param props Configuration properties for Kafka consumer and ZooKeeper client
*/
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern,
DeserializationSchema<T> valueDeserializer,
Properties props)
/**
* Creates a consumer with pattern-based topic subscription and KafkaDeserializationSchema.
* Enables dynamic discovery of topics matching the pattern.
*
* @param subscriptionPattern Regex pattern for topic names to subscribe to
* @param deserializer The deserializer used to convert Kafka ConsumerRecord to data objects
* @param props Configuration properties for Kafka consumer and ZooKeeper client
*/
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern,
KafkaDeserializationSchema<T> deserializer,
Properties props)/**
* Configures the consumer to start reading from the earliest available offset.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> setStartFromEarliest()
/**
* Configures the consumer to start reading from the latest available offset.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest()
/**
* Configures the consumer to start reading from the consumer group's committed offsets.
* This is the default behavior.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets()
/**
* Configures the consumer to start reading from specific offsets per partition.
*
* @param specificStartupOffsets Map of partition to offset mappings
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)/**
* Assigns a timestamp assigner and watermark generator for punctuated watermarks.
*
* @param assigner The assigner that generates punctuated watermarks
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner)
/**
* Assigns a timestamp assigner and watermark generator for periodic watermarks.
*
* @param assigner The assigner that generates periodic watermarks
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner)/**
* Configures whether the consumer should commit offsets back to Kafka on checkpoints.
* This is necessary for exactly-once processing guarantees.
*
* @param commitOnCheckpoints Whether to commit offsets on checkpoints (default: true)
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints)
/**
* Disables the filtering of restored partitions based on subscribed topics.
* By default, only partitions of subscribed topics are restored.
*
* @return This consumer instance for method chaining
*/
public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics()/**
* Opens the consumer and initializes resources.
*
* @param configuration The task configuration
* @throws Exception If initialization fails
*/
public void open(Configuration configuration) throws Exception
/**
* Main execution method that reads from Kafka and emits records.
*
* @param sourceContext The source context for emitting records
* @throws Exception If execution fails
*/
public void run(SourceContext<T> sourceContext) throws Exception
/**
* Cancels the consumer operation and stops reading.
*/
public void cancel()
/**
* Closes the consumer and cleans up resources.
*
* @throws Exception If cleanup fails
*/
public void close() throws Exception
/**
* Returns the type information for the produced data type.
*
* @return Type information for T
*/
public TypeInformation<T> getProducedType()/**
* Initializes the state of the function from a checkpoint.
*
* @param context The initialization context
* @throws Exception If state initialization fails
*/
public final void initializeState(FunctionInitializationContext context) throws Exception
/**
* Snapshots the function's state during checkpointing.
*
* @param context The snapshot context
* @throws Exception If state snapshotting fails
*/
public final void snapshotState(FunctionSnapshotContext context) throws Exception
/**
* Notifies the function that a checkpoint has been completed.
*
* @param checkpointId The ID of the completed checkpoint
* @throws Exception If notification processing fails
*/
public final void notifyCheckpointComplete(long checkpointId) throws Exception/**
* Validate the ZooKeeper configuration, checking for required parameters.
*
* @param props Properties to check
* @throws IllegalArgumentException if required properties are missing or invalid
*/
protected static void validateZooKeeperConfig(Properties props)import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "my-consumer-group");
properties.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic",
new SimpleStringSchema(),
properties
);
env.addSource(consumer).print();import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
KafkaDeserializationSchema<MyEvent> schema = new KafkaDeserializationSchema<MyEvent>() {
@Override
public MyEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return MyEvent.fromJson(new String(record.value()));
}
@Override
public boolean isEndOfStream(MyEvent nextElement) {
return nextElement.isEndMarker();
}
@Override
public TypeInformation<MyEvent> getProducedType() {
return TypeInformation.of(MyEvent.class);
}
};
FlinkKafkaConsumer08<MyEvent> consumer = new FlinkKafkaConsumer08<>(
topics, schema, properties);import org.apache.flink.streaming.api.functions.timestamps.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.regex.Pattern;
Pattern topicPattern = Pattern.compile("metrics-.*");
FlinkKafkaConsumer08<MyMetric> consumer = new FlinkKafkaConsumer08<>(
topicPattern,
new MyMetricDeserializer(),
properties
);
consumer.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<MyMetric>() {
private long currentMaxTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(MyMetric element, long previousElementTimestamp) {
long timestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - 5000); // 5 second delay
}
});
consumer.setStartFromLatest()
.setCommitOffsetsOnCheckpoints(true);import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 0), 12345L);
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 1), 67890L);
specificStartupOffsets.put(new KafkaTopicPartition("my-topic", 2), 54321L);
FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(
"my-topic", new SimpleStringSchema(), properties);
consumer.setStartFromSpecificOffsets(specificStartupOffsets)
.setCommitOffsetsOnCheckpoints(true);@Deprecated
public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T>
/**
* @deprecated Use FlinkKafkaConsumer08 instead
*/
@Deprecated
public FlinkKafkaConsumer081(String topic,
DeserializationSchema<T> valueDeserializer,
Properties props)@Deprecated
public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T>
/**
* @deprecated Use FlinkKafkaConsumer08 instead
*/
@Deprecated
public FlinkKafkaConsumer082(String topic,
DeserializationSchema<T> valueDeserializer,
Properties props)Package: org.apache.flink.streaming.connectors.kafka
Annotations: @PublicEvolving
Extends: Serializable, ResultTypeQueryable<T>
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Method to decide whether the element signals the end of the stream.
* If this returns true, the element won't be emitted.
*
* @param nextElement The element to test for end-of-stream signal
* @return True if the element signals end of stream, false otherwise
*/
boolean isEndOfStream(T nextElement);
/**
* Deserializes the Kafka record.
*
* @param record The ConsumerRecord from Kafka to deserialize
* @return The deserialized message as an object of type T
* @throws Exception If deserialization fails
*/
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
}public class CustomEventDeserializer implements KafkaDeserializationSchema<CustomEvent> {
@Override
public CustomEvent deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
// Access all ConsumerRecord information
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
long timestamp = record.timestamp();
byte[] key = record.key();
byte[] value = record.value();
// Custom deserialization logic
CustomEvent event = parseEvent(value);
event.setMetadata(topic, partition, offset, timestamp);
return event;
}
@Override
public boolean isEndOfStream(CustomEvent nextElement) {
// Check for end-of-stream marker
return nextElement != null && nextElement.getEventType().equals("END_STREAM");
}
@Override
public TypeInformation<CustomEvent> getProducedType() {
return TypeInformation.of(CustomEvent.class);
}
private CustomEvent parseEvent(byte[] data) throws Exception {
// Implementation specific parsing logic
return CustomEvent.fromBytes(data);
}
}Properties props = new Properties();
// Required: ZooKeeper connection for metadata
props.setProperty("zookeeper.connect", "localhost:2181");
// Required: Consumer group ID
props.setProperty("group.id", "my-consumer-group");// Offset reset behavior when no committed offset exists
props.setProperty("auto.offset.reset", "earliest"); // or "latest"
// Socket and network settings
props.setProperty("socket.timeout.ms", "30000");
props.setProperty("socket.receive.buffer.bytes", "65536");
// Fetch settings
props.setProperty("fetch.message.max.bytes", "1048576");
props.setProperty("fetch.wait.max.ms", "100");
// ZooKeeper settings
props.setProperty("zookeeper.session.timeout.ms", "6000");
props.setProperty("zookeeper.connection.timeout.ms", "6000");
// Auto-commit settings (recommend disabling for exactly-once)
props.setProperty("auto.commit.enable", "false");
props.setProperty("auto.commit.interval.ms", "60000");// Partition discovery interval (milliseconds)
props.setProperty("flink.partition-discovery.interval-millis", "30000");
// Disable metrics collection
props.setProperty("flink.disable-metrics", "false");
// Retry configuration for partition discovery
props.setProperty("flink.get-partitions.retry", "3");