Comprehensive API reference for Kafka 0.8 producer classes in the Apache Flink Kafka Connector.
Package: org.apache.flink.streaming.connectors.kafka
Annotations: @PublicEvolving
Extends: FlinkKafkaProducerBase<IN>
Description: Flink sink to produce data into a Kafka topic. Compatible with Kafka 0.8.x. Important Note: This producer does not have reliability guarantees and may lose messages on failures.
@PublicEvolving
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>private static final long serialVersionUID = 1L;/**
* Creates a FlinkKafkaProducer08 using a broker list and SerializationSchema.
*
* @param brokerList Comma separated list of Kafka brokers (e.g., "localhost:9092,broker2:9092")
* @param topicId Target Kafka topic name
* @param serializationSchema Schema to serialize objects to byte arrays
*/
public FlinkKafkaProducer08(String brokerList,
String topicId,
SerializationSchema<IN> serializationSchema)
/**
* Creates a FlinkKafkaProducer08 using Properties and SerializationSchema.
*
* @param topicId Target Kafka topic name
* @param serializationSchema Schema to serialize objects to byte arrays
* @param producerConfig Properties containing Kafka producer configuration
*/
public FlinkKafkaProducer08(String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig)
/**
* Creates a FlinkKafkaProducer08 with custom partitioner and SerializationSchema.
*
* @param topicId Target Kafka topic name
* @param serializationSchema Schema to serialize objects to byte arrays
* @param producerConfig Properties containing Kafka producer configuration
* @param customPartitioner Custom partitioner for determining target partition (can be null)
*/
public FlinkKafkaProducer08(String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner)/**
* Creates a FlinkKafkaProducer08 using broker list and KeyedSerializationSchema.
*
* @param brokerList Comma separated list of Kafka brokers
* @param topicId Target Kafka topic name
* @param serializationSchema Schema to serialize objects to key/value byte arrays
*/
public FlinkKafkaProducer08(String brokerList,
String topicId,
KeyedSerializationSchema<IN> serializationSchema)
/**
* Creates a FlinkKafkaProducer08 using Properties and KeyedSerializationSchema.
*
* @param topicId Target Kafka topic name
* @param serializationSchema Schema to serialize objects to key/value byte arrays
* @param producerConfig Properties containing Kafka producer configuration
*/
public FlinkKafkaProducer08(String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig)
/**
* Creates a FlinkKafkaProducer08 with custom partitioner and KeyedSerializationSchema.
*
* @param topicId Target Kafka topic name
* @param serializationSchema Schema to serialize objects to key/value byte arrays
* @param producerConfig Properties containing Kafka producer configuration
* @param customPartitioner Custom partitioner for determining target partition (can be null)
*/
public FlinkKafkaProducer08(String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner)/**
* @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner
*/
@Deprecated
public FlinkKafkaProducer08(String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
KafkaPartitioner<IN> customPartitioner)
/**
* @deprecated Use constructor with FlinkKafkaPartitioner instead of KafkaPartitioner
*/
@Deprecated
public FlinkKafkaProducer08(String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
KafkaPartitioner<IN> customPartitioner)/**
* Configures whether the producer should only log failures instead of throwing exceptions.
* Default is false (failures cause exceptions).
*
* @param logFailuresOnly If true, only log failures; if false, throw exceptions on failures
*/
public void setLogFailuresOnly(boolean logFailuresOnly)
/**
* Configures whether the producer should flush data on checkpoint operations.
* This can improve reliability at the cost of performance.
*
* @param flush If true, flush on checkpoints; if false, don't flush
*/
public void setFlushOnCheckpoint(boolean flush)/**
* Opens the producer and initializes resources.
*
* @param configuration The task configuration
*/
public void open(Configuration configuration)
/**
* Sends a record to Kafka.
*
* @param next The record to send
* @param context The sink context (can be used for side outputs)
* @throws Exception If sending fails
*/
public void invoke(IN next, Context context) throws Exception
/**
* Closes the producer and cleans up resources.
*
* @throws Exception If cleanup fails
*/
public void close() throws Exception/**
* Initializes the state of the function from a checkpoint.
*
* @param context The initialization context
* @throws Exception If state initialization fails
*/
public void initializeState(FunctionInitializationContext context) throws Exception
/**
* Snapshots the function's state during checkpointing.
*
* @param ctx The snapshot context
* @throws Exception If state snapshotting fails
*/
public void snapshotState(FunctionSnapshotContext ctx) throws Exception/**
* Converts a comma-separated broker list to Properties.
*
* @param brokerList Comma-separated list of brokers (e.g., "broker1:9092,broker2:9092")
* @return Properties object with "metadata.broker.list" set
*/
public static Properties getPropertiesFromBrokerList(String brokerList)/**
* Kafka 0.8 specific flush implementation (no-op since Kafka 0.8 doesn't support flushing).
*/
protected void flush()Package: org.apache.flink.streaming.connectors.kafka.partitioner
Description: Interface for custom partitioning strategies in Flink Kafka producers.
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
/**
* Returns the partition ID for a given record.
*
* @param record The record to partition
* @param key The serialized key (can be null)
* @param value The serialized value
* @param targetTopic The target topic name
* @param partitions Array of available partition IDs
* @return The partition ID (must be within partitions array bounds)
*/
public abstract int partition(T record, byte[] key, byte[] value,
String targetTopic, int[] partitions);
/**
* Called when the partitioner is opened. Override for initialization logic.
*
* @param parallelInstanceId The parallel instance ID of this subtask
* @param parallelInstances The total number of parallel instances
*/
public void open(int parallelInstanceId, int parallelInstances) {
// Default implementation is empty
}
}// Hash-based partitioning by customer ID
public class CustomerHashPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {
@Override
public int partition(CustomerEvent record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
int customerId = record.getCustomerId();
return Math.abs(customerId % partitions.length);
}
}
// Round-robin partitioning
public class RoundRobinPartitioner<T> extends FlinkKafkaPartitioner<T> {
private int counter = 0;
@Override
public int partition(T record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
return partitions[(counter++) % partitions.length];
}
}
// Time-based partitioning
public class TimeBasedPartitioner extends FlinkKafkaPartitioner<TimestampedEvent> {
@Override
public int partition(TimestampedEvent record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
// Partition based on hour of day
long timestamp = record.getTimestamp();
int hour = (int) ((timestamp / 3600000) % 24);
return partitions[hour % partitions.length];
}
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("metadata.broker.list", "localhost:9092");
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"my-topic",
new SimpleStringSchema(),
properties
);
DataStream<String> stream = env.fromElements("Hello", "World", "Kafka");
stream.addSink(producer);
env.execute("Kafka Producer Job");import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
// Custom partitioner implementation
FlinkKafkaPartitioner<MyEvent> partitioner = new FlinkKafkaPartitioner<MyEvent>() {
@Override
public int partition(MyEvent record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
// Partition based on event type
return Math.abs(record.getEventType().hashCode() % partitions.length);
}
};
FlinkKafkaProducer08<MyEvent> producer = new FlinkKafkaProducer08<>(
"events-topic",
new MyEventSerializer(),
properties,
partitioner
);
// Configure for reliability
producer.setLogFailuresOnly(false); // Fail on errors
producer.setFlushOnCheckpoint(true); // Flush on checkpoints
DataStream<MyEvent> events = // ... your event stream
events.addSink(producer);import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
KeyedSerializationSchema<CustomerOrder> schema = new KeyedSerializationSchema<CustomerOrder>() {
@Override
public byte[] serializeKey(CustomerOrder element) {
// Use customer ID as key for partitioning
return String.valueOf(element.getCustomerId()).getBytes();
}
@Override
public byte[] serializeValue(CustomerOrder element) {
// Serialize order details as JSON
return element.toJson().getBytes();
}
@Override
public String getTargetTopic(CustomerOrder element) {
// Dynamic topic selection based on order type
return "orders-" + element.getOrderType().toLowerCase();
}
};
FlinkKafkaProducer08<CustomerOrder> producer = new FlinkKafkaProducer08<>(
null, // topic will be determined by schema.getTargetTopic()
schema,
properties
);
DataStream<CustomerOrder> orders = // ... your order stream
orders.addSink(producer);Properties producerProps = new Properties();
// Required: Broker connection
producerProps.setProperty("metadata.broker.list", "broker1:9092,broker2:9092");
// Performance tuning
producerProps.setProperty("batch.num.messages", "200");
producerProps.setProperty("queue.buffering.max.ms", "1000");
producerProps.setProperty("queue.buffering.max.messages", "10000");
// Compression (optional)
producerProps.setProperty("compression.codec", "snappy"); // or "gzip", "lz4"
// Network settings
producerProps.setProperty("send.buffer.bytes", "102400");
producerProps.setProperty("client.id", "flink-kafka-producer");
// Reliability settings (Kafka 0.8 limitations apply)
producerProps.setProperty("request.required.acks", "1"); // 0, 1, or -1
producerProps.setProperty("request.timeout.ms", "10000");
producerProps.setProperty("message.send.max.retries", "3");
producerProps.setProperty("retry.backoff.ms", "100");
FlinkKafkaProducer08<MyData> producer = new FlinkKafkaProducer08<>(
"my-topic", new MyDataSerializer(), producerProps);// Configure producer for different reliability levels
FlinkKafkaProducer08<String> producer = new FlinkKafkaProducer08<>(
"my-topic", new SimpleStringSchema(), properties);
// Option 1: Fail fast on errors (default)
producer.setLogFailuresOnly(false);
// Option 2: Log errors but continue processing
producer.setLogFailuresOnly(true);
// Enable flushing on checkpoints for better reliability
producer.setFlushOnCheckpoint(true);
// Note: Kafka 0.8 producer limitations
// - No transactional guarantees
// - No exactly-once semantics
// - Messages may be lost on producer failures
// - No built-in retry mechanism for failed sends// Simple SerializationSchema example
public class JsonSerializationSchema<T> implements SerializationSchema<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(T element) {
try {
return objectMapper.writeValueAsBytes(element);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize to JSON", e);
}
}
}
// KeyedSerializationSchema with dynamic routing
public class RoutingKeyedSchema<T> implements KeyedSerializationSchema<T> {
@Override
public byte[] serializeKey(T element) {
if (element instanceof Keyed) {
return ((Keyed) element).getKey().getBytes();
}
return null; // No key
}
@Override
public byte[] serializeValue(T element) {
return element.toString().getBytes();
}
@Override
public String getTargetTopic(T element) {
if (element instanceof Routable) {
return ((Routable) element).getTargetTopic();
}
return null; // Use default topic
}
}Package: org.apache.flink.streaming.connectors.kafka
Annotations: @Deprecated
Extends: FlinkKafkaProducer08<IN>
@Deprecated
public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId,
SerializationSchema<IN> serializationSchema)
/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema,
Properties producerConfig)
/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema,
Properties producerConfig, KafkaPartitioner customPartitioner)
/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId,
KeyedSerializationSchema<IN> serializationSchema)
/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig)
/**
* @deprecated Use FlinkKafkaProducer08 instead
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig, KafkaPartitioner customPartitioner)Package: org.apache.flink.streaming.connectors.kafka.partitioner
Annotations: @PublicEvolving
Description: Abstract base class for custom partitioning logic determining which Kafka partition records should be written to.
@PublicEvolving
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
/**
* Initializer for the partitioner. Called once on each parallel sink instance.
*
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
* @param parallelInstances the total number of parallel instances
*/
public void open(int parallelInstanceId, int parallelInstances);
/**
* Determine the id of the partition that the record should be written to.
*
* @param record the record value
* @param key serialized key of the record
* @param value serialized value of the record
* @param targetTopic target topic for the record
* @param partitions found partitions for the target topic
* @return the id of the target partition
*/
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}public class CustomerIdPartitioner extends FlinkKafkaPartitioner<CustomerEvent> {
@Override
public int partition(CustomerEvent record, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
// Partition based on customer ID to ensure events for same customer go to same partition
return Math.abs(record.getCustomerId().hashCode() % partitions.length);
}
}
// Usage with producer
FlinkKafkaProducer08<CustomerEvent> producer = new FlinkKafkaProducer08<>(
"customer-events",
new CustomerEventSchema(),
properties,
new CustomerIdPartitioner()
);flush() method is a no-op in Kafka 0.8// Enable checkpointing for better reliability
env.enableCheckpointing(60000); // Checkpoint every minute
// Use flush on checkpoint
producer.setFlushOnCheckpoint(true);
// Configure appropriate retry settings in Kafka properties
Properties props = new Properties();
props.setProperty("message.send.max.retries", "3");
props.setProperty("retry.backoff.ms", "100");
props.setProperty("request.required.acks", "1"); // Wait for leader acknowledgment
// Consider using exactly-once sink if available in newer Flink versions
// For Kafka 0.8, implement custom retry logic if neededFor production systems requiring strong delivery guarantees:
flink-connector-kafka-0.9 or newer for exactly-once semantics