or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

configuration.mddocs/

0

# Configuration Management

1

2

Comprehensive configuration system for AWS credentials, regions, consumer behavior, producer settings, and advanced features like Enhanced Fan-Out and watermark management.

3

4

## Capabilities

5

6

### AWS Configuration

7

8

Base configuration constants for AWS service access, credential management, and regional settings.

9

10

```java { .api }

11

@PublicEvolving

12

public class AWSConfigConstants {

13

14

// Core AWS Configuration

15

public static final String AWS_REGION = "aws.region";

16

public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";

17

public static final String AWS_ENDPOINT = "aws.endpoint";

18

19

// Credential Provider Types

20

public enum CredentialProvider {

21

ENV_VAR, // Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

22

SYS_PROP, // System properties (aws.accessKeyId, aws.secretKey)

23

PROFILE, // AWS profile file (~/.aws/credentials)

24

BASIC, // Basic access key/secret key

25

ASSUME_ROLE, // IAM role assumption

26

WEB_IDENTITY_TOKEN, // Web identity token for OIDC

27

AUTO // Automatic credential chain

28

}

29

30

// Helper methods for prefixed configuration keys

31

public static String accessKeyId(String prefix);

32

public static String secretKey(String prefix);

33

public static String profilePath(String prefix);

34

public static String profileName(String prefix);

35

public static String roleArn(String prefix);

36

public static String roleSessionName(String prefix);

37

public static String externalId(String prefix);

38

public static String roleCredentialsProvider(String prefix);

39

public static String webIdentityTokenFile(String prefix);

40

}

41

```

42

43

### Consumer Configuration

44

45

Consumer-specific configuration constants for stream positioning, shard management, Enhanced Fan-Out, and watermark handling.

46

47

```java { .api }

48

@PublicEvolving

49

public class ConsumerConfigConstants extends AWSConfigConstants {

50

51

// Stream Configuration

52

public static final String RECORD_PUBLISHER_TYPE = "flink.stream.recordpublisher";

53

public static final String EFO_CONSUMER_NAME = "flink.stream.efo.consumername";

54

public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";

55

public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";

56

57

// Shard Configuration

58

public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

59

public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";

60

public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval";

61

public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.adaptivereads";

62

63

// Watermark Configuration

64

public static final String WATERMARK_SYNC_MILLIS = "flink.watermark.sync.interval";

65

public static final String WATERMARK_LOOKAHEAD_MILLIS = "flink.watermark.lookahead.millis";

66

67

// Enhanced Fan-Out Configuration

68

public static final String EFO_REGISTRATION_TYPE = "flink.stream.efo.registration";

69

70

// Stream Initial Position Options

71

public enum InitialPosition {

72

TRIM_HORIZON, // Start from oldest available record

73

LATEST, // Start from latest record

74

AT_TIMESTAMP // Start from specific timestamp

75

}

76

77

// Record Publisher Types

78

public enum RecordPublisherType {

79

EFO, // Enhanced Fan-Out with dedicated throughput

80

POLLING // Standard polling with shared throughput

81

}

82

83

// EFO Registration Strategies

84

public enum EFORegistrationType {

85

LAZY, // Register consumer on first access

86

EAGER, // Register consumer immediately on job start

87

NONE // Use existing consumer, don't register

88

}

89

90

/**

91

* Generate EFO consumer ARN for a specific stream.

92

*

93

* @param streamName Name of the Kinesis stream

94

* @return Consumer ARN property key

95

*/

96

public static String efoConsumerArn(String streamName);

97

}

98

```

99

100

### Producer Configuration (Deprecated)

101

102

Legacy producer configuration constants (deprecated in favor of direct KPL properties).

103

104

```java { .api }

105

@Deprecated

106

public class ProducerConfigConstants extends AWSConfigConstants {

107

108

/**

109

* @deprecated Use KPL property "CollectionMaxCount" instead

110

*/

111

@Deprecated

112

public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";

113

114

/**

115

* @deprecated Use KPL property "AggregationMaxCount" instead

116

*/

117

@Deprecated

118

public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";

119

}

120

```

121

122

## Configuration Examples

123

124

### Basic AWS Configuration

125

126

```java

127

import java.util.Properties;

128

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

129

130

Properties props = new Properties();

131

132

// Basic configuration with access keys

133

props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");

134

props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "AKIAIOSFODNN7EXAMPLE");

135

props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");

136

```

137

138

### Advanced AWS Credential Configuration

139

140

```java

141

// Using AWS profiles

142

props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");

143

props.setProperty(AWSConfigConstants.AWS_PROFILE_NAME, "my-profile");

144

props.setProperty(AWSConfigConstants.AWS_PROFILE_PATH, "/path/to/credentials");

145

146

// Using IAM role assumption

147

props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");

148

props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/MyRole");

149

props.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "flink-kinesis-session");

150

props.setProperty(AWSConfigConstants.AWS_ROLE_EXTERNAL_ID, "external-id");

151

152

// Using web identity tokens (for EKS/Fargate)

153

props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");

154

props.setProperty(AWSConfigConstants.AWS_WEB_IDENTITY_TOKEN_FILE, "/var/run/secrets/token");

155

props.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::123456789012:role/EKSRole");

156

157

// Using automatic credential chain

158

props.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

159

```

160

161

### Consumer Configuration

162

163

```java

164

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

165

166

// Stream positioning

167

props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

168

// or for specific timestamp

169

props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");

170

props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2023-01-01T00:00:00Z");

171

172

// Shard configuration for performance tuning

173

props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");

174

props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");

175

props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");

176

props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");

177

178

// Watermark configuration for event-time processing

179

props.setProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, "30000");

180

props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, "180000");

181

```

182

183

### Enhanced Fan-Out Configuration

184

185

```java

186

// Enable Enhanced Fan-Out for dedicated throughput

187

props.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO");

188

props.setProperty(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-application");

189

190

// EFO registration strategies

191

props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "LAZY"); // Register on demand

192

// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "EAGER"); // Register immediately

193

// props.setProperty(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, "NONE"); // Use existing consumer

194

195

// For using existing EFO consumer

196

String consumerArn = "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream/consumer/my-consumer:1234567890";

197

props.setProperty(ConsumerConfigConstants.efoConsumerArn("my-stream"), consumerArn);

198

```

199

200

### Multi-Region Configuration

201

202

```java

203

// Configure for cross-region access

204

props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");

205

props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://kinesis.us-west-2.amazonaws.com");

206

207

// For custom endpoints (testing, VPC endpoints)

208

props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "https://vpce-123456-xyz.kinesis.us-west-2.vpce.amazonaws.com");

209

```

210

211

### Producer KPL Configuration

212

213

```java

214

// KPL-specific configuration (passed directly to KPL)

215

props.setProperty("RecordMaxBufferedTime", "100"); // Batching delay (ms)

216

props.setProperty("RecordTtl", "30000"); // Record TTL (ms)

217

props.setProperty("RequestTimeout", "6000"); // Request timeout (ms)

218

props.setProperty("MaxConnections", "24"); // HTTP connections

219

220

// Aggregation settings

221

props.setProperty("AggregationEnabled", "true");

222

props.setProperty("AggregationMaxCount", "4294967295");

223

props.setProperty("AggregationMaxSize", "51200");

224

225

// Retry configuration

226

props.setProperty("RetryDuration", "10000"); // Max retry time (ms)

227

228

// Metrics configuration

229

props.setProperty("MetricsLevel", "DETAILED"); // NONE, SUMMARY, DETAILED

230

props.setProperty("MetricsGranularity", "SHARD"); // GLOBAL, STREAM, SHARD

231

props.setProperty("MetricsNameSpace", "MyApp/KinesisProducer");

232

```

233

234

## Configuration Best Practices

235

236

### Security

237

- Use IAM roles instead of hardcoded access keys when possible

238

- Implement credential rotation policies

239

- Use least-privilege access policies

240

- Enable CloudTrail logging for audit trails

241

242

### Performance

243

- Configure appropriate shard limits based on expected throughput

244

- Use Enhanced Fan-Out for high-throughput consumers

245

- Tune GetRecords intervals based on latency requirements

246

- Enable adaptive reads for variable workloads

247

248

### Reliability

249

- Configure appropriate timeouts and retry policies

250

- Set up monitoring and alerting for consumer lag

251

- Use checkpointing for exactly-once processing guarantees

252

- Configure shard idle timeouts to prevent watermark stalling

253

254

### Cost Optimization

255

- Use standard polling instead of EFO for low-throughput streams

256

- Configure appropriate retention periods

257

- Use shard-level metrics only when needed

258

- Optimize batch sizes and intervals to reduce API calls

259

260

## Environment-Specific Configuration

261

262

### Development Environment

263

264

```java

265

// Relaxed settings for development

266

props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

267

props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "60000");

268

props.setProperty("MetricsLevel", "SUMMARY");

269

```

270

271

### Production Environment

272

273

```java

274

// Optimized settings for production

275

props.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "200");

276

props.setProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, "30000");

277

props.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");

278

props.setProperty("MetricsLevel", "DETAILED");

279

props.setProperty("MetricsGranularity", "SHARD");

280

281

// Enhanced monitoring

282

props.setProperty("CloudWatchMetricsEnabled", "true");

283

props.setProperty("CloudWatchMetricsNamespace", "MyApp/Kinesis");

284

```

285

286

### Testing Environment

287

288

```java

289

// Configuration for integration testing

290

props.setProperty(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566"); // LocalStack

291

props.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

292

props.setProperty("MetricsLevel", "NONE"); // Disable metrics for testing

293

```