Testcontainers Kafka module provides automated management of Apache Kafka containers for testing in Java applications. It eliminates the need for external Kafka installations and manages the complete container lifecycle including broker setup, network configuration, and cleanup.
org.testcontainers:kafka:1.21.4Maven:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.21.4</version>
<scope>test</scope>
</dependency>Gradle:
testImplementation 'org.testcontainers:kafka:1.21.4'import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.kafka.ConfluentKafkaContainer;For deprecated legacy container:
import org.testcontainers.containers.KafkaContainer;import org.testcontainers.kafka.KafkaContainer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
try (KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")) {
kafka.start();
// Get bootstrap servers for connecting clients
String bootstrapServers = kafka.getBootstrapServers();
// Configure Kafka producer
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// Configure Kafka consumer
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
}import org.testcontainers.kafka.ConfluentKafkaContainer;
try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Use bootstrapServers with Kafka clients
}The Kafka module provides three container classes supporting different Kafka distributions:
org.testcontainers.kafka.KafkaContainer: Modern container for Apache Kafka images (apache/kafka, apache/kafka-native). Uses KRaft mode by default, eliminating Zookeeper dependency.
org.testcontainers.kafka.ConfluentKafkaContainer: Modern container for Confluent Platform images (confluentinc/cp-kafka version 7.4.0+). Uses KRaft mode by default.
org.testcontainers.containers.KafkaContainer: Deprecated legacy container for Confluent Platform images. Supports embedded Zookeeper, external Zookeeper, or KRaft mode (7.0.0+).
All containers extend GenericContainer from the Testcontainers core library, inheriting lifecycle management, network configuration, and container customization capabilities.
Default Configuration:
Container implementation for official Apache Kafka images providing single-node broker setup with KRaft mode and customizable network listeners.
public class KafkaContainer extends GenericContainer<KafkaContainer> {
/**
* Create a Kafka container with the specified Docker image name.
*
* @param imageName Docker image name (e.g., "apache/kafka-native:3.8.0")
*/
public KafkaContainer(String imageName);
/**
* Create a Kafka container with the specified DockerImageName.
*
* @param dockerImageName DockerImageName object
*/
public KafkaContainer(DockerImageName dockerImageName);
/**
* Add a listener in format host:port for connections within the same container network.
* The host will be included as a network alias.
*
* @param listener Listener in format "host:port"
* @return This KafkaContainer instance for chaining
*/
public KafkaContainer withListener(String listener);
/**
* Add a listener with custom advertised listener for external access.
*
* @param listener Listener in format "host:port"
* @param advertisedListener Supplier providing the advertised listener address
* @return This KafkaContainer instance for chaining
*/
public KafkaContainer withListener(String listener, Supplier<String> advertisedListener);
/**
* Get the bootstrap servers connection string for Kafka clients.
*
* @return Bootstrap servers in format "host:port"
*/
public String getBootstrapServers();
}Supported Images:
apache/kafkaapache/kafka-nativeDefault Port: 9092
Apache Kafka Container Details
Container implementation for Confluent Platform Kafka images providing single-node broker setup with KRaft mode and customizable network listeners.
public class ConfluentKafkaContainer extends GenericContainer<ConfluentKafkaContainer> {
/**
* Create a Confluent Kafka container with the specified Docker image name.
*
* @param imageName Docker image name (e.g., "confluentinc/cp-kafka:7.4.0")
*/
public ConfluentKafkaContainer(String imageName);
/**
* Create a Confluent Kafka container with the specified DockerImageName.
*
* @param dockerImageName DockerImageName object
*/
public ConfluentKafkaContainer(DockerImageName dockerImageName);
/**
* Add a listener in format host:port for connections within the same container network.
* The host will be included as a network alias.
*
* @param listener Listener in format "host:port"
* @return This ConfluentKafkaContainer instance for chaining
*/
public ConfluentKafkaContainer withListener(String listener);
/**
* Add a listener with custom advertised listener for external access.
*
* @param listener Listener in format "host:port"
* @param advertisedListener Supplier providing the advertised listener address
* @return This ConfluentKafkaContainer instance for chaining
*/
public ConfluentKafkaContainer withListener(String listener, Supplier<String> advertisedListener);
/**
* Get the bootstrap servers connection string for Kafka clients.
*
* @return Bootstrap servers in format "host:port"
*/
public String getBootstrapServers();
}Supported Images:
confluentinc/cp-kafka (version 7.4.0 and later)Default Port: 9092
Confluent Kafka Container Details
Legacy container implementation for Confluent Platform images with support for Zookeeper and KRaft modes. This class is deprecated and should not be used in new projects.
/**
* @deprecated Use org.testcontainers.kafka.ConfluentKafkaContainer
* or org.testcontainers.kafka.KafkaContainer instead
*/
@Deprecated
public class KafkaContainer extends GenericContainer<KafkaContainer> {
/**
* Create a Kafka container with the specified DockerImageName.
*
* @param dockerImageName DockerImageName object
*/
public KafkaContainer(DockerImageName dockerImageName);
/**
* Configure to use embedded Zookeeper (default mode).
* Cannot be used with KRaft mode.
*
* @return This KafkaContainer instance for chaining
*/
public KafkaContainer withEmbeddedZookeeper();
/**
* Configure to use external Zookeeper.
* Cannot be used with KRaft mode.
*
* @param connectString Zookeeper connection string
* @return This KafkaContainer instance for chaining
*/
public KafkaContainer withExternalZookeeper(String connectString);
/**
* Enable KRaft mode (requires Confluent Platform 7.0.0 or above).
* Cannot be used with Zookeeper configuration.
*
* @return This KafkaContainer instance for chaining
*/
public KafkaContainer withKraft();
/**
* Set custom cluster ID for KRaft mode.
*
* @param clusterId Cluster ID string
* @return This KafkaContainer instance for chaining
*/
public KafkaContainer withClusterId(String clusterId);
/**
* Add a listener supplier for custom network access.
*
* @param listenerSupplier Supplier providing listener in format "host:port"
* @return This KafkaContainer instance for chaining
*/
public KafkaContainer withListener(Supplier<String> listenerSupplier);
/**
* Get the bootstrap servers connection string for Kafka clients.
*
* @return Bootstrap servers in format "PLAINTEXT://host:port"
*/
public String getBootstrapServers();
}Public Constants:
public static final int KAFKA_PORT = 9093;
public static final int ZOOKEEPER_PORT = 2181;
public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";Supported Images:
confluentinc/cp-kafkaDefault Ports:
Legacy Kafka Container Details
When testing with multiple containers or proxy access patterns:
import org.testcontainers.containers.Network;
import org.testcontainers.kafka.KafkaContainer;
try (
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092")
.withNetwork(network);
GenericContainer<?> consumerApp = new GenericContainer<>("my-app:latest")
.withNetwork(network)
) {
kafka.start();
consumerApp.start();
// Consumer app can connect to kafka:19092 within the network
}Both modern and legacy containers support SASL authentication through environment variables:
import org.testcontainers.kafka.ConfluentKafkaContainer;
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
.withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN")
.withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", jaasConfig)
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", jaasConfig);/**
* Docker image name with optional registry, repository, and tag.
* Part of Testcontainers core library.
*/
public class DockerImageName {
/**
* Parse a Docker image name from a string.
*
* @param imageName Image name string (e.g., "apache/kafka:3.8.0")
* @return DockerImageName instance
*/
public static DockerImageName parse(String imageName);
}All Kafka containers extend GenericContainer<SELF> and inherit lifecycle and configuration methods:
public class GenericContainer<SELF extends GenericContainer<SELF>> {
// Lifecycle methods
public void start();
public void stop();
public void close(); // AutoCloseable
// Configuration methods
public SELF withEnv(String key, String value);
public SELF withEnv(Map<String, String> env);
public SELF withNetwork(Network network);
public SELF withNetworkAliases(String... aliases);
public SELF withExposedPorts(Integer... ports);
public SELF withCommand(String... command);
public SELF withCopyFileToContainer(MountableFile mountableFile, String containerPath);
public SELF waitingFor(WaitStrategy waitStrategy);
// Query methods
public String getHost();
public Integer getMappedPort(int originalPort);
public String getContainerId();
public String getDockerImageName();
public Boolean isRunning();
// Execution methods
public ExecResult execInContainer(String... command) throws IOException, InterruptedException;
public void copyFileToContainer(Transferable transferable, String containerPath);
}/**
* Docker network for connecting multiple containers.
* Part of Testcontainers core library.
*/
public interface Network extends AutoCloseable {
/**
* Create a new Docker network.
*
* @return Network instance
*/
public static Network newNetwork();
public void close();
}/**
* Java functional interface for lazy value provision.
* Part of java.util.function package.
*/
@FunctionalInterface
public interface Supplier<T> {
T get();
}From deprecated org.testcontainers.containers.KafkaContainer to modern containers:
// Old (deprecated)
import org.testcontainers.containers.KafkaContainer;
// New - for Confluent Platform
import org.testcontainers.kafka.ConfluentKafkaContainer;
// New - for Apache Kafka
import org.testcontainers.kafka.KafkaContainer;// Old (deprecated) - includes protocol prefix
String servers = kafka.getBootstrapServers(); // "PLAINTEXT://host:9093"
// New - no protocol prefix
String servers = kafka.getBootstrapServers(); // "host:9092"// Old (deprecated)
int port = KafkaContainer.KAFKA_PORT; // 9093
// New
int port = 9092; // Default port for new containers// Old (deprecated) - uses Supplier
kafka.withListener(() -> "kafka:19092");
// New - direct string for simple cases
kafka.withListener("kafka:19092");
// New - with advertised listener for external access
kafka.withListener("kafka:19092", () -> proxyHost + ":" + proxyPort);// Old (deprecated) - Zookeeper options
kafka.withEmbeddedZookeeper();
kafka.withExternalZookeeper("zookeeper:2181");
// New - no Zookeeper configuration needed, KRaft is default
// Just create and start the container