The Testcontainers Kafka module provides a specialized API for spinning up Apache Kafka containers for integration testing in Node.js applications. It supports multiple deployment modes including embedded ZooKeeper, external ZooKeeper, and KRaft (ZooKeeper-less) mode, with comprehensive security features including SSL/TLS and SASL authentication.
Required Dependencies:
@testcontainers/kafka (this package)@testcontainers/core is required (provided transitively)testcontainers package is required (provided transitively)kafkajs, @confluentinc/kafka-javascript) for connecting to the container (not required by testcontainers itself)Default Behaviors:
confluentinc/cp-kafka:7.9.1 or similar)9093 (plaintext listener, automatically exposed and mapped to random host port)9092 (internal, not mapped to host)9094 (when using KRaft mode, not mapped to host)getMappedPort(9093) to retrieve)KAFKA_AUTO_CREATE_TOPICS_ENABLE)Threading Model:
start() method is async and must be awaitedstop() method is async and must be awaitedwithKraft(), withSaslSslListener()) must be called before start()Lifecycle:
await container.start() before usestart() resolvesawait container.stop() for cleanupstop() even on errorsawait using syntax (TypeScript 5.2+) for automatic cleanup: await using container = await new KafkaContainer(image).start()start() is calledExceptions:
ContainerStartException - Container failed to start (timeout, image pull failure, etc.)PortBindingException - Port binding conflictsDockerNotAvailableException - Docker daemon not accessibleImagePullException - Failed to pull Docker imageError - KRaft mode with SASL/SSL on Confluent Platform < 7.5.0Error - KRaft mode on Confluent Platform < 7.0.0Error - Invalid certificate passphrases or malformed PKCS12 filesTimeoutError - Health check timeout (container started but Kafka not ready)Edge Cases:
withStartupTimeout())withReuse() to reuse containers across test runs (requires testcontainers configuration)withNetwork() for multi-container testsrestart() method for testing failover scenarios; container state is preservedlogs() method to retrieve container logs for debugging; logs are stream-basedexec() to run Kafka CLI tools inside container (e.g., kafka-topics, kafka-configs)npm install @testcontainers/kafka --save-devimport { KafkaContainer, StartedKafkaContainer } from "@testcontainers/kafka";For CommonJS:
const { KafkaContainer, StartedKafkaContainer } = require("@testcontainers/kafka");For TypeScript type annotations, you can also import configuration types:
import type { SaslSslListenerOptions } from "@testcontainers/kafka/build/kafka-container";import { KafkaContainer } from "@testcontainers/kafka";
// Start a Kafka container with default settings
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();
// Get connection details
const host = container.getHost();
const port = container.getMappedPort(9093);
const brokers = [`${host}:${port}`];
// Use brokers with your Kafka client
// Example with kafkajs:
// const kafka = new Kafka({ brokers });The Kafka module is built around several key components:
Core functionality for creating and configuring Kafka containers with various deployment modes and settings.
class KafkaContainer extends GenericContainer {
constructor(image: string);
withZooKeeper(host: string, port: number): this;
withKraft(): this;
withSaslSslListener(options: SaslSslListenerOptions): this;
start(): Promise<StartedKafkaContainer>;
}Constructor Parameters:
image: Docker image name (e.g., "confluentinc/cp-kafka:7.9.1")
Configuration Methods:
withZooKeeper(host: string, port: number): this
Configure Kafka to use an external ZooKeeper instance. Useful when running ZooKeeper in a separate container or when you need control over ZooKeeper configuration.
host: ZooKeeper hostname (typically a container alias when using Docker networks)port: ZooKeeper port (typically 2181)import { Network, GenericContainer, StartedNetwork } from "testcontainers";
import { KafkaContainer } from "@testcontainers/kafka";
await using network = await new Network().start();
await using zookeeper = await new GenericContainer("confluentinc/cp-zookeeper:7.9.1")
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: "2181" })
.start();
// Wait for ZooKeeper to be ready before starting Kafka
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withNetwork(network)
.withZooKeeper("zookeeper", 2181)
.start();withKraft(): this
Enable KRaft mode for ZooKeeper-less operation. KRaft is Kafka's native consensus protocol that eliminates the need for ZooKeeper.
withZooKeeper() (mutually exclusive)await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withKraft()
.start();withSaslSslListener(options: SaslSslListenerOptions): this
Configure a secure listener with SASL authentication and SSL/TLS encryption. Enables production-like security testing scenarios.
interface SaslSslListenerOptions {
sasl: {
mechanism: "SCRAM-SHA-256" | "SCRAM-SHA-512";
user: { name: string; password: string };
};
port: number;
keystore: { content: Buffer | string | Readable; passphrase: string };
truststore?: { content: Buffer | string | Readable; passphrase: string };
}See SASL/SSL Configuration for detailed documentation and examples.
start(): Promise<StartedKafkaContainer>
Start the Kafka container and wait for it to be ready.
StartedKafkaContainer instance for interacting with the running containerMethods for interacting with a running Kafka container, including retrieving connection details and managing the container lifecycle.
class StartedKafkaContainer extends AbstractStartedContainer {
getHost(): string;
getMappedPort(port: number): number;
getFirstMappedPort(): number;
stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>;
restart(options?: Partial<RestartOptions>): Promise<void>;
getId(): string;
getName(): string;
getNetworkNames(): string[];
getNetworkId(networkName: string): string;
getIpAddress(networkName: string): string;
exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>;
logs(opts?: { since?: number; tail?: number }): Promise<Readable>;
[Symbol.asyncDispose](): Promise<void>;
}Connection Methods:
getHost(): string
Get the host where the container is accessible. Typically returns "localhost" for local Docker or the Docker host IP in other environments.
getMappedPort(port: number): number
Get the host port mapped to a container port. For Kafka, the default client port is 9093.
port: Container port number (e.g., 9093 for plaintext, 9096 for secure listener)const host = container.getHost();
const port = container.getMappedPort(9093);
const brokers = [`${host}:${port}`];getFirstMappedPort(): number
Get the first mapped port. Convenient for single-port containers.
Lifecycle Methods:
stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>
Stop the running Kafka container.
options.timeout: Timeout in milliseconds for stop operationoptions.remove: Whether to remove container after stopping (default: false)options.removeVolumes: Whether to remove volumes after stopping (default: false)await using syntaxrestart(options?: Partial<RestartOptions>): Promise<void>
Restart the Kafka container. Useful for testing failover scenarios.
options.timeout: Timeout in milliseconds for restart operationSymbol.asyncDispose: Promise<void>
Enables automatic cleanup using await using syntax (TypeScript 5.2+):
await using container = await new KafkaContainer(image).start();
// Container automatically stopped when scope exitsstop()Container Information:
getId(): string
Get the Docker container ID.
getName(): string
Get the container name.
withName())Network Methods:
getNetworkNames(): string[]
Get all network names the container is connected to.
getNetworkId(networkName: string): string
Get the Docker network ID for a given network name.
networkName: Name of the networkgetIpAddress(networkName: string): string
Get the container's IP address on a specific Docker network. Useful for container-to-container communication.
networkName: Name of the networkDiagnostics:
exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>
Execute a command inside the running container.
command: Command string or array of command partsopts.workingDir: Working directory for command executionopts.user: User to run command asopts.env: Environment variables for commandExecResult with output, stdout, stderr, exitCodekafka-topics, kafka-configs, kafka-console-producer)localhost:9092)const result = await container.exec(["kafka-topics", "--list", "--bootstrap-server", "localhost:9092"]);
console.log(result.output);logs(opts?: { since?: number; tail?: number }): Promise<Readable>
Get container logs as a readable stream.
since: Unix timestamp to get logs fromtail: Number of lines from the end to retrieveconst logs = await container.logs({ tail: 100 });
logs.on('data', line => console.log(line.toString()));KafkaContainer inherits extensive configuration methods from the base GenericContainer class.
// Network configuration
withNetwork(network: StartedNetwork): this;
withNetworkMode(networkMode: string): this;
withNetworkAliases(...networkAliases: string[]): this;
withExtraHosts(extraHosts: ExtraHost[]): this;
// Port configuration
withExposedPorts(...ports: PortWithOptionalBinding[]): this;
// Environment and startup
withEnvironment(environment: Environment): this;
withCommand(command: string[]): this;
withEntrypoint(entrypoint: string[]): this;
withWorkingDir(workingDir: string): this;
// Container identity
withName(name: string): this;
withLabels(labels: Labels): this;
withHostname(hostname: string): this;
// Storage
withBindMounts(bindMounts: BindMount[]): this;
withTmpFs(tmpFs: TmpFs): this;
withCopyFilesToContainer(filesToCopy: FileToCopy[]): this;
withCopyDirectoriesToContainer(directoriesToCopy: DirectoryToCopy[]): this;
withCopyContentToContainer(contentsToCopy: ContentToCopy[]): this;
// Lifecycle
withHealthCheck(healthCheck: HealthCheck): this;
withStartupTimeout(startupTimeoutMs: number): this;
withWaitStrategy(waitStrategy: WaitStrategy): this;
withReuse(): this;
withAutoRemove(autoRemove: boolean): this;
// Security and permissions
withUser(user: string): this;
withPrivilegedMode(): this;
withAddedCapabilities(...capabilities: string[]): this;
withDroppedCapabilities(...capabilities: string[]): this;
// Resources
withResourcesQuota(quota: ResourcesQuota): this;
withSharedMemorySize(bytes: number): this;
withUlimits(ulimits: Ulimits): this;
// Advanced
withPlatform(platform: string): this;
withPullPolicy(pullPolicy: ImagePullPolicy): this;
withIpcMode(ipcMode: string): this;
withDefaultLogDriver(): this;
withLogConsumer(logConsumer: (stream: Readable) => unknown): this;Common Inherited Methods:
withNetwork(network: StartedNetwork): this
Connect the container to a Docker network for container-to-container communication.
network: Started network instanceawait using network = await new Network().start();
await using kafka = await new KafkaContainer(image)
.withNetwork(network)
.start();withNetworkAliases(...networkAliases: string[]): this
Set network aliases for the container. Other containers on the same network can connect using these aliases.
networkAliases: One or more alias namesawait using kafka = await new KafkaContainer(image)
.withNetwork(network)
.withNetworkAliases("kafka", "broker")
.start();withEnvironment(environment: Environment): this
Set environment variables in the container. Type: Record<string, string>.
environment: Object with environment variable key-value pairsKAFKA_AUTO_CREATE_TOPICS_ENABLE, KAFKA_NUM_PARTITIONS)await using kafka = await new KafkaContainer(image)
.withEnvironment({
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
KAFKA_NUM_PARTITIONS: "3",
KAFKA_DEFAULT_REPLICATION_FACTOR: "1"
})
.start();withReuse(): this
Enable container reuse across test runs. The container won't be stopped after tests complete, and subsequent runs will reuse the same container if the configuration matches.
.testcontainers.properties or environment variables)await using kafka = await new KafkaContainer(image)
.withReuse()
.start();withStartupTimeout(startupTimeoutMs: number): this
Set the maximum time to wait for the container to start (in milliseconds).
startupTimeoutMs: Timeout in millisecondsawait using kafka = await new KafkaContainer(image)
.withStartupTimeout(120000) // 2 minutes
.start();withLogConsumer(logConsumer: (stream: Readable) => unknown): this
Attach a log consumer to capture container logs.
logConsumer: Function that receives a Readable streamawait using kafka = await new KafkaContainer(image)
.withLogConsumer(stream => {
stream.on('data', line => console.log(line.toString()));
})
.start();// SASL/SSL Configuration Types
interface SaslSslListenerOptions {
sasl: SaslOptions;
port: number;
keystore: PKCS12CertificateStore;
truststore?: PKCS12CertificateStore;
}
interface SaslOptions {
mechanism: "SCRAM-SHA-256" | "SCRAM-SHA-512";
user: User;
}
interface User {
name: string;
password: string;
}
interface PKCS12CertificateStore {
content: Buffer | string | Readable;
passphrase: string;
}
// Container Lifecycle Types
interface StopOptions {
timeout: number;
remove: boolean;
removeVolumes: boolean;
}
interface RestartOptions {
timeout: number;
}
interface StoppedTestContainer {
getId(): string;
copyArchiveFromContainer(path: string): Promise<NodeJS.ReadableStream>;
}
// Execution Types
interface ExecOptions {
workingDir: string;
user: string;
env: Environment;
}
interface ExecResult {
output: string;
stdout: string;
stderr: string;
exitCode: number;
}
// Configuration Types
type Environment = { [key: string]: string };
type Labels = { [key: string]: string };
type PortWithOptionalBinding = number | `${number}/${"tcp" | "udp"}` | PortWithBinding;
interface PortWithBinding {
container: number;
host?: number;
protocol?: "tcp" | "udp";
}
interface ExtraHost {
host: string;
ipAddress: string;
}
// Storage Types
type BindMode = "rw" | "ro" | "z" | "Z";
interface BindMount {
source: string;
target: string;
mode?: BindMode;
}
interface FileToCopy {
source: string;
target: string;
mode?: number;
}
type DirectoryToCopy = FileToCopy;
interface ContentToCopy {
content: Buffer | string | Readable;
target: string;
mode?: number;
}
type TmpFs = { [dir: string]: string };
// Health and Wait Types
interface HealthCheck {
test: ["CMD-SHELL", string] | ["CMD", ...string[]];
interval?: number;
timeout?: number;
retries?: number;
startPeriod?: number;
}
interface WaitStrategy {
waitUntilReady(container: unknown, boundPorts: unknown, startTime?: Date): Promise<void>;
withStartupTimeout(startupTimeoutMs: number): WaitStrategy;
isStartupTimeoutSet(): boolean;
}
// Resource Types
interface ResourcesQuota {
memory?: number; // Memory limit in Gigabytes
cpu?: number; // CPU quota in units of CPUs
}
type Ulimits = { [name: string]: { hard: number | undefined; soft: number | undefined } };
// Pull Policy Type
interface ImagePullPolicy {
shouldPull(): boolean;
}
// Network Types
interface StartedNetwork {
getName(): string;
getId(): string;
stop(): Promise<StoppedNetwork>;
}
interface StoppedNetwork {
getName(): string;
getId(): string;
}
// Stream Type (from Node.js)
// Readable is imported from "stream" module
type Readable = NodeJS.ReadableStream;await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withReuse()
.start();await using network = await new Network().start();
await using app = await new GenericContainer("my-app")
.withNetwork(network)
.withNetworkAliases("app")
.withEnvironment({ KAFKA_BROKERS: "kafka:9092" })
.start();
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withNetwork(network)
.withNetworkAliases("kafka")
.start();await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withEnvironment({
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
KAFKA_NUM_PARTITIONS: "3",
KAFKA_DEFAULT_REPLICATION_FACTOR: "1",
KAFKA_LOG_RETENTION_HOURS: "1"
})
.start();import { Kafka } from "kafkajs";
import { KafkaContainer } from "@testcontainers/kafka";
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();
const client = new Kafka({
brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
});
const producer = client.producer();
await producer.connect();
await producer.send({
topic: "test-topic",
messages: [{ value: "Hello Kafka" }],
});
await producer.disconnect();import { Kafka } from "@confluentinc/kafka-javascript";
import { KafkaContainer } from "@testcontainers/kafka";
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();
const client = new Kafka({
kafkaJS: {
brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
},
});
const producer = client.producer();
await producer.connect();
await producer.send({
topic: "test-topic",
messages: [{ value: "Hello Kafka" }],
});
await producer.disconnect();import { Kafka } from "kafkajs";
import { KafkaContainer } from "@testcontainers/kafka";
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withEnvironment({
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
})
.start();
const client = new Kafka({
brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
});
const admin = client.admin();
await admin.connect();
// Create topics
await admin.createTopics({
topics: [
{ topic: "topic-1", numPartitions: 3 },
{ topic: "topic-2", numPartitions: 1 },
],
});
await admin.disconnect();let container: StartedKafkaContainer | undefined;
try {
container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();
// Use container for tests
} catch (error) {
console.error("Failed to start Kafka container:", error);
throw error;
} finally {
if (container) {
await container.stop();
}
}await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();
// List topics
const listResult = await kafka.exec([
"kafka-topics",
"--list",
"--bootstrap-server",
"localhost:9092",
]);
console.log(listResult.output);
// Create topic
await kafka.exec([
"kafka-topics",
"--create",
"--bootstrap-server",
"localhost:9092",
"--topic",
"my-topic",
"--partitions",
"3",
"--replication-factor",
"1",
]);