Comprehensive configuration system for AWS credentials, regions, consumer behavior, producer settings, and advanced features like Enhanced Fan-Out and watermark management.
Base configuration constants for AWS service access, credential management, and regional settings.
@PublicEvolving
public class AWSConfigConstants {
// Core AWS Configuration
public static final String AWS_REGION = "aws.region";
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
public static final String AWS_ENDPOINT = "aws.endpoint";
// Credential Provider Types
public enum CredentialProvider {
ENV_VAR, // Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
SYS_PROP, // System properties (aws.accessKeyId, aws.secretKey)
PROFILE, // AWS profile file (~/.aws/credentials)
BASIC, // Basic access key/secret key
ASSUME_ROLE, // IAM role assumption
WEB_IDENTITY_TOKEN, // Web identity token for OIDC
AUTO // Automatic credential chain
}
// Helper methods for prefixed configuration keys
public static String accessKeyId(String prefix);
public static String secretKey(String prefix);
public static String profilePath(String prefix);
public static String profileName(String prefix);
public static String roleArn(String prefix);
public static String roleSessionName(String prefix);
public static String externalId(String prefix);
public static String roleCredentialsProvider(String prefix);
public static String webIdentityTokenFile(String prefix);
}Consumer-specific configuration constants for stream positioning, shard management, Enhanced Fan-Out, and watermark handling.
@PublicEvolving
public class ConsumerConfigConstants extends AWSConfigConstants {
// Stream Configuration
public static final String RECORD_PUBLISHER_TYPE = "flink.stream.recordpublisher";
public static final String EFO_CONSUMER_NAME = "flink.stream.efo.consumername";
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
// Shard Configuration
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
// Watermark Configuration
public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";
// Enhanced Fan-Out Configuration
public static final String EFO_REGISTRATION_TYPE = "flink.stream.efo.registration";
// Stream Initial Position Options
public enum InitialPosition {
TRIM_HORIZON, // Start from oldest available record
LATEST, // Start from latest record
AT_TIMESTAMP // Start from specific timestamp
}
// Record Publisher Types
public enum RecordPublisherType {
EFO, // Enhanced Fan-Out with dedicated throughput
POLLING // Standard polling with shared throughput
}
// EFO Registration Strategies
public enum EFORegistrationType {
LAZY, // Register consumer on first access
EAGER, // Register consumer immediately on job start
NONE // Use existing consumer, don't register
}
/**
* Generate EFO consumer ARN for a specific stream.
*
* @param streamName Name of the Kinesis stream
* @return Consumer ARN property key
*/
public static String efoConsumerArn(String streamName);
}Legacy producer configuration constants (deprecated in favor of direct KPL properties).
@Deprecated
public class ProducerConfigConstants extends AWSConfigConstants {
/**
* @deprecated Use KPL property "CollectionMaxCount" instead
*/
@Deprecated
public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
/**
* @deprecated Use KPL property "AggregationMaxCount" instead
*/
@Deprecated
public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
}import java.util.Properties;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
Properties props = new Properties();
// Basic configuration with access keys
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "AKIAIOSFODNN7EXAMPLE");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");// Using AWS profiles
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
props.setProperty(AWSConfigConstants.AWS_PROFILE_NAME, "my-profile");
props.setProperty(AWSConfigConstants.AWS_PROFILE_PATH, "/path/to/credentials");
// Using IAM role assumption
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/MyRole");
props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "flink-kinesis-session");
props.setProperty(AWSConfigConstants.AWS_ROLE_EXTERNAL_ID, "external-id");
// Using web identity tokens (for EKS/Fargate)
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
props.setProperty(AWSConfigConstants.AWS_WEB_IDENTITY_TOKEN_FILE, "/var/run/secrets/token");
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/EKSRole");
// Using automatic credential chain
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
// Stream positioning
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
// or for specific timestamp
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2023-01-01T00:00:00Z");
// Shard configuration for performance tuning
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
// Watermark configuration for event-time processing
props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, "180000");// Enable Enhanced Fan-Out for dedicated throughput
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-application");
// EFO registration strategies
props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY"); // Register on demand
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "EAGER"); // Register immediately
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "NONE"); // Use existing consumer
// For using existing EFO consumer
String consumerArn = "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream/consumer/my-consumer:1234567890";
props.setProperty(ConsumerConfigConstants.efoConsumerArn("my-stream"), consumerArn);// Configure for cross-region access
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://kinesis.us-west-2.amazonaws.com");
// For custom endpoints (testing, VPC endpoints)
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://vpce-123456-xyz.kinesis.us-west-2.vpce.amazonaws.com");// KPL-specific configuration (passed directly to KPL)
props.setProperty("RecordMaxBufferedTime", "100"); // Batching delay (ms)
props.setProperty("RecordTtl", "30000"); // Record TTL (ms)
props.setProperty("RequestTimeout", "6000"); // Request timeout (ms)
props.setProperty("MaxConnections", "24"); // HTTP connections
// Aggregation settings
props.setProperty("AggregationEnabled", "true");
props.setProperty("AggregationMaxCount", "4294967295");
props.setProperty("AggregationMaxSize", "51200");
// Retry configuration
props.setProperty("RetryDuration", "10000"); // Max retry time (ms)
// Metrics configuration
props.setProperty("MetricsLevel", "DETAILED"); // NONE, SUMMARY, DETAILED
props.setProperty("MetricsGranularity", "SHARD"); // GLOBAL, STREAM, SHARD
props.setProperty("MetricsNameSpace", "MyApp/KinesisProducer");// Relaxed settings for development
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000");
props.setProperty("MetricsLevel", "SUMMARY");// Optimized settings for production
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
props.setProperty("MetricsLevel", "DETAILED");
props.setProperty("MetricsGranularity", "SHARD");
// Enhanced monitoring
props.setProperty("CloudWatchMetricsEnabled", "true");
props.setProperty("CloudWatchMetricsNamespace", "MyApp/Kinesis");// Configuration for integration testing
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566"); // LocalStack
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
props.setProperty("MetricsLevel", "NONE"); // Disable metrics for testing