Abstract base implementations for Kafka producers that provide exactly-once delivery semantics, serialization handling, and partitioning logic. These classes handle the complexities of reliable message production while delegating version-specific operations to concrete implementations.
The core abstract base class that all Flink Kafka producers extend. Provides comprehensive functionality for producing to Kafka topics with exactly-once processing guarantees and transaction support.
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
implements CheckpointedFunction {
public FlinkKafkaProducerBase(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner
);
}Parameters:
defaultTopicId - Default target topic for messages (can be overridden by serialization schema)serializationSchema - Schema for serializing elements to Kafka key-value messagesproducerConfig - Kafka producer configuration propertiescustomPartitioner - Custom partitioner for determining target partitions (optional, can be null)Usage Example:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("transaction.timeout.ms", "900000");
FlinkKafkaProducerBase<MyEvent> producer = new MyKafkaProducer(
"events-topic",
new MyEventSerializationSchema(),
props,
new FlinkFixedPartitioner<>()
);Configure how the producer handles failures during message production.
public void setLogFailuresOnly(boolean logFailuresOnly);Parameters:
logFailuresOnly - If true, failures are only logged (not thrown). If false, failures cause job failure.Usage Example:
// Log failures but continue processing (not recommended for production)
producer.setLogFailuresOnly(true);
// Fail job on any production failure (recommended for exactly-once)
producer.setLogFailuresOnly(false);Control whether messages are flushed synchronously during checkpoints for exactly-once guarantees.
public void setFlushOnCheckpoint(boolean flush);Parameters:
flush - If true, all pending messages are flushed during checkpoints (required for exactly-once)Usage Example:
// Enable checkpoint flushing for exactly-once guarantees
producer.setFlushOnCheckpoint(true);Static utility methods for common configuration tasks.
public static Properties getPropertiesFromBrokerList(String brokerList);Parameters:
brokerList - Comma-separated list of Kafka brokers (host:port format)Returns: Properties object with bootstrap.servers configured
Usage Example:
Properties props = FlinkKafkaProducerBase.getPropertiesFromBrokerList("broker1:9092,broker2:9092");
// Additional properties can be set on the returned Properties object
props.setProperty("transaction.timeout.ms", "900000");Core method for sending messages to Kafka (called by Flink runtime).
public void invoke(IN next, Context context) throws Exception;Parameters:
next - The record to be sent to Kafkacontext - Sink context providing additional informationThis method is called by the Flink runtime for each record and should not be called directly by user code.
Handle checkpointing and transaction coordination (implemented by the framework).
public void initializeState(FunctionInitializationContext context) throws Exception;
public void snapshotState(FunctionSnapshotContext context) throws Exception;These methods handle the exactly-once semantics by coordinating with Kafka transactions and Flink checkpoints.
public void open(Configuration configuration);
public void close() throws Exception;These methods handle producer lifecycle management including Kafka client initialization and cleanup.
Concrete implementations must implement this version-specific method:
protected abstract void flush();This method must flush all pending records to ensure they are sent to Kafka. Called during checkpoints to guarantee exactly-once processing when setFlushOnCheckpoint(true) is configured.
Methods and fields available to concrete implementations:
protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props);
protected void checkErroneous() throws Exception;
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer);
protected long numPendingRecords();Protected Fields:
protected final Properties producerConfig;
protected final String defaultTopicId;
protected final KeyedSerializationSchema<IN> schema;
protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
protected boolean logFailuresOnly;
protected boolean flushOnCheckpoint;Methods:
getKafkaProducer() - Factory method for creating Kafka producer instancescheckErroneous() - Check for and throw any pending async exceptionsgetPartitionsByTopic() - Utility to discover partitions for a topicnumPendingRecords() - Get count of unacknowledged records (useful for monitoring)Constants:
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";For exactly-once processing guarantees, configure the producer as follows:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("transaction.timeout.ms", "900000");
props.setProperty("max.in.flight.requests.per.connection", "1");
props.setProperty("retries", "2147483647");
props.setProperty("enable.idempotence", "true");
producer.setFlushOnCheckpoint(true);
producer.setLogFailuresOnly(false);For at-least-once processing with higher throughput:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "2147483647");
producer.setFlushOnCheckpoint(false);
producer.setLogFailuresOnly(false);For maximum throughput with at-least-once guarantees:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "1");
props.setProperty("batch.size", "16384");
props.setProperty("linger.ms", "5");
props.setProperty("compression.type", "lz4");
producer.setFlushOnCheckpoint(false);
producer.setLogFailuresOnly(true);The producer handles various types of failures:
retries configurationlogFailuresOnly settingWhen logFailuresOnly is false (recommended), any production failure will cause the Flink job to fail and restart, ensuring no data loss with proper checkpoint configuration.