or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsasl-ssl-configuration.md
tile.json

index.mddocs/

Testcontainers Kafka Module

The Testcontainers Kafka module provides a specialized API for spinning up Apache Kafka containers for integration testing in Node.js applications. It supports multiple deployment modes including embedded ZooKeeper, external ZooKeeper, and KRaft (ZooKeeper-less) mode, with comprehensive security features including SSL/TLS and SASL authentication.

Key Information for Agents

Required Dependencies:

  • @testcontainers/kafka (this package)
  • @testcontainers/core is required (provided transitively)
  • testcontainers package is required (provided transitively)
  • Docker must be installed and running
  • Kafka client library (e.g., kafkajs, @confluentinc/kafka-javascript) for connecting to the container (not required by testcontainers itself)

Default Behaviors:

  • Default image: Must be specified in constructor (no default, typically confluentinc/cp-kafka:7.9.1 or similar)
  • Default client port: 9093 (plaintext listener, automatically exposed and mapped to random host port)
  • Default broker port: 9092 (internal, not mapped to host)
  • Default KRaft controller port: 9094 (when using KRaft mode, not mapped to host)
  • Default mode: Embedded ZooKeeper for images < 8.0.0, KRaft mode for images >= 8.0.0
  • Health check: Automatic readiness check (waits for Kafka to be ready)
  • Startup timeout: Default from GenericContainer (typically 60 seconds, configurable)
  • Port mapping: Random available ports on host (use getMappedPort(9093) to retrieve)
  • Container reuse: Not enabled by default (new container per test)
  • Network isolation: Each container runs in isolated network by default
  • Advertised listeners: Automatically configured for container accessibility
  • Auto topic creation: Not enabled by default (configure via KAFKA_AUTO_CREATE_TOPICS_ENABLE)

Threading Model:

  • Container operations are asynchronous (return Promises)
  • start() method is async and must be awaited
  • stop() method is async and must be awaited
  • Container instances are not thread-safe for concurrent modification
  • Multiple containers can run concurrently in separate instances (each gets unique ports)
  • Client connections from Kafka clients are independent of container lifecycle
  • Container configuration methods (e.g., withKraft(), withSaslSslListener()) must be called before start()

Lifecycle:

  • Container must be started with await container.start() before use
  • Container automatically waits for Kafka to be ready before start() resolves
  • Container must be stopped with await container.stop() for cleanup
  • Always stop containers in finally blocks or use test framework cleanup hooks
  • Container stops automatically when Node.js process exits (if not explicitly stopped)
  • Failed containers may leave Docker resources; always call stop() even on errors
  • Use await using syntax (TypeScript 5.2+) for automatic cleanup: await using container = await new KafkaContainer(image).start()
  • Container configuration is immutable after start() is called

Exceptions:

  • ContainerStartException - Container failed to start (timeout, image pull failure, etc.)
  • PortBindingException - Port binding conflicts
  • DockerNotAvailableException - Docker daemon not accessible
  • ImagePullException - Failed to pull Docker image
  • Error - KRaft mode with SASL/SSL on Confluent Platform < 7.5.0
  • Error - KRaft mode on Confluent Platform < 7.0.0
  • Error - Invalid certificate passphrases or malformed PKCS12 files
  • Network errors from Kafka clients - Connection failures, authentication errors, broker unavailable
  • TimeoutError - Health check timeout (container started but Kafka not ready)

Edge Cases:

  • Port conflicts: Testcontainers automatically finds available ports, but conflicts can occur in CI/CD environments
  • Image pull failures: Network issues or invalid image tags cause startup failures
  • Health check timeouts: Slow container startup may exceed timeout (adjust with withStartupTimeout())
  • Multiple containers: Each container gets unique ports; coordinate if testing multi-container scenarios
  • Container reuse: Enable with withReuse() to reuse containers across test runs (requires testcontainers configuration)
  • Network isolation: Containers cannot communicate by default; use withNetwork() for multi-container tests
  • Resource cleanup: Always stop containers even if tests fail; use try/finally or test framework hooks
  • Concurrent tests: Each test should use separate container instances to avoid port conflicts
  • CI/CD environments: May require Docker-in-Docker or remote Docker configuration
  • Version compatibility: Images < 8.0.0 use embedded ZooKeeper by default; images >= 8.0.0 use KRaft automatically
  • KRaft + SASL: Requires Confluent Platform >= 7.5.0; throws error if version is insufficient
  • External ZooKeeper: Must be started before Kafka container; ensure ZooKeeper is ready before starting Kafka
  • SASL/SSL certificates: Must be in PKCS12 format; PEM certificates must be converted
  • Dual listeners: When using SASL/SSL, both plaintext (9093) and secure (custom port) listeners are available
  • Advertised listeners: Automatically configured based on container host and mapped ports
  • Container restart: Use restart() method for testing failover scenarios; container state is preserved
  • Log access: Use logs() method to retrieve container logs for debugging; logs are stream-based
  • Exec commands: Use exec() to run Kafka CLI tools inside container (e.g., kafka-topics, kafka-configs)

Package Information

  • Package Name: @testcontainers/kafka
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @testcontainers/kafka --save-dev

Core Imports

import { KafkaContainer, StartedKafkaContainer } from "@testcontainers/kafka";

For CommonJS:

const { KafkaContainer, StartedKafkaContainer } = require("@testcontainers/kafka");

For TypeScript type annotations, you can also import configuration types:

import type { SaslSslListenerOptions } from "@testcontainers/kafka/build/kafka-container";

Basic Usage

import { KafkaContainer } from "@testcontainers/kafka";

// Start a Kafka container with default settings
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

// Get connection details
const host = container.getHost();
const port = container.getMappedPort(9093);
const brokers = [`${host}:${port}`];

// Use brokers with your Kafka client
// Example with kafkajs:
// const kafka = new Kafka({ brokers });

Architecture

The Kafka module is built around several key components:

  • KafkaContainer: Fluent builder class for configuring Kafka containers before startup
  • StartedKafkaContainer: Represents a running Kafka container with lifecycle management methods
  • Operation Modes: Supports embedded ZooKeeper (default <8.0), external ZooKeeper, and KRaft mode
  • Security Configuration: SASL/SSL authentication with PKCS12 certificate stores
  • Network Integration: Full support for Docker networks and custom hostnames
  • Version Detection: Automatic feature detection based on Confluent Platform version

Capabilities

Container Configuration

Core functionality for creating and configuring Kafka containers with various deployment modes and settings.

class KafkaContainer extends GenericContainer {
  constructor(image: string);

  withZooKeeper(host: string, port: number): this;
  withKraft(): this;
  withSaslSslListener(options: SaslSslListenerOptions): this;
  start(): Promise<StartedKafkaContainer>;
}

Constructor Parameters:

  • image: Docker image name (e.g., "confluentinc/cp-kafka:7.9.1")
    • Images >= 8.0.0 automatically enable KRaft mode
    • Images < 8.0.0 use embedded ZooKeeper by default

Configuration Methods:

withZooKeeper(host: string, port: number): this

Configure Kafka to use an external ZooKeeper instance. Useful when running ZooKeeper in a separate container or when you need control over ZooKeeper configuration.

  • host: ZooKeeper hostname (typically a container alias when using Docker networks)
  • port: ZooKeeper port (typically 2181)
  • Important: ZooKeeper must be started and ready before Kafka container starts
  • Error: If ZooKeeper is not accessible, Kafka container will fail to start
import { Network, GenericContainer, StartedNetwork } from "testcontainers";
import { KafkaContainer } from "@testcontainers/kafka";

await using network = await new Network().start();

await using zookeeper = await new GenericContainer("confluentinc/cp-zookeeper:7.9.1")
  .withNetwork(network)
  .withNetworkAliases("zookeeper")
  .withEnvironment({ ZOOKEEPER_CLIENT_PORT: "2181" })
  .start();

// Wait for ZooKeeper to be ready before starting Kafka
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withNetwork(network)
  .withZooKeeper("zookeeper", 2181)
  .start();

withKraft(): this

Enable KRaft mode for ZooKeeper-less operation. KRaft is Kafka's native consensus protocol that eliminates the need for ZooKeeper.

  • Requires Confluent Platform >= 7.0.0
  • Images >= 8.0.0 automatically enable KRaft, so this method is optional for newer versions
  • Throws error if image version is below 7.0.0
  • Important: Cannot be used with withZooKeeper() (mutually exclusive)
  • Performance: KRaft mode typically starts faster than ZooKeeper mode
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withKraft()
  .start();

withSaslSslListener(options: SaslSslListenerOptions): this

Configure a secure listener with SASL authentication and SSL/TLS encryption. Enables production-like security testing scenarios.

interface SaslSslListenerOptions {
  sasl: {
    mechanism: "SCRAM-SHA-256" | "SCRAM-SHA-512";
    user: { name: string; password: string };
  };
  port: number;
  keystore: { content: Buffer | string | Readable; passphrase: string };
  truststore?: { content: Buffer | string | Readable; passphrase: string };
}
  • Requires PKCS12 certificate files for keystore and optional truststore
  • Supports SCRAM-SHA-256 and SCRAM-SHA-512 authentication mechanisms
  • When used with KRaft mode, requires Confluent Platform >= 7.5.0
  • Important: Both plaintext (9093) and secure (custom port) listeners are available
  • Error: Throws error if KRaft + SASL with version < 7.5.0
  • Error: Throws error if certificate passphrases are incorrect or files are malformed
  • Port selection: Choose a port that doesn't conflict (recommended: 9096, 9097, etc.)

See SASL/SSL Configuration for detailed documentation and examples.

start(): Promise<StartedKafkaContainer>

Start the Kafka container and wait for it to be ready.

  • Returns a StartedKafkaContainer instance for interacting with the running container
  • Throws error if using KRaft with SASL and version < 7.5.0
  • Automatically configures advertised listeners for container accessibility
  • Important: Must be awaited; container is not ready until Promise resolves
  • Timeout: Uses default or configured startup timeout; throws error if Kafka doesn't become ready

Started Container Operations

Methods for interacting with a running Kafka container, including retrieving connection details and managing the container lifecycle.

class StartedKafkaContainer extends AbstractStartedContainer {
  getHost(): string;
  getMappedPort(port: number): number;
  getFirstMappedPort(): number;
  stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>;
  restart(options?: Partial<RestartOptions>): Promise<void>;
  getId(): string;
  getName(): string;
  getNetworkNames(): string[];
  getNetworkId(networkName: string): string;
  getIpAddress(networkName: string): string;
  exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>;
  logs(opts?: { since?: number; tail?: number }): Promise<Readable>;
  [Symbol.asyncDispose](): Promise<void>;
}

Connection Methods:

getHost(): string

Get the host where the container is accessible. Typically returns "localhost" for local Docker or the Docker host IP in other environments.

  • Local Docker: Returns "localhost"
  • Remote Docker: Returns Docker host IP address
  • Docker networks: Use network aliases for container-to-container communication

getMappedPort(port: number): number

Get the host port mapped to a container port. For Kafka, the default client port is 9093.

  • port: Container port number (e.g., 9093 for plaintext, 9096 for secure listener)
  • Returns: Host port number (randomly assigned)
  • Important: Always use this method to get the actual host port; container ports are randomly mapped
  • Error: Throws error if port is not exposed or container is not started
const host = container.getHost();
const port = container.getMappedPort(9093);
const brokers = [`${host}:${port}`];

getFirstMappedPort(): number

Get the first mapped port. Convenient for single-port containers.

  • Returns: First exposed port's host mapping
  • Use case: When only one port is exposed and you don't need to specify which one

Lifecycle Methods:

stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>

Stop the running Kafka container.

  • options.timeout: Timeout in milliseconds for stop operation
  • options.remove: Whether to remove container after stopping (default: false)
  • options.removeVolumes: Whether to remove volumes after stopping (default: false)
  • Important: Always call this method for cleanup, even if tests fail
  • Automatic cleanup: Container stops automatically with await using syntax

restart(options?: Partial<RestartOptions>): Promise<void>

Restart the Kafka container. Useful for testing failover scenarios.

  • options.timeout: Timeout in milliseconds for restart operation
  • State: Container state is preserved (topics, messages, etc.)
  • Use case: Testing application behavior during broker restarts

Symbol.asyncDispose: Promise<void>

Enables automatic cleanup using await using syntax (TypeScript 5.2+):

await using container = await new KafkaContainer(image).start();
// Container automatically stopped when scope exits
  • Automatic: No need to manually call stop()
  • Scope: Container stops when leaving the scope (function, block, etc.)
  • Error handling: Container still stops even if errors occur

Container Information:

getId(): string

Get the Docker container ID.

  • Returns: Full Docker container ID
  • Use case: Debugging, logging, container identification

getName(): string

Get the container name.

  • Returns: Container name (auto-generated or set via withName())
  • Use case: Container identification, network communication

Network Methods:

getNetworkNames(): string[]

Get all network names the container is connected to.

  • Returns: Array of network names
  • Use case: Inspecting container network configuration

getNetworkId(networkName: string): string

Get the Docker network ID for a given network name.

  • networkName: Name of the network
  • Returns: Docker network ID
  • Use case: Advanced network debugging

getIpAddress(networkName: string): string

Get the container's IP address on a specific Docker network. Useful for container-to-container communication.

  • networkName: Name of the network
  • Returns: IP address on that network
  • Use case: Direct container-to-container connections without port mapping

Diagnostics:

exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>

Execute a command inside the running container.

  • command: Command string or array of command parts
  • opts.workingDir: Working directory for command execution
  • opts.user: User to run command as
  • opts.env: Environment variables for command
  • Returns: ExecResult with output, stdout, stderr, exitCode
  • Use case: Running Kafka CLI tools (e.g., kafka-topics, kafka-configs, kafka-console-producer)
  • Important: Commands run inside container; use container-internal addresses (e.g., localhost:9092)
const result = await container.exec(["kafka-topics", "--list", "--bootstrap-server", "localhost:9092"]);
console.log(result.output);

logs(opts?: { since?: number; tail?: number }): Promise<Readable>

Get container logs as a readable stream.

  • since: Unix timestamp to get logs from
  • tail: Number of lines from the end to retrieve
  • Returns: Readable stream of log lines
  • Use case: Debugging container startup issues, monitoring Kafka logs
  • Important: Logs are stream-based; read incrementally for large logs
const logs = await container.logs({ tail: 100 });
logs.on('data', line => console.log(line.toString()));

Inherited Container Configuration

KafkaContainer inherits extensive configuration methods from the base GenericContainer class.

// Network configuration
withNetwork(network: StartedNetwork): this;
withNetworkMode(networkMode: string): this;
withNetworkAliases(...networkAliases: string[]): this;
withExtraHosts(extraHosts: ExtraHost[]): this;

// Port configuration
withExposedPorts(...ports: PortWithOptionalBinding[]): this;

// Environment and startup
withEnvironment(environment: Environment): this;
withCommand(command: string[]): this;
withEntrypoint(entrypoint: string[]): this;
withWorkingDir(workingDir: string): this;

// Container identity
withName(name: string): this;
withLabels(labels: Labels): this;
withHostname(hostname: string): this;

// Storage
withBindMounts(bindMounts: BindMount[]): this;
withTmpFs(tmpFs: TmpFs): this;
withCopyFilesToContainer(filesToCopy: FileToCopy[]): this;
withCopyDirectoriesToContainer(directoriesToCopy: DirectoryToCopy[]): this;
withCopyContentToContainer(contentsToCopy: ContentToCopy[]): this;

// Lifecycle
withHealthCheck(healthCheck: HealthCheck): this;
withStartupTimeout(startupTimeoutMs: number): this;
withWaitStrategy(waitStrategy: WaitStrategy): this;
withReuse(): this;
withAutoRemove(autoRemove: boolean): this;

// Security and permissions
withUser(user: string): this;
withPrivilegedMode(): this;
withAddedCapabilities(...capabilities: string[]): this;
withDroppedCapabilities(...capabilities: string[]): this;

// Resources
withResourcesQuota(quota: ResourcesQuota): this;
withSharedMemorySize(bytes: number): this;
withUlimits(ulimits: Ulimits): this;

// Advanced
withPlatform(platform: string): this;
withPullPolicy(pullPolicy: ImagePullPolicy): this;
withIpcMode(ipcMode: string): this;
withDefaultLogDriver(): this;
withLogConsumer(logConsumer: (stream: Readable) => unknown): this;

Common Inherited Methods:

withNetwork(network: StartedNetwork): this

Connect the container to a Docker network for container-to-container communication.

  • network: Started network instance
  • Use case: Multi-container test scenarios
  • Important: Containers on same network can communicate using network aliases
await using network = await new Network().start();
await using kafka = await new KafkaContainer(image)
  .withNetwork(network)
  .start();

withNetworkAliases(...networkAliases: string[]): this

Set network aliases for the container. Other containers on the same network can connect using these aliases.

  • networkAliases: One or more alias names
  • Use case: Other containers connecting to Kafka using alias (e.g., "kafka:9093")
  • Important: Aliases only work within the same Docker network
await using kafka = await new KafkaContainer(image)
  .withNetwork(network)
  .withNetworkAliases("kafka", "broker")
  .start();

withEnvironment(environment: Environment): this

Set environment variables in the container. Type: Record<string, string>.

  • environment: Object with environment variable key-value pairs
  • Use case: Configuring Kafka settings (e.g., KAFKA_AUTO_CREATE_TOPICS_ENABLE, KAFKA_NUM_PARTITIONS)
  • Important: All values must be strings (convert numbers/booleans to strings)
await using kafka = await new KafkaContainer(image)
  .withEnvironment({
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
    KAFKA_NUM_PARTITIONS: "3",
    KAFKA_DEFAULT_REPLICATION_FACTOR: "1"
  })
  .start();

withReuse(): this

Enable container reuse across test runs. The container won't be stopped after tests complete, and subsequent runs will reuse the same container if the configuration matches.

  • Use case: Faster test execution in development
  • Important: Requires testcontainers configuration file (.testcontainers.properties or environment variables)
  • State: Container state persists across test runs (topics, messages, etc.)
await using kafka = await new KafkaContainer(image)
  .withReuse()
  .start();

withStartupTimeout(startupTimeoutMs: number): this

Set the maximum time to wait for the container to start (in milliseconds).

  • startupTimeoutMs: Timeout in milliseconds
  • Default: Typically 60 seconds (from GenericContainer)
  • Use case: Slow environments (CI/CD, resource-constrained systems)
  • Error: Throws timeout error if container doesn't become ready within timeout
await using kafka = await new KafkaContainer(image)
  .withStartupTimeout(120000) // 2 minutes
  .start();

withLogConsumer(logConsumer: (stream: Readable) => unknown): this

Attach a log consumer to capture container logs.

  • logConsumer: Function that receives a Readable stream
  • Use case: Real-time log monitoring, test debugging
  • Important: Logs are stream-based; handle incrementally
await using kafka = await new KafkaContainer(image)
  .withLogConsumer(stream => {
    stream.on('data', line => console.log(line.toString()));
  })
  .start();

Types

// SASL/SSL Configuration Types
interface SaslSslListenerOptions {
  sasl: SaslOptions;
  port: number;
  keystore: PKCS12CertificateStore;
  truststore?: PKCS12CertificateStore;
}

interface SaslOptions {
  mechanism: "SCRAM-SHA-256" | "SCRAM-SHA-512";
  user: User;
}

interface User {
  name: string;
  password: string;
}

interface PKCS12CertificateStore {
  content: Buffer | string | Readable;
  passphrase: string;
}

// Container Lifecycle Types
interface StopOptions {
  timeout: number;
  remove: boolean;
  removeVolumes: boolean;
}

interface RestartOptions {
  timeout: number;
}

interface StoppedTestContainer {
  getId(): string;
  copyArchiveFromContainer(path: string): Promise<NodeJS.ReadableStream>;
}

// Execution Types
interface ExecOptions {
  workingDir: string;
  user: string;
  env: Environment;
}

interface ExecResult {
  output: string;
  stdout: string;
  stderr: string;
  exitCode: number;
}

// Configuration Types
type Environment = { [key: string]: string };

type Labels = { [key: string]: string };

type PortWithOptionalBinding = number | `${number}/${"tcp" | "udp"}` | PortWithBinding;

interface PortWithBinding {
  container: number;
  host?: number;
  protocol?: "tcp" | "udp";
}

interface ExtraHost {
  host: string;
  ipAddress: string;
}

// Storage Types
type BindMode = "rw" | "ro" | "z" | "Z";

interface BindMount {
  source: string;
  target: string;
  mode?: BindMode;
}

interface FileToCopy {
  source: string;
  target: string;
  mode?: number;
}

type DirectoryToCopy = FileToCopy;

interface ContentToCopy {
  content: Buffer | string | Readable;
  target: string;
  mode?: number;
}

type TmpFs = { [dir: string]: string };

// Health and Wait Types
interface HealthCheck {
  test: ["CMD-SHELL", string] | ["CMD", ...string[]];
  interval?: number;
  timeout?: number;
  retries?: number;
  startPeriod?: number;
}

interface WaitStrategy {
  waitUntilReady(container: unknown, boundPorts: unknown, startTime?: Date): Promise<void>;
  withStartupTimeout(startupTimeoutMs: number): WaitStrategy;
  isStartupTimeoutSet(): boolean;
}

// Resource Types
interface ResourcesQuota {
  memory?: number; // Memory limit in Gigabytes
  cpu?: number; // CPU quota in units of CPUs
}

type Ulimits = { [name: string]: { hard: number | undefined; soft: number | undefined } };

// Pull Policy Type
interface ImagePullPolicy {
  shouldPull(): boolean;
}

// Network Types
interface StartedNetwork {
  getName(): string;
  getId(): string;
  stop(): Promise<StoppedNetwork>;
}

interface StoppedNetwork {
  getName(): string;
  getId(): string;
}

// Stream Type (from Node.js)
// Readable is imported from "stream" module
type Readable = NodeJS.ReadableStream;

Version Compatibility

  • KRaft Mode: Requires Confluent Platform >= 7.0.0
  • KRaft with SASL: Requires Confluent Platform >= 7.5.0
  • Automatic KRaft: Images >= 8.0.0 automatically use KRaft mode
  • ZooKeeper Mode: Supported on all Confluent Platform versions
  • SASL/SSL with ZooKeeper: Supported on all versions that support SASL/SSL

Common Patterns

Container Reuse for Faster Tests

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withReuse()
  .start();

Custom Network with Multiple Containers

await using network = await new Network().start();

await using app = await new GenericContainer("my-app")
  .withNetwork(network)
  .withNetworkAliases("app")
  .withEnvironment({ KAFKA_BROKERS: "kafka:9092" })
  .start();

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withNetwork(network)
  .withNetworkAliases("kafka")
  .start();

Adding Custom Kafka Configuration

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withEnvironment({
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
    KAFKA_NUM_PARTITIONS: "3",
    KAFKA_DEFAULT_REPLICATION_FACTOR: "1",
    KAFKA_LOG_RETENTION_HOURS: "1"
  })
  .start();

Using with kafkajs Client

import { Kafka } from "kafkajs";
import { KafkaContainer } from "@testcontainers/kafka";

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

const client = new Kafka({
  brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
});

const producer = client.producer();
await producer.connect();
await producer.send({
  topic: "test-topic",
  messages: [{ value: "Hello Kafka" }],
});
await producer.disconnect();

Using with @confluentinc/kafka-javascript Client

import { Kafka } from "@confluentinc/kafka-javascript";
import { KafkaContainer } from "@testcontainers/kafka";

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

const client = new Kafka({
  kafkaJS: {
    brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
  },
});

const producer = client.producer();
await producer.connect();
await producer.send({
  topic: "test-topic",
  messages: [{ value: "Hello Kafka" }],
});
await producer.disconnect();

Testing with Multiple Topics

import { Kafka } from "kafkajs";
import { KafkaContainer } from "@testcontainers/kafka";

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withEnvironment({
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
  })
  .start();

const client = new Kafka({
  brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
});

const admin = client.admin();
await admin.connect();

// Create topics
await admin.createTopics({
  topics: [
    { topic: "topic-1", numPartitions: 3 },
    { topic: "topic-2", numPartitions: 1 },
  ],
});

await admin.disconnect();

Error Handling Pattern

let container: StartedKafkaContainer | undefined;

try {
  container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();
  // Use container for tests
} catch (error) {
  console.error("Failed to start Kafka container:", error);
  throw error;
} finally {
  if (container) {
    await container.stop();
  }
}

Using Container Exec for Kafka CLI Operations

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

// List topics
const listResult = await kafka.exec([
  "kafka-topics",
  "--list",
  "--bootstrap-server",
  "localhost:9092",
]);
console.log(listResult.output);

// Create topic
await kafka.exec([
  "kafka-topics",
  "--create",
  "--bootstrap-server",
  "localhost:9092",
  "--topic",
  "my-topic",
  "--partitions",
  "3",
  "--replication-factor",
  "1",
]);