Apache Flink Kafka Connector for streaming data integration with Kafka 0.9.x versions
Streaming data sink functionality for producing to Kafka 0.9.x topics with configurable serialization, partitioning strategies, and reliability guarantees for high-throughput data pipelines.
Main Kafka producer class for writing data from Flink data streams to Kafka 0.9.x topics.
/**
* Kafka producer for writing data to Apache Kafka 0.9.x topics.
* Compatible with Kafka 0.9 without reliability guarantees.
*
* @param <IN> The type of records to write to Kafka
*/
@PublicEvolving
public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
/**
* Creates a producer with broker list and key-less serialization using fixed partitioner.
*
* @param brokerList Comma separated addresses of Kafka brokers
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined key-less serialization schema
*/
public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
/**
* Creates a producer with properties and key-less serialization using fixed partitioner.
*
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined key-less serialization schema
* @param producerConfig Properties with the producer configuration
*/
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates a producer with key-less serialization and custom partitioner.
*
* @param topicId The topic to write data to
* @param serializationSchema A key-less serializable serialization schema
* @param producerConfig Configuration properties for the KafkaProducer
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for round-robin
*/
public FlinkKafkaProducer09(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner);
/**
* Creates a producer with broker list and key/value serialization using fixed partitioner.
*
* @param brokerList Comma separated addresses of Kafka brokers
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined serialization schema supporting key/value messages
*/
public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
/**
* Creates a producer with properties and key/value serialization using fixed partitioner.
*
* @param topicId ID of the Kafka topic to write to
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration
*/
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates a producer with key/value serialization and custom partitioner.
*
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for key/value messages
* @param producerConfig Configuration properties for the KafkaProducer
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions, or null for key-based partitioning
*/
public FlinkKafkaProducer09(
String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner);
/**
* Sets whether the producer should only log failures instead of throwing exceptions.
*
* @param logFailuresOnly True to only log failures instead of throwing exceptions
*/
public void setLogFailuresOnly(boolean logFailuresOnly);
/**
* Sets whether the producer should flush pending records on checkpoint.
*
* @param flush True to flush on checkpoint, false otherwise
*/
public void setFlushOnCheckpoint(boolean flush);
}/**
* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
* Use FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner) instead.
*/
@Deprecated
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
/**
* @deprecated This constructor does not correctly handle partitioning when producing to multiple topics.
* Use FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner) instead.
*/
@Deprecated
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
"output-topic",
new SimpleStringSchema(),
properties
);
env.fromElements("Hello", "World", "Kafka")
.addSink(producer);
env.execute("Basic Kafka Producer");import org.apache.flink.api.common.serialization.SerializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
// Custom POJO
public class User {
public String name;
public int age;
public String email;
// constructors, getters, setters...
}
// JSON serialization schema
SerializationSchema<User> jsonSchema = new SerializationSchema<User>() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(User user) {
try {
return mapper.writeValueAsBytes(user);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize user", e);
}
}
};
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
"user-events",
jsonSchema,
properties
);
env.fromElements(
new User("Alice", 25, "alice@example.com"),
new User("Bob", 30, "bob@example.com")
).addSink(producer);import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
// Custom keyed serialization for key/value messages
KeyedSerializationSchema<User> keyedSchema = new KeyedSerializationSchema<User>() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serializeKey(User user) {
return user.email.getBytes(); // Use email as key
}
@Override
public byte[] serializeValue(User user) {
try {
return mapper.writeValueAsBytes(user);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize user", e);
}
}
@Override
public String getTargetTopic(User user) {
return null; // Use default topic
}
};
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
"keyed-users",
keyedSchema,
properties
);import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
// Custom partitioner based on user age
FlinkKafkaPartitioner<User> agePartitioner = new FlinkKafkaPartitioner<User>() {
@Override
public int partition(User record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Partition by age groups: 0-29, 30-49, 50+
int ageGroup = record.age < 30 ? 0 : (record.age < 50 ? 1 : 2);
return partitions[ageGroup % partitions.length];
}
};
FlinkKafkaProducer09<User> producer = new FlinkKafkaProducer09<>(
"age-partitioned-users",
jsonSchema,
properties,
agePartitioner
);Properties properties = new Properties();
// Required settings
properties.setProperty("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092");
// Performance tuning
properties.setProperty("batch.size", "16384");
properties.setProperty("linger.ms", "5");
properties.setProperty("compression.type", "snappy");
// Reliability settings (limited in Kafka 0.9)
properties.setProperty("acks", "1"); // 0=no ack, 1=leader ack, all=all replicas
properties.setProperty("retries", "3");
properties.setProperty("max.in.flight.requests.per.connection", "5");
// Buffer management
properties.setProperty("buffer.memory", "33554432");
properties.setProperty("max.block.ms", "60000");
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
"configured-topic",
new SimpleStringSchema(),
properties
);// Schema that can route to different topics based on record content
KeyedSerializationSchema<String> dynamicSchema = new KeyedSerializationSchema<String>() {
@Override
public byte[] serializeKey(String record) {
return null; // No key
}
@Override
public byte[] serializeValue(String record) {
return record.getBytes();
}
@Override
public String getTargetTopic(String record) {
// Route based on record content
if (record.startsWith("ERROR")) {
return "error-logs";
} else if (record.startsWith("WARN")) {
return "warning-logs";
} else {
return "info-logs";
}
}
};
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09<>(
"default-topic", // Fallback topic
dynamicSchema,
properties
);import org.apache.flink.streaming.api.datastream.DataStream;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create input stream
DataStream<String> inputStream = env.socketTextStream("localhost", 9999);
// Transform and produce to Kafka
inputStream
.filter(line -> !line.isEmpty())
.map(line -> line.toUpperCase())
.map(line -> "Processed at " + System.currentTimeMillis() + ": " + line)
.addSink(new FlinkKafkaProducer09<>(
"processed-data",
new SimpleStringSchema(),
properties
));
env.execute("Stream Processing to Kafka");Kafka 0.9 Limitations:
Common Issues:
SerializationException: Issues with custom serialization schemasTimeoutException: Network connectivity or broker availabilityRecordTooLargeException: Message exceeds broker limitsInvalidTopicException: Topic doesn't exist or invalid nameBest Practices:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-9-2-12