The Flink Kafka producer enables reliable message production to Kafka 0.8.x topics with configurable partitioning and serialization strategies.
The primary producer class for Kafka 0.8.x integration with extensive configuration options.
/**
* Kafka producer for Apache Kafka 0.8.x (provides best-effort delivery guarantees)
*/
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
/**
* Creates producer with broker list, topic, and value-only serialization
* @param brokerList Comma-separated list of Kafka brokers
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing values
*/
public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
/**
* Creates producer with properties and value-only serialization
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing values
* @param producerConfig Kafka producer properties
*/
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates producer with properties, value serialization, and custom partitioner
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing values
* @param producerConfig Kafka producer properties
* @param customPartitioner Custom partitioner for message distribution
*/
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
/**
* Creates producer with broker list, topic, and key-value serialization
* @param brokerList Comma-separated list of Kafka brokers
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing keys and values
*/
public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
/**
* Creates producer with properties and key-value serialization
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing keys and values
* @param producerConfig Kafka producer properties
*/
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates producer with properties, key-value serialization, and custom partitioner
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing keys and values
* @param producerConfig Kafka producer properties
* @param customPartitioner Custom partitioner for message distribution
*/
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
/**
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
*/
@Deprecated
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
/**
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
*/
@Deprecated
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
}Usage Examples:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import java.util.Properties;
// Basic producer with broker list
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"localhost:9092",
"output-topic",
new SimpleStringSchema()
);
DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);
// Producer with properties configuration
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "10");
producerProps.setProperty("compression.type", "snappy");
FlinkKafkaProducer08<String> configuredProducer = new FlinkKafkaProducer08<>(
"events-topic",
new SimpleStringSchema(),
producerProps
);
stream.addSink(configuredProducer);
// Producer with custom partitioner
FlinkKafkaProducer08<String> partitionedProducer = new FlinkKafkaProducer08<>(
"partitioned-topic",
new SimpleStringSchema(),
producerProps,
new CustomPartitioner<>() // implement FlinkKafkaPartitioner
);
// Key-value producer
KeyedSerializationSchema<Tuple2<String, String>> keyValueSchema =
new KeyedSerializationSchemaWrapper<>(
new SimpleStringSchema(), // key serializer
new SimpleStringSchema() // value serializer
);
FlinkKafkaProducer08<Tuple2<String, String>> kvProducer = new FlinkKafkaProducer08<>(
"key-value-topic",
keyValueSchema,
producerProps
);
DataStream<Tuple2<String, String>> kvStream = env.fromElements(
Tuple2.of("key1", "value1"),
Tuple2.of("key2", "value2")
);
kvStream.addSink(kvProducer);Deprecated alias that redirects to FlinkKafkaProducer08.
/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
}none, gzip, snappy, lz4)0, 1, all)The producer supports various serialization schemas:
// Simple string serialization
SerializationSchema<String> stringSchema = new SimpleStringSchema();
// JSON serialization
SerializationSchema<MyObject> jsonSchema = new JSONSerializationSchema<>();
// Avro serialization (with schema registry)
SerializationSchema<MyAvroRecord> avroSchema = new AvroSerializationSchema<>(MyAvroRecord.class);
// Custom serialization
SerializationSchema<MyCustomType> customSchema = new SerializationSchema<MyCustomType>() {
@Override
public byte[] serialize(MyCustomType element) {
// Custom serialization logic
return element.toBytes();
}
};Implement custom partitioning logic:
public class MyCustomPartitioner implements FlinkKafkaPartitioner<String> {
@Override
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Custom partitioning logic
if (record.startsWith("urgent")) {
return 0; // Route urgent messages to partition 0
}
return record.hashCode() % partitions.length;
}
}
// Use custom partitioner
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"my-topic",
new SimpleStringSchema(),
props,
new MyCustomPartitioner()
);The producer handles errors according to Kafka 0.8.x limitations:
// Configure retry behavior
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("retries", "3");
props.setProperty("retry.backoff.ms", "1000");
props.setProperty("acks", "1"); // Wait for leader acknowledgment
// Error handling in application
try {
stream.addSink(producer);
env.execute("Kafka Producer Job");
} catch (Exception e) {
logger.error("Kafka producer failed", e);
// Implement fallback or recovery logic
}batch.size and linger.ms for throughputbuffer.memory based on throughput requirements