or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

confluent-kafka-container.mdindex.mdkafka-container.mdlegacy-kafka-container.md
tile.json

index.mddocs/

Testcontainers Kafka Module

Testcontainers Kafka module provides automated management of Apache Kafka containers for testing in Java applications. It eliminates the need for external Kafka installations and manages the complete container lifecycle including broker setup, network configuration, and cleanup.

Package Information

  • Package Name: kafka
  • Package Coordinates: org.testcontainers:kafka:1.21.4
  • Language: Java
  • Minimum Java Version: 8
  • Installation:

Maven:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.21.4</version>
    <scope>test</scope>
</dependency>

Gradle:

testImplementation 'org.testcontainers:kafka:1.21.4'

Core Imports

import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.kafka.ConfluentKafkaContainer;

For deprecated legacy container:

import org.testcontainers.containers.KafkaContainer;

Basic Usage

Apache Kafka Container

import org.testcontainers.kafka.KafkaContainer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

try (KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")) {
    kafka.start();

    // Get bootstrap servers for connecting clients
    String bootstrapServers = kafka.getBootstrapServers();

    // Configure Kafka producer
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

    // Configure Kafka consumer
    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
}

Confluent Kafka Container

import org.testcontainers.kafka.ConfluentKafkaContainer;

try (ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")) {
    kafka.start();

    String bootstrapServers = kafka.getBootstrapServers();
    // Use bootstrapServers with Kafka clients
}

Architecture

The Kafka module provides three container classes supporting different Kafka distributions:

  • org.testcontainers.kafka.KafkaContainer: Modern container for Apache Kafka images (apache/kafka, apache/kafka-native). Uses KRaft mode by default, eliminating Zookeeper dependency.

  • org.testcontainers.kafka.ConfluentKafkaContainer: Modern container for Confluent Platform images (confluentinc/cp-kafka version 7.4.0+). Uses KRaft mode by default.

  • org.testcontainers.containers.KafkaContainer: Deprecated legacy container for Confluent Platform images. Supports embedded Zookeeper, external Zookeeper, or KRaft mode (7.0.0+).

All containers extend GenericContainer from the Testcontainers core library, inheriting lifecycle management, network configuration, and container customization capabilities.

Default Configuration:

  • Single-node broker with KRaft consensus (no Zookeeper)
  • Exposed port: 9092
  • Internal listeners: PLAINTEXT, BROKER, CONTROLLER
  • Replication factor: 1 (suitable for testing)
  • Auto-cleanup on container close

Capabilities

Apache Kafka Container

Container implementation for official Apache Kafka images providing single-node broker setup with KRaft mode and customizable network listeners.

public class KafkaContainer extends GenericContainer<KafkaContainer> {
    /**
     * Create a Kafka container with the specified Docker image name.
     *
     * @param imageName Docker image name (e.g., "apache/kafka-native:3.8.0")
     */
    public KafkaContainer(String imageName);

    /**
     * Create a Kafka container with the specified DockerImageName.
     *
     * @param dockerImageName DockerImageName object
     */
    public KafkaContainer(DockerImageName dockerImageName);

    /**
     * Add a listener in format host:port for connections within the same container network.
     * The host will be included as a network alias.
     *
     * @param listener Listener in format "host:port"
     * @return This KafkaContainer instance for chaining
     */
    public KafkaContainer withListener(String listener);

    /**
     * Add a listener with custom advertised listener for external access.
     *
     * @param listener Listener in format "host:port"
     * @param advertisedListener Supplier providing the advertised listener address
     * @return This KafkaContainer instance for chaining
     */
    public KafkaContainer withListener(String listener, Supplier<String> advertisedListener);

    /**
     * Get the bootstrap servers connection string for Kafka clients.
     *
     * @return Bootstrap servers in format "host:port"
     */
    public String getBootstrapServers();
}

Supported Images:

  • apache/kafka
  • apache/kafka-native

Default Port: 9092

Apache Kafka Container Details

Confluent Kafka Container

Container implementation for Confluent Platform Kafka images providing single-node broker setup with KRaft mode and customizable network listeners.

public class ConfluentKafkaContainer extends GenericContainer<ConfluentKafkaContainer> {
    /**
     * Create a Confluent Kafka container with the specified Docker image name.
     *
     * @param imageName Docker image name (e.g., "confluentinc/cp-kafka:7.4.0")
     */
    public ConfluentKafkaContainer(String imageName);

    /**
     * Create a Confluent Kafka container with the specified DockerImageName.
     *
     * @param dockerImageName DockerImageName object
     */
    public ConfluentKafkaContainer(DockerImageName dockerImageName);

    /**
     * Add a listener in format host:port for connections within the same container network.
     * The host will be included as a network alias.
     *
     * @param listener Listener in format "host:port"
     * @return This ConfluentKafkaContainer instance for chaining
     */
    public ConfluentKafkaContainer withListener(String listener);

    /**
     * Add a listener with custom advertised listener for external access.
     *
     * @param listener Listener in format "host:port"
     * @param advertisedListener Supplier providing the advertised listener address
     * @return This ConfluentKafkaContainer instance for chaining
     */
    public ConfluentKafkaContainer withListener(String listener, Supplier<String> advertisedListener);

    /**
     * Get the bootstrap servers connection string for Kafka clients.
     *
     * @return Bootstrap servers in format "host:port"
     */
    public String getBootstrapServers();
}

Supported Images:

  • confluentinc/cp-kafka (version 7.4.0 and later)

Default Port: 9092

Confluent Kafka Container Details

Legacy Kafka Container (Deprecated)

Legacy container implementation for Confluent Platform images with support for Zookeeper and KRaft modes. This class is deprecated and should not be used in new projects.

/**
 * @deprecated Use org.testcontainers.kafka.ConfluentKafkaContainer
 * or org.testcontainers.kafka.KafkaContainer instead
 */
@Deprecated
public class KafkaContainer extends GenericContainer<KafkaContainer> {
    /**
     * Create a Kafka container with the specified DockerImageName.
     *
     * @param dockerImageName DockerImageName object
     */
    public KafkaContainer(DockerImageName dockerImageName);

    /**
     * Configure to use embedded Zookeeper (default mode).
     * Cannot be used with KRaft mode.
     *
     * @return This KafkaContainer instance for chaining
     */
    public KafkaContainer withEmbeddedZookeeper();

    /**
     * Configure to use external Zookeeper.
     * Cannot be used with KRaft mode.
     *
     * @param connectString Zookeeper connection string
     * @return This KafkaContainer instance for chaining
     */
    public KafkaContainer withExternalZookeeper(String connectString);

    /**
     * Enable KRaft mode (requires Confluent Platform 7.0.0 or above).
     * Cannot be used with Zookeeper configuration.
     *
     * @return This KafkaContainer instance for chaining
     */
    public KafkaContainer withKraft();

    /**
     * Set custom cluster ID for KRaft mode.
     *
     * @param clusterId Cluster ID string
     * @return This KafkaContainer instance for chaining
     */
    public KafkaContainer withClusterId(String clusterId);

    /**
     * Add a listener supplier for custom network access.
     *
     * @param listenerSupplier Supplier providing listener in format "host:port"
     * @return This KafkaContainer instance for chaining
     */
    public KafkaContainer withListener(Supplier<String> listenerSupplier);

    /**
     * Get the bootstrap servers connection string for Kafka clients.
     *
     * @return Bootstrap servers in format "PLAINTEXT://host:port"
     */
    public String getBootstrapServers();
}

Public Constants:

public static final int KAFKA_PORT = 9093;
public static final int ZOOKEEPER_PORT = 2181;
public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

Supported Images:

  • confluentinc/cp-kafka

Default Ports:

  • Kafka: 9093
  • Zookeeper: 2181 (when embedded)

Legacy Kafka Container Details

Common Patterns

Using Custom Network Listeners

When testing with multiple containers or proxy access patterns:

import org.testcontainers.containers.Network;
import org.testcontainers.kafka.KafkaContainer;

try (
    Network network = Network.newNetwork();
    KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
        .withListener("kafka:19092")
        .withNetwork(network);
    GenericContainer<?> consumerApp = new GenericContainer<>("my-app:latest")
        .withNetwork(network)
) {
    kafka.start();
    consumerApp.start();

    // Consumer app can connect to kafka:19092 within the network
}

SASL Authentication Configuration

Both modern and legacy containers support SASL authentication through environment variables:

import org.testcontainers.kafka.ConfluentKafkaContainer;

ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0")
    .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
        "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")
    .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
    .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
    .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN")
    .withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", jaasConfig)
    .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", jaasConfig);

Types

DockerImageName

/**
 * Docker image name with optional registry, repository, and tag.
 * Part of Testcontainers core library.
 */
public class DockerImageName {
    /**
     * Parse a Docker image name from a string.
     *
     * @param imageName Image name string (e.g., "apache/kafka:3.8.0")
     * @return DockerImageName instance
     */
    public static DockerImageName parse(String imageName);
}

GenericContainer (Inherited Methods)

All Kafka containers extend GenericContainer<SELF> and inherit lifecycle and configuration methods:

public class GenericContainer<SELF extends GenericContainer<SELF>> {
    // Lifecycle methods
    public void start();
    public void stop();
    public void close(); // AutoCloseable

    // Configuration methods
    public SELF withEnv(String key, String value);
    public SELF withEnv(Map<String, String> env);
    public SELF withNetwork(Network network);
    public SELF withNetworkAliases(String... aliases);
    public SELF withExposedPorts(Integer... ports);
    public SELF withCommand(String... command);
    public SELF withCopyFileToContainer(MountableFile mountableFile, String containerPath);
    public SELF waitingFor(WaitStrategy waitStrategy);

    // Query methods
    public String getHost();
    public Integer getMappedPort(int originalPort);
    public String getContainerId();
    public String getDockerImageName();
    public Boolean isRunning();

    // Execution methods
    public ExecResult execInContainer(String... command) throws IOException, InterruptedException;
    public void copyFileToContainer(Transferable transferable, String containerPath);
}

Network

/**
 * Docker network for connecting multiple containers.
 * Part of Testcontainers core library.
 */
public interface Network extends AutoCloseable {
    /**
     * Create a new Docker network.
     *
     * @return Network instance
     */
    public static Network newNetwork();

    public void close();
}

Supplier

/**
 * Java functional interface for lazy value provision.
 * Part of java.util.function package.
 */
@FunctionalInterface
public interface Supplier<T> {
    T get();
}

Migration Guide

From deprecated org.testcontainers.containers.KafkaContainer to modern containers:

  1. Update import statements:
// Old (deprecated)
import org.testcontainers.containers.KafkaContainer;

// New - for Confluent Platform
import org.testcontainers.kafka.ConfluentKafkaContainer;

// New - for Apache Kafka
import org.testcontainers.kafka.KafkaContainer;
  1. Update bootstrap servers handling:
// Old (deprecated) - includes protocol prefix
String servers = kafka.getBootstrapServers(); // "PLAINTEXT://host:9093"

// New - no protocol prefix
String servers = kafka.getBootstrapServers(); // "host:9092"
  1. Update port references:
// Old (deprecated)
int port = KafkaContainer.KAFKA_PORT; // 9093

// New
int port = 9092; // Default port for new containers
  1. Update listener configuration:
// Old (deprecated) - uses Supplier
kafka.withListener(() -> "kafka:19092");

// New - direct string for simple cases
kafka.withListener("kafka:19092");

// New - with advertised listener for external access
kafka.withListener("kafka:19092", () -> proxyHost + ":" + proxyPort);
  1. Remove Zookeeper configuration (new containers use KRaft by default):
// Old (deprecated) - Zookeeper options
kafka.withEmbeddedZookeeper();
kafka.withExternalZookeeper("zookeeper:2181");

// New - no Zookeeper configuration needed, KRaft is default
// Just create and start the container