or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

confluent-kafka-container.mdindex.mdkafka-container.mdlegacy-kafka-container.md
tile.json

legacy-kafka-container.mddocs/

Legacy Kafka Container (Deprecated)

Status: DEPRECATED - This container is deprecated. Use org.testcontainers.kafka.ConfluentKafkaContainer or org.testcontainers.kafka.KafkaContainer instead.

Legacy container implementation for Confluent Platform images supporting embedded Zookeeper, external Zookeeper, or KRaft mode. Maintained for backward compatibility with existing code.

Capabilities

Container Creation

Create a legacy Kafka container with Confluent Platform images. The default mode uses embedded Zookeeper.

/**
 * Create a Kafka container with the specified DockerImageName.
 * Supported images: confluentinc/cp-kafka
 *
 * @param dockerImageName DockerImageName object
 * @throws IllegalArgumentException if image is not compatible with confluentinc/cp-kafka
 */
public KafkaContainer(DockerImageName dockerImageName);

Usage:

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

// Using DockerImageName
DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka:6.2.1");
KafkaContainer kafka = new KafkaContainer(imageName);

Note: The deprecated constructors without parameters or with version strings are also available but should not be used:

/**
 * @deprecated Use KafkaContainer(DockerImageName) instead
 */
@Deprecated
public KafkaContainer();

/**
 * @deprecated Use KafkaContainer(DockerImageName) instead
 */
@Deprecated
public KafkaContainer(String confluentPlatformVersion);

Bootstrap Servers

Get the connection string for Kafka clients to connect to the broker. Note the format difference from modern containers.

/**
 * Get the bootstrap servers connection string for Kafka clients.
 * Returns the mapped host and port with PLAINTEXT:// protocol prefix.
 *
 * @return Bootstrap servers in format "PLAINTEXT://host:port" (e.g., "PLAINTEXT://localhost:54321")
 */
public String getBootstrapServers();

Usage:

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))) {
    kafka.start();

    // Note: includes "PLAINTEXT://" prefix
    String bootstrapServers = kafka.getBootstrapServers();
    // Example: "PLAINTEXT://localhost:54321"
}

Embedded Zookeeper Mode

Configure the container to use embedded Zookeeper (default mode). Cannot be used with KRaft mode.

/**
 * Configure to use embedded Zookeeper (default mode).
 * The container will start Zookeeper internally on port 2181.
 * Cannot be combined with withKraft() or withExternalZookeeper().
 *
 * @return This KafkaContainer instance for method chaining
 * @throws IllegalStateException if KRaft mode is already enabled
 */
public KafkaContainer withEmbeddedZookeeper();

Usage:

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

// Embedded Zookeeper is the default, explicit call is optional
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
        .withEmbeddedZookeeper()) {
    kafka.start();

    // Kafka uses internal Zookeeper on port 2181
}

External Zookeeper Mode

Configure the container to connect to an external Zookeeper instance. Cannot be used with KRaft mode.

/**
 * Configure to use external Zookeeper.
 * The container will connect to the specified Zookeeper instance.
 * Cannot be combined with withKraft() or withEmbeddedZookeeper().
 *
 * @param connectString Zookeeper connection string (e.g., "zookeeper:2181")
 * @return This KafkaContainer instance for method chaining
 * @throws IllegalStateException if KRaft mode is already enabled
 */
public KafkaContainer withExternalZookeeper(String connectString);

Usage:

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

try (
    Network network = Network.newNetwork();
    GenericContainer<?> zookeeper = new GenericContainer<>(
            DockerImageName.parse("confluentinc/cp-zookeeper:4.0.0"))
        .withNetwork(network)
        .withNetworkAliases("zookeeper")
        .withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
        .withNetwork(network)
        .withExternalZookeeper("zookeeper:2181")
) {
    zookeeper.start();
    kafka.start();

    // Kafka connects to external Zookeeper
}

KRaft Mode

Enable KRaft mode for Zookeeper-less operation. Requires Confluent Platform 7.0.0 or later. Cannot be used with Zookeeper configuration.

/**
 * Enable KRaft mode (requires Confluent Platform 7.0.0 or above).
 * KRaft provides Zookeeper-less Kafka operation using Kafka's native consensus protocol.
 * Cannot be combined with withEmbeddedZookeeper() or withExternalZookeeper().
 *
 * @return This KafkaContainer instance for method chaining
 * @throws IllegalArgumentException if version is less than 7.0.0
 * @throws IllegalStateException if Zookeeper is already configured
 */
public KafkaContainer withKraft();

Usage:

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
        .withKraft()) {
    kafka.start();

    // Kafka runs in KRaft mode without Zookeeper
}

Custom Cluster ID

Set a custom cluster ID for KRaft mode instead of using the default cluster ID.

/**
 * Set custom cluster ID for KRaft mode.
 * Only used when KRaft mode is enabled.
 *
 * @param clusterId Cluster ID string
 * @return This KafkaContainer instance for method chaining
 * @throws NullPointerException if clusterId is null
 */
public KafkaContainer withClusterId(String clusterId);

Usage:

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
        .withKraft()
        .withClusterId("my-custom-cluster-id")) {
    kafka.start();

    // Kafka uses custom cluster ID
}

Custom Network Listener

Add a custom listener for connections within Docker networks or external access patterns.

/**
 * Add a listener supplier for custom network access.
 * The supplier is evaluated when the container starts, allowing dynamic address resolution.
 * The host from the listener will be added as a network alias.
 *
 * The listener is added to the default listeners:
 * - 0.0.0.0:9093 (PLAINTEXT)
 * - 0.0.0.0:9092 (BROKER)
 *
 * @param listenerSupplier Supplier providing listener in format "host:port"
 * @return This KafkaContainer instance for method chaining
 */
public KafkaContainer withListener(Supplier<String> listenerSupplier);

Usage:

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.function.Supplier;

try (
    Network network = Network.newNetwork();
    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
        .withKraft()
        .withListener(() -> "kafka:19092")
        .withNetwork(network)
) {
    kafka.start();

    // Other containers in the network can connect to kafka:19092
}

Constants

The legacy container exposes several public constants:

/**
 * Kafka broker port for the PLAINTEXT listener.
 */
public static final int KAFKA_PORT = 9093;

/**
 * Zookeeper port when embedded Zookeeper is used.
 */
public static final int ZOOKEEPER_PORT = 2181;

/**
 * Default cluster ID used for KRaft mode.
 */
public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

Configuration Details

Default Environment Variables (Zookeeper Mode)

When using embedded or external Zookeeper:

KAFKA_BROKER_ID=1
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_ZOOKEEPER_CONNECT=localhost:2181  (or external address)
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0

Default Environment Variables (KRaft Mode)

When using KRaft mode:

CLUSTER_ID=4L6g3nShT-eMCtK--X86sw  (or custom cluster ID)
KAFKA_NODE_ID=1
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9094
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0

Wait Strategy

  • Zookeeper mode: Waits for log message ".*\\[KafkaServer id=\\d+\\] started.*"
  • KRaft mode: Waits for log message ".*Transitioning from RECOVERY to RUNNING.*"

Exposed Ports

  • 9093: Kafka broker port (PLAINTEXT listener)
  • 2181: Zookeeper port (only when embedded Zookeeper is used)

Mode Constraints

The container enforces mutual exclusivity between Zookeeper and KRaft modes:

// Valid configurations
kafka.withEmbeddedZookeeper();  // Default
kafka.withExternalZookeeper("zk:2181");
kafka.withKraft();  // Requires 7.0.0+

// Invalid - throws IllegalStateException
kafka.withKraft().withEmbeddedZookeeper();  // Cannot use both
kafka.withKraft().withExternalZookeeper("zk:2181");  // Cannot use both
kafka.withExternalZookeeper("zk:2181").withKraft();  // Cannot use both

Complete Examples

Basic Usage with Embedded Zookeeper

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public void testWithEmbeddedZookeeper() throws Exception {
    try (KafkaContainer kafka = new KafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))) {
        kafka.start();

        String bootstrapServers = kafka.getBootstrapServers();
        // Example: "PLAINTEXT://localhost:54321"

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, String> record =
                new ProducerRecord<>("test-topic", "key", "value");
            producer.send(record).get();
        }
    }
}

Usage with KRaft Mode

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public void testWithKraft() throws Exception {
    try (KafkaContainer kafka = new KafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
            .withKraft()) {
        kafka.start();

        String bootstrapServers = kafka.getBootstrapServers();
        // Use with Kafka clients (no Zookeeper required)
    }
}

Usage with External Zookeeper

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

public void testWithExternalZookeeper() throws Exception {
    try (
        Network network = Network.newNetwork();
        GenericContainer<?> zookeeper = new GenericContainer<>(
                DockerImageName.parse("confluentinc/cp-zookeeper:6.2.1"))
            .withNetwork(network)
            .withNetworkAliases("zookeeper")
            .withEnv("ZOOKEEPER_CLIENT_PORT", "2181")
            .withEnv("ZOOKEEPER_TICK_TIME", "2000");
        KafkaContainer kafka = new KafkaContainer(
                DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
            .withNetwork(network)
            .withExternalZookeeper("zookeeper:2181")
    ) {
        zookeeper.start();
        kafka.start();

        String bootstrapServers = kafka.getBootstrapServers();
        // Kafka is connected to external Zookeeper
    }
}

Usage with Custom Listener

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

public void testWithCustomListener() throws Exception {
    try (
        Network network = Network.newNetwork();
        KafkaContainer kafka = new KafkaContainer(
                DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
            .withKraft()
            .withListener(() -> "kafka:19092")
            .withNetwork(network);
        GenericContainer<?> consumer = new GenericContainer<>("my-consumer:latest")
            .withNetwork(network)
    ) {
        kafka.start();
        consumer.start();

        // Consumer can connect to kafka:19092 within the network
    }
}

Migration to Modern Containers

Recommended: Migrate to org.testcontainers.kafka.ConfluentKafkaContainer or org.testcontainers.kafka.KafkaContainer.

Key differences to consider:

  1. Import statement:
// Old (deprecated)
import org.testcontainers.containers.KafkaContainer;

// New
import org.testcontainers.kafka.ConfluentKafkaContainer;
  1. Bootstrap servers format:
// Old (deprecated) - includes protocol prefix
String servers = kafka.getBootstrapServers(); // "PLAINTEXT://localhost:9093"

// New - no protocol prefix
String servers = kafka.getBootstrapServers(); // "localhost:9092"
  1. Default port:
// Old (deprecated)
int port = KafkaContainer.KAFKA_PORT; // 9093

// New
int port = 9092; // Default for modern containers
  1. Listener API:
// Old (deprecated) - uses Supplier
kafka.withListener(() -> "kafka:19092");

// New - direct string
kafka.withListener("kafka:19092");
  1. Default mode:
// Old (deprecated) - embedded Zookeeper by default
KafkaContainer kafka = new KafkaContainer(...);  // Uses Zookeeper

// New - KRaft by default
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(...);  // Uses KRaft

Troubleshooting

Version Requirements for KRaft

KRaft mode requires Confluent Platform 7.0.0 or later:

// Correct
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft()
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()

// Incorrect - throws IllegalArgumentException
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")).withKraft()

Mode Conflicts

Cannot mix Zookeeper and KRaft configurations:

// These throw IllegalStateException
kafka.withKraft().withEmbeddedZookeeper()
kafka.withKraft().withExternalZookeeper("zk:2181")
kafka.withExternalZookeeper("zk:2181").withKraft()

Bootstrap Servers Protocol Prefix

Remember that the deprecated container includes "PLAINTEXT://" in the bootstrap servers:

String servers = kafka.getBootstrapServers();
// Returns: "PLAINTEXT://localhost:54321"

// If needed, remove prefix for compatibility
String serversWithoutProtocol = servers.replace("PLAINTEXT://", "");

Port Difference

The deprecated container uses port 9093 by default, while modern containers use 9092:

// Deprecated container
KafkaContainer.KAFKA_PORT  // 9093

// Modern containers use 9092