or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-connector-kinesis_2-11

Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-kinesis_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-kinesis_2-11@1.14.0

index.mddocs/

Apache Flink Kinesis Connector

Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services. This connector enables exactly-once processing guarantees, automatic shard discovery, checkpointing, and supports both Kinesis Data Streams and DynamoDB Streams.

Package Information

  • Package Name: flink-connector-kinesis_2.11
  • Package Type: maven
  • Language: Java
  • Installation: Add to your Maven pom.xml:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
      <version>1.14.6</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;

Configuration and serialization:

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure AWS properties
Properties consumerProps = new Properties();
consumerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
consumerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
consumerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

// Create Kinesis consumer
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
    "my-stream",
    new SimpleStringSchema(),
    consumerProps
);

// Create data stream from Kinesis
DataStream<String> stream = env.addSource(consumer);

// Configure producer properties
Properties producerProps = new Properties();
producerProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
producerProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
producerProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

// Create Kinesis producer
FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(
    new SimpleStringSchema(),
    producerProps
);
producer.setDefaultStream("output-stream");

// Send data to Kinesis
stream.addSink(producer);

// Execute the job
env.execute("Kinesis Streaming Job");

Architecture

The Flink Kinesis connector is built around several key components:

  • Consumer (FlinkKinesisConsumer): Reads from Kinesis streams with exactly-once semantics using Flink's checkpointing mechanism
  • Producer (FlinkKinesisProducer): Writes to Kinesis streams using the Kinesis Producer Library (KPL) for high throughput
  • Shard Management: Automatic shard discovery and assignment across Flink parallelism
  • Watermark Support: Event-time processing with configurable watermark strategies
  • AWS Integration: Support for multiple AWS credential providers and regions

The connector supports both AWS SDK v1.x and v2.x, provides comprehensive metrics, and integrates with Flink's Table API for SQL-based stream processing.

Capabilities

Kinesis Consumer

Core consumer functionality for reading from Kinesis Data Streams with exactly-once processing guarantees, automatic shard discovery, and comprehensive configuration options.

public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> 
    implements ResultTypeQueryable<T>, CheckpointedFunction {
    
    public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);
    public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);
    public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);
    
    public void setShardAssigner(KinesisShardAssigner shardAssigner);
    public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);
    public void setWatermarkTracker(WatermarkTracker watermarkTracker);
}

Consumer

Kinesis Producer

Producer functionality for writing data to Kinesis Data Streams with configurable partitioning, error handling, and backpressure management.

public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> 
    implements CheckpointedFunction {
    
    public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);
    public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);
    
    public void setFailOnError(boolean failOnError);
    public void setQueueLimit(int queueLimit);
    public void setDefaultStream(String defaultStream);
    public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);
}

Producer

DynamoDB Streams Integration

Specialized consumer for reading from DynamoDB Streams, extending the base Kinesis consumer with DynamoDB-specific functionality.

public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
    public FlinkDynamoDBStreamsConsumer(String stream, DeserializationSchema<T> deserializer, Properties config);
    public FlinkDynamoDBStreamsConsumer(List<String> streams, KinesisDeserializationSchema deserializer, Properties config);
}

DynamoDB Streams

Configuration Management

Comprehensive configuration constants and utilities for AWS credentials, regions, consumer behavior, and producer settings.

public class AWSConfigConstants {
    public static final String AWS_REGION = "aws.region";
    public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
    public static final String AWS_ACCESS_KEY_ID;
    public static final String AWS_SECRET_ACCESS_KEY;
    
    public enum CredentialProvider { ENV_VAR, SYS_PROP, PROFILE, BASIC, ASSUME_ROLE, WEB_IDENTITY_TOKEN, AUTO }
}

public class ConsumerConfigConstants extends AWSConfigConstants {
    public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
    public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
    public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
    
    public enum InitialPosition { TRIM_HORIZON, LATEST, AT_TIMESTAMP }
    public enum RecordPublisherType { EFO, POLLING }
}

Configuration

Serialization and Deserialization

Kinesis-specific serialization interfaces that provide access to stream metadata and allow custom target stream specification.

public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    default void open(DeserializationSchema.InitializationContext context) throws Exception;
    T deserialize(byte[] recordValue, String partitionKey, String seqNum, 
                  long approxArrivalTimestamp, String stream, String shardId) throws IOException;
}

public interface KinesisSerializationSchema<T> extends Serializable {
    default void open(InitializationContext context) throws Exception;
    ByteBuffer serialize(T element);
    String getTargetStream(T element);
}

Serialization

Partitioning Strategies

Flexible partitioning strategies for distributing data across Kinesis shards and mapping shards to Flink subtasks.

public abstract class KinesisPartitioner<T> implements Serializable {
    public abstract String getPartitionId(T element);
    public String getExplicitHashKey(T element);
    public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks);
}

public interface KinesisShardAssigner extends Serializable {
    int assign(StreamShardHandle shard, int numParallelSubtasks);
}

Partitioning

Table API Integration

SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks.

public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static final String IDENTIFIER = "kinesis";
    
    public DynamicTableSource createDynamicTableSource(Context context);
    public DynamicTableSink createDynamicTableSink(Context context);
    public Set<ConfigOption<?>> requiredOptions();
    public Set<ConfigOption<?>> optionalOptions();
}

Table API

Types

Core Model Classes

public class SequenceNumber implements Comparable<SequenceNumber>, Serializable {
    public String get();
    public int compareTo(SequenceNumber other);
    public boolean equals(Object obj);
    public String toString();
}

public enum SentinelSequenceNumber {
    SENTINEL_EARLIEST_SEQUENCE_NUM,
    SENTINEL_LATEST_SEQUENCE_NUM,
    SENTINEL_SHARD_ENDING_SEQUENCE_NUM
}

public class StartingPosition implements Serializable {
    public static StartingPosition fromStart();
    public static StartingPosition fromEnd();
    public static StartingPosition fromTimestamp(Date timestamp);
    public static StartingPosition continueFromSequenceNumber(SequenceNumber sequenceNumber);
}

public abstract class StreamShardHandle implements Serializable {
    public abstract String getStreamName();
    public abstract Shard getShard();
    public abstract boolean equals(Object obj);
    public abstract int hashCode();
}

public class KinesisStreamShardState implements Serializable {
    public StreamShardMetadata getStreamShardMetadata();
    public SequenceNumber getLastProcessedSequenceNum();
    public boolean equals(Object obj);
    public String toString();
}

Exception Classes

public abstract class FlinkKinesisException extends RuntimeException {
    public FlinkKinesisException(String message);
    public FlinkKinesisException(String message, Throwable cause);
    
    public static class FlinkKinesisTimeoutException extends FlinkKinesisException {
        // Semantic exception for timeout errors
    }
}

Watermark Management

public abstract class WatermarkTracker implements Closeable, Serializable {
    public static final long DEFAULT_UPDATE_TIMEOUT_MILLIS = 60_000;
    
    public abstract long getUpdateTimeoutCount();
    public void setUpdateTimeoutMillis(long updateTimeoutMillis);
    public abstract long updateWatermark(long localWatermark);
    public void open(RuntimeContext context);
    public void close();
}