Status: DEPRECATED - This container is deprecated. Use org.testcontainers.kafka.ConfluentKafkaContainer or org.testcontainers.kafka.KafkaContainer instead.
Legacy container implementation for Confluent Platform images supporting embedded Zookeeper, external Zookeeper, or KRaft mode. Maintained for backward compatibility with existing code.
Create a legacy Kafka container with Confluent Platform images. The default mode uses embedded Zookeeper.
/**
* Create a Kafka container with the specified DockerImageName.
* Supported images: confluentinc/cp-kafka
*
* @param dockerImageName DockerImageName object
* @throws IllegalArgumentException if image is not compatible with confluentinc/cp-kafka
*/
public KafkaContainer(DockerImageName dockerImageName);Usage:
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
// Using DockerImageName
DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka:6.2.1");
KafkaContainer kafka = new KafkaContainer(imageName);Note: The deprecated constructors without parameters or with version strings are also available but should not be used:
/**
* @deprecated Use KafkaContainer(DockerImageName) instead
*/
@Deprecated
public KafkaContainer();
/**
* @deprecated Use KafkaContainer(DockerImageName) instead
*/
@Deprecated
public KafkaContainer(String confluentPlatformVersion);Get the connection string for Kafka clients to connect to the broker. Note the format difference from modern containers.
/**
* Get the bootstrap servers connection string for Kafka clients.
* Returns the mapped host and port with PLAINTEXT:// protocol prefix.
*
* @return Bootstrap servers in format "PLAINTEXT://host:port" (e.g., "PLAINTEXT://localhost:54321")
*/
public String getBootstrapServers();Usage:
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))) {
kafka.start();
// Note: includes "PLAINTEXT://" prefix
String bootstrapServers = kafka.getBootstrapServers();
// Example: "PLAINTEXT://localhost:54321"
}Configure the container to use embedded Zookeeper (default mode). Cannot be used with KRaft mode.
/**
* Configure to use embedded Zookeeper (default mode).
* The container will start Zookeeper internally on port 2181.
* Cannot be combined with withKraft() or withExternalZookeeper().
*
* @return This KafkaContainer instance for method chaining
* @throws IllegalStateException if KRaft mode is already enabled
*/
public KafkaContainer withEmbeddedZookeeper();Usage:
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
// Embedded Zookeeper is the default, explicit call is optional
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withEmbeddedZookeeper()) {
kafka.start();
// Kafka uses internal Zookeeper on port 2181
}Configure the container to connect to an external Zookeeper instance. Cannot be used with KRaft mode.
/**
* Configure to use external Zookeeper.
* The container will connect to the specified Zookeeper instance.
* Cannot be combined with withKraft() or withEmbeddedZookeeper().
*
* @param connectString Zookeeper connection string (e.g., "zookeeper:2181")
* @return This KafkaContainer instance for method chaining
* @throws IllegalStateException if KRaft mode is already enabled
*/
public KafkaContainer withExternalZookeeper(String connectString);Usage:
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
try (
Network network = Network.newNetwork();
GenericContainer<?> zookeeper = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-zookeeper:4.0.0"))
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181")
) {
zookeeper.start();
kafka.start();
// Kafka connects to external Zookeeper
}Enable KRaft mode for Zookeeper-less operation. Requires Confluent Platform 7.0.0 or later. Cannot be used with Zookeeper configuration.
/**
* Enable KRaft mode (requires Confluent Platform 7.0.0 or above).
* KRaft provides Zookeeper-less Kafka operation using Kafka's native consensus protocol.
* Cannot be combined with withEmbeddedZookeeper() or withExternalZookeeper().
*
* @return This KafkaContainer instance for method chaining
* @throws IllegalArgumentException if version is less than 7.0.0
* @throws IllegalStateException if Zookeeper is already configured
*/
public KafkaContainer withKraft();Usage:
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withKraft()) {
kafka.start();
// Kafka runs in KRaft mode without Zookeeper
}Set a custom cluster ID for KRaft mode instead of using the default cluster ID.
/**
* Set custom cluster ID for KRaft mode.
* Only used when KRaft mode is enabled.
*
* @param clusterId Cluster ID string
* @return This KafkaContainer instance for method chaining
* @throws NullPointerException if clusterId is null
*/
public KafkaContainer withClusterId(String clusterId);Usage:
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withKraft()
.withClusterId("my-custom-cluster-id")) {
kafka.start();
// Kafka uses custom cluster ID
}Add a custom listener for connections within Docker networks or external access patterns.
/**
* Add a listener supplier for custom network access.
* The supplier is evaluated when the container starts, allowing dynamic address resolution.
* The host from the listener will be added as a network alias.
*
* The listener is added to the default listeners:
* - 0.0.0.0:9093 (PLAINTEXT)
* - 0.0.0.0:9092 (BROKER)
*
* @param listenerSupplier Supplier providing listener in format "host:port"
* @return This KafkaContainer instance for method chaining
*/
public KafkaContainer withListener(Supplier<String> listenerSupplier);Usage:
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.function.Supplier;
try (
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withKraft()
.withListener(() -> "kafka:19092")
.withNetwork(network)
) {
kafka.start();
// Other containers in the network can connect to kafka:19092
}The legacy container exposes several public constants:
/**
* Kafka broker port for the PLAINTEXT listener.
*/
public static final int KAFKA_PORT = 9093;
/**
* Zookeeper port when embedded Zookeeper is used.
*/
public static final int ZOOKEEPER_PORT = 2181;
/**
* Default cluster ID used for KRaft mode.
*/
public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";When using embedded or external Zookeeper:
KAFKA_BROKER_ID=1
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_ZOOKEEPER_CONNECT=localhost:2181 (or external address)
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0When using KRaft mode:
CLUSTER_ID=4L6g3nShT-eMCtK--X86sw (or custom cluster ID)
KAFKA_NODE_ID=1
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9094
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0".*\\[KafkaServer id=\\d+\\] started.*"".*Transitioning from RECOVERY to RUNNING.*"The container enforces mutual exclusivity between Zookeeper and KRaft modes:
// Valid configurations
kafka.withEmbeddedZookeeper(); // Default
kafka.withExternalZookeeper("zk:2181");
kafka.withKraft(); // Requires 7.0.0+
// Invalid - throws IllegalStateException
kafka.withKraft().withEmbeddedZookeeper(); // Cannot use both
kafka.withKraft().withExternalZookeeper("zk:2181"); // Cannot use both
kafka.withExternalZookeeper("zk:2181").withKraft(); // Cannot use bothimport org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public void testWithEmbeddedZookeeper() throws Exception {
try (KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))) {
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Example: "PLAINTEXT://localhost:54321"
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("test-topic", "key", "value");
producer.send(record).get();
}
}
}import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
public void testWithKraft() throws Exception {
try (KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withKraft()) {
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Use with Kafka clients (no Zookeeper required)
}
}import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
public void testWithExternalZookeeper() throws Exception {
try (
Network network = Network.newNetwork();
GenericContainer<?> zookeeper = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-zookeeper:6.2.1"))
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181")
.withEnv("ZOOKEEPER_TICK_TIME", "2000");
KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(network)
.withExternalZookeeper("zookeeper:2181")
) {
zookeeper.start();
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Kafka is connected to external Zookeeper
}
}import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
public void testWithCustomListener() throws Exception {
try (
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withKraft()
.withListener(() -> "kafka:19092")
.withNetwork(network);
GenericContainer<?> consumer = new GenericContainer<>("my-consumer:latest")
.withNetwork(network)
) {
kafka.start();
consumer.start();
// Consumer can connect to kafka:19092 within the network
}
}Recommended: Migrate to org.testcontainers.kafka.ConfluentKafkaContainer or org.testcontainers.kafka.KafkaContainer.
Key differences to consider:
// Old (deprecated)
import org.testcontainers.containers.KafkaContainer;
// New
import org.testcontainers.kafka.ConfluentKafkaContainer;// Old (deprecated) - includes protocol prefix
String servers = kafka.getBootstrapServers(); // "PLAINTEXT://localhost:9093"
// New - no protocol prefix
String servers = kafka.getBootstrapServers(); // "localhost:9092"// Old (deprecated)
int port = KafkaContainer.KAFKA_PORT; // 9093
// New
int port = 9092; // Default for modern containers// Old (deprecated) - uses Supplier
kafka.withListener(() -> "kafka:19092");
// New - direct string
kafka.withListener("kafka:19092");// Old (deprecated) - embedded Zookeeper by default
KafkaContainer kafka = new KafkaContainer(...); // Uses Zookeeper
// New - KRaft by default
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(...); // Uses KRaftKRaft mode requires Confluent Platform 7.0.0 or later:
// Correct
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft()
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
// Incorrect - throws IllegalArgumentException
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")).withKraft()Cannot mix Zookeeper and KRaft configurations:
// These throw IllegalStateException
kafka.withKraft().withEmbeddedZookeeper()
kafka.withKraft().withExternalZookeeper("zk:2181")
kafka.withExternalZookeeper("zk:2181").withKraft()Remember that the deprecated container includes "PLAINTEXT://" in the bootstrap servers:
String servers = kafka.getBootstrapServers();
// Returns: "PLAINTEXT://localhost:54321"
// If needed, remove prefix for compatibility
String serversWithoutProtocol = servers.replace("PLAINTEXT://", "");The deprecated container uses port 9093 by default, while modern containers use 9092:
// Deprecated container
KafkaContainer.KAFKA_PORT // 9093
// Modern containers use 9092