or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

confluent-kafka-container.mdindex.mdkafka-container.mdlegacy-kafka-container.md
tile.json

kafka-container.mddocs/

Apache Kafka Container

Container implementation for official Apache Kafka images (apache/kafka, apache/kafka-native) providing automated single-node broker management with KRaft consensus mode.

Capabilities

Container Creation

Create a Kafka container with Apache Kafka images. The container uses KRaft mode by default, eliminating the need for Zookeeper.

/**
 * Create a Kafka container with the specified Docker image name.
 * Supported images: apache/kafka, apache/kafka-native
 *
 * @param imageName Docker image name as string (e.g., "apache/kafka-native:3.8.0")
 * @throws IllegalArgumentException if image is not compatible with apache/kafka or apache/kafka-native
 */
public KafkaContainer(String imageName);

/**
 * Create a Kafka container with the specified DockerImageName.
 * Supported images: apache/kafka, apache/kafka-native
 *
 * @param dockerImageName DockerImageName object
 * @throws IllegalArgumentException if image is not compatible with apache/kafka or apache/kafka-native
 */
public KafkaContainer(DockerImageName dockerImageName);

Usage:

import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

// Using string
KafkaContainer kafka1 = new KafkaContainer("apache/kafka-native:3.8.0");

// Using DockerImageName
DockerImageName imageName = DockerImageName.parse("apache/kafka:latest");
KafkaContainer kafka2 = new KafkaContainer(imageName);

Bootstrap Servers

Get the connection string for Kafka clients to connect to the broker.

/**
 * Get the bootstrap servers connection string for Kafka clients.
 * Returns the mapped host and port that external clients should use.
 *
 * @return Bootstrap servers in format "host:port" (e.g., "localhost:54321")
 */
public String getBootstrapServers();

Usage:

import org.testcontainers.kafka.KafkaContainer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;

try (KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")) {
    kafka.start();

    String bootstrapServers = kafka.getBootstrapServers();

    // Use with Kafka producer
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer");

    // Create producer with bootstrap servers
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
}

Simple Network Listener

Add a custom listener for connections within the same Docker network. The host portion becomes a network alias that other containers can use to connect.

/**
 * Add a listener in format host:port for connections within the same container network.
 * The host will be included as a network alias, allowing other containers in the same
 * network to connect using this hostname.
 *
 * The listener is added to the default listeners:
 * - 0.0.0.0:9092 (PLAINTEXT)
 * - 0.0.0.0:9093 (BROKER)
 * - 0.0.0.0:9094 (CONTROLLER)
 *
 * The advertised listener will use the same host:port as the listener.
 *
 * @param listener Listener in format "host:port" (e.g., "kafka:19092")
 * @return This KafkaContainer instance for method chaining
 */
public KafkaContainer withListener(String listener);

Usage:

import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;

try (
    Network network = Network.newNetwork();
    KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
        .withListener("kafka:19092")
        .withNetwork(network);
    GenericContainer<?> app = new GenericContainer<>("my-consumer-app:latest")
        .withNetwork(network)
        .withEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:19092")
) {
    kafka.start();
    app.start();

    // The app container can now connect to Kafka using "kafka:19092"
}

Advanced Network Listener

Add a custom listener with separate advertised listener for external access patterns such as proxy configurations or port forwarding scenarios.

/**
 * Add a listener with custom advertised listener for external access.
 * The host from the listener parameter will be included as a network alias.
 *
 * The listener is added to the default listeners:
 * - 0.0.0.0:9092 (PLAINTEXT)
 * - 0.0.0.0:9093 (BROKER)
 * - 0.0.0.0:9094 (CONTROLLER)
 *
 * The advertised listener supplier is evaluated when the container starts,
 * allowing dynamic resolution of external addresses.
 *
 * @param listener Listener in format "host:port" for internal network access
 * @param advertisedListener Supplier providing the advertised listener address in format "host:port"
 * @return This KafkaContainer instance for method chaining
 */
public KafkaContainer withListener(String listener, Supplier<String> advertisedListener);

Usage:

import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.GenericContainer;
import java.util.function.Supplier;

// Using with proxy/socat for external access
try (
    Network network = Network.newNetwork();
    SocatContainer socat = new SocatContainer()
        .withNetwork(network)
        .withTarget(2000, "kafka", 19092);
    KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
        .withListener("kafka:19092", () -> socat.getHost() + ":" + socat.getMappedPort(2000))
        .withNetwork(network)
) {
    socat.start();
    kafka.start();

    // External clients connect via socat proxy
    String externalBootstrap = socat.getHost() + ":" + socat.getMappedPort(2000);
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, externalBootstrap);
    // ... rest of configuration
}

Configuration Details

Default Environment Variables

The container automatically configures the following environment variables for KRaft mode:

CLUSTER_ID=4L6g3nShT-eMCtK--X86sw
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
KAFKA_PROCESS_ROLES=broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_NODE_ID=1
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0

Wait Strategy

The container waits for the following log message before considering the container ready:

.*Transitioning from RECOVERY to RUNNING.*

This ensures the broker has completed recovery and is ready to accept connections.

Exposed Ports

  • 9092: Kafka broker port (PLAINTEXT listener)

Custom Configuration

You can override environment variables or add custom configuration using inherited GenericContainer methods:

import org.testcontainers.kafka.KafkaContainer;

KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
    .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
    .withEnv("KAFKA_LOG_RETENTION_HOURS", "168")
    .withEnv("KAFKA_NUM_PARTITIONS", "3");

Inherited Capabilities

The KafkaContainer class extends GenericContainer<KafkaContainer> and inherits all lifecycle and configuration methods:

Lifecycle Management

/**
 * Start the container. Blocks until the container is ready based on wait strategy.
 */
public void start();

/**
 * Stop the container.
 */
public void stop();

/**
 * Close and remove the container. Implements AutoCloseable for use in try-with-resources.
 */
public void close();

Network Configuration

/**
 * Attach the container to a specific Docker network.
 *
 * @param network Network to attach to
 * @return This container instance for chaining
 */
public KafkaContainer withNetwork(Network network);

/**
 * Add network aliases for this container.
 *
 * @param aliases Network alias names
 * @return This container instance for chaining
 */
public KafkaContainer withNetworkAliases(String... aliases);

Environment Configuration

/**
 * Set an environment variable.
 *
 * @param key Environment variable name
 * @param value Environment variable value
 * @return This container instance for chaining
 */
public KafkaContainer withEnv(String key, String value);

/**
 * Set multiple environment variables.
 *
 * @param env Map of environment variables
 * @return This container instance for chaining
 */
public KafkaContainer withEnv(Map<String, String> env);

Container Inspection

/**
 * Get the host that this container is running on.
 *
 * @return Host address
 */
public String getHost();

/**
 * Get the mapped port on the host for a container port.
 *
 * @param originalPort Container port number
 * @return Mapped host port number
 */
public Integer getMappedPort(int originalPort);

/**
 * Get the Docker container ID.
 *
 * @return Container ID
 */
public String getContainerId();

/**
 * Check if the container is currently running.
 *
 * @return true if running, false otherwise
 */
public Boolean isRunning();

Complete Example

import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.containers.Network;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaIntegrationTest {

    public void testKafkaProducerConsumer() throws Exception {
        // Create and start Kafka container
        try (KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")) {
            kafka.start();

            String bootstrapServers = kafka.getBootstrapServers();

            // Configure producer
            Properties producerProps = new Properties();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

            // Create producer and send message
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
                ProducerRecord<String, String> record =
                    new ProducerRecord<>("test-topic", "key", "Hello Kafka");
                producer.send(record).get();
            }

            // Configure consumer
            Properties consumerProps = new Properties();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

            // Create consumer and read message
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                consumer.subscribe(Collections.singletonList("test-topic"));
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));

                // Process records
                records.forEach(record -> {
                    System.out.println("Received: " + record.value());
                });
            }
        }
    }

    public void testKafkaWithCustomNetwork() throws Exception {
        // Test with custom network and listener
        try (
            Network network = Network.newNetwork();
            KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
                .withListener("kafka:19092")
                .withNetwork(network);
            GenericContainer<?> consumerApp = new GenericContainer<>("my-consumer:latest")
                .withNetwork(network)
                .withEnv("KAFKA_BOOTSTRAP_SERVERS", "kafka:19092")
        ) {
            kafka.start();
            consumerApp.start();

            // Consumer app can now communicate with Kafka
            // using the "kafka:19092" address
        }
    }
}

Troubleshooting

Connection Refused Errors

Ensure you use getBootstrapServers() to get the correct mapped port:

// Correct
String servers = kafka.getBootstrapServers(); // e.g., "localhost:54321"

// Incorrect - port 9092 is not directly accessible
String servers = "localhost:9092"; // This will fail

Network Communication Between Containers

When connecting from another container, use withListener() and ensure both containers are on the same network:

Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
    .withListener("kafka:19092")  // Creates network alias "kafka"
    .withNetwork(network);

Image Compatibility

Only Apache Kafka images are supported. Ensure you're using:

  • apache/kafka:X.Y.Z
  • apache/kafka-native:X.Y.Z

For Confluent Platform images, use ConfluentKafkaContainer instead.