Apache Kafka 0.8.x connector for Apache Flink streaming data processing
The Flink Kafka producer enables reliable message production to Kafka 0.8.x topics with configurable partitioning and serialization strategies.
The primary producer class for Kafka 0.8.x integration with extensive configuration options.
/**
* Kafka producer for Apache Kafka 0.8.x (provides best-effort delivery guarantees)
*/
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
/**
* Creates producer with broker list, topic, and value-only serialization
* @param brokerList Comma-separated list of Kafka brokers
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing values
*/
public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
/**
* Creates producer with properties and value-only serialization
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing values
* @param producerConfig Kafka producer properties
*/
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates producer with properties, value serialization, and custom partitioner
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing values
* @param producerConfig Kafka producer properties
* @param customPartitioner Custom partitioner for message distribution
*/
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
/**
* Creates producer with broker list, topic, and key-value serialization
* @param brokerList Comma-separated list of Kafka brokers
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing keys and values
*/
public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
/**
* Creates producer with properties and key-value serialization
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing keys and values
* @param producerConfig Kafka producer properties
*/
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* Creates producer with properties, key-value serialization, and custom partitioner
* @param topicId Target Kafka topic name
* @param serializationSchema Schema for serializing keys and values
* @param producerConfig Kafka producer properties
* @param customPartitioner Custom partitioner for message distribution
*/
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner);
/**
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
*/
@Deprecated
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
/**
* @deprecated Use FlinkKafkaPartitioner instead of KafkaPartitioner
*/
@Deprecated
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner);
}Usage Examples:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import java.util.Properties;
// Basic producer with broker list
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"localhost:9092",
"output-topic",
new SimpleStringSchema()
);
DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);
// Producer with properties configuration
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "localhost:9092");
producerProps.setProperty("batch.size", "16384");
producerProps.setProperty("linger.ms", "10");
producerProps.setProperty("compression.type", "snappy");
FlinkKafkaProducer08<String> configuredProducer = new FlinkKafkaProducer08<>(
"events-topic",
new SimpleStringSchema(),
producerProps
);
stream.addSink(configuredProducer);
// Producer with custom partitioner
FlinkKafkaProducer08<String> partitionedProducer = new FlinkKafkaProducer08<>(
"partitioned-topic",
new SimpleStringSchema(),
producerProps,
new CustomPartitioner<>() // implement FlinkKafkaPartitioner
);
// Key-value producer
KeyedSerializationSchema<Tuple2<String, String>> keyValueSchema =
new KeyedSerializationSchemaWrapper<>(
new SimpleStringSchema(), // key serializer
new SimpleStringSchema() // value serializer
);
FlinkKafkaProducer08<Tuple2<String, String>> kvProducer = new FlinkKafkaProducer08<>(
"key-value-topic",
keyValueSchema,
producerProps
);
DataStream<Tuple2<String, String>> kvStream = env.fromElements(
Tuple2.of("key1", "value1"),
Tuple2.of("key2", "value2")
);
kvStream.addSink(kvProducer);Deprecated alias that redirects to FlinkKafkaProducer08.
/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig);
/**
* @deprecated Use FlinkKafkaProducer08 constructor instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner);
}none, gzip, snappy, lz4)0, 1, all)The producer supports various serialization schemas:
// Simple string serialization
SerializationSchema<String> stringSchema = new SimpleStringSchema();
// JSON serialization
SerializationSchema<MyObject> jsonSchema = new JSONSerializationSchema<>();
// Avro serialization (with schema registry)
SerializationSchema<MyAvroRecord> avroSchema = new AvroSerializationSchema<>(MyAvroRecord.class);
// Custom serialization
SerializationSchema<MyCustomType> customSchema = new SerializationSchema<MyCustomType>() {
@Override
public byte[] serialize(MyCustomType element) {
// Custom serialization logic
return element.toBytes();
}
};Implement custom partitioning logic:
public class MyCustomPartitioner implements FlinkKafkaPartitioner<String> {
@Override
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
// Custom partitioning logic
if (record.startsWith("urgent")) {
return 0; // Route urgent messages to partition 0
}
return record.hashCode() % partitions.length;
}
}
// Use custom partitioner
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"my-topic",
new SimpleStringSchema(),
props,
new MyCustomPartitioner()
);The producer handles errors according to Kafka 0.8.x limitations:
// Configure retry behavior
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("retries", "3");
props.setProperty("retry.backoff.ms", "1000");
props.setProperty("acks", "1"); // Wait for leader acknowledgment
// Error handling in application
try {
stream.addSink(producer);
env.execute("Kafka Producer Job");
} catch (Exception e) {
logger.error("Kafka producer failed", e);
// Implement fallback or recovery logic
}batch.size and linger.ms for throughputbuffer.memory based on throughput requirementsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-kafka-0-8-2-10