or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

producer.mddocs/

0

# Kinesis Producer

1

2

The FlinkKinesisProducer enables high-throughput data ingestion to Amazon Kinesis Data Streams using the Kinesis Producer Library (KPL) with configurable partitioning, error handling, and backpressure management.

3

4

## Capabilities

5

6

### FlinkKinesisProducer

7

8

Main producer class for writing data to Kinesis streams with configurable partitioning strategies, error handling, and performance tuning options.

9

10

```java { .api }

11

@PublicEvolving

12

public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>

13

implements CheckpointedFunction {

14

15

// Metric constants

16

public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";

17

public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";

18

public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";

19

public static final String KINESIS_PRODUCER_RELEASE_HOOK_NAME = "kinesisProducer";

20

21

/**

22

* Create producer with standard serialization schema.

23

*

24

* @param schema Standard Flink serialization schema

25

* @param configProps AWS and producer configuration properties

26

*/

27

public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);

28

29

/**

30

* Create producer with Kinesis-specific serialization schema.

31

*

32

* @param schema Kinesis serialization schema with target stream specification

33

* @param configProps AWS and producer configuration properties

34

*/

35

public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);

36

37

/**

38

* Configure error handling behavior.

39

*

40

* @param failOnError If true, fail the job on any production error; if false, log and continue

41

*/

42

public void setFailOnError(boolean failOnError);

43

44

/**

45

* Set the maximum number of outstanding records before backpressuring.

46

*

47

* @param queueLimit Maximum outstanding records (default: Integer.MAX_VALUE)

48

*/

49

public void setQueueLimit(int queueLimit);

50

51

/**

52

* Set the default target stream for records.

53

*

54

* @param defaultStream Default stream name (can be overridden by serialization schema)

55

*/

56

public void setDefaultStream(String defaultStream);

57

58

/**

59

* Set the default partition for records.

60

*

61

* @param defaultPartition Default partition ID (can be overridden by serialization schema)

62

*/

63

public void setDefaultPartition(String defaultPartition);

64

65

/**

66

* Set custom partitioner for distributing records across shards.

67

*

68

* @param partitioner Custom partitioner implementation

69

*/

70

public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);

71

72

/**

73

* Initialize the producer with runtime configuration.

74

*

75

* @param parameters Runtime configuration parameters

76

* @throws Exception On initialization errors

77

*/

78

public void open(Configuration parameters) throws Exception;

79

80

/**

81

* Send a record to Kinesis.

82

*

83

* @param value Record to send

84

* @param context Sink context with additional metadata

85

* @throws Exception On send errors

86

*/

87

public void invoke(OUT value, Context context) throws Exception;

88

89

/**

90

* Close the producer and cleanup resources.

91

*

92

* @throws Exception On cleanup errors

93

*/

94

public void close() throws Exception;

95

96

/**

97

* Initialize state for checkpointing.

98

*

99

* @param context Function initialization context

100

* @throws Exception On initialization errors

101

*/

102

public void initializeState(FunctionInitializationContext context) throws Exception;

103

104

/**

105

* Create snapshot of current state for checkpointing.

106

*

107

* @param context Function snapshot context

108

* @throws Exception On snapshot errors

109

*/

110

public void snapshotState(FunctionSnapshotContext context) throws Exception;

111

}

112

```

113

114

### Usage Examples

115

116

#### Basic Producer Setup

117

118

```java

119

import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

120

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

121

import org.apache.flink.api.common.serialization.SimpleStringSchema;

122

import java.util.Properties;

123

124

// Configure AWS properties

125

Properties props = new Properties();

126

props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");

127

props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");

128

props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");

129

130

// Create producer with simple string serialization

131

FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(

132

new SimpleStringSchema(),

133

props

134

);

135

136

// Configure producer settings

137

producer.setDefaultStream("my-output-stream");

138

producer.setFailOnError(true);

139

producer.setQueueLimit(1000);

140

141

// Add to data stream

142

dataStream.addSink(producer);

143

```

144

145

#### Producer with Custom Serialization

146

147

```java

148

import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;

149

import java.nio.ByteBuffer;

150

151

// Custom serialization schema with target stream selection

152

KinesisSerializationSchema<MyEvent> customSerializer = new KinesisSerializationSchema<MyEvent>() {

153

@Override

154

public ByteBuffer serialize(MyEvent element) {

155

// Convert to JSON or other format

156

String json = toJson(element);

157

return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));

158

}

159

160

@Override

161

public String getTargetStream(MyEvent element) {

162

// Route to different streams based on event type

163

return "events-" + element.getEventType().toLowerCase();

164

}

165

};

166

167

FlinkKinesisProducer<MyEvent> producer = new FlinkKinesisProducer<>(

168

customSerializer,

169

props

170

);

171

```

172

173

#### Producer with Custom Partitioning

174

175

```java

176

import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;

177

178

// Custom partitioner for load balancing

179

KinesisPartitioner<MyEvent> customPartitioner = new KinesisPartitioner<MyEvent>() {

180

@Override

181

public String getPartitionId(MyEvent element) {

182

// Partition by user ID for user-based ordering

183

return String.valueOf(element.getUserId() % 100);

184

}

185

186

@Override

187

public String getExplicitHashKey(MyEvent element) {

188

// Optional: provide explicit hash key for finer control

189

return String.valueOf(element.getUserId());

190

}

191

};

192

193

producer.setCustomPartitioner(customPartitioner);

194

```

195

196

#### Producer with Fixed Partitioning

197

198

```java

199

import org.apache.flink.streaming.connectors.kinesis.FixedKinesisPartitioner;

200

201

// Use fixed partitioner to ensure each Flink partition maps to same Kinesis partition

202

FixedKinesisPartitioner<MyEvent> fixedPartitioner = new FixedKinesisPartitioner<>();

203

producer.setCustomPartitioner(fixedPartitioner);

204

```

205

206

#### Producer with Random Partitioning

207

208

```java

209

import org.apache.flink.streaming.connectors.kinesis.RandomKinesisPartitioner;

210

211

// Use random partitioner for even distribution

212

RandomKinesisPartitioner<MyEvent> randomPartitioner = new RandomKinesisPartitioner<>();

213

producer.setCustomPartitioner(randomPartitioner);

214

```

215

216

#### High-Throughput Configuration

217

218

```java

219

// Configure for high throughput with KPL settings

220

props.setProperty("RecordMaxBufferedTime", "100"); // 100ms batching

221

props.setProperty("MaxConnections", "24"); // More connections

222

props.setProperty("RequestTimeout", "6000"); // 6 second timeout

223

props.setProperty("RecordTtl", "30000"); // 30 second TTL

224

225

// Configure aggregation

226

props.setProperty("AggregationEnabled", "true");

227

props.setProperty("AggregationMaxCount", "4294967295");

228

props.setProperty("AggregationMaxSize", "51200");

229

230

// Set higher queue limit for buffering

231

producer.setQueueLimit(10000);

232

233

// Configure error handling for high throughput

234

producer.setFailOnError(false); // Log errors but continue processing

235

```

236

237

#### Exactly-Once Guarantees with Checkpointing

238

239

```java

240

// Enable checkpointing for exactly-once guarantees

241

env.enableCheckpointing(60000); // Checkpoint every minute

242

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

243

244

// Configure checkpoint cleanup

245

env.getCheckpointConfig().enableExternalizedCheckpoints(

246

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION

247

);

248

249

// The producer automatically participates in checkpointing

250

// Records are only considered committed after successful checkpoint

251

```

252

253

### Configuration Options

254

255

Key configuration properties for the FlinkKinesisProducer:

256

257

#### KPL Configuration

258

- `RecordMaxBufferedTime`: Maximum time to buffer records (default: 100ms)

259

- `RecordTtl`: Time-to-live for records in buffer (default: 30000ms)

260

- `RequestTimeout`: Timeout for HTTP requests (default: 6000ms)

261

- `MaxConnections`: Maximum concurrent connections (default: 24)

262

263

#### Aggregation Settings

264

- `AggregationEnabled`: Enable record aggregation (default: true)

265

- `AggregationMaxCount`: Maximum records per aggregate (default: 4294967295)

266

- `AggregationMaxSize`: Maximum aggregate size in bytes (default: 51200)

267

268

#### Retry Configuration

269

- `RetryDuration`: Maximum retry duration (default: 10000ms)

270

- `MetricsLevel`: CloudWatch metrics level (NONE, SUMMARY, DETAILED)

271

- `MetricsGranularity`: Metrics granularity (GLOBAL, STREAM, SHARD)

272

273

### Error Handling and Reliability

274

275

The producer provides several mechanisms for handling errors and ensuring reliability:

276

277

#### Automatic Retry

278

- Built-in exponential backoff for transient failures

279

- Configurable retry duration and maximum attempts

280

- Automatic handling of throttling and service limits

281

282

#### Error Handling Modes

283

- **Fail-on-Error**: Fail the entire job on any production error

284

- **Log-and-Continue**: Log errors but continue processing other records

285

- **Custom Handling**: Implement custom error handling in serialization schema

286

287

#### Metrics and Monitoring

288

- **Backpressure Cycles**: Number of times producer was backpressured

289

- **Outstanding Records**: Current number of unacknowledged records

290

- **KPL Metrics**: Detailed metrics from Kinesis Producer Library

291

- **CloudWatch Integration**: Automatic metric publishing to CloudWatch

292

293

#### Memory Management

294

- Configurable queue limits to prevent out-of-memory errors

295

- Automatic backpressure when queue limits are reached

296

- Resource cleanup on job cancellation or failure

297

298

### Performance Tuning

299

300

#### Throughput Optimization

301

- Increase `MaxConnections` for higher parallelism

302

- Reduce `RecordMaxBufferedTime` for lower latency

303

- Enable aggregation for better throughput

304

- Tune queue limits based on memory availability

305

306

#### Latency Optimization

307

- Reduce `RecordMaxBufferedTime` to minimize buffering delay

308

- Disable aggregation for lowest latency

309

- Use direct partition assignment instead of random partitioning

310

- Configure smaller batch sizes

311

312

#### Resource Management

313

- Monitor outstanding records count to prevent memory issues

314

- Use appropriate queue limits based on record size and memory

315

- Configure KPL thread pool sizes based on CPU cores

316

- Set reasonable timeouts to prevent resource leaks