Apache Flink connector for Amazon Kinesis Data Streams that provides both consumer and producer functionality for streaming data integration with AWS Kinesis services
Comprehensive configuration system for AWS credentials, regions, consumer behavior, producer settings, and advanced features like Enhanced Fan-Out and watermark management.
Base configuration constants for AWS service access, credential management, and regional settings.
@PublicEvolving
public class AWSConfigConstants {
// Core AWS Configuration
public static final String AWS_REGION = "aws.region";
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
public static final String AWS_ENDPOINT = "aws.endpoint";
// Credential Provider Types
public enum CredentialProvider {
ENV_VAR, // Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
SYS_PROP, // System properties (aws.accessKeyId, aws.secretKey)
PROFILE, // AWS profile file (~/.aws/credentials)
BASIC, // Basic access key/secret key
ASSUME_ROLE, // IAM role assumption
WEB_IDENTITY_TOKEN, // Web identity token for OIDC
AUTO // Automatic credential chain
}
// Helper methods for prefixed configuration keys
public static String accessKeyId(String prefix);
public static String secretKey(String prefix);
public static String profilePath(String prefix);
public static String profileName(String prefix);
public static String roleArn(String prefix);
public static String roleSessionName(String prefix);
public static String externalId(String prefix);
public static String roleCredentialsProvider(String prefix);
public static String webIdentityTokenFile(String prefix);
}Consumer-specific configuration constants for stream positioning, shard management, Enhanced Fan-Out, and watermark handling.
@PublicEvolving
public class ConsumerConfigConstants extends AWSConfigConstants {
// Stream Configuration
public static final String RECORD_PUBLISHER_TYPE = "flink.stream.recordpublisher";
public static final String EFO_CONSUMER_NAME = "flink.stream.efo.consumername";
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
// Shard Configuration
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
// Watermark Configuration
public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";
// Enhanced Fan-Out Configuration
public static final String EFO_REGISTRATION_TYPE = "flink.stream.efo.registration";
// Stream Initial Position Options
public enum InitialPosition {
TRIM_HORIZON, // Start from oldest available record
LATEST, // Start from latest record
AT_TIMESTAMP // Start from specific timestamp
}
// Record Publisher Types
public enum RecordPublisherType {
EFO, // Enhanced Fan-Out with dedicated throughput
POLLING // Standard polling with shared throughput
}
// EFO Registration Strategies
public enum EFORegistrationType {
LAZY, // Register consumer on first access
EAGER, // Register consumer immediately on job start
NONE // Use existing consumer, don't register
}
/**
* Generate EFO consumer ARN for a specific stream.
*
* @param streamName Name of the Kinesis stream
* @return Consumer ARN property key
*/
public static String efoConsumerArn(String streamName);
}Legacy producer configuration constants (deprecated in favor of direct KPL properties).
@Deprecated
public class ProducerConfigConstants extends AWSConfigConstants {
/**
* @deprecated Use KPL property "CollectionMaxCount" instead
*/
@Deprecated
public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
/**
* @deprecated Use KPL property "AggregationMaxCount" instead
*/
@Deprecated
public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
}import java.util.Properties;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
Properties props = new Properties();
// Basic configuration with access keys
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "AKIAIOSFODNN7EXAMPLE");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");// Using AWS profiles
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
props.setProperty(AWSConfigConstants.AWS_PROFILE_NAME, "my-profile");
props.setProperty(AWSConfigConstants.AWS_PROFILE_PATH, "/path/to/credentials");
// Using IAM role assumption
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/MyRole");
props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "flink-kinesis-session");
props.setProperty(AWSConfigConstants.AWS_ROLE_EXTERNAL_ID, "external-id");
// Using web identity tokens (for EKS/Fargate)
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
props.setProperty(AWSConfigConstants.AWS_WEB_IDENTITY_TOKEN_FILE, "/var/run/secrets/token");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/EKSRole");
// Using automatic credential chain
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
// Stream positioning
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
// or for specific timestamp
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2023-01-01T00:00:00Z");
// Shard configuration for performance tuning
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
// Watermark configuration for event-time processing
props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, "180000");// Enable Enhanced Fan-Out for dedicated throughput
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-application");
// EFO registration strategies
props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY"); // Register on demand
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "EAGER"); // Register immediately
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "NONE"); // Use existing consumer
// For using existing EFO consumer
String consumerArn = "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream/consumer/my-consumer:1234567890";
props.setProperty(ConsumerConfigConstants.efoConsumerArn("my-stream"), consumerArn);// Configure for cross-region access
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://kinesis.us-west-2.amazonaws.com");
// For custom endpoints (testing, VPC endpoints)
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://vpce-123456-xyz.kinesis.us-west-2.vpce.amazonaws.com");// KPL-specific configuration (passed directly to KPL)
props.setProperty("RecordMaxBufferedTime", "100"); // Batching delay (ms)
props.setProperty("RecordTtl", "30000"); // Record TTL (ms)
props.setProperty("RequestTimeout", "6000"); // Request timeout (ms)
props.setProperty("MaxConnections", "24"); // HTTP connections
// Aggregation settings
props.setProperty("AggregationEnabled", "true");
props.setProperty("AggregationMaxCount", "4294967295");
props.setProperty("AggregationMaxSize", "51200");
// Retry configuration
props.setProperty("RetryDuration", "10000"); // Max retry time (ms)
// Metrics configuration
props.setProperty("MetricsLevel", "DETAILED"); // NONE, SUMMARY, DETAILED
props.setProperty("MetricsGranularity", "SHARD"); // GLOBAL, STREAM, SHARD
props.setProperty("MetricsNameSpace", "MyApp/KinesisProducer");// Relaxed settings for development
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000");
props.setProperty("MetricsLevel", "SUMMARY");// Optimized settings for production
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
props.setProperty("MetricsLevel", "DETAILED");
props.setProperty("MetricsGranularity", "SHARD");
// Enhanced monitoring
props.setProperty("CloudWatchMetricsEnabled", "true");
props.setProperty("CloudWatchMetricsNamespace", "MyApp/Kinesis");// Configuration for integration testing
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566"); // LocalStack
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
props.setProperty("MetricsLevel", "NONE"); // Disable metrics for testingInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kinesis-2-11