The FlinkKinesisConsumer provides exactly-once streaming data ingestion from Amazon Kinesis Data Streams with automatic shard discovery, checkpointing, and comprehensive configuration options for high-throughput stream processing.
Main consumer class for reading from one or more Kinesis streams with exactly-once processing guarantees through Flink's checkpointing mechanism.
@PublicEvolving
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T>, CheckpointedFunction {
/**
* Create consumer for single stream with standard deserialization schema.
*
* @param stream Stream name to consume from
* @param deserializer Standard Flink deserialization schema
* @param configProps AWS and consumer configuration properties
*/
public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);
/**
* Create consumer for single stream with Kinesis-specific deserialization schema.
*
* @param stream Stream name to consume from
* @param deserializer Kinesis deserialization schema with metadata access
* @param configProps AWS and consumer configuration properties
*/
public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);
/**
* Create consumer for multiple streams with Kinesis-specific deserialization schema.
*
* @param streams List of stream names to consume from
* @param deserializer Kinesis deserialization schema with metadata access
* @param configProps AWS and consumer configuration properties
*/
public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);
/**
* Get current shard assigner for mapping shards to subtasks.
*
* @return Current shard assigner instance
*/
public KinesisShardAssigner getShardAssigner();
/**
* Set custom shard assigner for mapping shards to subtasks.
*
* @param shardAssigner Custom shard assigner implementation
*/
public void setShardAssigner(KinesisShardAssigner shardAssigner);
/**
* Get current periodic watermark assigner for event-time processing.
*
* @return Current watermark assigner instance
*/
public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner();
/**
* Set periodic watermark assigner for event-time processing.
*
* @param periodicWatermarkAssigner Watermark assigner implementation
*/
public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);
/**
* Get current watermark tracker for distributed watermark aggregation.
*
* @return Current watermark tracker instance
*/
public WatermarkTracker getWatermarkTracker();
/**
* Set watermark tracker for distributed watermark aggregation.
*
* @param watermarkTracker Watermark tracker implementation
*/
public void setWatermarkTracker(WatermarkTracker watermarkTracker);
/**
* Main source function execution method.
*
* @param sourceContext Flink source context for emitting records
* @throws Exception On processing errors
*/
public void run(SourceContext<T> sourceContext) throws Exception;
/**
* Cancel the consumer and stop reading from streams.
*/
public void cancel();
/**
* Close resources and cleanup.
*
* @throws Exception On cleanup errors
*/
public void close() throws Exception;
/**
* Get the type information for produced records.
*
* @return Type information for output type T
*/
public TypeInformation<T> getProducedType();
/**
* Initialize state for checkpointing.
*
* @param context Function initialization context
* @throws Exception On initialization errors
*/
public void initializeState(FunctionInitializationContext context) throws Exception;
/**
* Create snapshot of current state for checkpointing.
*
* @param context Function snapshot context
* @throws Exception On snapshot errors
*/
public void snapshotState(FunctionSnapshotContext context) throws Exception;
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
"my-kinesis-stream",
new SimpleStringSchema(),
props
);
DataStream<String> stream = env.addSource(consumer);import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import java.util.Arrays;
// Custom deserialization schema with access to Kinesis metadata
KinesisDeserializationSchema<MyEvent> deserializer = new KinesisDeserializationSchema<MyEvent>() {
@Override
public MyEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException {
// Parse JSON with metadata
MyEvent event = parseJson(recordValue);
event.setMetadata(stream, shardId, seqNum, approxArrivalTimestamp);
return event;
}
@Override
public TypeInformation<MyEvent> getProducedType() {
return TypeInformation.of(MyEvent.class);
}
};
FlinkKinesisConsumer<MyEvent> consumer = new FlinkKinesisConsumer<>(
Arrays.asList("stream-1", "stream-2", "stream-3"),
deserializer,
props
);import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
// Custom watermark assigner for event-time processing
AssignerWithPeriodicWatermarks<MyEvent> watermarkAssigner = new AssignerWithPeriodicWatermarks<MyEvent>() {
private long maxTimestamp = Long.MIN_VALUE;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getEventTime();
maxTimestamp = Math.max(timestamp, maxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// Allow 10 seconds of lateness
return new Watermark(maxTimestamp - 10000);
}
};
consumer.setPeriodicWatermarkAssigner(watermarkAssigner);
// Configure watermark emission interval
env.getConfig().setAutoWatermarkInterval(5000);
// Configure shard idle timeout to prevent watermark stalling
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
// Custom shard assigner to control load balancing
KinesisShardAssigner customAssigner = new KinesisShardAssigner() {
@Override
public int assign(StreamShardHandle shard, int numParallelSubtasks) {
// Custom logic for shard assignment
String shardId = shard.getShard().getShardId();
// Use consistent hashing or custom logic
return Math.abs(shardId.hashCode()) % numParallelSubtasks;
}
};
consumer.setShardAssigner(customAssigner);// Configure Enhanced Fan-Out for dedicated throughput
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-app");
// EFO registration strategy (LAZY, EAGER, NONE)
props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY");
// Consumer ARN for existing EFO consumer
String consumerArn = ConsumerConfigConstants.efoConsumerArn("my-kinesis-stream");Key configuration properties for the FlinkKinesisConsumer:
STREAM_INITIAL_POSITION: Starting position (TRIM_HORIZON, LATEST, AT_TIMESTAMP)STREAM_INITIAL_TIMESTAMP: Timestamp for AT_TIMESTAMP positioningRECORD_PUBLISHER_TYPE: EFO or POLLING record publisherEFO_CONSUMER_NAME: Name for Enhanced Fan-Out consumerSHARD_GETRECORDS_MAX: Maximum records per GetRecords call (default: 10000)SHARD_GETRECORDS_INTERVAL_MILLIS: Interval between GetRecords calls (default: 200ms)SHARD_IDLE_INTERVAL_MILLIS: Timeout for idle shard detectionSHARD_USE_ADAPTIVE_READS: Enable adaptive read intervalsWATERMARK_SYNC_MILLIS: Interval for watermark synchronization (default: 30000)WATERMARK_LOOKAHEAD_MILLIS: Lookahead time for watermark calculation (default: 180000)The consumer provides comprehensive error handling and recovery mechanisms: