0
# Configuration Management
1
2
Comprehensive configuration system for AWS credentials, regions, consumer behavior, producer settings, and advanced features like Enhanced Fan-Out and watermark management.
3
4
## Capabilities
5
6
### AWS Configuration
7
8
Base configuration constants for AWS service access, credential management, and regional settings.
9
10
```java { .api }
11
@PublicEvolving
12
public class AWSConfigConstants {
13
14
// Core AWS Configuration
15
public static final String AWS_REGION = "aws.region";
16
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
17
public static final String AWS_ENDPOINT = "aws.endpoint";
18
19
// Credential Provider Types
20
public enum CredentialProvider {
21
ENV_VAR, // Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
22
SYS_PROP, // System properties (aws.accessKeyId, aws.secretKey)
23
PROFILE, // AWS profile file (~/.aws/credentials)
24
BASIC, // Basic access key/secret key
25
ASSUME_ROLE, // IAM role assumption
26
WEB_IDENTITY_TOKEN, // Web identity token for OIDC
27
AUTO // Automatic credential chain
28
}
29
30
// Helper methods for prefixed configuration keys
31
public static String accessKeyId(String prefix);
32
public static String secretKey(String prefix);
33
public static String profilePath(String prefix);
34
public static String profileName(String prefix);
35
public static String roleArn(String prefix);
36
public static String roleSessionName(String prefix);
37
public static String externalId(String prefix);
38
public static String roleCredentialsProvider(String prefix);
39
public static String webIdentityTokenFile(String prefix);
40
}
41
```
42
43
### Consumer Configuration
44
45
Consumer-specific configuration constants for stream positioning, shard management, Enhanced Fan-Out, and watermark handling.
46
47
```java { .api }
48
@PublicEvolving
49
public class ConsumerConfigConstants extends AWSConfigConstants {
50
51
// Stream Configuration
52
public static final String RECORD_PUBLISHER_TYPE = "flink.stream.recordpublisher";
53
public static final String EFO_CONSUMER_NAME = "flink.stream.efo.consumername";
54
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
55
public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
56
57
// Shard Configuration
58
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
59
public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
60
public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";
61
public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";
62
63
// Watermark Configuration
64
public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";
65
public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";
66
67
// Enhanced Fan-Out Configuration
68
public static final String EFO_REGISTRATION_TYPE = "flink.stream.efo.registration";
69
70
// Stream Initial Position Options
71
public enum InitialPosition {
72
TRIM_HORIZON, // Start from oldest available record
73
LATEST, // Start from latest record
74
AT_TIMESTAMP // Start from specific timestamp
75
}
76
77
// Record Publisher Types
78
public enum RecordPublisherType {
79
EFO, // Enhanced Fan-Out with dedicated throughput
80
POLLING // Standard polling with shared throughput
81
}
82
83
// EFO Registration Strategies
84
public enum EFORegistrationType {
85
LAZY, // Register consumer on first access
86
EAGER, // Register consumer immediately on job start
87
NONE // Use existing consumer, don't register
88
}
89
90
/**
91
* Generate EFO consumer ARN for a specific stream.
92
*
93
* @param streamName Name of the Kinesis stream
94
* @return Consumer ARN property key
95
*/
96
public static String efoConsumerArn(String streamName);
97
}
98
```
99
100
### Producer Configuration (Deprecated)
101
102
Legacy producer configuration constants (deprecated in favor of direct KPL properties).
103
104
```java { .api }
105
@Deprecated
106
public class ProducerConfigConstants extends AWSConfigConstants {
107
108
/**
109
* @deprecated Use KPL property "CollectionMaxCount" instead
110
*/
111
@Deprecated
112
public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
113
114
/**
115
* @deprecated Use KPL property "AggregationMaxCount" instead
116
*/
117
@Deprecated
118
public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
119
}
120
```
121
122
## Configuration Examples
123
124
### Basic AWS Configuration
125
126
```java
127
import java.util.Properties;
128
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
129
130
Properties props = new Properties();
131
132
// Basic configuration with access keys
133
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
134
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "AKIAIOSFODNN7EXAMPLE");
135
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
136
```
137
138
### Advanced AWS Credential Configuration
139
140
```java
141
// Using AWS profiles
142
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
143
props.setProperty(AWSConfigConstants.AWS_PROFILE_NAME, "my-profile");
144
props.setProperty(AWSConfigConstants.AWS_PROFILE_PATH, "/path/to/credentials");
145
146
// Using IAM role assumption
147
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
148
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/MyRole");
149
props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "flink-kinesis-session");
150
props.setProperty(AWSConfigConstants.AWS_ROLE_EXTERNAL_ID, "external-id");
151
152
// Using web identity tokens (for EKS/Fargate)
153
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
154
props.setProperty(AWSConfigConstants.AWS_WEB_IDENTITY_TOKEN_FILE, "/var/run/secrets/token");
155
props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/EKSRole");
156
157
// Using automatic credential chain
158
props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
159
```
160
161
### Consumer Configuration
162
163
```java
164
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
165
166
// Stream positioning
167
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
168
// or for specific timestamp
169
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
170
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2023-01-01T00:00:00Z");
171
172
// Shard configuration for performance tuning
173
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");
174
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
175
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
176
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
177
178
// Watermark configuration for event-time processing
179
props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, "30000");
180
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, "180000");
181
```
182
183
### Enhanced Fan-Out Configuration
184
185
```java
186
// Enable Enhanced Fan-Out for dedicated throughput
187
props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");
188
props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-application");
189
190
// EFO registration strategies
191
props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY"); // Register on demand
192
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "EAGER"); // Register immediately
193
// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "NONE"); // Use existing consumer
194
195
// For using existing EFO consumer
196
String consumerArn = "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream/consumer/my-consumer:1234567890";
197
props.setProperty(ConsumerConfigConstants.efoConsumerArn("my-stream"), consumerArn);
198
```
199
200
### Multi-Region Configuration
201
202
```java
203
// Configure for cross-region access
204
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
205
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://kinesis.us-west-2.amazonaws.com");
206
207
// For custom endpoints (testing, VPC endpoints)
208
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://vpce-123456-xyz.kinesis.us-west-2.vpce.amazonaws.com");
209
```
210
211
### Producer KPL Configuration
212
213
```java
214
// KPL-specific configuration (passed directly to KPL)
215
props.setProperty("RecordMaxBufferedTime", "100"); // Batching delay (ms)
216
props.setProperty("RecordTtl", "30000"); // Record TTL (ms)
217
props.setProperty("RequestTimeout", "6000"); // Request timeout (ms)
218
props.setProperty("MaxConnections", "24"); // HTTP connections
219
220
// Aggregation settings
221
props.setProperty("AggregationEnabled", "true");
222
props.setProperty("AggregationMaxCount", "4294967295");
223
props.setProperty("AggregationMaxSize", "51200");
224
225
// Retry configuration
226
props.setProperty("RetryDuration", "10000"); // Max retry time (ms)
227
228
// Metrics configuration
229
props.setProperty("MetricsLevel", "DETAILED"); // NONE, SUMMARY, DETAILED
230
props.setProperty("MetricsGranularity", "SHARD"); // GLOBAL, STREAM, SHARD
231
props.setProperty("MetricsNameSpace", "MyApp/KinesisProducer");
232
```
233
234
## Configuration Best Practices
235
236
### Security
237
- Use IAM roles instead of hardcoded access keys when possible
238
- Implement credential rotation policies
239
- Use least-privilege access policies
240
- Enable CloudTrail logging for audit trails
241
242
### Performance
243
- Configure appropriate shard limits based on expected throughput
244
- Use Enhanced Fan-Out for high-throughput consumers
245
- Tune GetRecords intervals based on latency requirements
246
- Enable adaptive reads for variable workloads
247
248
### Reliability
249
- Configure appropriate timeouts and retry policies
250
- Set up monitoring and alerting for consumer lag
251
- Use checkpointing for exactly-once processing guarantees
252
- Configure shard idle timeouts to prevent watermark stalling
253
254
### Cost Optimization
255
- Use standard polling instead of EFO for low-throughput streams
256
- Configure appropriate retention periods
257
- Use shard-level metrics only when needed
258
- Optimize batch sizes and intervals to reduce API calls
259
260
## Environment-Specific Configuration
261
262
### Development Environment
263
264
```java
265
// Relaxed settings for development
266
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
267
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000");
268
props.setProperty("MetricsLevel", "SUMMARY");
269
```
270
271
### Production Environment
272
273
```java
274
// Optimized settings for production
275
props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");
276
props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");
277
props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
278
props.setProperty("MetricsLevel", "DETAILED");
279
props.setProperty("MetricsGranularity", "SHARD");
280
281
// Enhanced monitoring
282
props.setProperty("CloudWatchMetricsEnabled", "true");
283
props.setProperty("CloudWatchMetricsNamespace", "MyApp/Kinesis");
284
```
285
286
### Testing Environment
287
288
```java
289
// Configuration for integration testing
290
props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566"); // LocalStack
291
props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
292
props.setProperty("MetricsLevel", "NONE"); // Disable metrics for testing
293
```