tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
The Producer API publishes streams of records to Kafka topics. Producers are thread-safe and can be shared across multiple threads.
Main producer interface for sending records to Kafka.
package org.apache.kafka.clients.producer;
public interface Producer<K, V> extends Closeable {
// Sending records
Future<RecordMetadata> send(ProducerRecord<K, V> record);
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
// Flushing and lifecycle
void flush();
void close();
void close(Duration timeout);
// Transactions
void initTransactions();
void beginTransaction();
void commitTransaction();
void abortTransaction();
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata);
// Metadata and operations
List<PartitionInfo> partitionsFor(String topic);
Map<MetricName, ? extends Metric> metrics();
Uuid clientInstanceId(Duration timeout);
// Metric subscription (for monitoring)
void registerMetricForSubscription(KafkaMetric metric);
void unregisterMetricFromSubscription(KafkaMetric metric);
}Thread-safe implementation of the Producer interface.
package org.apache.kafka.clients.producer;
public class KafkaProducer<K, V> implements Producer<K, V> {
// Constructor with Properties
public KafkaProducer(Properties properties);
public KafkaProducer(Properties properties,
Serializer<K> keySerializer,
Serializer<V> valueSerializer);
// Constructor with Map
public KafkaProducer(Map<String, Object> configs);
public KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer);
// All Producer interface methods implemented
}A key/value pair to be sent to Kafka.
package org.apache.kafka.clients.producer;
public class ProducerRecord<K, V> {
// Full constructor
public ProducerRecord(String topic,
Integer partition,
Long timestamp,
K key,
V value,
Iterable<Header> headers);
// Commonly used constructors
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, V value);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
// Accessors
public String topic();
public Integer partition();
public Long timestamp();
public K key();
public V value();
public Headers headers();
}Usage Example:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
// Simple record with key and value
ProducerRecord<String, String> record1 =
new ProducerRecord<>("my-topic", "key", "value");
// Record with specific partition
ProducerRecord<String, String> record2 =
new ProducerRecord<>("my-topic", 0, "key", "value");
// Record with timestamp
ProducerRecord<String, String> record3 =
new ProducerRecord<>("my-topic", null, System.currentTimeMillis(), "key", "value");
// Record with headers
List<Header> headers = Arrays.asList(
new RecordHeader("header-key", "header-value".getBytes())
);
ProducerRecord<String, String> record4 =
new ProducerRecord<>("my-topic", null, null, "key", "value", headers);Metadata for a record that has been acknowledged by the server.
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.TopicPartition;
public final class RecordMetadata {
// Offset information
public long offset();
public boolean hasOffset();
// Timestamp information
public long timestamp();
public boolean hasTimestamp();
// Partition and topic
public int partition();
public String topic();
public TopicPartition topicPartition();
// Size information
public int serializedKeySize();
public int serializedValueSize();
}Usage Example:
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(); // Blocking call
System.out.println("Sent to topic: " + metadata.topic());
System.out.println("Partition: " + metadata.partition());
System.out.println("Offset: " + metadata.offset());
System.out.println("Timestamp: " + metadata.timestamp());Transaction state for two-phase commit transactions.
package org.apache.kafka.clients.producer;
public class PreparedTxnState {
public long producerId();
public short epoch();
public boolean hasTransaction();
@Override
public String toString();
}Functional interface for asynchronous record acknowledgment.
package org.apache.kafka.clients.producer;
@FunctionalInterface
public interface Callback {
/**
* Called when the record has been acknowledged by the server or failed.
* Exactly one of metadata or exception will be non-null.
*/
void onCompletion(RecordMetadata metadata, Exception exception);
}Usage Example:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Send failed: " + exception.getMessage());
} else {
System.out.println("Sent successfully to " +
metadata.topic() + "-" + metadata.partition() +
" at offset " + metadata.offset());
}
}
});
// Lambda syntax
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Send failed: " + exception.getMessage());
} else {
System.out.println("Sent to offset " + metadata.offset());
}
});Custom partition selection strategy.
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface Partitioner extends Configurable, Closeable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes Serialized key bytes (or null if no key)
* @param value The value to partition on or null
* @param valueBytes Serialized value bytes or null
* @param cluster The current cluster metadata
* @return The partition number
*/
int partition(String topic,
Object key,
byte[] keyBytes,
Object value,
byte[] valueBytes,
Cluster cluster);
void close();
// Optional: called when partitions change
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}Usage Example:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// Initialize partitioner with configuration
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// Custom partitioning logic
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
// Round-robin for null keys
return cluster.nextRandomPartition(topic);
}
// Hash-based partitioning for non-null keys
return Math.abs(key.hashCode()) % numPartitions;
}
@Override
public void close() {
// Cleanup resources
}
}
// Configure producer to use custom partitioner
Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());Kafka provides several built-in partitioner implementations:
Default partitioner (since Kafka 2.4) that provides better batching for records without keys. Records with the same key go to the same partition, while records without keys are distributed in a sticky manner to improve batching.
package org.apache.kafka.clients.producer.internals;
public class UniformStickyPartitioner implements Partitioner {
// Default behavior:
// - Records with keys: Hash-based partitioning
// - Records without keys: Sticky to one partition until batch is full, then switch
// - Provides better throughput due to improved batching
}Configuration (implicit default):
// UniformStickyPartitioner is the default - no configuration needed
Properties props = new Properties();
// ... other producer config
// Producer will automatically use UniformStickyPartitionerLegacy default partitioner (deprecated in favor of UniformStickyPartitioner). Uses round-robin for records without keys.
package org.apache.kafka.clients.producer.internals;
@Deprecated
public class DefaultPartitioner implements Partitioner {
// Behavior:
// - Records with keys: Hash-based partitioning
// - Records without keys: Round-robin across available partitions
}Configuration:
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());Distributes records evenly across all partitions in a round-robin fashion, regardless of key.
package org.apache.kafka.clients.producer;
public class RoundRobinPartitioner implements Partitioner {
// Behavior:
// - Ignores record keys
// - Distributes records evenly across all partitions
// - Useful when you want uniform distribution regardless of keys
}Configuration:
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());Partitioning Behavior Summary:
| Partitioner | Records with Keys | Records without Keys | Use Case |
|---|---|---|---|
| UniformStickyPartitioner (default) | Hash-based | Sticky until batch full | Best throughput, recommended |
| DefaultPartitioner (deprecated) | Hash-based | Round-robin | Legacy compatibility |
| RoundRobinPartitioner | Round-robin (ignores key) | Round-robin | Uniform distribution |
Intercept and potentially mutate records before sending.
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
* Called before record is sent. Can modify the record.
*/
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
* Called when record acknowledgment is received.
*/
void onAcknowledgement(RecordMetadata metadata, Exception exception);
/**
* Called when record acknowledgment is received (with headers).
*/
default void onAcknowledgement(RecordMetadata metadata,
Exception exception,
Headers headers) {
onAcknowledgement(metadata, exception);
}
void close();
}Usage Example:
import org.apache.kafka.clients.producer.*;
import java.util.Map;
public class MetricsInterceptor implements ProducerInterceptor<String, String> {
private long successCount = 0;
private long failureCount = 0;
@Override
public void configure(Map<String, ?> configs) {}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// Add timestamp header
record.headers().add("sent-timestamp",
String.valueOf(System.currentTimeMillis()).getBytes());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCount++;
} else {
failureCount++;
}
}
@Override
public void close() {
System.out.println("Success: " + successCount + ", Failures: " + failureCount);
}
}
// Configure interceptor
Properties props = new Properties();
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
MetricsInterceptor.class.getName());Round-robin partitioning strategy across all available partitions.
package org.apache.kafka.clients.producer;
public class RoundRobinPartitioner implements Partitioner {
// Distributes records evenly across all partitions
}Usage:
Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
"org.apache.kafka.clients.producer.RoundRobinPartitioner");Mock producer for testing purposes.
package org.apache.kafka.clients.producer;
public class MockProducer<K, V> implements Producer<K, V> {
public MockProducer();
public MockProducer(boolean autoComplete,
Serializer<K> keySerializer,
Serializer<V> valueSerializer);
// Test helpers
public List<ProducerRecord<K, V>> history();
public List<ProducerRecord<K, V>> uncommittedRecords();
public boolean completeNext();
public boolean errorNext(RuntimeException e);
public void clear();
// Transaction state
public boolean transactionInitialized();
public boolean transactionInFlight();
public boolean transactionCommitted();
public boolean transactionAborted();
public boolean flushed();
public boolean closed();
}Usage Example:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
// Create mock producer
MockProducer<String, String> mockProducer = new MockProducer<>(
true, // auto-complete
new StringSerializer(),
new StringSerializer()
);
// Send records
ProducerRecord<String, String> record =
new ProducerRecord<>("test-topic", "key", "value");
mockProducer.send(record);
// Verify records were sent
List<ProducerRecord<String, String>> history = mockProducer.history();
assertEquals(1, history.size());
assertEquals("test-topic", history.get(0).topic());
assertEquals("key", history.get(0).key());Configuration constants for the producer.
package org.apache.kafka.clients.producer;
public class ProducerConfig extends AbstractConfig {
// Required configurations
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
// Optional but commonly used
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String ACKS_CONFIG = "acks"; // "all", "1", "0"
public static final String RETRIES_CONFIG = "retries";
public static final String BATCH_SIZE_CONFIG = "batch.size";
public static final String LINGER_MS_CONFIG = "linger.ms";
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
// Reliability
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION =
"max.in.flight.requests.per.connection";
// Timeouts
public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
// Partitioning
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
public static final String PARTITIONER_IGNORE_KEYS_CONFIG = "partitioner.ignore.keys";
public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG =
"partitioner.adaptive.partitioning.enable";
// Compression levels
public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level";
public static final String COMPRESSION_LZ4_LEVEL_CONFIG = "compression.lz4.level";
public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = "compression.zstd.level";
// Metrics
public static final String ENABLE_METRICS_PUSH_CONFIG = "enable.metrics.push";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
// Two-phase commit
public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG =
"transaction.two.phase.commit.enable";
// Interceptors
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
}Basic Configuration:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);High Throughput Configuration:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // Wait up to 10ms
// Compression
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Buffer memory
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB
Producer<String, String> producer = new KafkaProducer<>(props);Exactly-Once Configuration:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Idempotence (enabled by default in 3.0+)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Transactions
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
// Reliability
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();Configuration Defaults:
batch.size: 16384 (16KB)linger.ms: 5buffer.memory: 33554432 (32MB)max.request.size: 1048576 (1MB)acks: "all"retries: 2147483647 (Integer.MAX_VALUE)enable.idempotence: truemax.in.flight.requests.per.connection: 5compression.type: "none"delivery.timeout.ms: 120000 (2 minutes)request.timeout.ms: 30000 (30 seconds)max.block.ms: 60000 (1 minute)transaction.timeout.ms: 60000 (1 minute)transaction.two.phase.commit.enable: falsepartitioner.adaptive.partitioning.enable: truepartitioner.ignore.keys: falseenable.metrics.push: trueimport org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
RecordMetadata metadata = producer.send(record).get(); // Blocking
System.out.println("Sent to partition " + metadata.partition() +
" at offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error sending record: " + exception.getMessage());
} else {
System.out.println("Sent record to " + metadata.topic() +
"-" + metadata.partition() + " at offset " + metadata.offset());
}
});
}
producer.close();import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
producer.send(record); // No callback, no waiting
}
producer.flush(); // Ensure all records are sent
producer.close();import org.apache.kafka.clients.producer.*;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// Send records
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
producer.send(record);
}
// Commit transaction
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
} finally {
producer.close();
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
// Configure transactional producer
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
// Configure consumer with read_committed isolation
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("input-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
try {
producer.beginTransaction();
// Process and send records
for (ConsumerRecord<String, String> record : records) {
String transformedValue = record.value().toUpperCase();
ProducerRecord<String, String> outputRecord =
new ProducerRecord<>("output-topic", record.key(), transformedValue);
producer.send(outputRecord);
}
// Commit consumer offsets within transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords =
records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
}Cannot allocate memory for sending.
package org.apache.kafka.common.errors;
public class BufferExhaustedException extends TimeoutException {
// Thrown when producer cannot allocate memory from buffer pool
// Usually due to high send rate or insufficient buffer.memory
}Handling:
try {
producer.send(record).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof BufferExhaustedException) {
// Increase buffer.memory or reduce send rate
System.err.println("Buffer exhausted, slowing down sends");
Thread.sleep(100);
}
}import org.apache.kafka.common.errors.*;
// Record too large for configured max.request.size
public class RecordTooLargeException extends ApiException {}
// Operation timeout
public class TimeoutException extends ApiException {}
// Authentication failure
public class AuthenticationException extends ApiException {}
// Authorization failure
public class AuthorizationException extends ApiException {}
// Topic does not exist
public class UnknownTopicOrPartitionException extends RetriableException {}
// Invalid topic name
public class InvalidTopicException extends ApiException {}
// Serialization error
public class SerializationException extends KafkaException {}The KafkaProducer is thread-safe and can be shared across multiple threads:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.*;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Single producer instance shared across threads
Producer<String, String> producer = new KafkaProducer<>(props);
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
final int threadId = i;
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic",
"thread-" + threadId + "-key-" + j,
"value-" + j);
producer.send(record);
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
producer.close();Always close the producer to release resources:
Producer<String, String> producer = new KafkaProducer<>(props);
try {
// Use producer
} finally {
producer.close(); // or producer.close(Duration.ofSeconds(30))
}Implement proper error handling for production use:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Log and potentially retry
System.err.println("Retriable error: " + exception.getMessage());
} else if (exception instanceof RecordTooLargeException) {
// Record is too large, cannot retry
System.err.println("Record too large: " + exception.getMessage());
} else if (exception instanceof AuthorizationException) {
// Not authorized to write to topic
System.err.println("Authorization error: " + exception.getMessage());
} else {
// Other non-retriable errors
System.err.println("Fatal error: " + exception.getMessage());
}
}
});Access producer metrics for monitoring:
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Map<MetricName, ? extends Metric> metrics = producer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (entry.getKey().name().equals("record-send-rate")) {
System.out.println("Record send rate: " + entry.getValue().metricValue());
}
}Key metrics to monitor:
record-send-rate: Records sent per secondrecord-error-rate: Records that resulted in errorsrecord-retry-rate: Records that were retriedbuffer-available-bytes: Available buffer memorybatch-size-avg: Average batch sizecompression-rate-avg: Average compression ratiorequest-latency-avg: Average request latencySymptoms:
buffer-exhausted-rate metricCauses:
Solutions:
// Increase buffer memory
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB
// Increase max block time
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000); // 2 minutes
// Implement backpressure
try {
producer.send(record).get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Slow down or drop records
System.err.println("Send timeout, implementing backpressure");
Thread.sleep(100);
}Prevention:
buffer-available-bytes metricSymptoms:
Causes:
Solutions:
// Increase max request size
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152); // 2MB
// Enable compression to reduce size
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
// Or split large records
public List<ProducerRecord<String, String>> splitLargeRecord(
ProducerRecord<String, byte[]> largeRecord) {
byte[] data = largeRecord.value();
int chunkSize = 900000; // 900KB chunks
List<ProducerRecord<String, String>> chunks = new ArrayList<>();
for (int i = 0; i < data.length; i += chunkSize) {
int end = Math.min(i + chunkSize, data.length);
byte[] chunk = Arrays.copyOfRange(data, i, end);
ProducerRecord<String, String> chunkRecord = new ProducerRecord<>(
largeRecord.topic(),
largeRecord.key(),
Base64.getEncoder().encodeToString(chunk)
);
// Add metadata headers
chunkRecord.headers().add("chunk-id", String.valueOf(i / chunkSize).getBytes());
chunkRecord.headers().add("total-chunks",
String.valueOf((data.length + chunkSize - 1) / chunkSize).getBytes());
chunks.add(chunkRecord);
}
return chunks;
}Prevention:
Symptoms:
request-latency-avg metricCauses:
Solutions:
// Increase timeout configurations
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 60 seconds
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 180000); // 3 minutes
// Implement exponential backoff retry
private RecordMetadata sendWithExponentialBackoff(
ProducerRecord<String, String> record,
int maxRetries) throws Exception {
int retries = 0;
long backoffMs = 100;
while (retries < maxRetries) {
try {
return producer.send(record).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof TimeoutException) {
retries++;
if (retries >= maxRetries) {
throw e;
}
System.err.println("Timeout on attempt " + retries +
", retrying after " + backoffMs + "ms");
Thread.sleep(backoffMs);
backoffMs = Math.min(backoffMs * 2, 10000); // Max 10s
} else {
throw e;
}
}
}
throw new RuntimeException("Failed after " + maxRetries + " retries");
}Prevention:
Symptoms:
Causes:
Solutions:
// Ensure unique transactional.id per instance
String transactionalId = "my-app-" + hostname + "-" + instanceId;
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
// Increase transaction timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_MS_CONFIG, 120000); // 2 minutes
// Handle fencing gracefully
try {
producer.beginTransaction();
producer.send(record);
producer.commitTransaction();
} catch (ProducerFencedException e) {
// Fatal: another producer with same ID is active
System.err.println("Producer fenced: " + e.getMessage());
producer.close();
// Create new producer with different transactional.id
// or shutdown this instance
}Prevention:
// Null keys are valid and will use default partitioner
ProducerRecord<String, String> nullKeyRecord =
new ProducerRecord<>("my-topic", null, "value");
// With UniformStickyPartitioner (default):
// - Null keys go to sticky partition until batch full
// - Different null-key records may go to different partitions
// - No ordering guarantee for null-key records
// To ensure all null-key records go to same partition:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
"org.apache.kafka.clients.producer.RoundRobinPartitioner");
// Or specify partition explicitly:
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", 0, null, "value");// Null values are valid and represent tombstones (deletion markers)
ProducerRecord<String, String> tombstone =
new ProducerRecord<>("compacted-topic", "key-to-delete", null);
producer.send(tombstone);
// In compacted topics:
// - Null value marks key for deletion
// - Useful for compaction cleanup
// - Requires compaction enabled on topic
// Create compacted topic:
NewTopic compactedTopic = new NewTopic("my-compact-topic", 3, (short) 2);
Map<String, String> configs = new HashMap<>();
configs.put("cleanup.policy", "compact");
configs.put("min.compaction.lag.ms", "0");
configs.put("delete.retention.ms", "86400000");
compactedTopic.configs(configs);// Sending many records rapidly
List<ProducerRecord<String, String>> records = generateLargeRecordSet();
// Potential issues:
// 1. Buffer exhaustion
// 2. Memory pressure
// 3. Long blocking on close()
// Solution: Batch with flow control
int batchSize = 1000;
for (int i = 0; i < records.size(); i += batchSize) {
int end = Math.min(i + batchSize, records.size());
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (int j = i; j < end; j++) {
futures.add(producer.send(records.get(j)));
}
// Wait for batch to complete before next batch
for (Future<RecordMetadata> future : futures) {
try {
future.get();
} catch (Exception e) {
System.err.println("Batch send error: " + e.getMessage());
}
}
}// Producer automatically retries on broker failures
// Configure retries and timeout appropriately
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 180000); // 3 minutes
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 1 minute
// Handle broker unavailability
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof org.apache.kafka.common.errors.BrokerNotAvailableException) {
System.err.println("Broker not available, will retry automatically");
// No action needed, producer will retry
} else if (exception instanceof org.apache.kafka.common.errors.NetworkException) {
System.err.println("Network error, will retry automatically");
} else {
System.err.println("Fatal error: " + exception.getMessage());
// Handle non-retriable errors
}
}
});// Transaction exceeds transaction.timeout.ms
try {
producer.beginTransaction();
// Long-running operations
for (int i = 0; i < 10000; i++) {
producer.send(new ProducerRecord<>("topic", "key-" + i, "value-" + i));
// Expensive processing between sends
doExpensiveWork(i);
}
producer.commitTransaction();
} catch (org.apache.kafka.common.errors.TimeoutException e) {
// Transaction timed out
producer.abortTransaction();
// Solution: Break into smaller transactions
int batchSize = 1000;
for (int batch = 0; batch < 10; batch++) {
producer.beginTransaction();
for (int i = batch * batchSize; i < (batch + 1) * batchSize; i++) {
producer.send(new ProducerRecord<>("topic", "key-" + i, "value-" + i));
}
producer.commitTransaction();
}
}// Producer automatically handles partition leader changes
// Metadata refresh happens transparently
// Monitor leader changes via callbacks
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof org.apache.kafka.common.errors.NotLeaderOrFollowerException ||
exception instanceof org.apache.kafka.common.errors.LeaderNotAvailableException) {
System.out.println("Leader changed, producer will retry with updated metadata");
// No action needed, automatic retry
}
} else {
// Success - check if partition leader is as expected
System.out.println("Sent to partition " + metadata.partition());
}
});
// Force metadata refresh if needed
List<PartitionInfo> partitions = producer.partitionsFor("my-topic");
// This call refreshes metadata if staleProperties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Maximize batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // Wait 20ms for batching
// Enable compression
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // Fast compression
// Increase buffer
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB
// Increase in-flight requests
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 10);
// Async sends
Producer<String, String> producer = new KafkaProducer<>(props);Throughput vs Latency Tradeoffs:
batch.size → Higher throughput, higher latencylinger.ms → Higher throughput, higher latencymax.in.flight.requests → Higher throughput, potential ordering issuesProperties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Minimize batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1); // Send immediately
props.put(ProducerConfig.LINGER_MS_CONFIG, 0); // No waiting
// No compression (adds latency)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
// Reduce in-flight requests
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
// Synchronous sends for lowest latency
producer.send(record).get();Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Maximum durability
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Transactions for exactly-once
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
// Conservative timeouts
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();Durability Levels:
acks=0: No durability, fire and forgetacks=1: Leader acknowledgment onlyacks=all: All in-sync replicas must acknowledge (highest durability)delivery.timeout.ms Relationship:
// delivery.timeout.ms >= linger.ms + request.timeout.ms
// If violated, producer fails to start
// Example configuration:
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // Must be >= 30010
// Violation example (will throw ConfigException):
// props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 20000); // ERROR: < request.timeout.msIdempotence Requirements:
// When enable.idempotence=true (default), the following must be set:
// - acks = all (or -1)
// - max.in.flight.requests.per.connection <= 5
// - retries > 0
// Valid idempotent configuration:
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Required
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // Must be <= 5
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // Required > 0
// Invalid configuration (will throw ConfigException):
// props.put(ProducerConfig.ACKS_CONFIG, "1"); // ERROR: must be "all"
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 10); // ERROR: > 5Transaction Requirements:
// Transactions require idempotence
// Setting transactional.id automatically enables idempotence
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-txn-id");
// Automatically sets:
// - enable.idempotence = true
// - acks = all
// - retries = Integer.MAX_VALUE
// Must call initTransactions() before use
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Required before first transaction// DON'T DO THIS - Very inefficient
public void sendMessage(String topic, String key, String value) {
Properties props = new Properties();
// ... configure producer
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>(topic, key, value));
producer.close(); // Expensive operation
}
// DO THIS - Reuse producer instance
private final Producer<String, String> producer = createProducer();
public void sendMessage(String topic, String key, String value) {
producer.send(new ProducerRecord<>(topic, key, value));
}
// Close producer on application shutdown
public void shutdown() {
producer.close();
}// DON'T DO THIS - Silent failures
producer.send(record); // No callback, no checking
// DO THIS - Handle failures appropriately
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Send failed: " + exception.getMessage());
// Log to monitoring system
// Implement retry logic
// Alert on persistent failures
} else {
// Optional: Log success metrics
}
});
// Or for critical data:
try {
RecordMetadata metadata = producer.send(record).get();
// Verify success
} catch (Exception e) {
// Handle failure - retry, alert, etc.
}// DON'T DO THIS - Can hang indefinitely
Properties props = new Properties();
// ... no timeout configuration
// DO THIS - Set appropriate timeouts
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); // 1 minute
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30 seconds
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2 minutes
// And use timeout on blocking calls
try {
RecordMetadata metadata = producer.send(record).get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Handle timeout
}// DON'T DO THIS - Resource leak
public void processData() {
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record);
// Missing close() - threads and connections not cleaned up
}
// DO THIS - Always close
public void processData() {
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(record).get();
} // Automatically closed
}
// Or with explicit try-finally
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(record).get();
} finally {
producer.close(Duration.ofSeconds(30)); // With timeout
}// Calculate required buffer size:
// buffer.memory >= peak_send_rate * avg_record_size * latency_seconds
// Example calculation:
// - Peak send rate: 10,000 records/second
// - Average record size: 1KB
// - Expected max latency: 3 seconds
// Required buffer: 10,000 * 1,024 * 3 = 30,720,000 bytes (~30MB)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB (with safety margin)
// Monitor actual usage
Map<MetricName, ? extends Metric> metrics = producer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (entry.getKey().name().equals("buffer-available-bytes")) {
long available = (long) entry.getValue().metricValue();
long total = 33554432L;
long used = total - available;
double usagePercent = (used * 100.0) / total;
if (usagePercent > 80) {
System.err.println("WARNING: Buffer usage at " + usagePercent + "%");
}
}
}// Compression algorithm selection guide:
// - snappy: Fast, moderate compression (default for high throughput)
// - lz4: Fastest, good compression (best for real-time)
// - gzip: Slow, best compression (best for bandwidth-limited)
// - zstd: Good balance, configurable levels
// Benchmark different compression types
String[] compressionTypes = {"none", "snappy", "lz4", "gzip", "zstd"};
for (String compression : compressionTypes) {
Properties testProps = new Properties();
testProps.putAll(props);
testProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression);
Producer<String, String> testProducer = new KafkaProducer<>(testProps);
long startTime = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
testProducer.send(new ProducerRecord<>("test-topic", "key-" + i,
generateTestData())).get();
}
long duration = System.currentTimeMillis() - startTime;
System.out.println(compression + " compression: " + duration + "ms");
testProducer.close();
}
// Typical results (depends on data):
// - none: Fastest, largest network usage
// - lz4: ~10-20% slower than none, 40-60% size reduction
// - snappy: Similar to lz4
// - zstd: ~30-50% slower, 50-70% size reduction
// - gzip: ~2-3x slower, 60-80% size reductionImplement custom batching logic for optimal throughput:
import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.*;
public class BatchingProducer {
private final Producer<String, String> producer;
private final BlockingQueue<ProducerRecord<String, String>> recordQueue;
private final int batchSize;
private final long batchTimeoutMs;
private volatile boolean running = true;
public BatchingProducer(Properties props, int batchSize, long batchTimeoutMs) {
this.producer = new KafkaProducer<>(props);
this.recordQueue = new LinkedBlockingQueue<>();
this.batchSize = batchSize;
this.batchTimeoutMs = batchTimeoutMs;
// Start batch processing thread
new Thread(this::processBatches).start();
}
public void sendAsync(String topic, String key, String value) throws InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
recordQueue.put(record);
}
private void processBatches() {
List<ProducerRecord<String, String>> batch = new ArrayList<>();
long lastBatchTime = System.currentTimeMillis();
while (running) {
try {
// Poll with timeout
ProducerRecord<String, String> record = recordQueue.poll(
batchTimeoutMs, TimeUnit.MILLISECONDS);
if (record != null) {
batch.add(record);
}
// Send batch if size or time threshold reached
long now = System.currentTimeMillis();
boolean sizeThreshold = batch.size() >= batchSize;
boolean timeThreshold = (now - lastBatchTime) >= batchTimeoutMs;
if (sizeThreshold || (timeThreshold && !batch.isEmpty())) {
sendBatch(batch);
batch.clear();
lastBatchTime = now;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// Send remaining records
if (!batch.isEmpty()) {
sendBatch(batch);
}
}
private void sendBatch(List<ProducerRecord<String, String>> batch) {
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (ProducerRecord<String, String> record : batch) {
futures.add(producer.send(record));
}
// Wait for all sends to complete
int successCount = 0;
int failureCount = 0;
for (Future<RecordMetadata> future : futures) {
try {
future.get(30, TimeUnit.SECONDS);
successCount++;
} catch (Exception e) {
failureCount++;
System.err.println("Batch send error: " + e.getMessage());
}
}
System.out.println("Batch sent: " + successCount + " succeeded, " +
failureCount + " failed");
}
public void close() {
running = false;
producer.close();
}
}Send records to specific partitions based on custom logic:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.*;
public class PartitionAwareProducer {
/**
* Custom partitioner that routes records based on key hash range
*/
public static class RangePartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
return 0; // Default partition for null keys
}
// Use consistent hashing for key-based routing
int hash = Math.abs(key.hashCode());
return hash % numPartitions;
}
@Override
public void close() {}
}
/**
* Priority-based partitioner that routes high-priority messages to specific partitions
*/
public static class PriorityPartitioner implements Partitioner {
private static final int HIGH_PRIORITY_PARTITION = 0;
private static final int NORMAL_PRIORITY_PARTITION_OFFSET = 1;
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// Check for priority header in value
if (value != null && value.toString().startsWith("HIGH:")) {
return HIGH_PRIORITY_PARTITION;
}
// Round-robin for normal priority across remaining partitions
if (numPartitions > 1) {
return NORMAL_PRIORITY_PARTITION_OFFSET +
(int) (System.nanoTime() % (numPartitions - 1));
}
return 0;
}
@Override
public void close() {}
}
/**
* Send records with explicit partition assignment
*/
public static void sendToSpecificPartition(Producer<String, String> producer,
String topic, int partition,
String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, partition, key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent to partition " + metadata.partition() +
" (requested: " + partition + ")");
} else {
System.err.println("Failed to send to partition " + partition +
": " + exception.getMessage());
}
});
}
}Implement circuit breaker pattern for producer resilience:
import org.apache.kafka.clients.producer.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class CircuitBreakerProducer {
private enum CircuitState { CLOSED, OPEN, HALF_OPEN }
private final Producer<String, String> producer;
private final int failureThreshold;
private final long timeoutMs;
private final AtomicInteger consecutiveFailures;
private final AtomicLong circuitOpenedTime;
private volatile CircuitState state;
public CircuitBreakerProducer(Properties props, int failureThreshold, long timeoutMs) {
this.producer = new KafkaProducer<>(props);
this.failureThreshold = failureThreshold;
this.timeoutMs = timeoutMs;
this.consecutiveFailures = new AtomicInteger(0);
this.circuitOpenedTime = new AtomicLong(0);
this.state = CircuitState.CLOSED;
}
public void send(ProducerRecord<String, String> record) {
if (!isCallAllowed()) {
throw new IllegalStateException(
"Circuit breaker is OPEN, rejecting send request");
}
producer.send(record, (metadata, exception) -> {
if (exception != null) {
onFailure();
} else {
onSuccess();
}
});
}
private boolean isCallAllowed() {
if (state == CircuitState.CLOSED) {
return true;
}
if (state == CircuitState.OPEN) {
long now = System.currentTimeMillis();
if (now - circuitOpenedTime.get() >= timeoutMs) {
// Transition to HALF_OPEN
state = CircuitState.HALF_OPEN;
System.out.println("Circuit breaker transitioning to HALF_OPEN");
return true;
}
return false;
}
// HALF_OPEN state
return true;
}
private void onSuccess() {
consecutiveFailures.set(0);
if (state == CircuitState.HALF_OPEN) {
state = CircuitState.CLOSED;
System.out.println("Circuit breaker CLOSED (recovered)");
}
}
private void onFailure() {
int failures = consecutiveFailures.incrementAndGet();
if (failures >= failureThreshold && state != CircuitState.OPEN) {
state = CircuitState.OPEN;
circuitOpenedTime.set(System.currentTimeMillis());
System.err.println("Circuit breaker OPENED after " + failures +
" consecutive failures");
}
}
public CircuitState getState() {
return state;
}
public void close() {
producer.close();
}
}Adjust producer behavior based on real-time metrics:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class AdaptiveProducer {
private final Producer<String, String> producer;
private final ScheduledExecutorService scheduler;
private volatile int currentBatchSize;
private volatile int currentLingerMs;
public AdaptiveProducer(Properties props) {
this.producer = new KafkaProducer<>(props);
this.currentBatchSize = Integer.parseInt(
props.getProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"));
this.currentLingerMs = Integer.parseInt(
props.getProperty(ProducerConfig.LINGER_MS_CONFIG, "0"));
this.scheduler = Executors.newSingleThreadScheduledExecutor();
// Start metric monitoring
scheduler.scheduleAtFixedRate(this::adjustSettings, 10, 10, TimeUnit.SECONDS);
}
private void adjustSettings() {
Map<MetricName, ? extends Metric> metrics = producer.metrics();
// Get key metrics
double recordSendRate = getMetricValue(metrics, "record-send-rate");
double recordErrorRate = getMetricValue(metrics, "record-error-rate");
double bufferAvailableBytes = getMetricValue(metrics, "buffer-available-bytes");
double batchSizeAvg = getMetricValue(metrics, "batch-size-avg");
// Adjust based on metrics
if (recordErrorRate > 0.01) { // > 1% error rate
System.out.println("High error rate detected, reducing send rate");
// Could implement backpressure here
}
if (bufferAvailableBytes < 1000000) { // < 1MB available
System.err.println("WARNING: Low buffer memory available: " +
bufferAvailableBytes + " bytes");
}
// Log adaptive metrics
System.out.println(String.format(
"Metrics - Send rate: %.2f/s, Error rate: %.4f%%, Avg batch: %.0f bytes",
recordSendRate, recordErrorRate * 100, batchSizeAvg));
}
private double getMetricValue(Map<MetricName, ? extends Metric> metrics, String name) {
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (entry.getKey().name().equals(name)) {
Object value = entry.getValue().metricValue();
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
}
}
return 0.0;
}
public void send(ProducerRecord<String, String> record) {
producer.send(record);
}
public Map<String, Object> getMetricsSummary() {
Map<String, Object> summary = new java.util.HashMap<>();
Map<MetricName, ? extends Metric> metrics = producer.metrics();
String[] importantMetrics = {
"record-send-rate", "record-error-rate", "request-latency-avg",
"buffer-available-bytes", "batch-size-avg", "compression-rate-avg"
};
for (String metricName : importantMetrics) {
double value = getMetricValue(metrics, metricName);
summary.put(metricName, value);
}
return summary;
}
public void close() {
scheduler.shutdown();
producer.close();
}
}Producer that can failover between multiple data centers:
import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class MultiDCProducer {
private final List<Producer<String, String>> producers;
private volatile int activeProduerIndex;
private final Object lock = new Object();
public MultiDCProducer(List<Properties> dcConfigs) {
this.producers = new ArrayList<>();
this.activeProduerIndex = 0;
for (Properties config : dcConfigs) {
producers.add(new KafkaProducer<>(config));
}
}
public RecordMetadata send(ProducerRecord<String, String> record, int maxRetries)
throws Exception {
int retries = 0;
Exception lastException = null;
while (retries < maxRetries) {
Producer<String, String> producer;
int currentIndex;
synchronized (lock) {
currentIndex = activeProduerIndex;
producer = producers.get(currentIndex);
}
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Sent to DC " + currentIndex +
", partition " + metadata.partition());
return metadata;
} catch (ExecutionException e) {
lastException = e;
System.err.println("Failed to send to DC " + currentIndex +
": " + e.getMessage());
// Try next DC
synchronized (lock) {
activeProduerIndex = (activeProduerIndex + 1) % producers.size();
}
retries++;
if (retries < maxRetries) {
Thread.sleep(1000 * retries); // Exponential backoff
}
}
}
throw new Exception("Failed to send after " + maxRetries + " retries", lastException);
}
public void closeAll() {
for (Producer<String, String> producer : producers) {
producer.close();
}
}
}Comprehensive metrics collection for producer monitoring:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.*;
import java.util.concurrent.*;
public class ProducerMetricsDashboard {
private final Producer<String, String> producer;
private final ScheduledExecutorService scheduler;
private final Map<String, List<Double>> metricHistory;
public ProducerMetricsDashboard(Producer<String, String> producer) {
this.producer = producer;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.metricHistory = new ConcurrentHashMap<>();
// Collect metrics every 5 seconds
scheduler.scheduleAtFixedRate(this::collectMetrics, 0, 5, TimeUnit.SECONDS);
}
private void collectMetrics() {
Map<MetricName, ? extends Metric> metrics = producer.metrics();
long timestamp = System.currentTimeMillis();
// Collect key metrics
String[] keyMetrics = {
"record-send-rate",
"record-send-total",
"record-error-rate",
"record-error-total",
"record-retry-rate",
"record-retry-total",
"request-latency-avg",
"request-latency-max",
"outgoing-byte-rate",
"buffer-available-bytes",
"buffer-total-bytes",
"batch-size-avg",
"batch-size-max",
"compression-rate-avg",
"requests-in-flight"
};
for (String metricName : keyMetrics) {
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (entry.getKey().name().equals(metricName)) {
Object value = entry.getValue().metricValue();
if (value instanceof Number) {
double numValue = ((Number) value).doubleValue();
metricHistory.computeIfAbsent(metricName,
k -> new CopyOnWriteArrayList<>()).add(numValue);
// Keep last 100 samples (8.3 minutes at 5s interval)
List<Double> history = metricHistory.get(metricName);
if (history.size() > 100) {
history.remove(0);
}
}
}
}
}
}
public void printDashboard() {
System.out.println("\n=== Producer Metrics Dashboard ===");
System.out.println("Timestamp: " + new Date());
System.out.println();
// Throughput
System.out.println("THROUGHPUT:");
printMetric(" Record Send Rate", "record-send-rate", "records/sec");
printMetric(" Outgoing Byte Rate", "outgoing-byte-rate", "bytes/sec");
System.out.println();
// Errors
System.out.println("ERRORS:");
printMetric(" Error Rate", "record-error-rate", "errors/sec");
printMetric(" Retry Rate", "record-retry-rate", "retries/sec");
double errorTotal = getLatestMetric("record-error-total");
double sendTotal = getLatestMetric("record-send-total");
double errorPercent = sendTotal > 0 ? (errorTotal / sendTotal * 100) : 0;
System.out.println(String.format(" Error Percentage: %.4f%%", errorPercent));
System.out.println();
// Latency
System.out.println("LATENCY:");
printMetric(" Avg Request Latency", "request-latency-avg", "ms");
printMetric(" Max Request Latency", "request-latency-max", "ms");
System.out.println();
// Batching & Compression
System.out.println("BATCHING & COMPRESSION:");
printMetric(" Avg Batch Size", "batch-size-avg", "bytes");
printMetric(" Max Batch Size", "batch-size-max", "bytes");
printMetric(" Compression Rate", "compression-rate-avg", "ratio");
System.out.println();
// Resource Usage
System.out.println("RESOURCES:");
double bufferAvailable = getLatestMetric("buffer-available-bytes");
double bufferTotal = getLatestMetric("buffer-total-bytes");
double bufferUsed = bufferTotal - bufferAvailable;
double bufferUsagePercent = bufferTotal > 0 ? (bufferUsed / bufferTotal * 100) : 0;
System.out.println(String.format(" Buffer Usage: %.2f MB / %.2f MB (%.1f%%)",
bufferUsed / 1024 / 1024, bufferTotal / 1024 / 1024, bufferUsagePercent));
printMetric(" Requests In Flight", "requests-in-flight", "count");
System.out.println();
// Alerts
System.out.println("ALERTS:");
checkAlerts();
System.out.println("===================================\n");
}
private void printMetric(String label, String metricName, String unit) {
double value = getLatestMetric(metricName);
double avg = getAverageMetric(metricName);
double max = getMaxMetric(metricName);
System.out.println(String.format("%s: %.2f %s (avg: %.2f, max: %.2f)",
label, value, unit, avg, max));
}
private double getLatestMetric(String metricName) {
List<Double> history = metricHistory.get(metricName);
if (history == null || history.isEmpty()) {
return 0.0;
}
return history.get(history.size() - 1);
}
private double getAverageMetric(String metricName) {
List<Double> history = metricHistory.get(metricName);
if (history == null || history.isEmpty()) {
return 0.0;
}
return history.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
}
private double getMaxMetric(String metricName) {
List<Double> history = metricHistory.get(metricName);
if (history == null || history.isEmpty()) {
return 0.0;
}
return history.stream().mapToDouble(Double::doubleValue).max().orElse(0.0);
}
private void checkAlerts() {
boolean hasAlerts = false;
// Check error rate
double errorRate = getLatestMetric("record-error-rate");
if (errorRate > 0.1) { // More than 0.1 errors/sec
System.out.println(" ⚠️ HIGH ERROR RATE: " + errorRate + " errors/sec");
hasAlerts = true;
}
// Check buffer usage
double bufferAvailable = getLatestMetric("buffer-available-bytes");
double bufferTotal = getLatestMetric("buffer-total-bytes");
double bufferUsagePercent = bufferTotal > 0 ?
((bufferTotal - bufferAvailable) / bufferTotal * 100) : 0;
if (bufferUsagePercent > 90) {
System.out.println(" ⚠️ HIGH BUFFER USAGE: " +
String.format("%.1f%%", bufferUsagePercent));
hasAlerts = true;
}
// Check latency
double avgLatency = getLatestMetric("request-latency-avg");
if (avgLatency > 1000) { // More than 1 second
System.out.println(" ⚠️ HIGH LATENCY: " + avgLatency + " ms");
hasAlerts = true;
}
if (!hasAlerts) {
System.out.println(" ✅ No alerts");
}
}
public void close() {
scheduler.shutdown();
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerMetricsDashboard dashboard = new ProducerMetricsDashboard(producer);
// Print dashboard every 10 seconds
ScheduledExecutorService printScheduler = Executors.newSingleThreadScheduledExecutor();
printScheduler.scheduleAtFixedRate(dashboard::printDashboard, 10, 10, TimeUnit.SECONDS);
// Keep running
Thread.sleep(Long.MAX_VALUE);
}
}