Container implementation for official Apache Kafka images (apache/kafka, apache/kafka-native) providing automated single-node broker management with KRaft consensus mode.
Create a Kafka container with Apache Kafka images. The container uses KRaft mode by default, eliminating the need for Zookeeper.
/**
* Create a Kafka container with the specified Docker image name.
* Supported images: apache/kafka, apache/kafka-native
*
* @param imageName Docker image name as string (e.g., "apache/kafka-native:3.8.0")
* @throws IllegalArgumentException if image is not compatible with apache/kafka or apache/kafka-native
*/
public KafkaContainer(String imageName);
/**
* Create a Kafka container with the specified DockerImageName.
* Supported images: apache/kafka, apache/kafka-native
*
* @param dockerImageName DockerImageName object
* @throws IllegalArgumentException if image is not compatible with apache/kafka or apache/kafka-native
*/
public KafkaContainer(DockerImageName dockerImageName);Usage:
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
// Using string
KafkaContainer kafka1 = new KafkaContainer("apache/kafka-native:3.8.0");
// Using DockerImageName
DockerImageName imageName = DockerImageName.parse("apache/kafka:latest");
KafkaContainer kafka2 = new KafkaContainer(imageName);Get the connection string for Kafka clients to connect to the broker.
/**
* Get the bootstrap servers connection string for Kafka clients.
* Returns the mapped host and port that external clients should use.
*
* @return Bootstrap servers in format "host:port" (e.g., "localhost:54321")
*/
public String getBootstrapServers();Usage:
import org.testcontainers.kafka.KafkaContainer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
try (KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")) {
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Use with Kafka producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Create producer with bootstrap servers
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
}Add a custom listener for connections within the same Docker network. The host portion becomes a network alias that other containers can use to connect.
/**
* Add a listener in format host:port for connections within the same container network.
* The host will be included as a network alias, allowing other containers in the same
* network to connect using this hostname.
*
* The listener is added to the default listeners:
* - 0.0.0.0:9092 (PLAINTEXT)
* - 0.0.0.0:9093 (BROKER)
* - 0.0.0.0:9094 (CONTROLLER)
*
* The advertised listener will use the same host:port as the listener.
*
* @param listener Listener in format "host:port" (e.g., "kafka:19092")
* @return This KafkaContainer instance for method chaining
*/
public KafkaContainer withListener(String listener);Usage:
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;
try (
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092")
.withNetwork(network);
GenericContainer<?> app = new GenericContainer<>("my-consumer-app:latest")
.withNetwork(network)
.withEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:19092")
) {
kafka.start();
app.start();
// The app container can now connect to Kafka using "kafka:19092"
}Add a custom listener with separate advertised listener for external access patterns such as proxy configurations or port forwarding scenarios.
/**
* Add a listener with custom advertised listener for external access.
* The host from the listener parameter will be included as a network alias.
*
* The listener is added to the default listeners:
* - 0.0.0.0:9092 (PLAINTEXT)
* - 0.0.0.0:9093 (BROKER)
* - 0.0.0.0:9094 (CONTROLLER)
*
* The advertised listener supplier is evaluated when the container starts,
* allowing dynamic resolution of external addresses.
*
* @param listener Listener in format "host:port" for internal network access
* @param advertisedListener Supplier providing the advertised listener address in format "host:port"
* @return This KafkaContainer instance for method chaining
*/
public KafkaContainer withListener(String listener, Supplier<String> advertisedListener);Usage:
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;
import java.util.function.Supplier;
// Using with proxy/socat for external access
try (
Network network = Network.newNetwork();
SocatContainer socat = new SocatContainer()
.withNetwork(network)
.withTarget(2000, "kafka", 19092);
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092", () -> socat.getHost() + ":" + socat.getMappedPort(2000))
.withNetwork(network)
) {
socat.start();
kafka.start();
// External clients connect via socat proxy
String externalBootstrap = socat.getHost() + ":" + socat.getMappedPort(2000);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, externalBootstrap);
// ... rest of configuration
}The container automatically configures the following environment variables for KRaft mode:
CLUSTER_ID=4L6g3nShT-eMCtK--X86sw
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_NODE_ID=1
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0The container waits for the following log message before considering the container ready:
.*Transitioning from RECOVERY to RUNNING.*This ensures the broker has completed recovery and is ready to accept connections.
You can override environment variables or add custom configuration using inherited GenericContainer methods:
import org.testcontainers.kafka.KafkaContainer;
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.withEnv("KAFKA_LOG_RETENTION_HOURS", "168")
.withEnv("KAFKA_NUM_PARTITIONS", "3");The KafkaContainer class extends GenericContainer<KafkaContainer> and inherits all lifecycle and configuration methods:
/**
* Start the container. Blocks until the container is ready based on wait strategy.
*/
public void start();
/**
* Stop the container.
*/
public void stop();
/**
* Close and remove the container. Implements AutoCloseable for use in try-with-resources.
*/
public void close();/**
* Attach the container to a specific Docker network.
*
* @param network Network to attach to
* @return This container instance for chaining
*/
public KafkaContainer withNetwork(Network network);
/**
* Add network aliases for this container.
*
* @param aliases Network alias names
* @return This container instance for chaining
*/
public KafkaContainer withNetworkAliases(String... aliases);/**
* Set an environment variable.
*
* @param key Environment variable name
* @param value Environment variable value
* @return This container instance for chaining
*/
public KafkaContainer withEnv(String key, String value);
/**
* Set multiple environment variables.
*
* @param env Map of environment variables
* @return This container instance for chaining
*/
public KafkaContainer withEnv(Map<String, String> env);/**
* Get the host that this container is running on.
*
* @return Host address
*/
public String getHost();
/**
* Get the mapped port on the host for a container port.
*
* @param originalPort Container port number
* @return Mapped host port number
*/
public Integer getMappedPort(int originalPort);
/**
* Get the Docker container ID.
*
* @return Container ID
*/
public String getContainerId();
/**
* Check if the container is currently running.
*
* @return true if running, false otherwise
*/
public Boolean isRunning();import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.containers.Network;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaIntegrationTest {
public void testKafkaProducerConsumer() throws Exception {
// Create and start Kafka container
try (KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")) {
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Configure producer
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Create producer and send message
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("test-topic", "key", "Hello Kafka");
producer.send(record).get();
}
// Configure consumer
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Create consumer and read message
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList("test-topic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
// Process records
records.forEach(record -> {
System.out.println("Received: " + record.value());
});
}
}
}
public void testKafkaWithCustomNetwork() throws Exception {
// Test with custom network and listener
try (
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092")
.withNetwork(network);
GenericContainer<?> consumerApp = new GenericContainer<>("my-consumer:latest")
.withNetwork(network)
.withEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:19092")
) {
kafka.start();
consumerApp.start();
// Consumer app can now communicate with Kafka
// using the "kafka:19092" address
}
}
}Ensure you use getBootstrapServers() to get the correct mapped port:
// Correct
String servers = kafka.getBootstrapServers(); // e.g., "localhost:54321"
// Incorrect - port 9092 is not directly accessible
String servers = "localhost:9092"; // This will failWhen connecting from another container, use withListener() and ensure both containers are on the same network:
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092") // Creates network alias "kafka"
.withNetwork(network);Only Apache Kafka images are supported. Ensure you're using:
apache/kafka:X.Y.Zapache/kafka-native:X.Y.ZFor Confluent Platform images, use ConfluentKafkaContainer instead.