Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services
Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services. This connector enables exactly-once processing guarantees, automatic shard discovery, checkpointing, and supports both Kinesis Data Streams and DynamoDB Streams.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;Configuration and serialization:
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure AWS properties
Properties consumerProps = new Properties();
consumerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
consumerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
consumerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
// Create Kinesis consumer
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
"my-stream",
new SimpleStringSchema(),
consumerProps
);
// Create data stream from Kinesis
DataStream<String> stream = env.addSource(consumer);
// Configure producer properties
Properties producerProps = new Properties();
producerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
// Create Kinesis producer
FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(
new SimpleStringSchema(),
producerProps
);
producer.setDefaultStream("output-stream");
// Send data to Kinesis
stream.addSink(producer);
// Execute the job
env.execute("Kinesis Streaming Job");The Flink Kinesis connector is built around several key components:
The connector supports both AWS SDK v1.x and v2.x, provides comprehensive metrics, and integrates with Flink's Table API for SQL-based stream processing.
Core consumer functionality for reading from Kinesis Data Streams with exactly-once processing guarantees, automatic shard discovery, and comprehensive configuration options.
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T>, CheckpointedFunction {
public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);
public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);
public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);
public void setShardAssigner(KinesisShardAssigner shardAssigner);
public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);
public void setWatermarkTracker(WatermarkTracker watermarkTracker);
}Producer functionality for writing data to Kinesis Data Streams with configurable partitioning, error handling, and backpressure management.
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>
implements CheckpointedFunction {
public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);
public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);
public void setFailOnError(boolean failOnError);
public void setQueueLimit(int queueLimit);
public void setDefaultStream(String defaultStream);
public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);
}Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality.
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
}Comprehensive configuration constants and utilities for AWS credentials, regions, consumer behavior, and producer settings.
public class AWSConfigConstants {
public static final String AWS_REGION = "aws.region";
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
public static final String AWS_ACCESS_KEY_ID;
public static final String AWS_SECRET_ACCESS_KEY;
public enum CredentialProvider { ENV_VAR, SYS_PROP, PROFILE, BASIC, ASSUME_ROLE, WEB_IDENTITY_TOKEN, AUTO }
}
public class ConsumerConfigConstants extends AWSConfigConstants {
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
public enum InitialPosition { TRIM_HORIZON, LATEST, AT_TIMESTAMP }
public enum RecordPublisherType { EFO, POLLING }
}Kinesis-specific serialization interfaces that provide access to stream metadata and allow custom target stream specification.
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
default void open(DeserializationSchema.InitializationContext context) throws Exception;
T deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
}
public interface KinesisSerializationSchema<T> extends Serializable {
default void open(InitializationContext context) throws Exception;
ByteBuffer serialize(T element);
String getTargetStream(T element);
}Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks.
public abstract class KinesisPartitioner<T> implements Serializable {
public abstract String getPartitionId(T element);
public String getExplicitHashKey(T element);
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);
}
public interface KinesisShardAssigner extends Serializable {
int assign(StreamShardHandle shard, int numParallelSubtasks);
}SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks.
public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "kinesis";
public DynamicTableSource createDynamicTableSource(Context context);
public DynamicTableSink createDynamicTableSink(Context context);
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}public class SequenceNumber implements Comparable<SequenceNumber>, Serializable {
public String get();
public int compareTo(SequenceNumber other);
public boolean equals(Object obj);
public String toString();
}
public enum SentinelSequenceNumber {
SENTINEL_EARLIEST_SEQUENCE_NUM,
SENTINEL_LATEST_SEQUENCE_NUM,
SENTINEL_SHARD_ENDING_SEQUENCE_NUM
}
public class StartingPosition implements Serializable {
public static StartingPosition fromStart();
public static StartingPosition fromEnd();
public static StartingPosition fromTimestamp(Date timestamp);
public static StartingPosition continueFromSequenceNumber(SequenceNumber sequenceNumber);
}
public abstract class StreamShardHandle implements Serializable {
public abstract String getStreamName();
public abstract Shard getShard();
public abstract boolean equals(Object obj);
public abstract int hashCode();
}
public class KinesisStreamShardState implements Serializable {
public StreamShardMetadata getStreamShardMetadata();
public SequenceNumber getLastProcessedSequenceNum();
public boolean equals(Object obj);
public String toString();
}public abstract class FlinkKinesisException extends RuntimeException {
public FlinkKinesisException(String message);
public FlinkKinesisException(String message, Throwable cause);
public static class FlinkKinesisTimeoutException extends FlinkKinesisException {
// Semantic exception for timeout errors
}
}public abstract class WatermarkTracker implements Closeable, Serializable {
public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;
public abstract long getUpdateTimeoutCount();
public void setUpdateTimeoutMillis(long updateTimeoutMillis);
public abstract long updateWatermark(long localWatermark);
public void open(RuntimeContext context);
public void close();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kinesis-2-11