or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

confluent-kafka-container.mdindex.mdkafka-container.mdlegacy-kafka-container.md
tile.json

confluent-kafka-container.mddocs/

Confluent Kafka Container

Container implementation for Confluent Platform Kafka images (confluentinc/cp-kafka) providing automated single-node broker management with KRaft consensus mode.

Capabilities

Container Creation

Create a Confluent Kafka container with Confluent Platform images version 7.4.0 or later. The container uses KRaft mode by default, eliminating the need for Zookeeper.

/**
 * Create a Confluent Kafka container with the specified Docker image name.
 * Supported images: confluentinc/cp-kafka (version 7.4.0 and later)
 *
 * @param imageName Docker image name as string (e.g., "confluentinc/cp-kafka:7.4.0")
 * @throws IllegalArgumentException if image is not compatible with confluentinc/cp-kafka
 */
public ConfluentKafkaContainer(String imageName);

/**
 * Create a Confluent Kafka container with the specified DockerImageName.
 * Supported images: confluentinc/cp-kafka (version 7.4.0 and later)
 *
 * @param dockerImageName DockerImageName object
 * @throws IllegalArgumentException if image is not compatible with confluentinc/cp-kafka
 */
public ConfluentKafkaContainer(DockerImageName dockerImageName);

Usage:

import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

// Using string
ConfluentKafkaContainer kafka1 = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0");

// Using DockerImageName
DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka:7.7.0");
ConfluentKafkaContainer kafka2 = new ConfluentKafkaContainer(imageName);

Bootstrap Servers

Get the connection string for Kafka clients to connect to the broker.

/**
 * Get the bootstrap servers connection string for Kafka clients.
 * Returns the mapped host and port that external clients should use.
 *
 * @return Bootstrap servers in format "host:port" (e.g., "localhost:54321")
 */
public String getBootstrapServers();

Usage:

import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;

try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {
    kafka.start();

    String bootstrapServers = kafka.getBootstrapServers();

    // Use with Kafka consumer
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");

    // Create consumer with bootstrap servers
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
}

Simple Network Listener

Add a custom listener for connections within the same Docker network. The host portion becomes a network alias that other containers can use to connect.

/**
 * Add a listener in format host:port for connections within the same container network.
 * The host will be included as a network alias, allowing other containers in the same
 * network to connect using this hostname.
 *
 * The listener is added to the default listeners:
 * - 0.0.0.0:9092 (PLAINTEXT)
 * - 0.0.0.0:9093 (BROKER)
 * - 0.0.0.0:9094 (CONTROLLER)
 *
 * The advertised listener will use the same host:port as the listener.
 *
 * @param listener Listener in format "host:port" (e.g., "kafka:19092")
 * @return This ConfluentKafkaContainer instance for method chaining
 */
public ConfluentKafkaContainer withListener(String listener);

Usage:

import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;

try (
    Network network = Network.newNetwork();
    ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
        .withListener("kafka:19092")
        .withNetwork(network);
    GenericContainer<?> app = new GenericContainer<>("my-producer-app:latest")
        .withNetwork(network)
        .withEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:19092")
) {
    kafka.start();
    app.start();

    // The app container can now connect to Kafka using "kafka:19092"
}

Advanced Network Listener

Add a custom listener with separate advertised listener for external access patterns such as proxy configurations or port forwarding scenarios.

/**
 * Add a listener with custom advertised listener for external access.
 * The host from the listener parameter will be included as a network alias.
 *
 * The listener is added to the default listeners:
 * - 0.0.0.0:9092 (PLAINTEXT)
 * - 0.0.0.0:9093 (BROKER)
 * - 0.0.0.0:9094 (CONTROLLER)
 *
 * The advertised listener supplier is evaluated when the container starts,
 * allowing dynamic resolution of external addresses.
 *
 * @param listener Listener in format "host:port" for internal network access
 * @param advertisedListener Supplier providing the advertised listener address in format "host:port"
 * @return This ConfluentKafkaContainer instance for method chaining
 */
public ConfluentKafkaContainer withListener(String listener, Supplier<String> advertisedListener);

Usage:

import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;
import java.util.function.Supplier;

// Using with proxy/socat for external access
try (
    Network network = Network.newNetwork();
    SocatContainer socat = new SocatContainer()
        .withNetwork(network)
        .withTarget(2000, "kafka", 19092);
    ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
        .withListener("kafka:19092", () -> socat.getHost() + ":" + socat.getMappedPort(2000))
        .withNetwork(network)
) {
    socat.start();
    kafka.start();

    // External clients connect via socat proxy
    String externalBootstrap = socat.getHost() + ":" + socat.getMappedPort(2000);
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, externalBootstrap);
    // ... rest of configuration
}

Configuration Details

Default Environment Variables

The container automatically configures the following environment variables for KRaft mode:

CLUSTER_ID=4L6g3nShT-eMCtK--X86sw
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_NODE_ID=1
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0

Wait Strategy

The container waits for the following log message before considering the container ready:

.*Transitioning from RECOVERY to RUNNING.*

This ensures the broker has completed recovery and is ready to accept connections.

Exposed Ports

  • 9092: Kafka broker port (PLAINTEXT listener)

Custom Configuration

You can override environment variables or add custom configuration using inherited GenericContainer methods:

import org.testcontainers.kafka.ConfluentKafkaContainer;

ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
    .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
    .withEnv("KAFKA_LOG_RETENTION_HOURS", "168")
    .withEnv("KAFKA_NUM_PARTITIONS", "3");

Security Configuration

SASL/PLAIN Authentication

Configure SASL/PLAIN authentication for secure client connections:

import org.testcontainers.kafka.ConfluentKafkaContainer;

String jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"admin\" " +
    "password=\"admin-secret\" " +
    "user_admin=\"admin-secret\" " +
    "user_test=\"test-secret\";";

ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0")
    .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
        "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")
    .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
    .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
    .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN")
    .withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", jaasConfig)
    .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", jaasConfig);

kafka.start();

// Client configuration
Properties clientProps = new Properties();
clientProps.put("bootstrap.servers", kafka.getBootstrapServers());
clientProps.put("security.protocol", "SASL_PLAINTEXT");
clientProps.put("sasl.mechanism", "PLAIN");
clientProps.put("sasl.jaas.config",
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"test\" password=\"test-secret\";");

SASL/SCRAM Authentication

Configure SASL/SCRAM-SHA-256 authentication by extending the container and customizing initialization:

import org.testcontainers.kafka.ConfluentKafkaContainer;
import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.utility.MountableFile;

ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0") {
    @Override
    protected void containerIsStarting(InspectContainerResponse containerInfo) {
        try {
            String command =
                "echo 'kafka-storage format --ignore-formatted -t \"" +
                "$CLUSTER_ID" +
                "\" --add-scram SCRAM-SHA-256=[name=admin,password=admin] " +
                "-c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure";
            execInContainer("bash", "-c", command);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        super.containerIsStarting(containerInfo);
    }
}
    .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
        "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")
    .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
    .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "SCRAM-SHA-256")
    .withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
    .withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf")
    .withCopyFileToContainer(
        MountableFile.forClasspathResource("kafka_server_jaas.conf"),
        "/etc/kafka/secrets/kafka_server_jaas.conf"
    );

Inherited Capabilities

The ConfluentKafkaContainer class extends GenericContainer<ConfluentKafkaContainer> and inherits all lifecycle and configuration methods:

Lifecycle Management

/**
 * Start the container. Blocks until the container is ready based on wait strategy.
 */
public void start();

/**
 * Stop the container.
 */
public void stop();

/**
 * Close and remove the container. Implements AutoCloseable for use in try-with-resources.
 */
public void close();

Network Configuration

/**
 * Attach the container to a specific Docker network.
 *
 * @param network Network to attach to
 * @return This container instance for chaining
 */
public ConfluentKafkaContainer withNetwork(Network network);

/**
 * Add network aliases for this container.
 *
 * @param aliases Network alias names
 * @return This container instance for chaining
 */
public ConfluentKafkaContainer withNetworkAliases(String... aliases);

Environment Configuration

/**
 * Set an environment variable.
 *
 * @param key Environment variable name
 * @param value Environment variable value
 * @return This container instance for chaining
 */
public ConfluentKafkaContainer withEnv(String key, String value);

/**
 * Set multiple environment variables.
 *
 * @param env Map of environment variables
 * @return This container instance for chaining
 */
public ConfluentKafkaContainer withEnv(Map<String, String> env);

File Operations

/**
 * Copy a file to the container before it starts.
 *
 * @param mountableFile File to copy
 * @param containerPath Destination path in container
 * @return This container instance for chaining
 */
public ConfluentKafkaContainer withCopyFileToContainer(
    MountableFile mountableFile,
    String containerPath
);

/**
 * Execute a command in a running container.
 *
 * @param command Command and arguments to execute
 * @return Execution result with stdout, stderr, and exit code
 * @throws IOException if execution fails
 * @throws InterruptedException if execution is interrupted
 */
public ExecResult execInContainer(String... command)
    throws IOException, InterruptedException;

Container Inspection

/**
 * Get the host that this container is running on.
 *
 * @return Host address
 */
public String getHost();

/**
 * Get the mapped port on the host for a container port.
 *
 * @param originalPort Container port number
 * @return Mapped host port number
 */
public Integer getMappedPort(int originalPort);

/**
 * Get the Docker container ID.
 *
 * @return Container ID
 */
public String getContainerId();

/**
 * Check if the container is currently running.
 *
 * @return true if running, false otherwise
 */
public Boolean isRunning();

Complete Example

import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConfluentKafkaIntegrationTest {

    public void testKafkaWithAdminClient() throws Exception {
        try (ConfluentKafkaContainer kafka =
                new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {
            kafka.start();

            String bootstrapServers = kafka.getBootstrapServers();

            // Create topic using admin client
            Properties adminProps = new Properties();
            adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

            try (AdminClient adminClient = AdminClient.create(adminProps)) {
                NewTopic topic = new NewTopic("orders", 3, (short) 1);
                adminClient.createTopics(Collections.singletonList(topic)).all().get();
            }

            // Produce messages
            Properties producerProps = new Properties();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

            try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
                for (int i = 0; i < 10; i++) {
                    ProducerRecord<String, String> record =
                        new ProducerRecord<>("orders", "order-" + i, "Order details " + i);
                    producer.send(record).get();
                }
            }

            // Consume messages
            Properties consumerProps = new Properties();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                consumer.subscribe(Collections.singletonList("orders"));

                int messagesRead = 0;
                while (messagesRead < 10) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                    records.forEach(record -> {
                        System.out.printf("Consumed: key=%s, value=%s%n",
                            record.key(), record.value());
                    });
                    messagesRead += records.count();
                }
            }
        }
    }
}

Troubleshooting

Version Compatibility

Ensure you're using Confluent Platform version 7.4.0 or later:

// Correct
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0")

// Incorrect - version too old
new ConfluentKafkaContainer("confluentinc/cp-kafka:7.3.0") // Not supported

For older Confluent Platform versions, use the deprecated org.testcontainers.containers.KafkaContainer instead.

Connection Refused Errors

Always use getBootstrapServers() to get the correct mapped port:

// Correct
String servers = kafka.getBootstrapServers(); // e.g., "localhost:54321"

// Incorrect - port 9092 is not directly accessible
String servers = "localhost:9092"; // This will fail

Security Protocol Configuration

When using SASL, ensure the security protocol matches on both server and client:

// Server configuration
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
    "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")

// Client configuration must match
clientProps.put("security.protocol", "SASL_PLAINTEXT");

Image Compatibility

Only Confluent Platform images are supported. Ensure you're using:

  • confluentinc/cp-kafka:7.4.0 or later

For Apache Kafka images, use KafkaContainer instead.