The FlinkKafkaProducer011 provides transactional Kafka message production with exactly-once semantics, flexible partitioning, and comprehensive delivery guarantee options optimized for Kafka 0.11.x.
Main producer class for writing to Kafka 0.11.x topics with transaction support and exactly-once guarantees.
/**
* Kafka producer for Kafka 0.11.x with support for transactional writes and exactly-once semantics
* Extends TwoPhaseCommitSinkFunction for transaction coordination
*/
@PublicEvolving
class FlinkKafkaProducer011<IN> extends TwoPhaseCommitSinkFunction<IN, KafkaTransactionState, KafkaTransactionContext> {
// Simple constructors for basic use cases
FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
// Keyed serialization constructors
FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic);
FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner);
// Full constructor with all options
FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize);
}Usage Examples:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
// Basic producer setup
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
"output-topic",
new SimpleStringSchema(),
props
);
// Producer with exactly-once semantics
props.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(
"critical-topic",
new SimpleStringSchema(),
props,
Semantic.EXACTLY_ONCE
);
// Producer with custom partitioner
FlinkFixedPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(
"partitioned-topic",
new SimpleStringSchema(),
props,
Optional.of(partitioner)
);Configure delivery guarantees and transaction behavior.
/**
* Delivery semantics for the Kafka producer
*/
enum Semantic {
/** Transactional writes with exactly-once guarantees */
EXACTLY_ONCE,
/** At-least-once delivery semantics */
AT_LEAST_ONCE,
/** No delivery guarantees */
NONE
}Usage Examples:
// Exactly-once semantics (requires Kafka transactions)
Properties exactlyOnceProps = new Properties();
exactlyOnceProps.setProperty("bootstrap.servers", "localhost:9092");
exactlyOnceProps.setProperty("transaction.timeout.ms", "900000");
exactlyOnceProps.setProperty("enable.idempotence", "true");
FlinkKafkaProducer011<String> exactlyOnceProducer = new FlinkKafkaProducer011<>(
"reliable-topic",
new SimpleStringSchema(),
exactlyOnceProps,
Semantic.EXACTLY_ONCE
);
// At-least-once semantics (faster, but may have duplicates)
FlinkKafkaProducer011<String> atLeastOnceProducer = new FlinkKafkaProducer011<>(
"fast-topic",
new SimpleStringSchema(),
props,
Semantic.AT_LEAST_ONCE
);
// No guarantees (fastest, fire-and-forget)
FlinkKafkaProducer011<String> fireForgetProducer = new FlinkKafkaProducer011<>(
"logs-topic",
new SimpleStringSchema(),
props,
Semantic.NONE
);Essential configuration options and constants for optimal producer behavior.
/**
* Configuration constants for producer tuning
*/
static final int SAFE_SCALE_DOWN_FACTOR = 5; // Safe scale down factor for transactional IDs
static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; // Default producer pool size
static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1); // Default transaction timeout
static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; // Configuration key for disabling metricsUsage Examples:
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
// Transaction configuration for exactly-once
producerProps.setProperty("transaction.timeout.ms", "900000"); // 15 minutes
producerProps.setProperty("enable.idempotence", "true");
producerProps.setProperty("retries", "2147483647"); // Max retries
producerProps.setProperty("max.in.flight.requests.per.connection", "5");
// Performance tuning
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "5");
producerProps.setProperty("buffer.memory", "33554432");
producerProps.setProperty("compression.type", "snappy");
// Flink-specific configuration
producerProps.setProperty("flink.disable-metrics", "false");
// Create producer with custom pool size
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
"my-topic",
new KeyedSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord<>("my-topic", element.getBytes());
}
},
producerProps,
Optional.empty(),
Semantic.EXACTLY_ONCE,
10 // Custom producer pool size
);Additional configuration methods for specialized use cases.
/**
* Configure timestamp writing to Kafka records
* @param writeTimestampToKafka whether to write Flink timestamps to Kafka
*/
void setWriteTimestampToKafka(boolean writeTimestampToKafka);
/**
* Configure error handling mode
* @param logFailuresOnly if true, only log failures instead of failing the job
*/
void setLogFailuresOnly(boolean logFailuresOnly);
/**
* Override transaction timeout handling for specific error scenarios
* @return this producer instance for method chaining
*/
FlinkKafkaProducer011<IN> ignoreFailuresAfterTransactionTimeout();Usage Examples:
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(
"timestamped-topic",
new SimpleStringSchema(),
props,
Semantic.EXACTLY_ONCE
);
// Configure timestamp writing
producer.setWriteTimestampToKafka(true);
// Configure error handling for non-critical data
FlinkKafkaProducer011<String> loggingProducer = new FlinkKafkaProducer011<>(
"logs-topic",
new SimpleStringSchema(),
props,
Semantic.AT_LEAST_ONCE
);
loggingProducer.setLogFailuresOnly(true);
// Handle transaction timeout gracefully
producer.ignoreFailuresAfterTransactionTimeout();
// Add to DataStream
dataStream.addSink(producer).name("Kafka Sink");Implement custom partitioning strategies for message distribution.
/**
* Base class for custom Kafka partitioners
*/
@PublicEvolving
abstract class FlinkKafkaPartitioner<T> implements Serializable {
/**
* Initialize the partitioner
* @param parallelInstanceId the parallel instance ID of this subtask
* @param parallelInstances total number of parallel instances
*/
void open(int parallelInstanceId, int parallelInstances);
/**
* Determine the partition for a record
* @param record the record to partition
* @param key the record key
* @param value the record value
* @param targetTopic the target topic name
* @param partitions available partition IDs
* @return the partition ID to use
*/
abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
/**
* Fixed partitioner ensuring each Flink partition goes to one Kafka partition
*/
@PublicEvolving
class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
// Implementation ensures deterministic partition assignment
}Usage Examples:
// Custom partitioner based on record content
public class CustomPartitioner<String> extends FlinkKafkaPartitioner<String> {
@Override
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Hash-based partitioning on record content
return Math.abs(record.hashCode()) % partitions.length;
}
}
// Use custom partitioner
CustomPartitioner<String> customPartitioner = new CustomPartitioner<>();
FlinkKafkaProducer011<String> partitionedProducer = new FlinkKafkaProducer011<>(
"custom-partitioned-topic",
new SimpleStringSchema(),
props,
Optional.of(customPartitioner),
Semantic.EXACTLY_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE
);The producer integrates with the Flink Kafka exception hierarchy for comprehensive error management.
// Producer operations may throw FlinkKafka011Exception for transactional or configuration errors
// Common error scenarios:
// - PRODUCERS_POOL_EMPTY: No available producers in the pool
// - EXTERNAL_ERROR: Kafka broker or network issuesConfiguration for resilient operation:
Properties resilientProps = new Properties();
resilientProps.setProperty("bootstrap.servers", "localhost:9092");
resilientProps.setProperty("retries", "2147483647");
resilientProps.setProperty("max.in.flight.requests.per.connection", "5");
resilientProps.setProperty("enable.idempotence", "true");
resilientProps.setProperty("acks", "all");
resilientProps.setProperty("transaction.timeout.ms", "600000"); // 10 minutes
resilientProps.setProperty("delivery.timeout.ms", "600000");
resilientProps.setProperty("request.timeout.ms", "300000");