or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md
tile.json

consumer.mddocs/

Kinesis Consumer

The FlinkKinesisConsumer provides exactly-once streaming data ingestion from Amazon Kinesis Data Streams with automatic shard discovery, checkpointing, and comprehensive configuration options for high-throughput stream processing.

Capabilities

FlinkKinesisConsumer

Main consumer class for reading from one or more Kinesis streams with exactly-once processing guarantees through Flink's checkpointing mechanism.

@PublicEvolving
public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> 
    implements ResultTypeQueryable<T>, CheckpointedFunction {
    
    /**
     * Create consumer for single stream with standard deserialization schema.
     *
     * @param stream Stream name to consume from
     * @param deserializer Standard Flink deserialization schema
     * @param configProps AWS and consumer configuration properties
     */
    public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps);
    
    /**
     * Create consumer for single stream with Kinesis-specific deserialization schema.
     *
     * @param stream Stream name to consume from
     * @param deserializer Kinesis deserialization schema with metadata access
     * @param configProps AWS and consumer configuration properties
     */
    public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps);
    
    /**
     * Create consumer for multiple streams with Kinesis-specific deserialization schema.
     *
     * @param streams List of stream names to consume from
     * @param deserializer Kinesis deserialization schema with metadata access
     * @param configProps AWS and consumer configuration properties
     */
    public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps);
    
    /**
     * Get current shard assigner for mapping shards to subtasks.
     *
     * @return Current shard assigner instance
     */
    public KinesisShardAssigner getShardAssigner();
    
    /**
     * Set custom shard assigner for mapping shards to subtasks.
     *
     * @param shardAssigner Custom shard assigner implementation
     */
    public void setShardAssigner(KinesisShardAssigner shardAssigner);
    
    /**
     * Get current periodic watermark assigner for event-time processing.
     *
     * @return Current watermark assigner instance
     */
    public AssignerWithPeriodicWatermarks<T> getPeriodicWatermarkAssigner();
    
    /**
     * Set periodic watermark assigner for event-time processing.
     *
     * @param periodicWatermarkAssigner Watermark assigner implementation
     */
    public void setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner);
    
    /**
     * Get current watermark tracker for distributed watermark aggregation.
     *
     * @return Current watermark tracker instance
     */
    public WatermarkTracker getWatermarkTracker();
    
    /**
     * Set watermark tracker for distributed watermark aggregation.
     *
     * @param watermarkTracker Watermark tracker implementation
     */
    public void setWatermarkTracker(WatermarkTracker watermarkTracker);
    
    /**
     * Main source function execution method.
     *
     * @param sourceContext Flink source context for emitting records
     * @throws Exception On processing errors
     */
    public void run(SourceContext<T> sourceContext) throws Exception;
    
    /**
     * Cancel the consumer and stop reading from streams.
     */
    public void cancel();
    
    /**
     * Close resources and cleanup.
     *
     * @throws Exception On cleanup errors
     */
    public void close() throws Exception;
    
    /**
     * Get the type information for produced records.
     *
     * @return Type information for output type T
     */
    public TypeInformation<T> getProducedType();
    
    /**
     * Initialize state for checkpointing.
     *
     * @param context Function initialization context
     * @throws Exception On initialization errors
     */
    public void initializeState(FunctionInitializationContext context) throws Exception;
    
    /**
     * Create snapshot of current state for checkpointing.
     *
     * @param context Function snapshot context
     * @throws Exception On snapshot errors
     */
    public void snapshotState(FunctionSnapshotContext context) throws Exception;
}

Usage Examples

Basic Single Stream Consumer

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
    "my-kinesis-stream",
    new SimpleStringSchema(),
    props
);

DataStream<String> stream = env.addSource(consumer);

Multi-Stream Consumer with Custom Deserialization

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import java.util.Arrays;

// Custom deserialization schema with access to Kinesis metadata
KinesisDeserializationSchema<MyEvent> deserializer = new KinesisDeserializationSchema<MyEvent>() {
    @Override
    public MyEvent deserialize(byte[] recordValue, String partitionKey, String seqNum,
                              long approxArrivalTimestamp, String stream, String shardId) throws IOException {
        // Parse JSON with metadata
        MyEvent event = parseJson(recordValue);
        event.setMetadata(stream, shardId, seqNum, approxArrivalTimestamp);
        return event;
    }
    
    @Override
    public TypeInformation<MyEvent> getProducedType() {
        return TypeInformation.of(MyEvent.class);
    }
};

FlinkKinesisConsumer<MyEvent> consumer = new FlinkKinesisConsumer<>(
    Arrays.asList("stream-1", "stream-2", "stream-3"),
    deserializer,
    props
);

Consumer with Event-Time Processing and Watermarks

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

// Custom watermark assigner for event-time processing
AssignerWithPeriodicWatermarks<MyEvent> watermarkAssigner = new AssignerWithPeriodicWatermarks<MyEvent>() {
    private long maxTimestamp = Long.MIN_VALUE;
    
    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getEventTime();
        maxTimestamp = Math.max(timestamp, maxTimestamp);
        return timestamp;
    }
    
    @Override
    public Watermark getCurrentWatermark() {
        // Allow 10 seconds of lateness
        return new Watermark(maxTimestamp - 10000);
    }
};

consumer.setPeriodicWatermarkAssigner(watermarkAssigner);

// Configure watermark emission interval
env.getConfig().setAutoWatermarkInterval(5000);

// Configure shard idle timeout to prevent watermark stalling
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");

Consumer with Custom Shard Assignment

import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;

// Custom shard assigner to control load balancing
KinesisShardAssigner customAssigner = new KinesisShardAssigner() {
    @Override
    public int assign(StreamShardHandle shard, int numParallelSubtasks) {
        // Custom logic for shard assignment
        String shardId = shard.getShard().getShardId();
        // Use consistent hashing or custom logic
        return Math.abs(shardId.hashCode()) % numParallelSubtasks;
    }
};

consumer.setShardAssigner(customAssigner);

Enhanced Fan-Out (EFO) Consumer Configuration

// Configure Enhanced Fan-Out for dedicated throughput
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-app");

// EFO registration strategy (LAZY, EAGER, NONE)
props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY");

// Consumer ARN for existing EFO consumer
String consumerArn = ConsumerConfigConstants.efoConsumerArn("my-kinesis-stream");

Configuration Options

Key configuration properties for the FlinkKinesisConsumer:

Stream Configuration

  • STREAM_INITIAL_POSITION: Starting position (TRIM_HORIZON, LATEST, AT_TIMESTAMP)
  • STREAM_INITIAL_TIMESTAMP: Timestamp for AT_TIMESTAMP positioning
  • RECORD_PUBLISHER_TYPE: EFO or POLLING record publisher
  • EFO_CONSUMER_NAME: Name for Enhanced Fan-Out consumer

Shard Configuration

  • SHARD_GETRECORDS_MAX: Maximum records per GetRecords call (default: 10000)
  • SHARD_GETRECORDS_INTERVAL_MILLIS: Interval between GetRecords calls (default: 200ms)
  • SHARD_IDLE_INTERVAL_MILLIS: Timeout for idle shard detection
  • SHARD_USE_ADAPTIVE_READS: Enable adaptive read intervals

Watermark Configuration

  • WATERMARK_SYNC_MILLIS: Interval for watermark synchronization (default: 30000)
  • WATERMARK_LOOKAHEAD_MILLIS: Lookahead time for watermark calculation (default: 180000)

Error Handling

The consumer provides comprehensive error handling and recovery mechanisms:

  • Checkpointing: Automatic recovery from exactly the last checkpointed position
  • Shard Resharding: Automatic detection and handling of shard splits and merges
  • Network Failures: Automatic retry with exponential backoff
  • Throttling: Built-in handling of Kinesis throttling with adaptive backoff
  • Idle Shards: Configurable timeout to prevent stalled watermarks from closed shards