or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

clients

admin.mdconsumer.mdproducer.md
index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

producer.mddocs/clients/

Producer API

The Producer API publishes streams of records to Kafka topics. Producers are thread-safe and can be shared across multiple threads.

Core Interfaces

Producer<K, V>

Main producer interface for sending records to Kafka.

package org.apache.kafka.clients.producer;

public interface Producer<K, V> extends Closeable {
    // Sending records
    Future<RecordMetadata> send(ProducerRecord<K, V> record);
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

    // Flushing and lifecycle
    void flush();
    void close();
    void close(Duration timeout);

    // Transactions
    void initTransactions();
    void beginTransaction();
    void commitTransaction();
    void abortTransaction();
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                   ConsumerGroupMetadata groupMetadata);

    // Metadata and operations
    List<PartitionInfo> partitionsFor(String topic);
    Map<MetricName, ? extends Metric> metrics();
    Uuid clientInstanceId(Duration timeout);

    // Metric subscription (for monitoring)
    void registerMetricForSubscription(KafkaMetric metric);
    void unregisterMetricFromSubscription(KafkaMetric metric);
}

KafkaProducer<K, V>

Thread-safe implementation of the Producer interface.

package org.apache.kafka.clients.producer;

public class KafkaProducer<K, V> implements Producer<K, V> {
    // Constructor with Properties
    public KafkaProducer(Properties properties);
    public KafkaProducer(Properties properties,
                        Serializer<K> keySerializer,
                        Serializer<V> valueSerializer);

    // Constructor with Map
    public KafkaProducer(Map<String, Object> configs);
    public KafkaProducer(Map<String, Object> configs,
                        Serializer<K> keySerializer,
                        Serializer<V> valueSerializer);

    // All Producer interface methods implemented
}

Data Classes

ProducerRecord<K, V>

A key/value pair to be sent to Kafka.

package org.apache.kafka.clients.producer;

public class ProducerRecord<K, V> {
    // Full constructor
    public ProducerRecord(String topic,
                         Integer partition,
                         Long timestamp,
                         K key,
                         V value,
                         Iterable<Header> headers);

    // Commonly used constructors
    public ProducerRecord(String topic, K key, V value);
    public ProducerRecord(String topic, V value);
    public ProducerRecord(String topic, Integer partition, K key, V value);
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);

    // Accessors
    public String topic();
    public Integer partition();
    public Long timestamp();
    public K key();
    public V value();
    public Headers headers();
}

Usage Example:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

// Simple record with key and value
ProducerRecord<String, String> record1 =
    new ProducerRecord<>("my-topic", "key", "value");

// Record with specific partition
ProducerRecord<String, String> record2 =
    new ProducerRecord<>("my-topic", 0, "key", "value");

// Record with timestamp
ProducerRecord<String, String> record3 =
    new ProducerRecord<>("my-topic", null, System.currentTimeMillis(), "key", "value");

// Record with headers
List<Header> headers = Arrays.asList(
    new RecordHeader("header-key", "header-value".getBytes())
);
ProducerRecord<String, String> record4 =
    new ProducerRecord<>("my-topic", null, null, "key", "value", headers);

RecordMetadata

Metadata for a record that has been acknowledged by the server.

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.TopicPartition;

public final class RecordMetadata {
    // Offset information
    public long offset();
    public boolean hasOffset();

    // Timestamp information
    public long timestamp();
    public boolean hasTimestamp();

    // Partition and topic
    public int partition();
    public String topic();
    public TopicPartition topicPartition();

    // Size information
    public int serializedKeySize();
    public int serializedValueSize();
}

Usage Example:

Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(); // Blocking call

System.out.println("Sent to topic: " + metadata.topic());
System.out.println("Partition: " + metadata.partition());
System.out.println("Offset: " + metadata.offset());
System.out.println("Timestamp: " + metadata.timestamp());

PreparedTxnState

Transaction state for two-phase commit transactions.

package org.apache.kafka.clients.producer;

public class PreparedTxnState {
    public long producerId();
    public short epoch();
    public boolean hasTransaction();
    @Override
    public String toString();
}

Callback Interfaces

Callback

Functional interface for asynchronous record acknowledgment.

package org.apache.kafka.clients.producer;

@FunctionalInterface
public interface Callback {
    /**
     * Called when the record has been acknowledged by the server or failed.
     * Exactly one of metadata or exception will be non-null.
     */
    void onCompletion(RecordMetadata metadata, Exception exception);
}

Usage Example:

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            System.err.println("Send failed: " + exception.getMessage());
        } else {
            System.out.println("Sent successfully to " +
                metadata.topic() + "-" + metadata.partition() +
                " at offset " + metadata.offset());
        }
    }
});

// Lambda syntax
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        System.err.println("Send failed: " + exception.getMessage());
    } else {
        System.out.println("Sent to offset " + metadata.offset());
    }
});

Partitioner

Custom partition selection strategy.

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface Partitioner extends Configurable, Closeable {
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes Serialized key bytes (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes Serialized value bytes or null
     * @param cluster The current cluster metadata
     * @return The partition number
     */
    int partition(String topic,
                 Object key,
                 byte[] keyBytes,
                 Object value,
                 byte[] valueBytes,
                 Cluster cluster);

    void close();

    // Optional: called when partitions change
    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Usage Example:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

public class CustomPartitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) {
        // Initialize partitioner with configuration
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        // Custom partitioning logic
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (key == null) {
            // Round-robin for null keys
            return cluster.nextRandomPartition(topic);
        }

        // Hash-based partitioning for non-null keys
        return Math.abs(key.hashCode()) % numPartitions;
    }

    @Override
    public void close() {
        // Cleanup resources
    }
}

// Configure producer to use custom partitioner
Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

Built-in Partitioners

Kafka provides several built-in partitioner implementations:

UniformStickyPartitioner

Default partitioner (since Kafka 2.4) that provides better batching for records without keys. Records with the same key go to the same partition, while records without keys are distributed in a sticky manner to improve batching.

package org.apache.kafka.clients.producer.internals;

public class UniformStickyPartitioner implements Partitioner {
    // Default behavior:
    // - Records with keys: Hash-based partitioning
    // - Records without keys: Sticky to one partition until batch is full, then switch
    // - Provides better throughput due to improved batching
}

Configuration (implicit default):

// UniformStickyPartitioner is the default - no configuration needed
Properties props = new Properties();
// ... other producer config
// Producer will automatically use UniformStickyPartitioner

DefaultPartitioner

Legacy default partitioner (deprecated in favor of UniformStickyPartitioner). Uses round-robin for records without keys.

package org.apache.kafka.clients.producer.internals;

@Deprecated
public class DefaultPartitioner implements Partitioner {
    // Behavior:
    // - Records with keys: Hash-based partitioning
    // - Records without keys: Round-robin across available partitions
}

Configuration:

import org.apache.kafka.clients.producer.internals.DefaultPartitioner;

Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());

RoundRobinPartitioner

Distributes records evenly across all partitions in a round-robin fashion, regardless of key.

package org.apache.kafka.clients.producer;

public class RoundRobinPartitioner implements Partitioner {
    // Behavior:
    // - Ignores record keys
    // - Distributes records evenly across all partitions
    // - Useful when you want uniform distribution regardless of keys
}

Configuration:

import org.apache.kafka.clients.producer.RoundRobinPartitioner;

Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());

Partitioning Behavior Summary:

PartitionerRecords with KeysRecords without KeysUse Case
UniformStickyPartitioner (default)Hash-basedSticky until batch fullBest throughput, recommended
DefaultPartitioner (deprecated)Hash-basedRound-robinLegacy compatibility
RoundRobinPartitionerRound-robin (ignores key)Round-robinUniform distribution

ProducerInterceptor<K, V>

Intercept and potentially mutate records before sending.

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
    /**
     * Called before record is sent. Can modify the record.
     */
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     * Called when record acknowledgment is received.
     */
    void onAcknowledgement(RecordMetadata metadata, Exception exception);

    /**
     * Called when record acknowledgment is received (with headers).
     */
    default void onAcknowledgement(RecordMetadata metadata,
                                   Exception exception,
                                   Headers headers) {
        onAcknowledgement(metadata, exception);
    }

    void close();
}

Usage Example:

import org.apache.kafka.clients.producer.*;
import java.util.Map;

public class MetricsInterceptor implements ProducerInterceptor<String, String> {
    private long successCount = 0;
    private long failureCount = 0;

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // Add timestamp header
        record.headers().add("sent-timestamp",
            String.valueOf(System.currentTimeMillis()).getBytes());
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCount++;
        } else {
            failureCount++;
        }
    }

    @Override
    public void close() {
        System.out.println("Success: " + successCount + ", Failures: " + failureCount);
    }
}

// Configure interceptor
Properties props = new Properties();
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
    MetricsInterceptor.class.getName());

Built-in Components

RoundRobinPartitioner

Round-robin partitioning strategy across all available partitions.

package org.apache.kafka.clients.producer;

public class RoundRobinPartitioner implements Partitioner {
    // Distributes records evenly across all partitions
}

Usage:

Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
    "org.apache.kafka.clients.producer.RoundRobinPartitioner");

MockProducer<K, V>

Mock producer for testing purposes.

package org.apache.kafka.clients.producer;

public class MockProducer<K, V> implements Producer<K, V> {
    public MockProducer();
    public MockProducer(boolean autoComplete,
                       Serializer<K> keySerializer,
                       Serializer<V> valueSerializer);

    // Test helpers
    public List<ProducerRecord<K, V>> history();
    public List<ProducerRecord<K, V>> uncommittedRecords();
    public boolean completeNext();
    public boolean errorNext(RuntimeException e);
    public void clear();

    // Transaction state
    public boolean transactionInitialized();
    public boolean transactionInFlight();
    public boolean transactionCommitted();
    public boolean transactionAborted();
    public boolean flushed();
    public boolean closed();
}

Usage Example:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

// Create mock producer
MockProducer<String, String> mockProducer = new MockProducer<>(
    true, // auto-complete
    new StringSerializer(),
    new StringSerializer()
);

// Send records
ProducerRecord<String, String> record =
    new ProducerRecord<>("test-topic", "key", "value");
mockProducer.send(record);

// Verify records were sent
List<ProducerRecord<String, String>> history = mockProducer.history();
assertEquals(1, history.size());
assertEquals("test-topic", history.get(0).topic());
assertEquals("key", history.get(0).key());

Configuration

ProducerConfig

Configuration constants for the producer.

package org.apache.kafka.clients.producer;

public class ProducerConfig extends AbstractConfig {
    // Required configurations
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";

    // Optional but commonly used
    public static final String CLIENT_ID_CONFIG = "client.id";
    public static final String ACKS_CONFIG = "acks"; // "all", "1", "0"
    public static final String RETRIES_CONFIG = "retries";
    public static final String BATCH_SIZE_CONFIG = "batch.size";
    public static final String LINGER_MS_CONFIG = "linger.ms";
    public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
    public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
    public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
    public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";

    // Reliability
    public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
    public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";
    public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
    public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION =
        "max.in.flight.requests.per.connection";

    // Timeouts
    public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";
    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";

    // Partitioning
    public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
    public static final String PARTITIONER_IGNORE_KEYS_CONFIG = "partitioner.ignore.keys";
    public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG =
        "partitioner.adaptive.partitioning.enable";

    // Compression levels
    public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level";
    public static final String COMPRESSION_LZ4_LEVEL_CONFIG = "compression.lz4.level";
    public static final String COMPRESSION_ZSTD_LEVEL_CONFIG = "compression.zstd.level";

    // Metrics
    public static final String ENABLE_METRICS_PUSH_CONFIG = "enable.metrics.push";
    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
    public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";

    // Two-phase commit
    public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG =
        "transaction.two.phase.commit.enable";

    // Interceptors
    public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
}

Configuration Examples

Basic Configuration:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

High Throughput Configuration:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // Wait up to 10ms

// Compression
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

// Buffer memory
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB

Producer<String, String> producer = new KafkaProducer<>(props);

Exactly-Once Configuration:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Idempotence (enabled by default in 3.0+)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Transactions
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");

// Reliability
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

Configuration Defaults:

  • batch.size: 16384 (16KB)
  • linger.ms: 5
  • buffer.memory: 33554432 (32MB)
  • max.request.size: 1048576 (1MB)
  • acks: "all"
  • retries: 2147483647 (Integer.MAX_VALUE)
  • enable.idempotence: true
  • max.in.flight.requests.per.connection: 5
  • compression.type: "none"
  • delivery.timeout.ms: 120000 (2 minutes)
  • request.timeout.ms: 30000 (30 seconds)
  • max.block.ms: 60000 (1 minute)
  • transaction.timeout.ms: 60000 (1 minute)
  • transaction.two.phase.commit.enable: false
  • partitioner.adaptive.partitioning.enable: true
  • partitioner.ignore.keys: false
  • enable.metrics.push: true

Usage Patterns

Simple Synchronous Send

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

try {
    ProducerRecord<String, String> record =
        new ProducerRecord<>("my-topic", "key", "value");

    RecordMetadata metadata = producer.send(record).get(); // Blocking
    System.out.println("Sent to partition " + metadata.partition() +
        " at offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
} finally {
    producer.close();
}

Asynchronous Send with Callback

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 100; i++) {
    ProducerRecord<String, String> record =
        new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);

    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            System.err.println("Error sending record: " + exception.getMessage());
        } else {
            System.out.println("Sent record to " + metadata.topic() +
                "-" + metadata.partition() + " at offset " + metadata.offset());
        }
    });
}

producer.close();

Fire-and-Forget

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 100; i++) {
    ProducerRecord<String, String> record =
        new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
    producer.send(record); // No callback, no waiting
}

producer.flush(); // Ensure all records are sent
producer.close();

Transactional Producer

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // Send records
    for (int i = 0; i < 100; i++) {
        ProducerRecord<String, String> record =
            new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
        producer.send(record);
    }

    // Commit transaction
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
} finally {
    producer.close();
}

Consume-Transform-Produce (Exactly-Once)

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

// Configure transactional producer
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");

Producer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();

// Configure consumer with read_committed isolation
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("input-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (!records.isEmpty()) {
        try {
            producer.beginTransaction();

            // Process and send records
            for (ConsumerRecord<String, String> record : records) {
                String transformedValue = record.value().toUpperCase();
                ProducerRecord<String, String> outputRecord =
                    new ProducerRecord<>("output-topic", record.key(), transformedValue);
                producer.send(outputRecord);
            }

            // Commit consumer offsets within transaction
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords =
                    records.records(partition);
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
            }

            producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
            throw e;
        }
    }
}

Exceptions

BufferExhaustedException

Cannot allocate memory for sending.

package org.apache.kafka.common.errors;

public class BufferExhaustedException extends TimeoutException {
    // Thrown when producer cannot allocate memory from buffer pool
    // Usually due to high send rate or insufficient buffer.memory
}

Handling:

try {
    producer.send(record).get();
} catch (ExecutionException e) {
    if (e.getCause() instanceof BufferExhaustedException) {
        // Increase buffer.memory or reduce send rate
        System.err.println("Buffer exhausted, slowing down sends");
        Thread.sleep(100);
    }
}

Other Common Producer Exceptions

import org.apache.kafka.common.errors.*;

// Record too large for configured max.request.size
public class RecordTooLargeException extends ApiException {}

// Operation timeout
public class TimeoutException extends ApiException {}

// Authentication failure
public class AuthenticationException extends ApiException {}

// Authorization failure
public class AuthorizationException extends ApiException {}

// Topic does not exist
public class UnknownTopicOrPartitionException extends RetriableException {}

// Invalid topic name
public class InvalidTopicException extends ApiException {}

// Serialization error
public class SerializationException extends KafkaException {}

Thread Safety

The KafkaProducer is thread-safe and can be shared across multiple threads:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.*;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// Single producer instance shared across threads
Producer<String, String> producer = new KafkaProducer<>(props);

ExecutorService executor = Executors.newFixedThreadPool(10);

for (int i = 0; i < 10; i++) {
    final int threadId = i;
    executor.submit(() -> {
        for (int j = 0; j < 100; j++) {
            ProducerRecord<String, String> record =
                new ProducerRecord<>("my-topic",
                    "thread-" + threadId + "-key-" + j,
                    "value-" + j);
            producer.send(record);
        }
    });
}

executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
producer.close();

Best Practices

Resource Management

Always close the producer to release resources:

Producer<String, String> producer = new KafkaProducer<>(props);
try {
    // Use producer
} finally {
    producer.close(); // or producer.close(Duration.ofSeconds(30))
}

Error Handling

Implement proper error handling for production use:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.*;

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        if (exception instanceof RetriableException) {
            // Log and potentially retry
            System.err.println("Retriable error: " + exception.getMessage());
        } else if (exception instanceof RecordTooLargeException) {
            // Record is too large, cannot retry
            System.err.println("Record too large: " + exception.getMessage());
        } else if (exception instanceof AuthorizationException) {
            // Not authorized to write to topic
            System.err.println("Authorization error: " + exception.getMessage());
        } else {
            // Other non-retriable errors
            System.err.println("Fatal error: " + exception.getMessage());
        }
    }
});

Monitoring

Access producer metrics for monitoring:

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;

Map<MetricName, ? extends Metric> metrics = producer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
    if (entry.getKey().name().equals("record-send-rate")) {
        System.out.println("Record send rate: " + entry.getValue().metricValue());
    }
}

Key metrics to monitor:

  • record-send-rate: Records sent per second
  • record-error-rate: Records that resulted in errors
  • record-retry-rate: Records that were retried
  • buffer-available-bytes: Available buffer memory
  • batch-size-avg: Average batch size
  • compression-rate-avg: Average compression ratio
  • request-latency-avg: Average request latency

Troubleshooting

Common Producer Issues

Issue: BufferExhaustedException

Symptoms:

  • Frequent BufferExhaustedException in logs
  • High buffer-exhausted-rate metric
  • Send operations timing out

Causes:

  • Producer sending faster than broker can accept
  • Insufficient buffer.memory configuration
  • Network issues causing slow sends
  • Broker performance degradation

Solutions:

// Increase buffer memory
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB

// Increase max block time
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000); // 2 minutes

// Implement backpressure
try {
    producer.send(record).get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    // Slow down or drop records
    System.err.println("Send timeout, implementing backpressure");
    Thread.sleep(100);
}

Prevention:

  • Monitor buffer-available-bytes metric
  • Set up alerts for low buffer availability
  • Implement producer backpressure mechanisms
  • Size buffer.memory appropriately for peak load

Issue: RecordTooLargeException

Symptoms:

  • RecordTooLargeException on send
  • Large records fail consistently
  • Some records succeed, others fail

Causes:

  • Record size exceeds max.request.size (default 1MB)
  • Record size exceeds broker max.message.bytes
  • Headers too large

Solutions:

// Increase max request size
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152); // 2MB

// Enable compression to reduce size
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

// Or split large records
public List<ProducerRecord<String, String>> splitLargeRecord(
        ProducerRecord<String, byte[]> largeRecord) {
    byte[] data = largeRecord.value();
    int chunkSize = 900000; // 900KB chunks
    List<ProducerRecord<String, String>> chunks = new ArrayList<>();
    
    for (int i = 0; i < data.length; i += chunkSize) {
        int end = Math.min(i + chunkSize, data.length);
        byte[] chunk = Arrays.copyOfRange(data, i, end);
        
        ProducerRecord<String, String> chunkRecord = new ProducerRecord<>(
            largeRecord.topic(),
            largeRecord.key(),
            Base64.getEncoder().encodeToString(chunk)
        );
        
        // Add metadata headers
        chunkRecord.headers().add("chunk-id", String.valueOf(i / chunkSize).getBytes());
        chunkRecord.headers().add("total-chunks", 
            String.valueOf((data.length + chunkSize - 1) / chunkSize).getBytes());
        
        chunks.add(chunkRecord);
    }
    
    return chunks;
}

Prevention:

  • Validate record sizes before sending
  • Configure appropriate limits
  • Use compression
  • Design data model to avoid large messages

Issue: TimeoutException

Symptoms:

  • Periodic timeout exceptions
  • Increasing request-latency-avg metric
  • Sends eventually succeed after retries

Causes:

  • Network latency
  • Broker overload
  • Insufficient timeout configurations
  • Client resource contention

Solutions:

// Increase timeout configurations
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 60 seconds
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 180000); // 3 minutes

// Implement exponential backoff retry
private RecordMetadata sendWithExponentialBackoff(
        ProducerRecord<String, String> record,
        int maxRetries) throws Exception {
    int retries = 0;
    long backoffMs = 100;
    
    while (retries < maxRetries) {
        try {
            return producer.send(record).get();
        } catch (ExecutionException e) {
            if (e.getCause() instanceof TimeoutException) {
                retries++;
                if (retries >= maxRetries) {
                    throw e;
                }
                System.err.println("Timeout on attempt " + retries + 
                    ", retrying after " + backoffMs + "ms");
                Thread.sleep(backoffMs);
                backoffMs = Math.min(backoffMs * 2, 10000); // Max 10s
            } else {
                throw e;
            }
        }
    }
    
    throw new RuntimeException("Failed after " + maxRetries + " retries");
}

Prevention:

  • Monitor broker health and performance
  • Configure appropriate timeouts
  • Implement proper retry logic
  • Set up broker alerts

Issue: ProducerFencedException

Symptoms:

  • ProducerFencedException when using transactions
  • Producer suddenly stops working
  • Errors mentioning "fenced" or "epoch"

Causes:

  • Another producer with same transactional.id started
  • Producer exceeded transaction.timeout.ms
  • Network partition caused session timeout

Solutions:

// Ensure unique transactional.id per instance
String transactionalId = "my-app-" + hostname + "-" + instanceId;
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);

// Increase transaction timeout
props.put(ProducerConfig.TRANSACTION_TIMEOUT_MS_CONFIG, 120000); // 2 minutes

// Handle fencing gracefully
try {
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // Fatal: another producer with same ID is active
    System.err.println("Producer fenced: " + e.getMessage());
    producer.close();
    // Create new producer with different transactional.id
    // or shutdown this instance
}

Prevention:

  • Ensure unique transactional.id across all producer instances
  • Use instance identifiers in transactional.id
  • Implement proper producer lifecycle management
  • Monitor transaction durations

Edge Cases

Null Keys

// Null keys are valid and will use default partitioner
ProducerRecord<String, String> nullKeyRecord = 
    new ProducerRecord<>("my-topic", null, "value");

// With UniformStickyPartitioner (default):
// - Null keys go to sticky partition until batch full
// - Different null-key records may go to different partitions
// - No ordering guarantee for null-key records

// To ensure all null-key records go to same partition:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
    "org.apache.kafka.clients.producer.RoundRobinPartitioner");
// Or specify partition explicitly:
ProducerRecord<String, String> record = 
    new ProducerRecord<>("my-topic", 0, null, "value");

Null Values

// Null values are valid and represent tombstones (deletion markers)
ProducerRecord<String, String> tombstone = 
    new ProducerRecord<>("compacted-topic", "key-to-delete", null);

producer.send(tombstone);

// In compacted topics:
// - Null value marks key for deletion
// - Useful for compaction cleanup
// - Requires compaction enabled on topic

// Create compacted topic:
NewTopic compactedTopic = new NewTopic("my-compact-topic", 3, (short) 2);
Map<String, String> configs = new HashMap<>();
configs.put("cleanup.policy", "compact");
configs.put("min.compaction.lag.ms", "0");
configs.put("delete.retention.ms", "86400000");
compactedTopic.configs(configs);

Very Large Batches

// Sending many records rapidly
List<ProducerRecord<String, String>> records = generateLargeRecordSet();

// Potential issues:
// 1. Buffer exhaustion
// 2. Memory pressure
// 3. Long blocking on close()

// Solution: Batch with flow control
int batchSize = 1000;
for (int i = 0; i < records.size(); i += batchSize) {
    int end = Math.min(i + batchSize, records.size());
    List<Future<RecordMetadata>> futures = new ArrayList<>();
    
    for (int j = i; j < end; j++) {
        futures.add(producer.send(records.get(j)));
    }
    
    // Wait for batch to complete before next batch
    for (Future<RecordMetadata> future : futures) {
        try {
            future.get();
        } catch (Exception e) {
            System.err.println("Batch send error: " + e.getMessage());
        }
    }
}

Broker Failures

// Producer automatically retries on broker failures
// Configure retries and timeout appropriately

props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 180000); // 3 minutes
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 1 minute

// Handle broker unavailability
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        if (exception instanceof org.apache.kafka.common.errors.BrokerNotAvailableException) {
            System.err.println("Broker not available, will retry automatically");
            // No action needed, producer will retry
        } else if (exception instanceof org.apache.kafka.common.errors.NetworkException) {
            System.err.println("Network error, will retry automatically");
        } else {
            System.err.println("Fatal error: " + exception.getMessage());
            // Handle non-retriable errors
        }
    }
});

Transaction Timeout

// Transaction exceeds transaction.timeout.ms
try {
    producer.beginTransaction();
    
    // Long-running operations
    for (int i = 0; i < 10000; i++) {
        producer.send(new ProducerRecord<>("topic", "key-" + i, "value-" + i));
        
        // Expensive processing between sends
        doExpensiveWork(i);
    }
    
    producer.commitTransaction();
} catch (org.apache.kafka.common.errors.TimeoutException e) {
    // Transaction timed out
    producer.abortTransaction();
    
    // Solution: Break into smaller transactions
    int batchSize = 1000;
    for (int batch = 0; batch < 10; batch++) {
        producer.beginTransaction();
        for (int i = batch * batchSize; i < (batch + 1) * batchSize; i++) {
            producer.send(new ProducerRecord<>("topic", "key-" + i, "value-" + i));
        }
        producer.commitTransaction();
    }
}

Partition Leader Changes

// Producer automatically handles partition leader changes
// Metadata refresh happens transparently

// Monitor leader changes via callbacks
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        if (exception instanceof org.apache.kafka.common.errors.NotLeaderOrFollowerException ||
            exception instanceof org.apache.kafka.common.errors.LeaderNotAvailableException) {
            System.out.println("Leader changed, producer will retry with updated metadata");
            // No action needed, automatic retry
        }
    } else {
        // Success - check if partition leader is as expected
        System.out.println("Sent to partition " + metadata.partition());
    }
});

// Force metadata refresh if needed
List<PartitionInfo> partitions = producer.partitionsFor("my-topic");
// This call refreshes metadata if stale

Performance Tuning Guide

High Throughput Configuration

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// Maximize batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // Wait 20ms for batching

// Enable compression
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // Fast compression

// Increase buffer
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728); // 128MB

// Increase in-flight requests
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 10);

// Async sends
Producer<String, String> producer = new KafkaProducer<>(props);

Throughput vs Latency Tradeoffs:

  • Larger batch.size → Higher throughput, higher latency
  • Larger linger.ms → Higher throughput, higher latency
  • More max.in.flight.requests → Higher throughput, potential ordering issues
  • Compression → Higher throughput, higher CPU usage

Low Latency Configuration

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// Minimize batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1); // Send immediately
props.put(ProducerConfig.LINGER_MS_CONFIG, 0); // No waiting

// No compression (adds latency)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

// Reduce in-flight requests
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

// Synchronous sends for lowest latency
producer.send(record).get();

High Durability Configuration

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// Maximum durability
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// Transactions for exactly-once
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");

// Conservative timeouts
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); // 5 minutes

Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

Durability Levels:

  • acks=0: No durability, fire and forget
  • acks=1: Leader acknowledgment only
  • acks=all: All in-sync replicas must acknowledge (highest durability)

Configuration Interdependencies

delivery.timeout.ms Relationship:

// delivery.timeout.ms >= linger.ms + request.timeout.ms
// If violated, producer fails to start

// Example configuration:
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // Must be >= 30010

// Violation example (will throw ConfigException):
// props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 20000); // ERROR: < request.timeout.ms

Idempotence Requirements:

// When enable.idempotence=true (default), the following must be set:
// - acks = all (or -1)
// - max.in.flight.requests.per.connection <= 5
// - retries > 0

// Valid idempotent configuration:
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Required
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // Must be <= 5
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // Required > 0

// Invalid configuration (will throw ConfigException):
// props.put(ProducerConfig.ACKS_CONFIG, "1"); // ERROR: must be "all"
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 10); // ERROR: > 5

Transaction Requirements:

// Transactions require idempotence
// Setting transactional.id automatically enables idempotence

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-txn-id");
// Automatically sets:
// - enable.idempotence = true
// - acks = all
// - retries = Integer.MAX_VALUE

// Must call initTransactions() before use
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Required before first transaction

Common Anti-Patterns

Anti-Pattern: Creating Producer Per Message

// DON'T DO THIS - Very inefficient
public void sendMessage(String topic, String key, String value) {
    Properties props = new Properties();
    // ... configure producer
    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<>(topic, key, value));
    producer.close(); // Expensive operation
}

// DO THIS - Reuse producer instance
private final Producer<String, String> producer = createProducer();

public void sendMessage(String topic, String key, String value) {
    producer.send(new ProducerRecord<>(topic, key, value));
}

// Close producer on application shutdown
public void shutdown() {
    producer.close();
}

Anti-Pattern: Ignoring Send Failures

// DON'T DO THIS - Silent failures
producer.send(record); // No callback, no checking

// DO THIS - Handle failures appropriately
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        System.err.println("Send failed: " + exception.getMessage());
        // Log to monitoring system
        // Implement retry logic
        // Alert on persistent failures
    } else {
        // Optional: Log success metrics
    }
});

// Or for critical data:
try {
    RecordMetadata metadata = producer.send(record).get();
    // Verify success
} catch (Exception e) {
    // Handle failure - retry, alert, etc.
}

Anti-Pattern: Not Setting Timeouts

// DON'T DO THIS - Can hang indefinitely
Properties props = new Properties();
// ... no timeout configuration

// DO THIS - Set appropriate timeouts
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); // 1 minute
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); // 30 seconds
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2 minutes

// And use timeout on blocking calls
try {
    RecordMetadata metadata = producer.send(record).get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    // Handle timeout
}

Anti-Pattern: Not Closing Producer

// DON'T DO THIS - Resource leak
public void processData() {
    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(record);
    // Missing close() - threads and connections not cleaned up
}

// DO THIS - Always close
public void processData() {
    try (Producer<String, String> producer = new KafkaProducer<>(props)) {
        producer.send(record).get();
    } // Automatically closed
}

// Or with explicit try-finally
Producer<String, String> producer = new KafkaProducer<>(props);
try {
    producer.send(record).get();
} finally {
    producer.close(Duration.ofSeconds(30)); // With timeout
}

Capacity Planning

Sizing buffer.memory

// Calculate required buffer size:
// buffer.memory >= peak_send_rate * avg_record_size * latency_seconds

// Example calculation:
// - Peak send rate: 10,000 records/second
// - Average record size: 1KB
// - Expected max latency: 3 seconds
// Required buffer: 10,000 * 1,024 * 3 = 30,720,000 bytes (~30MB)

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB (with safety margin)

// Monitor actual usage
Map<MetricName, ? extends Metric> metrics = producer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
    if (entry.getKey().name().equals("buffer-available-bytes")) {
        long available = (long) entry.getValue().metricValue();
        long total = 33554432L;
        long used = total - available;
        double usagePercent = (used * 100.0) / total;
        
        if (usagePercent > 80) {
            System.err.println("WARNING: Buffer usage at " + usagePercent + "%");
        }
    }
}

Compression Selection

// Compression algorithm selection guide:
// - snappy: Fast, moderate compression (default for high throughput)
// - lz4: Fastest, good compression (best for real-time)
// - gzip: Slow, best compression (best for bandwidth-limited)
// - zstd: Good balance, configurable levels

// Benchmark different compression types
String[] compressionTypes = {"none", "snappy", "lz4", "gzip", "zstd"};

for (String compression : compressionTypes) {
    Properties testProps = new Properties();
    testProps.putAll(props);
    testProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression);
    
    Producer<String, String> testProducer = new KafkaProducer<>(testProps);
    
    long startTime = System.currentTimeMillis();
    for (int i = 0; i < 10000; i++) {
        testProducer.send(new ProducerRecord<>("test-topic", "key-" + i, 
            generateTestData())).get();
    }
    long duration = System.currentTimeMillis() - startTime;
    
    System.out.println(compression + " compression: " + duration + "ms");
    testProducer.close();
}

// Typical results (depends on data):
// - none: Fastest, largest network usage
// - lz4: ~10-20% slower than none, 40-60% size reduction
// - snappy: Similar to lz4
// - zstd: ~30-50% slower, 50-70% size reduction
// - gzip: ~2-3x slower, 60-80% size reduction

Advanced Producer Patterns

Pattern: Batching with Custom Logic

Implement custom batching logic for optimal throughput:

import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.*;

public class BatchingProducer {
    private final Producer<String, String> producer;
    private final BlockingQueue<ProducerRecord<String, String>> recordQueue;
    private final int batchSize;
    private final long batchTimeoutMs;
    private volatile boolean running = true;
    
    public BatchingProducer(Properties props, int batchSize, long batchTimeoutMs) {
        this.producer = new KafkaProducer<>(props);
        this.recordQueue = new LinkedBlockingQueue<>();
        this.batchSize = batchSize;
        this.batchTimeoutMs = batchTimeoutMs;
        
        // Start batch processing thread
        new Thread(this::processBatches).start();
    }
    
    public void sendAsync(String topic, String key, String value) throws InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        recordQueue.put(record);
    }
    
    private void processBatches() {
        List<ProducerRecord<String, String>> batch = new ArrayList<>();
        long lastBatchTime = System.currentTimeMillis();
        
        while (running) {
            try {
                // Poll with timeout
                ProducerRecord<String, String> record = recordQueue.poll(
                    batchTimeoutMs, TimeUnit.MILLISECONDS);
                
                if (record != null) {
                    batch.add(record);
                }
                
                // Send batch if size or time threshold reached
                long now = System.currentTimeMillis();
                boolean sizeThreshold = batch.size() >= batchSize;
                boolean timeThreshold = (now - lastBatchTime) >= batchTimeoutMs;
                
                if (sizeThreshold || (timeThreshold && !batch.isEmpty())) {
                    sendBatch(batch);
                    batch.clear();
                    lastBatchTime = now;
                }
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        // Send remaining records
        if (!batch.isEmpty()) {
            sendBatch(batch);
        }
    }
    
    private void sendBatch(List<ProducerRecord<String, String>> batch) {
        List<Future<RecordMetadata>> futures = new ArrayList<>();
        
        for (ProducerRecord<String, String> record : batch) {
            futures.add(producer.send(record));
        }
        
        // Wait for all sends to complete
        int successCount = 0;
        int failureCount = 0;
        
        for (Future<RecordMetadata> future : futures) {
            try {
                future.get(30, TimeUnit.SECONDS);
                successCount++;
            } catch (Exception e) {
                failureCount++;
                System.err.println("Batch send error: " + e.getMessage());
            }
        }
        
        System.out.println("Batch sent: " + successCount + " succeeded, " + 
            failureCount + " failed");
    }
    
    public void close() {
        running = false;
        producer.close();
    }
}

Pattern: Partition-Aware Producer

Send records to specific partitions based on custom logic:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.*;

public class PartitionAwareProducer {
    
    /**
     * Custom partitioner that routes records based on key hash range
     */
    public static class RangePartitioner implements Partitioner {
        
        @Override
        public void configure(Map<String, ?> configs) {}
        
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                            Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            
            if (key == null) {
                return 0; // Default partition for null keys
            }
            
            // Use consistent hashing for key-based routing
            int hash = Math.abs(key.hashCode());
            return hash % numPartitions;
        }
        
        @Override
        public void close() {}
    }
    
    /**
     * Priority-based partitioner that routes high-priority messages to specific partitions
     */
    public static class PriorityPartitioner implements Partitioner {
        private static final int HIGH_PRIORITY_PARTITION = 0;
        private static final int NORMAL_PRIORITY_PARTITION_OFFSET = 1;
        
        @Override
        public void configure(Map<String, ?> configs) {}
        
        @Override
        public int partition(String topic, Object key, byte[] keyBytes,
                            Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            
            // Check for priority header in value
            if (value != null && value.toString().startsWith("HIGH:")) {
                return HIGH_PRIORITY_PARTITION;
            }
            
            // Round-robin for normal priority across remaining partitions
            if (numPartitions > 1) {
                return NORMAL_PRIORITY_PARTITION_OFFSET + 
                    (int) (System.nanoTime() % (numPartitions - 1));
            }
            
            return 0;
        }
        
        @Override
        public void close() {}
    }
    
    /**
     * Send records with explicit partition assignment
     */
    public static void sendToSpecificPartition(Producer<String, String> producer,
                                               String topic, int partition,
                                               String key, String value) {
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(topic, partition, key, value);
        
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Sent to partition " + metadata.partition() +
                    " (requested: " + partition + ")");
            } else {
                System.err.println("Failed to send to partition " + partition +
                    ": " + exception.getMessage());
            }
        });
    }
}

Pattern: Producer with Circuit Breaker

Implement circuit breaker pattern for producer resilience:

import org.apache.kafka.clients.producer.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class CircuitBreakerProducer {
    private enum CircuitState { CLOSED, OPEN, HALF_OPEN }
    
    private final Producer<String, String> producer;
    private final int failureThreshold;
    private final long timeoutMs;
    private final AtomicInteger consecutiveFailures;
    private final AtomicLong circuitOpenedTime;
    private volatile CircuitState state;
    
    public CircuitBreakerProducer(Properties props, int failureThreshold, long timeoutMs) {
        this.producer = new KafkaProducer<>(props);
        this.failureThreshold = failureThreshold;
        this.timeoutMs = timeoutMs;
        this.consecutiveFailures = new AtomicInteger(0);
        this.circuitOpenedTime = new AtomicLong(0);
        this.state = CircuitState.CLOSED;
    }
    
    public void send(ProducerRecord<String, String> record) {
        if (!isCallAllowed()) {
            throw new IllegalStateException(
                "Circuit breaker is OPEN, rejecting send request");
        }
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                onFailure();
            } else {
                onSuccess();
            }
        });
    }
    
    private boolean isCallAllowed() {
        if (state == CircuitState.CLOSED) {
            return true;
        }
        
        if (state == CircuitState.OPEN) {
            long now = System.currentTimeMillis();
            if (now - circuitOpenedTime.get() >= timeoutMs) {
                // Transition to HALF_OPEN
                state = CircuitState.HALF_OPEN;
                System.out.println("Circuit breaker transitioning to HALF_OPEN");
                return true;
            }
            return false;
        }
        
        // HALF_OPEN state
        return true;
    }
    
    private void onSuccess() {
        consecutiveFailures.set(0);
        
        if (state == CircuitState.HALF_OPEN) {
            state = CircuitState.CLOSED;
            System.out.println("Circuit breaker CLOSED (recovered)");
        }
    }
    
    private void onFailure() {
        int failures = consecutiveFailures.incrementAndGet();
        
        if (failures >= failureThreshold && state != CircuitState.OPEN) {
            state = CircuitState.OPEN;
            circuitOpenedTime.set(System.currentTimeMillis());
            System.err.println("Circuit breaker OPENED after " + failures + 
                " consecutive failures");
        }
    }
    
    public CircuitState getState() {
        return state;
    }
    
    public void close() {
        producer.close();
    }
}

Pattern: Metrics-Based Adaptive Producer

Adjust producer behavior based on real-time metrics:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AdaptiveProducer {
    private final Producer<String, String> producer;
    private final ScheduledExecutorService scheduler;
    private volatile int currentBatchSize;
    private volatile int currentLingerMs;
    
    public AdaptiveProducer(Properties props) {
        this.producer = new KafkaProducer<>(props);
        this.currentBatchSize = Integer.parseInt(
            props.getProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"));
        this.currentLingerMs = Integer.parseInt(
            props.getProperty(ProducerConfig.LINGER_MS_CONFIG, "0"));
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        
        // Start metric monitoring
        scheduler.scheduleAtFixedRate(this::adjustSettings, 10, 10, TimeUnit.SECONDS);
    }
    
    private void adjustSettings() {
        Map<MetricName, ? extends Metric> metrics = producer.metrics();
        
        // Get key metrics
        double recordSendRate = getMetricValue(metrics, "record-send-rate");
        double recordErrorRate = getMetricValue(metrics, "record-error-rate");
        double bufferAvailableBytes = getMetricValue(metrics, "buffer-available-bytes");
        double batchSizeAvg = getMetricValue(metrics, "batch-size-avg");
        
        // Adjust based on metrics
        if (recordErrorRate > 0.01) {  // > 1% error rate
            System.out.println("High error rate detected, reducing send rate");
            // Could implement backpressure here
        }
        
        if (bufferAvailableBytes < 1000000) {  // < 1MB available
            System.err.println("WARNING: Low buffer memory available: " + 
                bufferAvailableBytes + " bytes");
        }
        
        // Log adaptive metrics
        System.out.println(String.format(
            "Metrics - Send rate: %.2f/s, Error rate: %.4f%%, Avg batch: %.0f bytes",
            recordSendRate, recordErrorRate * 100, batchSizeAvg));
    }
    
    private double getMetricValue(Map<MetricName, ? extends Metric> metrics, String name) {
        for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
            if (entry.getKey().name().equals(name)) {
                Object value = entry.getValue().metricValue();
                if (value instanceof Number) {
                    return ((Number) value).doubleValue();
                }
            }
        }
        return 0.0;
    }
    
    public void send(ProducerRecord<String, String> record) {
        producer.send(record);
    }
    
    public Map<String, Object> getMetricsSummary() {
        Map<String, Object> summary = new java.util.HashMap<>();
        Map<MetricName, ? extends Metric> metrics = producer.metrics();
        
        String[] importantMetrics = {
            "record-send-rate", "record-error-rate", "request-latency-avg",
            "buffer-available-bytes", "batch-size-avg", "compression-rate-avg"
        };
        
        for (String metricName : importantMetrics) {
            double value = getMetricValue(metrics, metricName);
            summary.put(metricName, value);
        }
        
        return summary;
    }
    
    public void close() {
        scheduler.shutdown();
        producer.close();
    }
}

Pattern: Multi-DC Producer with Failover

Producer that can failover between multiple data centers:

import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class MultiDCProducer {
    private final List<Producer<String, String>> producers;
    private volatile int activeProduerIndex;
    private final Object lock = new Object();
    
    public MultiDCProducer(List<Properties> dcConfigs) {
        this.producers = new ArrayList<>();
        this.activeProduerIndex = 0;
        
        for (Properties config : dcConfigs) {
            producers.add(new KafkaProducer<>(config));
        }
    }
    
    public RecordMetadata send(ProducerRecord<String, String> record, int maxRetries) 
            throws Exception {
        int retries = 0;
        Exception lastException = null;
        
        while (retries < maxRetries) {
            Producer<String, String> producer;
            int currentIndex;
            
            synchronized (lock) {
                currentIndex = activeProduerIndex;
                producer = producers.get(currentIndex);
            }
            
            try {
                RecordMetadata metadata = producer.send(record).get();
                System.out.println("Sent to DC " + currentIndex + 
                    ", partition " + metadata.partition());
                return metadata;
                
            } catch (ExecutionException e) {
                lastException = e;
                System.err.println("Failed to send to DC " + currentIndex + 
                    ": " + e.getMessage());
                
                // Try next DC
                synchronized (lock) {
                    activeProduerIndex = (activeProduerIndex + 1) % producers.size();
                }
                
                retries++;
                
                if (retries < maxRetries) {
                    Thread.sleep(1000 * retries); // Exponential backoff
                }
            }
        }
        
        throw new Exception("Failed to send after " + maxRetries + " retries", lastException);
    }
    
    public void closeAll() {
        for (Producer<String, String> producer : producers) {
            producer.close();
        }
    }
}

Monitoring and Observability

Complete Metrics Dashboard

Comprehensive metrics collection for producer monitoring:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.*;
import java.util.concurrent.*;

public class ProducerMetricsDashboard {
    private final Producer<String, String> producer;
    private final ScheduledExecutorService scheduler;
    private final Map<String, List<Double>> metricHistory;
    
    public ProducerMetricsDashboard(Producer<String, String> producer) {
        this.producer = producer;
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.metricHistory = new ConcurrentHashMap<>();
        
        // Collect metrics every 5 seconds
        scheduler.scheduleAtFixedRate(this::collectMetrics, 0, 5, TimeUnit.SECONDS);
    }
    
    private void collectMetrics() {
        Map<MetricName, ? extends Metric> metrics = producer.metrics();
        long timestamp = System.currentTimeMillis();
        
        // Collect key metrics
        String[] keyMetrics = {
            "record-send-rate",
            "record-send-total",
            "record-error-rate",
            "record-error-total",
            "record-retry-rate",
            "record-retry-total",
            "request-latency-avg",
            "request-latency-max",
            "outgoing-byte-rate",
            "buffer-available-bytes",
            "buffer-total-bytes",
            "batch-size-avg",
            "batch-size-max",
            "compression-rate-avg",
            "requests-in-flight"
        };
        
        for (String metricName : keyMetrics) {
            for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
                if (entry.getKey().name().equals(metricName)) {
                    Object value = entry.getValue().metricValue();
                    if (value instanceof Number) {
                        double numValue = ((Number) value).doubleValue();
                        metricHistory.computeIfAbsent(metricName, 
                            k -> new CopyOnWriteArrayList<>()).add(numValue);
                        
                        // Keep last 100 samples (8.3 minutes at 5s interval)
                        List<Double> history = metricHistory.get(metricName);
                        if (history.size() > 100) {
                            history.remove(0);
                        }
                    }
                }
            }
        }
    }
    
    public void printDashboard() {
        System.out.println("\n=== Producer Metrics Dashboard ===");
        System.out.println("Timestamp: " + new Date());
        System.out.println();
        
        // Throughput
        System.out.println("THROUGHPUT:");
        printMetric("  Record Send Rate", "record-send-rate", "records/sec");
        printMetric("  Outgoing Byte Rate", "outgoing-byte-rate", "bytes/sec");
        System.out.println();
        
        // Errors
        System.out.println("ERRORS:");
        printMetric("  Error Rate", "record-error-rate", "errors/sec");
        printMetric("  Retry Rate", "record-retry-rate", "retries/sec");
        double errorTotal = getLatestMetric("record-error-total");
        double sendTotal = getLatestMetric("record-send-total");
        double errorPercent = sendTotal > 0 ? (errorTotal / sendTotal * 100) : 0;
        System.out.println(String.format("  Error Percentage: %.4f%%", errorPercent));
        System.out.println();
        
        // Latency
        System.out.println("LATENCY:");
        printMetric("  Avg Request Latency", "request-latency-avg", "ms");
        printMetric("  Max Request Latency", "request-latency-max", "ms");
        System.out.println();
        
        // Batching & Compression
        System.out.println("BATCHING & COMPRESSION:");
        printMetric("  Avg Batch Size", "batch-size-avg", "bytes");
        printMetric("  Max Batch Size", "batch-size-max", "bytes");
        printMetric("  Compression Rate", "compression-rate-avg", "ratio");
        System.out.println();
        
        // Resource Usage
        System.out.println("RESOURCES:");
        double bufferAvailable = getLatestMetric("buffer-available-bytes");
        double bufferTotal = getLatestMetric("buffer-total-bytes");
        double bufferUsed = bufferTotal - bufferAvailable;
        double bufferUsagePercent = bufferTotal > 0 ? (bufferUsed / bufferTotal * 100) : 0;
        System.out.println(String.format("  Buffer Usage: %.2f MB / %.2f MB (%.1f%%)",
            bufferUsed / 1024 / 1024, bufferTotal / 1024 / 1024, bufferUsagePercent));
        printMetric("  Requests In Flight", "requests-in-flight", "count");
        System.out.println();
        
        // Alerts
        System.out.println("ALERTS:");
        checkAlerts();
        
        System.out.println("===================================\n");
    }
    
    private void printMetric(String label, String metricName, String unit) {
        double value = getLatestMetric(metricName);
        double avg = getAverageMetric(metricName);
        double max = getMaxMetric(metricName);
        
        System.out.println(String.format("%s: %.2f %s (avg: %.2f, max: %.2f)",
            label, value, unit, avg, max));
    }
    
    private double getLatestMetric(String metricName) {
        List<Double> history = metricHistory.get(metricName);
        if (history == null || history.isEmpty()) {
            return 0.0;
        }
        return history.get(history.size() - 1);
    }
    
    private double getAverageMetric(String metricName) {
        List<Double> history = metricHistory.get(metricName);
        if (history == null || history.isEmpty()) {
            return 0.0;
        }
        return history.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
    }
    
    private double getMaxMetric(String metricName) {
        List<Double> history = metricHistory.get(metricName);
        if (history == null || history.isEmpty()) {
            return 0.0;
        }
        return history.stream().mapToDouble(Double::doubleValue).max().orElse(0.0);
    }
    
    private void checkAlerts() {
        boolean hasAlerts = false;
        
        // Check error rate
        double errorRate = getLatestMetric("record-error-rate");
        if (errorRate > 0.1) {  // More than 0.1 errors/sec
            System.out.println("  ⚠️  HIGH ERROR RATE: " + errorRate + " errors/sec");
            hasAlerts = true;
        }
        
        // Check buffer usage
        double bufferAvailable = getLatestMetric("buffer-available-bytes");
        double bufferTotal = getLatestMetric("buffer-total-bytes");
        double bufferUsagePercent = bufferTotal > 0 ? 
            ((bufferTotal - bufferAvailable) / bufferTotal * 100) : 0;
        if (bufferUsagePercent > 90) {
            System.out.println("  ⚠️  HIGH BUFFER USAGE: " + 
                String.format("%.1f%%", bufferUsagePercent));
            hasAlerts = true;
        }
        
        // Check latency
        double avgLatency = getLatestMetric("request-latency-avg");
        if (avgLatency > 1000) {  // More than 1 second
            System.out.println("  ⚠️  HIGH LATENCY: " + avgLatency + " ms");
            hasAlerts = true;
        }
        
        if (!hasAlerts) {
            System.out.println("  ✅ No alerts");
        }
    }
    
    public void close() {
        scheduler.shutdown();
    }
    
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<String, String> producer = new KafkaProducer<>(props);
        ProducerMetricsDashboard dashboard = new ProducerMetricsDashboard(producer);
        
        // Print dashboard every 10 seconds
        ScheduledExecutorService printScheduler = Executors.newSingleThreadScheduledExecutor();
        printScheduler.scheduleAtFixedRate(dashboard::printDashboard, 10, 10, TimeUnit.SECONDS);
        
        // Keep running
        Thread.sleep(Long.MAX_VALUE);
    }
}