Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kinesis_2-11@1.14.0Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services. This connector enables exactly-once processing guarantees, automatic shard discovery, checkpointing, and supports both Kinesis Data Streams and DynamoDB Streams.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;Configuration and serialization:
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure AWS properties
Properties consumerProps = new Properties();
consumerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
consumerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
consumerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
// Create Kinesis consumer
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
"my-stream",
new SimpleStringSchema(),
consumerProps
);
// Create data stream from Kinesis
DataStream<String> stream = env.addSource(consumer);
// Configure producer properties
Properties producerProps = new Properties();
producerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
// Create Kinesis producer
FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(
new SimpleStringSchema(),
producerProps
);
producer.setDefaultStream("output-stream");
// Send data to Kinesis
stream.addSink(producer);
// Execute the job
env.execute("Kinesis Streaming Job");The Flink Kinesis connector is built around several key components:
The connector supports both AWS SDK v1.x and v2.x, provides comprehensive metrics, and integrates with Flink's Table API for SQL-based stream processing.
Core consumer functionality for reading from Kinesis Data Streams with exactly-once processing guarantees, automatic shard discovery, and comprehensive configuration options.
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T>, CheckpointedFunction {
public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);
public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);
public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);
public void setShardAssigner(KinesisShardAssigner shardAssigner);
public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);
public void setWatermarkTracker(WatermarkTracker watermarkTracker);
}Producer functionality for writing data to Kinesis Data Streams with configurable partitioning, error handling, and backpressure management.
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>
implements CheckpointedFunction {
public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);
public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);
public void setFailOnError(boolean failOnError);
public void setQueueLimit(int queueLimit);
public void setDefaultStream(String defaultStream);
public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);
}Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality.
public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
}Comprehensive configuration constants and utilities for AWS credentials, regions, consumer behavior, and producer settings.
public class AWSConfigConstants {
public static final String AWS_REGION = "aws.region";
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
public static final String AWS_ACCESS_KEY_ID;
public static final String AWS_SECRET_ACCESS_KEY;
public enum CredentialProvider { ENV_VAR, SYS_PROP, PROFILE, BASIC, ASSUME_ROLE, WEB_IDENTITY_TOKEN, AUTO }
}
public class ConsumerConfigConstants extends AWSConfigConstants {
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
public enum InitialPosition { TRIM_HORIZON, LATEST, AT_TIMESTAMP }
public enum RecordPublisherType { EFO, POLLING }
}Kinesis-specific serialization interfaces that provide access to stream metadata and allow custom target stream specification.
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
default void open(DeserializationSchema.InitializationContext context) throws Exception;
T deserialize(byte[] recordValue, String partitionKey, String seqNum,
long approxArrivalTimestamp, String stream, String shardId) throws IOException;
}
public interface KinesisSerializationSchema<T> extends Serializable {
default void open(InitializationContext context) throws Exception;
ByteBuffer serialize(T element);
String getTargetStream(T element);
}Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks.
public abstract class KinesisPartitioner<T> implements Serializable {
public abstract String getPartitionId(T element);
public String getExplicitHashKey(T element);
public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);
}
public interface KinesisShardAssigner extends Serializable {
int assign(StreamShardHandle shard, int numParallelSubtasks);
}SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks.
public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "kinesis";
public DynamicTableSource createDynamicTableSource(Context context);
public DynamicTableSink createDynamicTableSink(Context context);
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}public class SequenceNumber implements Comparable<SequenceNumber>, Serializable {
public String get();
public int compareTo(SequenceNumber other);
public boolean equals(Object obj);
public String toString();
}
public enum SentinelSequenceNumber {
SENTINEL_EARLIEST_SEQUENCE_NUM,
SENTINEL_LATEST_SEQUENCE_NUM,
SENTINEL_SHARD_ENDING_SEQUENCE_NUM
}
public class StartingPosition implements Serializable {
public static StartingPosition fromStart();
public static StartingPosition fromEnd();
public static StartingPosition fromTimestamp(Date timestamp);
public static StartingPosition continueFromSequenceNumber(SequenceNumber sequenceNumber);
}
public abstract class StreamShardHandle implements Serializable {
public abstract String getStreamName();
public abstract Shard getShard();
public abstract boolean equals(Object obj);
public abstract int hashCode();
}
public class KinesisStreamShardState implements Serializable {
public StreamShardMetadata getStreamShardMetadata();
public SequenceNumber getLastProcessedSequenceNum();
public boolean equals(Object obj);
public String toString();
}public abstract class FlinkKinesisException extends RuntimeException {
public FlinkKinesisException(String message);
public FlinkKinesisException(String message, Throwable cause);
public static class FlinkKinesisTimeoutException extends FlinkKinesisException {
// Semantic exception for timeout errors
}
}public abstract class WatermarkTracker implements Closeable, Serializable {
public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;
public abstract long getUpdateTimeoutCount();
public void setUpdateTimeoutMillis(long updateTimeoutMillis);
public abstract long updateWatermark(long localWatermark);
public void open(RuntimeContext context);
public void close();
}