Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees for high-throughput data pipelines.
Main Kafka producer class for writing data from Flink data streams to Kafka 0.9.x topics.
/**
* Kafka producer for writing data to Apache Kafka 0.9.x topics.
* Compatible with Kafka 0.9 without reliability guarantees.
*
* @param <IN> The type of records to write to Kafka
*/
@PublicEvolving
public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
/**
* Creates a producer with broker list and key-less serialization using fixed partitioner.
*
* @param brokerList Comma separated addresses of Kafka brokers
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined key-less serialization schema
*/
public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
/**
* Creates a producer with properties and key-less serialization using fixed partitioner.
*
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined key-less serialization schema
* @param producerConfig Properties with the producer configuration
*/
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates a producer with key-less serialization and custom partitioner.
*
* @param topicId The topic to write data to
* @param serializationSchema A key-less serializable serialization schema
* @param producerConfig Configuration properties for the KafkaProducer
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for round-robin
*/
public FlinkKafkaProducer09(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner);
/**
* Creates a producer with broker list and key/value serialization using fixed partitioner.
*
* @param brokerList Comma separated addresses of Kafka brokers
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined serialization schema supporting key/value messages
*/
public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
/**
* Creates a producer with properties and key/value serialization using fixed partitioner.
*
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration
*/
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates a producer with key/value serialization and custom partitioner.
*
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for key/value messages
* @param producerConfig Configuration properties for the KafkaProducer
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for key-based partitioning
*/
public FlinkKafkaProducer09(
String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner);
/**
* Sets whether the producer should only log failures instead of throwing exceptions.
*
* @param logFailuresOnly True to only log failures instead of throwing exceptions
*/
public void setLogFailuresOnly(boolean logFailuresOnly);
/**
* Sets whether the producer should flush pending records on checkpoint.
*
* @param flush True to flush on checkpoint, false otherwise
*/
public void setFlushOnCheckpoint(boolean flush);
}/**
* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
* Use FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner) instead.
*/
@Deprecated
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
/**
* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
* Use FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) instead.
*/
@Deprecated
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
"output-topic",
new SimpleStringSchema(),
properties
);
env.fromElements("Hello", "World", "Kafka")
.addSink(producer);
env.execute("Basic Kafka Producer");import org.apache.flink.api.common.serialization.SerializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
// Custom POJO
public class User {
public String name;
public int age;
public String email;
// constructors, getters, setters...
}
// JSON serialization schema
SerializationSchema<User> jsonSchema = new SerializationSchema<User>() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(User user) {
try {
return mapper.writeValueAsBytes(user);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize user", e);
}
}
};
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
"user-events",
jsonSchema,
properties
);
env.fromElements(
new User("Alice", 25, "alice@example.com"),
new User("Bob", 30, "bob@example.com")
).addSink(producer);import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
// Custom keyed serialization for key/value messages
KeyedSerializationSchema<User> keyedSchema = new KeyedSerializationSchema<User>() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serializeKey(User user) {
return user.email.getBytes(); // Use email as key
}
@Override
public byte[] serializeValue(User user) {
try {
return mapper.writeValueAsBytes(user);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize user", e);
}
}
@Override
public String getTargetTopic(User user) {
return null; // Use default topic
}
};
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
"keyed-users",
keyedSchema,
properties
);import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
// Custom partitioner based on user age
FlinkKafkaPartitioner<User> agePartitioner = new FlinkKafkaPartitioner<User>() {
@Override
public int partition(User record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition by age groups: 0-29, 30-49, 50+
int ageGroup = record.age < 30 ? 0 : (record.age < 50 ? 1 : 2);
return partitions[ageGroup % partitions.length];
}
};
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
"age-partitioned-users",
jsonSchema,
properties,
agePartitioner
);Properties properties = new Properties();
// Required settings
properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");
// Performance tuning
properties.setProperty("batch.size", "16384");
properties.setProperty("linger.ms", "5");
properties.setProperty("compression.type", "snappy");
// Reliability settings (limited in Kafka 0.9)
properties.setProperty("acks", "1"); // 0=no ack, 1=leader ack, all=all replicas
properties.setProperty("retries", "3");
properties.setProperty("max.in.flight.requests.per.connection", "5");
// Buffer management
properties.setProperty("buffer.memory", "33554432");
properties.setProperty("max.block.ms", "60000");
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
"configured-topic",
new SimpleStringSchema(),
properties
);// Schema that can route to different topics based on record content
KeyedSerializationSchema<String> dynamicSchema = new KeyedSerializationSchema<String>() {
@Override
public byte[] serializeKey(String record) {
return null; // No key
}
@Override
public byte[] serializeValue(String record) {
return record.getBytes();
}
@Override
public String getTargetTopic(String record) {
// Route based on record content
if (record.startsWith("ERROR")) {
return "error-logs";
} else if (record.startsWith("WARN")) {
return "warning-logs";
} else {
return "info-logs";
}
}
};
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
"default-topic", // Fallback topic
dynamicSchema,
properties
);import org.apache.flink.streaming.api.datastream.DataStream;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create input stream
DataStream<String> inputStream = env.socketTextStream("localhost", 9999);
// Transform and produce to Kafka
inputStream
.filter(line -> !line.isEmpty())
.map(line -> line.toUpperCase())
.map(line -> "Processed at " + System.currentTimeMillis() + ": " + line)
.addSink(new FlinkKafkaProducer09<>(
"processed-data",
new SimpleStringSchema(),
properties
));
env.execute("Stream Processing to Kafka");Kafka 0.9 Limitations:
Common Issues:
SerializationException: Issues with custom serialization schemasTimeoutException: Network connectivity or broker availabilityRecordTooLargeException: Message exceeds broker limitsInvalidTopicException: Topic doesn't exist or invalid nameBest Practices: