Container implementation for Confluent Platform Kafka images (confluentinc/cp-kafka) providing automated single-node broker management with KRaft consensus mode.
Create a Confluent Kafka container with Confluent Platform images version 7.4.0 or later. The container uses KRaft mode by default, eliminating the need for Zookeeper.
/**
* Create a Confluent Kafka container with the specified Docker image name.
* Supported images: confluentinc/cp-kafka (version 7.4.0 and later)
*
* @param imageName Docker image name as string (e.g., "confluentinc/cp-kafka:7.4.0")
* @throws IllegalArgumentException if image is not compatible with confluentinc/cp-kafka
*/
public ConfluentKafkaContainer(String imageName);
/**
* Create a Confluent Kafka container with the specified DockerImageName.
* Supported images: confluentinc/cp-kafka (version 7.4.0 and later)
*
* @param dockerImageName DockerImageName object
* @throws IllegalArgumentException if image is not compatible with confluentinc/cp-kafka
*/
public ConfluentKafkaContainer(DockerImageName dockerImageName);Usage:
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;
// Using string
ConfluentKafkaContainer kafka1 = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0");
// Using DockerImageName
DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka:7.7.0");
ConfluentKafkaContainer kafka2 = new ConfluentKafkaContainer(imageName);Get the connection string for Kafka clients to connect to the broker.
/**
* Get the bootstrap servers connection string for Kafka clients.
* Returns the mapped host and port that external clients should use.
*
* @return Bootstrap servers in format "host:port" (e.g., "localhost:54321")
*/
public String getBootstrapServers();Usage:
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Use with Kafka consumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// Create consumer with bootstrap servers
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
}Add a custom listener for connections within the same Docker network. The host portion becomes a network alias that other containers can use to connect.
/**
* Add a listener in format host:port for connections within the same container network.
* The host will be included as a network alias, allowing other containers in the same
* network to connect using this hostname.
*
* The listener is added to the default listeners:
* - 0.0.0.0:9092 (PLAINTEXT)
* - 0.0.0.0:9093 (BROKER)
* - 0.0.0.0:9094 (CONTROLLER)
*
* The advertised listener will use the same host:port as the listener.
*
* @param listener Listener in format "host:port" (e.g., "kafka:19092")
* @return This ConfluentKafkaContainer instance for method chaining
*/
public ConfluentKafkaContainer withListener(String listener);Usage:
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;
try (
Network network = Network.newNetwork();
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
.withListener("kafka:19092")
.withNetwork(network);
GenericContainer<?> app = new GenericContainer<>("my-producer-app:latest")
.withNetwork(network)
.withEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:19092")
) {
kafka.start();
app.start();
// The app container can now connect to Kafka using "kafka:19092"
}Add a custom listener with separate advertised listener for external access patterns such as proxy configurations or port forwarding scenarios.
/**
* Add a listener with custom advertised listener for external access.
* The host from the listener parameter will be included as a network alias.
*
* The listener is added to the default listeners:
* - 0.0.0.0:9092 (PLAINTEXT)
* - 0.0.0.0:9093 (BROKER)
* - 0.0.0.0:9094 (CONTROLLER)
*
* The advertised listener supplier is evaluated when the container starts,
* allowing dynamic resolution of external addresses.
*
* @param listener Listener in format "host:port" for internal network access
* @param advertisedListener Supplier providing the advertised listener address in format "host:port"
* @return This ConfluentKafkaContainer instance for method chaining
*/
public ConfluentKafkaContainer withListener(String listener, Supplier<String> advertisedListener);Usage:
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;
import java.util.function.Supplier;
// Using with proxy/socat for external access
try (
Network network = Network.newNetwork();
SocatContainer socat = new SocatContainer()
.withNetwork(network)
.withTarget(2000, "kafka", 19092);
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
.withListener("kafka:19092", () -> socat.getHost() + ":" + socat.getMappedPort(2000))
.withNetwork(network)
) {
socat.start();
kafka.start();
// External clients connect via socat proxy
String externalBootstrap = socat.getHost() + ":" + socat.getMappedPort(2000);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, externalBootstrap);
// ... rest of configuration
}The container automatically configures the following environment variables for KRaft mode:
CLUSTER_ID=4L6g3nShT-eMCtK--X86sw
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_NODE_ID=1
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0The container waits for the following log message before considering the container ready:
.*Transitioning from RECOVERY to RUNNING.*This ensures the broker has completed recovery and is ready to accept connections.
You can override environment variables or add custom configuration using inherited GenericContainer methods:
import org.testcontainers.kafka.ConfluentKafkaContainer;
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.withEnv("KAFKA_LOG_RETENTION_HOURS", "168")
.withEnv("KAFKA_NUM_PARTITIONS", "3");Configure SASL/PLAIN authentication for secure client connections:
import org.testcontainers.kafka.ConfluentKafkaContainer;
String jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\" " +
"user_admin=\"admin-secret\" " +
"user_test=\"test-secret\";";
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
.withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN")
.withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", jaasConfig)
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", jaasConfig);
kafka.start();
// Client configuration
Properties clientProps = new Properties();
clientProps.put("bootstrap.servers", kafka.getBootstrapServers());
clientProps.put("security.protocol", "SASL_PLAINTEXT");
clientProps.put("sasl.mechanism", "PLAIN");
clientProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"test\" password=\"test-secret\";");Configure SASL/SCRAM-SHA-256 authentication by extending the container and customizing initialization:
import org.testcontainers.kafka.ConfluentKafkaContainer;
import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.utility.MountableFile;
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0") {
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
try {
String command =
"echo 'kafka-storage format --ignore-formatted -t \"" +
"$CLUSTER_ID" +
"\" --add-scram SCRAM-SHA-256=[name=admin,password=admin] " +
"-c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure";
execInContainer("bash", "-c", command);
} catch (Exception e) {
throw new RuntimeException(e);
}
super.containerIsStarting(containerInfo);
}
}
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "SCRAM-SHA-256")
.withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
.withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf")
.withCopyFileToContainer(
MountableFile.forClasspathResource("kafka_server_jaas.conf"),
"/etc/kafka/secrets/kafka_server_jaas.conf"
);The ConfluentKafkaContainer class extends GenericContainer<ConfluentKafkaContainer> and inherits all lifecycle and configuration methods:
/**
* Start the container. Blocks until the container is ready based on wait strategy.
*/
public void start();
/**
* Stop the container.
*/
public void stop();
/**
* Close and remove the container. Implements AutoCloseable for use in try-with-resources.
*/
public void close();/**
* Attach the container to a specific Docker network.
*
* @param network Network to attach to
* @return This container instance for chaining
*/
public ConfluentKafkaContainer withNetwork(Network network);
/**
* Add network aliases for this container.
*
* @param aliases Network alias names
* @return This container instance for chaining
*/
public ConfluentKafkaContainer withNetworkAliases(String... aliases);/**
* Set an environment variable.
*
* @param key Environment variable name
* @param value Environment variable value
* @return This container instance for chaining
*/
public ConfluentKafkaContainer withEnv(String key, String value);
/**
* Set multiple environment variables.
*
* @param env Map of environment variables
* @return This container instance for chaining
*/
public ConfluentKafkaContainer withEnv(Map<String, String> env);/**
* Copy a file to the container before it starts.
*
* @param mountableFile File to copy
* @param containerPath Destination path in container
* @return This container instance for chaining
*/
public ConfluentKafkaContainer withCopyFileToContainer(
MountableFile mountableFile,
String containerPath
);
/**
* Execute a command in a running container.
*
* @param command Command and arguments to execute
* @return Execution result with stdout, stderr, and exit code
* @throws IOException if execution fails
* @throws InterruptedException if execution is interrupted
*/
public ExecResult execInContainer(String... command)
throws IOException, InterruptedException;/**
* Get the host that this container is running on.
*
* @return Host address
*/
public String getHost();
/**
* Get the mapped port on the host for a container port.
*
* @param originalPort Container port number
* @return Mapped host port number
*/
public Integer getMappedPort(int originalPort);
/**
* Get the Docker container ID.
*
* @return Container ID
*/
public String getContainerId();
/**
* Check if the container is currently running.
*
* @return true if running, false otherwise
*/
public Boolean isRunning();import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConfluentKafkaIntegrationTest {
public void testKafkaWithAdminClient() throws Exception {
try (ConfluentKafkaContainer kafka =
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();
// Create topic using admin client
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient adminClient = AdminClient.create(adminProps)) {
NewTopic topic = new NewTopic("orders", 3, (short) 1);
adminClient.createTopics(Collections.singletonList(topic)).all().get();
}
// Produce messages
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "order-" + i, "Order details " + i);
producer.send(record).get();
}
}
// Consume messages
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList("orders"));
int messagesRead = 0;
while (messagesRead < 10) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
records.forEach(record -> {
System.out.printf("Consumed: key=%s, value=%s%n",
record.key(), record.value());
});
messagesRead += records.count();
}
}
}
}
}Ensure you're using Confluent Platform version 7.4.0 or later:
// Correct
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0")
// Incorrect - version too old
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.3.0") // Not supportedFor older Confluent Platform versions, use the deprecated org.testcontainers.containers.KafkaContainer instead.
Always use getBootstrapServers() to get the correct mapped port:
// Correct
String servers = kafka.getBootstrapServers(); // e.g., "localhost:54321"
// Incorrect - port 9092 is not directly accessible
String servers = "localhost:9092"; // This will failWhen using SASL, ensure the security protocol matches on both server and client:
// Server configuration
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")
// Client configuration must match
clientProps.put("security.protocol", "SASL_PLAINTEXT");Only Confluent Platform images are supported. Ensure you're using:
confluentinc/cp-kafka:7.4.0 or laterFor Apache Kafka images, use KafkaContainer instead.