Apache Flink Kafka 0.10 connector for streaming data processing with exactly-once processing guarantees
—
The FlinkKafkaProducer010 provides comprehensive functionality for producing data to Apache Kafka 0.10.x topics with exactly-once processing guarantees, custom partitioning strategies, and timestamp support.
Creates producers that serialize only the record values, without keys.
/**
* Creates a FlinkKafkaProducer for a given topic using broker list
* @param brokerList Comma separated addresses of the brokers
* @param topicId ID of the Kafka topic
* @param serializationSchema User defined key-less serialization schema
*/
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema);
/**
* Creates a FlinkKafkaProducer for a given topic using properties
* @param topicId ID of the Kafka topic
* @param serializationSchema User defined key-less serialization schema
* @param producerConfig Properties with the producer configuration
*/
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig);
/**
* Creates a FlinkKafkaProducer with custom partitioning
* @param topicId The topic to write data to
* @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be distributed to Kafka partitions in a round-robin fashion
*/
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);Usage Examples:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import java.util.Properties;
// Simple producer with broker list
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(
"localhost:9092",
"output-topic",
new SimpleStringSchema()
);
// Producer with properties configuration
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "3");
FlinkKafkaProducer010<String> configuredProducer = new FlinkKafkaProducer010<>(
"output-topic",
new SimpleStringSchema(),
props
);
// Producer with custom partitioner
FlinkKafkaProducer010<MyEvent> eventProducer = new FlinkKafkaProducer010<>(
"events-topic",
new MyEventSerializationSchema(),
props,
new MyCustomPartitioner<>()
);Creates producers that serialize both keys and values, enabling key-based partitioning.
/**
* Creates a FlinkKafkaProducer with key-value serialization using broker list
* @param brokerList Comma separated addresses of the brokers
* @param topicId ID of the Kafka topic
* @param serializationSchema User defined serialization schema supporting key/value messages
*/
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema);
/**
* Creates a FlinkKafkaProducer with key-value serialization using properties
* @param topicId ID of the Kafka topic
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration
*/
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig);
/**
* Creates a FlinkKafkaProducer with key-value serialization and custom partitioning
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. If set to null, records will be partitioned by the key of each record
*/
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner);Usage Examples:
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
// Custom keyed serialization schema
KeyedSerializationSchema<MyEvent> keyedSchema = new KeyedSerializationSchema<MyEvent>() {
@Override
public byte[] serializeKey(MyEvent element) {
return element.getUserId().getBytes();
}
@Override
public byte[] serializeValue(MyEvent element) {
return element.toJson().getBytes();
}
@Override
public String getTargetTopic(MyEvent element) {
return null; // Use default topic
}
};
// Producer with key-value serialization
FlinkKafkaProducer010<MyEvent> keyedProducer = new FlinkKafkaProducer010<>(
"keyed-events-topic",
keyedSchema,
props
);
// Producer with custom partitioner for key-value data
FlinkKafkaProducer010<MyEvent> customKeyedProducer = new FlinkKafkaProducer010<>(
"partitioned-events-topic",
keyedSchema,
props,
new UserIdPartitioner<>()
);Configure the producer to write Flink's event time timestamps to Kafka records.
/**
* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
* Timestamps must be positive for Kafka to accept them.
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);Usage Examples:
FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(
"timestamped-events",
new MyEventSerializationSchema(),
props
);
// Enable timestamp writing to Kafka
producer.setWriteTimestampToKafka(true);
// Use in streaming pipeline
DataStream<MyEvent> events = env.addSource(new MyEventSource());
events.addSink(producer);Legacy factory methods for creating producers with timestamp support (deprecated in favor of constructor + setWriteTimestampToKafka).
/**
* @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig);
/**
* @deprecated Use FlinkKafkaProducer010(String, SerializationSchema, Properties) and call setWriteTimestampToKafka(boolean)
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
DataStream<T> inStream,
String topicId,
SerializationSchema<T> serializationSchema,
Properties producerConfig);
/**
* @deprecated Use FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) and call setWriteTimestampToKafka(boolean)
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(
DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<T> customPartitioner);/**
* Configuration wrapper for deprecated timestamp-enabled producer factory methods
* @deprecated Use constructor approach instead
*/
@Deprecated
public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
/**
* Configure failure logging behavior
* @param logFailuresOnly Flag indicating if failures should only be logged instead of causing job failure
*/
public void setLogFailuresOnly(boolean logFailuresOnly);
/**
* Configure checkpoint flushing behavior
* @param flush Flag indicating if producer should flush on checkpoint
*/
public void setFlushOnCheckpoint(boolean flush);
/**
* Configure timestamp writing to Kafka
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka);
}Required properties:
Recommended properties for exactly-once:
Performance tuning properties:
Implement custom partitioning logic by extending FlinkKafkaPartitioner:
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
public class UserIdPartitioner<T> extends FlinkKafkaPartitioner<MyEvent> {
@Override
public int partition(MyEvent record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition based on user ID hash
return Math.abs(record.getUserId().hashCode()) % partitions.length;
}
@Override
public void open(int parallelInstanceId, int parallelInstances) {
// Optional: Initialize partitioner
}
}
// Use custom partitioner
FlinkKafkaProducer010<MyEvent> producer = new FlinkKafkaProducer010<>(
"user-events",
new MyEventSerializationSchema(),
props,
new UserIdPartitioner<>()
);Configure the producer for exactly-once processing guarantees:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("acks", "all");
props.setProperty("retries", "3");
props.setProperty("enable.idempotence", "true");
props.setProperty("max.in.flight.requests.per.connection", "1");
FlinkKafkaProducer010<String> exactlyOnceProducer = new FlinkKafkaProducer010<>(
"exactly-once-topic",
new SimpleStringSchema(),
props
);
// Enable checkpointing in streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);The producer handles various error scenarios:
The producer integrates with Flink's fault tolerance mechanisms:
// Configure fault tolerance
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-10-2-12