CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-testcontainers--kafka

Kafka module for Testcontainers that enables developers to easily spin up Kafka containers for integration testing

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Testcontainers Kafka Module

The Testcontainers Kafka module provides a specialized API for spinning up Apache Kafka containers for integration testing in Node.js applications. It supports multiple deployment modes including embedded ZooKeeper, external ZooKeeper, and KRaft (ZooKeeper-less) mode, with comprehensive security features including SSL/TLS and SASL authentication.

Key Information for Agents

Required Dependencies:

  • @testcontainers/kafka (this package)
  • @testcontainers/core is required (provided transitively)
  • testcontainers package is required (provided transitively)
  • Docker must be installed and running
  • Kafka client library (e.g., kafkajs, @confluentinc/kafka-javascript) for connecting to the container (not required by testcontainers itself)

Default Behaviors:

  • Default image: Must be specified in constructor (no default, typically confluentinc/cp-kafka:7.9.1 or similar)
  • Default client port: 9093 (plaintext listener, automatically exposed and mapped to random host port)
  • Default broker port: 9092 (internal, not mapped to host)
  • Default KRaft controller port: 9094 (when using KRaft mode, not mapped to host)
  • Default mode: Embedded ZooKeeper for images < 8.0.0, KRaft mode for images >= 8.0.0
  • Health check: Automatic readiness check (waits for Kafka to be ready)
  • Startup timeout: Default from GenericContainer (typically 60 seconds, configurable)
  • Port mapping: Random available ports on host (use getMappedPort(9093) to retrieve)
  • Container reuse: Not enabled by default (new container per test)
  • Network isolation: Each container runs in isolated network by default
  • Advertised listeners: Automatically configured for container accessibility
  • Auto topic creation: Not enabled by default (configure via KAFKA_AUTO_CREATE_TOPICS_ENABLE)

Threading Model:

  • Container operations are asynchronous (return Promises)
  • start() method is async and must be awaited
  • stop() method is async and must be awaited
  • Container instances are not thread-safe for concurrent modification
  • Multiple containers can run concurrently in separate instances (each gets unique ports)
  • Client connections from Kafka clients are independent of container lifecycle
  • Container configuration methods (e.g., withKraft(), withSaslSslListener()) must be called before start()

Lifecycle:

  • Container must be started with await container.start() before use
  • Container automatically waits for Kafka to be ready before start() resolves
  • Container must be stopped with await container.stop() for cleanup
  • Always stop containers in finally blocks or use test framework cleanup hooks
  • Container stops automatically when Node.js process exits (if not explicitly stopped)
  • Failed containers may leave Docker resources; always call stop() even on errors
  • Use await using syntax (TypeScript 5.2+) for automatic cleanup: await using container = await new KafkaContainer(image).start()
  • Container configuration is immutable after start() is called

Exceptions:

  • ContainerStartException - Container failed to start (timeout, image pull failure, etc.)
  • PortBindingException - Port binding conflicts
  • DockerNotAvailableException - Docker daemon not accessible
  • ImagePullException - Failed to pull Docker image
  • Error - KRaft mode with SASL/SSL on Confluent Platform < 7.5.0
  • Error - KRaft mode on Confluent Platform < 7.0.0
  • Error - Invalid certificate passphrases or malformed PKCS12 files
  • Network errors from Kafka clients - Connection failures, authentication errors, broker unavailable
  • TimeoutError - Health check timeout (container started but Kafka not ready)

Edge Cases:

  • Port conflicts: Testcontainers automatically finds available ports, but conflicts can occur in CI/CD environments
  • Image pull failures: Network issues or invalid image tags cause startup failures
  • Health check timeouts: Slow container startup may exceed timeout (adjust with withStartupTimeout())
  • Multiple containers: Each container gets unique ports; coordinate if testing multi-container scenarios
  • Container reuse: Enable with withReuse() to reuse containers across test runs (requires testcontainers configuration)
  • Network isolation: Containers cannot communicate by default; use withNetwork() for multi-container tests
  • Resource cleanup: Always stop containers even if tests fail; use try/finally or test framework hooks
  • Concurrent tests: Each test should use separate container instances to avoid port conflicts
  • CI/CD environments: May require Docker-in-Docker or remote Docker configuration
  • Version compatibility: Images < 8.0.0 use embedded ZooKeeper by default; images >= 8.0.0 use KRaft automatically
  • KRaft + SASL: Requires Confluent Platform >= 7.5.0; throws error if version is insufficient
  • External ZooKeeper: Must be started before Kafka container; ensure ZooKeeper is ready before starting Kafka
  • SASL/SSL certificates: Must be in PKCS12 format; PEM certificates must be converted
  • Dual listeners: When using SASL/SSL, both plaintext (9093) and secure (custom port) listeners are available
  • Advertised listeners: Automatically configured based on container host and mapped ports
  • Container restart: Use restart() method for testing failover scenarios; container state is preserved
  • Log access: Use logs() method to retrieve container logs for debugging; logs are stream-based
  • Exec commands: Use exec() to run Kafka CLI tools inside container (e.g., kafka-topics, kafka-configs)

Package Information

  • Package Name: @testcontainers/kafka
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @testcontainers/kafka --save-dev

Core Imports

import { KafkaContainer, StartedKafkaContainer } from "@testcontainers/kafka";

For CommonJS:

const { KafkaContainer, StartedKafkaContainer } = require("@testcontainers/kafka");

For TypeScript type annotations, you can also import configuration types:

import type { SaslSslListenerOptions } from "@testcontainers/kafka/build/kafka-container";

Basic Usage

import { KafkaContainer } from "@testcontainers/kafka";

// Start a Kafka container with default settings
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

// Get connection details
const host = container.getHost();
const port = container.getMappedPort(9093);
const brokers = [`${host}:${port}`];

// Use brokers with your Kafka client
// Example with kafkajs:
// const kafka = new Kafka({ brokers });

Architecture

The Kafka module is built around several key components:

  • KafkaContainer: Fluent builder class for configuring Kafka containers before startup
  • StartedKafkaContainer: Represents a running Kafka container with lifecycle management methods
  • Operation Modes: Supports embedded ZooKeeper (default <8.0), external ZooKeeper, and KRaft mode
  • Security Configuration: SASL/SSL authentication with PKCS12 certificate stores
  • Network Integration: Full support for Docker networks and custom hostnames
  • Version Detection: Automatic feature detection based on Confluent Platform version

Capabilities

Container Configuration

Core functionality for creating and configuring Kafka containers with various deployment modes and settings.

class KafkaContainer extends GenericContainer {
  constructor(image: string);

  withZooKeeper(host: string, port: number): this;
  withKraft(): this;
  withSaslSslListener(options: SaslSslListenerOptions): this;
  start(): Promise<StartedKafkaContainer>;
}

Constructor Parameters:

  • image: Docker image name (e.g., "confluentinc/cp-kafka:7.9.1")
    • Images >= 8.0.0 automatically enable KRaft mode
    • Images < 8.0.0 use embedded ZooKeeper by default

Configuration Methods:

withZooKeeper(host: string, port: number): this

Configure Kafka to use an external ZooKeeper instance. Useful when running ZooKeeper in a separate container or when you need control over ZooKeeper configuration.

  • host: ZooKeeper hostname (typically a container alias when using Docker networks)
  • port: ZooKeeper port (typically 2181)
  • Important: ZooKeeper must be started and ready before Kafka container starts
  • Error: If ZooKeeper is not accessible, Kafka container will fail to start
import { Network, GenericContainer, StartedNetwork } from "testcontainers";
import { KafkaContainer } from "@testcontainers/kafka";

await using network = await new Network().start();

await using zookeeper = await new GenericContainer("confluentinc/cp-zookeeper:7.9.1")
  .withNetwork(network)
  .withNetworkAliases("zookeeper")
  .withEnvironment({ ZOOKEEPER_CLIENT_PORT: "2181" })
  .start();

// Wait for ZooKeeper to be ready before starting Kafka
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withNetwork(network)
  .withZooKeeper("zookeeper", 2181)
  .start();

withKraft(): this

Enable KRaft mode for ZooKeeper-less operation. KRaft is Kafka's native consensus protocol that eliminates the need for ZooKeeper.

  • Requires Confluent Platform >= 7.0.0
  • Images >= 8.0.0 automatically enable KRaft, so this method is optional for newer versions
  • Throws error if image version is below 7.0.0
  • Important: Cannot be used with withZooKeeper() (mutually exclusive)
  • Performance: KRaft mode typically starts faster than ZooKeeper mode
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withKraft()
  .start();

withSaslSslListener(options: SaslSslListenerOptions): this

Configure a secure listener with SASL authentication and SSL/TLS encryption. Enables production-like security testing scenarios.

interface SaslSslListenerOptions {
  sasl: {
    mechanism: "SCRAM-SHA-256" | "SCRAM-SHA-512";
    user: { name: string; password: string };
  };
  port: number;
  keystore: { content: Buffer | string | Readable; passphrase: string };
  truststore?: { content: Buffer | string | Readable; passphrase: string };
}
  • Requires PKCS12 certificate files for keystore and optional truststore
  • Supports SCRAM-SHA-256 and SCRAM-SHA-512 authentication mechanisms
  • When used with KRaft mode, requires Confluent Platform >= 7.5.0
  • Important: Both plaintext (9093) and secure (custom port) listeners are available
  • Error: Throws error if KRaft + SASL with version < 7.5.0
  • Error: Throws error if certificate passphrases are incorrect or files are malformed
  • Port selection: Choose a port that doesn't conflict (recommended: 9096, 9097, etc.)

See SASL/SSL Configuration for detailed documentation and examples.

start(): Promise<StartedKafkaContainer>

Start the Kafka container and wait for it to be ready.

  • Returns a StartedKafkaContainer instance for interacting with the running container
  • Throws error if using KRaft with SASL and version < 7.5.0
  • Automatically configures advertised listeners for container accessibility
  • Important: Must be awaited; container is not ready until Promise resolves
  • Timeout: Uses default or configured startup timeout; throws error if Kafka doesn't become ready

Started Container Operations

Methods for interacting with a running Kafka container, including retrieving connection details and managing the container lifecycle.

class StartedKafkaContainer extends AbstractStartedContainer {
  getHost(): string;
  getMappedPort(port: number): number;
  getFirstMappedPort(): number;
  stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>;
  restart(options?: Partial<RestartOptions>): Promise<void>;
  getId(): string;
  getName(): string;
  getNetworkNames(): string[];
  getNetworkId(networkName: string): string;
  getIpAddress(networkName: string): string;
  exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>;
  logs(opts?: { since?: number; tail?: number }): Promise<Readable>;
  [Symbol.asyncDispose](): Promise<void>;
}

Connection Methods:

getHost(): string

Get the host where the container is accessible. Typically returns "localhost" for local Docker or the Docker host IP in other environments.

  • Local Docker: Returns "localhost"
  • Remote Docker: Returns Docker host IP address
  • Docker networks: Use network aliases for container-to-container communication

getMappedPort(port: number): number

Get the host port mapped to a container port. For Kafka, the default client port is 9093.

  • port: Container port number (e.g., 9093 for plaintext, 9096 for secure listener)
  • Returns: Host port number (randomly assigned)
  • Important: Always use this method to get the actual host port; container ports are randomly mapped
  • Error: Throws error if port is not exposed or container is not started
const host = container.getHost();
const port = container.getMappedPort(9093);
const brokers = [`${host}:${port}`];

getFirstMappedPort(): number

Get the first mapped port. Convenient for single-port containers.

  • Returns: First exposed port's host mapping
  • Use case: When only one port is exposed and you don't need to specify which one

Lifecycle Methods:

stop(options?: Partial<StopOptions>): Promise<StoppedTestContainer>

Stop the running Kafka container.

  • options.timeout: Timeout in milliseconds for stop operation
  • options.remove: Whether to remove container after stopping (default: false)
  • options.removeVolumes: Whether to remove volumes after stopping (default: false)
  • Important: Always call this method for cleanup, even if tests fail
  • Automatic cleanup: Container stops automatically with await using syntax

restart(options?: Partial<RestartOptions>): Promise<void>

Restart the Kafka container. Useful for testing failover scenarios.

  • options.timeout: Timeout in milliseconds for restart operation
  • State: Container state is preserved (topics, messages, etc.)
  • Use case: Testing application behavior during broker restarts

[Symbol.asyncDispose](): Promise<void>

Enables automatic cleanup using await using syntax (TypeScript 5.2+):

await using container = await new KafkaContainer(image).start();
// Container automatically stopped when scope exits
  • Automatic: No need to manually call stop()
  • Scope: Container stops when leaving the scope (function, block, etc.)
  • Error handling: Container still stops even if errors occur

Container Information:

getId(): string

Get the Docker container ID.

  • Returns: Full Docker container ID
  • Use case: Debugging, logging, container identification

getName(): string

Get the container name.

  • Returns: Container name (auto-generated or set via withName())
  • Use case: Container identification, network communication

Network Methods:

getNetworkNames(): string[]

Get all network names the container is connected to.

  • Returns: Array of network names
  • Use case: Inspecting container network configuration

getNetworkId(networkName: string): string

Get the Docker network ID for a given network name.

  • networkName: Name of the network
  • Returns: Docker network ID
  • Use case: Advanced network debugging

getIpAddress(networkName: string): string

Get the container's IP address on a specific Docker network. Useful for container-to-container communication.

  • networkName: Name of the network
  • Returns: IP address on that network
  • Use case: Direct container-to-container connections without port mapping

Diagnostics:

exec(command: string | string[], opts?: Partial<ExecOptions>): Promise<ExecResult>

Execute a command inside the running container.

  • command: Command string or array of command parts
  • opts.workingDir: Working directory for command execution
  • opts.user: User to run command as
  • opts.env: Environment variables for command
  • Returns: ExecResult with output, stdout, stderr, exitCode
  • Use case: Running Kafka CLI tools (e.g., kafka-topics, kafka-configs, kafka-console-producer)
  • Important: Commands run inside container; use container-internal addresses (e.g., localhost:9092)
const result = await container.exec(["kafka-topics", "--list", "--bootstrap-server", "localhost:9092"]);
console.log(result.output);

logs(opts?: { since?: number; tail?: number }): Promise<Readable>

Get container logs as a readable stream.

  • since: Unix timestamp to get logs from
  • tail: Number of lines from the end to retrieve
  • Returns: Readable stream of log lines
  • Use case: Debugging container startup issues, monitoring Kafka logs
  • Important: Logs are stream-based; read incrementally for large logs
const logs = await container.logs({ tail: 100 });
logs.on('data', line => console.log(line.toString()));

Inherited Container Configuration

KafkaContainer inherits extensive configuration methods from the base GenericContainer class.

// Network configuration
withNetwork(network: StartedNetwork): this;
withNetworkMode(networkMode: string): this;
withNetworkAliases(...networkAliases: string[]): this;
withExtraHosts(extraHosts: ExtraHost[]): this;

// Port configuration
withExposedPorts(...ports: PortWithOptionalBinding[]): this;

// Environment and startup
withEnvironment(environment: Environment): this;
withCommand(command: string[]): this;
withEntrypoint(entrypoint: string[]): this;
withWorkingDir(workingDir: string): this;

// Container identity
withName(name: string): this;
withLabels(labels: Labels): this;
withHostname(hostname: string): this;

// Storage
withBindMounts(bindMounts: BindMount[]): this;
withTmpFs(tmpFs: TmpFs): this;
withCopyFilesToContainer(filesToCopy: FileToCopy[]): this;
withCopyDirectoriesToContainer(directoriesToCopy: DirectoryToCopy[]): this;
withCopyContentToContainer(contentsToCopy: ContentToCopy[]): this;

// Lifecycle
withHealthCheck(healthCheck: HealthCheck): this;
withStartupTimeout(startupTimeoutMs: number): this;
withWaitStrategy(waitStrategy: WaitStrategy): this;
withReuse(): this;
withAutoRemove(autoRemove: boolean): this;

// Security and permissions
withUser(user: string): this;
withPrivilegedMode(): this;
withAddedCapabilities(...capabilities: string[]): this;
withDroppedCapabilities(...capabilities: string[]): this;

// Resources
withResourcesQuota(quota: ResourcesQuota): this;
withSharedMemorySize(bytes: number): this;
withUlimits(ulimits: Ulimits): this;

// Advanced
withPlatform(platform: string): this;
withPullPolicy(pullPolicy: ImagePullPolicy): this;
withIpcMode(ipcMode: string): this;
withDefaultLogDriver(): this;
withLogConsumer(logConsumer: (stream: Readable) => unknown): this;

Common Inherited Methods:

withNetwork(network: StartedNetwork): this

Connect the container to a Docker network for container-to-container communication.

  • network: Started network instance
  • Use case: Multi-container test scenarios
  • Important: Containers on same network can communicate using network aliases
await using network = await new Network().start();
await using kafka = await new KafkaContainer(image)
  .withNetwork(network)
  .start();

withNetworkAliases(...networkAliases: string[]): this

Set network aliases for the container. Other containers on the same network can connect using these aliases.

  • networkAliases: One or more alias names
  • Use case: Other containers connecting to Kafka using alias (e.g., "kafka:9093")
  • Important: Aliases only work within the same Docker network
await using kafka = await new KafkaContainer(image)
  .withNetwork(network)
  .withNetworkAliases("kafka", "broker")
  .start();

withEnvironment(environment: Environment): this

Set environment variables in the container. Type: Record<string, string>.

  • environment: Object with environment variable key-value pairs
  • Use case: Configuring Kafka settings (e.g., KAFKA_AUTO_CREATE_TOPICS_ENABLE, KAFKA_NUM_PARTITIONS)
  • Important: All values must be strings (convert numbers/booleans to strings)
await using kafka = await new KafkaContainer(image)
  .withEnvironment({
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
    KAFKA_NUM_PARTITIONS: "3",
    KAFKA_DEFAULT_REPLICATION_FACTOR: "1"
  })
  .start();

withReuse(): this

Enable container reuse across test runs. The container won't be stopped after tests complete, and subsequent runs will reuse the same container if the configuration matches.

  • Use case: Faster test execution in development
  • Important: Requires testcontainers configuration file (.testcontainers.properties or environment variables)
  • State: Container state persists across test runs (topics, messages, etc.)
await using kafka = await new KafkaContainer(image)
  .withReuse()
  .start();

withStartupTimeout(startupTimeoutMs: number): this

Set the maximum time to wait for the container to start (in milliseconds).

  • startupTimeoutMs: Timeout in milliseconds
  • Default: Typically 60 seconds (from GenericContainer)
  • Use case: Slow environments (CI/CD, resource-constrained systems)
  • Error: Throws timeout error if container doesn't become ready within timeout
await using kafka = await new KafkaContainer(image)
  .withStartupTimeout(120000) // 2 minutes
  .start();

withLogConsumer(logConsumer: (stream: Readable) => unknown): this

Attach a log consumer to capture container logs.

  • logConsumer: Function that receives a Readable stream
  • Use case: Real-time log monitoring, test debugging
  • Important: Logs are stream-based; handle incrementally
await using kafka = await new KafkaContainer(image)
  .withLogConsumer(stream => {
    stream.on('data', line => console.log(line.toString()));
  })
  .start();

Types

// SASL/SSL Configuration Types
interface SaslSslListenerOptions {
  sasl: SaslOptions;
  port: number;
  keystore: PKCS12CertificateStore;
  truststore?: PKCS12CertificateStore;
}

interface SaslOptions {
  mechanism: "SCRAM-SHA-256" | "SCRAM-SHA-512";
  user: User;
}

interface User {
  name: string;
  password: string;
}

interface PKCS12CertificateStore {
  content: Buffer | string | Readable;
  passphrase: string;
}

// Container Lifecycle Types
interface StopOptions {
  timeout: number;
  remove: boolean;
  removeVolumes: boolean;
}

interface RestartOptions {
  timeout: number;
}

interface StoppedTestContainer {
  getId(): string;
  copyArchiveFromContainer(path: string): Promise<NodeJS.ReadableStream>;
}

// Execution Types
interface ExecOptions {
  workingDir: string;
  user: string;
  env: Environment;
}

interface ExecResult {
  output: string;
  stdout: string;
  stderr: string;
  exitCode: number;
}

// Configuration Types
type Environment = { [key: string]: string };

type Labels = { [key: string]: string };

type PortWithOptionalBinding = number | `${number}/${"tcp" | "udp"}` | PortWithBinding;

interface PortWithBinding {
  container: number;
  host?: number;
  protocol?: "tcp" | "udp";
}

interface ExtraHost {
  host: string;
  ipAddress: string;
}

// Storage Types
type BindMode = "rw" | "ro" | "z" | "Z";

interface BindMount {
  source: string;
  target: string;
  mode?: BindMode;
}

interface FileToCopy {
  source: string;
  target: string;
  mode?: number;
}

type DirectoryToCopy = FileToCopy;

interface ContentToCopy {
  content: Buffer | string | Readable;
  target: string;
  mode?: number;
}

type TmpFs = { [dir: string]: string };

// Health and Wait Types
interface HealthCheck {
  test: ["CMD-SHELL", string] | ["CMD", ...string[]];
  interval?: number;
  timeout?: number;
  retries?: number;
  startPeriod?: number;
}

interface WaitStrategy {
  waitUntilReady(container: unknown, boundPorts: unknown, startTime?: Date): Promise<void>;
  withStartupTimeout(startupTimeoutMs: number): WaitStrategy;
  isStartupTimeoutSet(): boolean;
}

// Resource Types
interface ResourcesQuota {
  memory?: number; // Memory limit in Gigabytes
  cpu?: number; // CPU quota in units of CPUs
}

type Ulimits = { [name: string]: { hard: number | undefined; soft: number | undefined } };

// Pull Policy Type
interface ImagePullPolicy {
  shouldPull(): boolean;
}

// Network Types
interface StartedNetwork {
  getName(): string;
  getId(): string;
  stop(): Promise<StoppedNetwork>;
}

interface StoppedNetwork {
  getName(): string;
  getId(): string;
}

// Stream Type (from Node.js)
// Readable is imported from "stream" module
type Readable = NodeJS.ReadableStream;

Version Compatibility

  • KRaft Mode: Requires Confluent Platform >= 7.0.0
  • KRaft with SASL: Requires Confluent Platform >= 7.5.0
  • Automatic KRaft: Images >= 8.0.0 automatically use KRaft mode
  • ZooKeeper Mode: Supported on all Confluent Platform versions
  • SASL/SSL with ZooKeeper: Supported on all versions that support SASL/SSL

Common Patterns

Container Reuse for Faster Tests

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withReuse()
  .start();

Custom Network with Multiple Containers

await using network = await new Network().start();

await using app = await new GenericContainer("my-app")
  .withNetwork(network)
  .withNetworkAliases("app")
  .withEnvironment({ KAFKA_BROKERS: "kafka:9092" })
  .start();

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withNetwork(network)
  .withNetworkAliases("kafka")
  .start();

Adding Custom Kafka Configuration

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withEnvironment({
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
    KAFKA_NUM_PARTITIONS: "3",
    KAFKA_DEFAULT_REPLICATION_FACTOR: "1",
    KAFKA_LOG_RETENTION_HOURS: "1"
  })
  .start();

Using with kafkajs Client

import { Kafka } from "kafkajs";
import { KafkaContainer } from "@testcontainers/kafka";

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

const client = new Kafka({
  brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
});

const producer = client.producer();
await producer.connect();
await producer.send({
  topic: "test-topic",
  messages: [{ value: "Hello Kafka" }],
});
await producer.disconnect();

Using with @confluentinc/kafka-javascript Client

import { Kafka } from "@confluentinc/kafka-javascript";
import { KafkaContainer } from "@testcontainers/kafka";

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

const client = new Kafka({
  kafkaJS: {
    brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
  },
});

const producer = client.producer();
await producer.connect();
await producer.send({
  topic: "test-topic",
  messages: [{ value: "Hello Kafka" }],
});
await producer.disconnect();

Testing with Multiple Topics

import { Kafka } from "kafkajs";
import { KafkaContainer } from "@testcontainers/kafka";

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
  .withEnvironment({
    KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true",
  })
  .start();

const client = new Kafka({
  brokers: [`${kafka.getHost()}:${kafka.getMappedPort(9093)}`],
});

const admin = client.admin();
await admin.connect();

// Create topics
await admin.createTopics({
  topics: [
    { topic: "topic-1", numPartitions: 3 },
    { topic: "topic-2", numPartitions: 1 },
  ],
});

await admin.disconnect();

Error Handling Pattern

let container: StartedKafkaContainer | undefined;

try {
  container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();
  // Use container for tests
} catch (error) {
  console.error("Failed to start Kafka container:", error);
  throw error;
} finally {
  if (container) {
    await container.stop();
  }
}

Using Container Exec for Kafka CLI Operations

await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1").start();

// List topics
const listResult = await kafka.exec([
  "kafka-topics",
  "--list",
  "--bootstrap-server",
  "localhost:9092",
]);
console.log(listResult.output);

// Create topic
await kafka.exec([
  "kafka-topics",
  "--create",
  "--bootstrap-server",
  "localhost:9092",
  "--topic",
  "my-topic",
  "--partitions",
  "3",
  "--replication-factor",
  "1",
]);

docs

index.md

sasl-ssl-configuration.md

tile.json