The FlinkKinesisProducer enables high-throughput data ingestion to Amazon Kinesis Data Streams using the Kinesis Producer Library (KPL) with configurable partitioning, error handling, and backpressure management.
Main producer class for writing data to Kinesis streams with configurable partitioning strategies, error handling, and performance tuning options.
@PublicEvolving
public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT>
implements CheckpointedFunction {
// Metric constants
public static final String KINESIS_PRODUCER_METRIC_GROUP = "kinesisProducer";
public static final String METRIC_BACKPRESSURE_CYCLES = "backpressureCycles";
public static final String METRIC_OUTSTANDING_RECORDS_COUNT = "outstandingRecordsCount";
public static final String KINESIS_PRODUCER_RELEASE_HOOK_NAME = "kinesisProducer";
/**
* Create producer with standard serialization schema.
*
* @param schema Standard Flink serialization schema
* @param configProps AWS and producer configuration properties
*/
public FlinkKinesisProducer(SerializationSchema<OUT> schema, Properties configProps);
/**
* Create producer with Kinesis-specific serialization schema.
*
* @param schema Kinesis serialization schema with target stream specification
* @param configProps AWS and producer configuration properties
*/
public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps);
/**
* Configure error handling behavior.
*
* @param failOnError If true, fail the job on any production error; if false, log and continue
*/
public void setFailOnError(boolean failOnError);
/**
* Set the maximum number of outstanding records before backpressuring.
*
* @param queueLimit Maximum outstanding records (default: Integer.MAX_VALUE)
*/
public void setQueueLimit(int queueLimit);
/**
* Set the default target stream for records.
*
* @param defaultStream Default stream name (can be overridden by serialization schema)
*/
public void setDefaultStream(String defaultStream);
/**
* Set the default partition for records.
*
* @param defaultPartition Default partition ID (can be overridden by serialization schema)
*/
public void setDefaultPartition(String defaultPartition);
/**
* Set custom partitioner for distributing records across shards.
*
* @param partitioner Custom partitioner implementation
*/
public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner);
/**
* Initialize the producer with runtime configuration.
*
* @param parameters Runtime configuration parameters
* @throws Exception On initialization errors
*/
public void open(Configuration parameters) throws Exception;
/**
* Send a record to Kinesis.
*
* @param value Record to send
* @param context Sink context with additional metadata
* @throws Exception On send errors
*/
public void invoke(OUT value, Context context) throws Exception;
/**
* Close the producer and cleanup resources.
*
* @throws Exception On cleanup errors
*/
public void close() throws Exception;
/**
* Initialize state for checkpointing.
*
* @param context Function initialization context
* @throws Exception On initialization errors
*/
public void initializeState(FunctionInitializationContext context) throws Exception;
/**
* Create snapshot of current state for checkpointing.
*
* @param context Function snapshot context
* @throws Exception On snapshot errors
*/
public void snapshotState(FunctionSnapshotContext context) throws Exception;
}import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
// Configure AWS properties
Properties props = new Properties();
props.setProperty(AWSConfigConstants.AWS_REGION, "us-west-2");
props.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your-access-key");
props.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your-secret-key");
// Create producer with simple string serialization
FlinkKinesisProducer<String> producer = new FlinkKinesisProducer<>(
new SimpleStringSchema(),
props
);
// Configure producer settings
producer.setDefaultStream("my-output-stream");
producer.setFailOnError(true);
producer.setQueueLimit(1000);
// Add to data stream
dataStream.addSink(producer);import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
import java.nio.ByteBuffer;
// Custom serialization schema with target stream selection
KinesisSerializationSchema<MyEvent> customSerializer = new KinesisSerializationSchema<MyEvent>() {
@Override
public ByteBuffer serialize(MyEvent element) {
// Convert to JSON or other format
String json = toJson(element);
return ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8));
}
@Override
public String getTargetStream(MyEvent element) {
// Route to different streams based on event type
return "events-" + element.getEventType().toLowerCase();
}
};
FlinkKinesisProducer<MyEvent> producer = new FlinkKinesisProducer<>(
customSerializer,
props
);import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
// Custom partitioner for load balancing
KinesisPartitioner<MyEvent> customPartitioner = new KinesisPartitioner<MyEvent>() {
@Override
public String getPartitionId(MyEvent element) {
// Partition by user ID for user-based ordering
return String.valueOf(element.getUserId() % 100);
}
@Override
public String getExplicitHashKey(MyEvent element) {
// Optional: provide explicit hash key for finer control
return String.valueOf(element.getUserId());
}
};
producer.setCustomPartitioner(customPartitioner);import org.apache.flink.streaming.connectors.kinesis.FixedKinesisPartitioner;
// Use fixed partitioner to ensure each Flink partition maps to same Kinesis partition
FixedKinesisPartitioner<MyEvent> fixedPartitioner = new FixedKinesisPartitioner<>();
producer.setCustomPartitioner(fixedPartitioner);import org.apache.flink.streaming.connectors.kinesis.RandomKinesisPartitioner;
// Use random partitioner for even distribution
RandomKinesisPartitioner<MyEvent> randomPartitioner = new RandomKinesisPartitioner<>();
producer.setCustomPartitioner(randomPartitioner);// Configure for high throughput with KPL settings
props.setProperty("RecordMaxBufferedTime", "100"); // 100ms batching
props.setProperty("MaxConnections", "24"); // More connections
props.setProperty("RequestTimeout", "6000"); // 6 second timeout
props.setProperty("RecordTtl", "30000"); // 30 second TTL
// Configure aggregation
props.setProperty("AggregationEnabled", "true");
props.setProperty("AggregationMaxCount", "4294967295");
props.setProperty("AggregationMaxSize", "51200");
// Set higher queue limit for buffering
producer.setQueueLimit(10000);
// Configure error handling for high throughput
producer.setFailOnError(false); // Log errors but continue processing// Enable checkpointing for exactly-once guarantees
env.enableCheckpointing(60000); // Checkpoint every minute
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Configure checkpoint cleanup
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// The producer automatically participates in checkpointing
// Records are only considered committed after successful checkpointKey configuration properties for the FlinkKinesisProducer:
RecordMaxBufferedTime: Maximum time to buffer records (default: 100ms)RecordTtl: Time-to-live for records in buffer (default: 30000ms)RequestTimeout: Timeout for HTTP requests (default: 6000ms)MaxConnections: Maximum concurrent connections (default: 24)AggregationEnabled: Enable record aggregation (default: true)AggregationMaxCount: Maximum records per aggregate (default: 4294967295)AggregationMaxSize: Maximum aggregate size in bytes (default: 51200)RetryDuration: Maximum retry duration (default: 10000ms)MetricsLevel: CloudWatch metrics level (NONE, SUMMARY, DETAILED)MetricsGranularity: Metrics granularity (GLOBAL, STREAM, SHARD)The producer provides several mechanisms for handling errors and ensuring reliability:
MaxConnections for higher parallelismRecordMaxBufferedTime for lower latencyRecordMaxBufferedTime to minimize buffering delay