Advanced security configuration for Kafka containers with SASL authentication and SSL/TLS encryption. This enables production-like security testing scenarios with authenticated and encrypted connections.
Required for SASL/SSL:
.pfx files) for keystore and optional truststoreDefault Behaviors:
withSaslSslListener().pfx); PEM certificates must be convertedError Conditions:
start()Edge Cases:
withSaslSslListener(); additional users require manual configuration via exec()Configure a secure listener with SASL authentication and SSL/TLS encryption for Kafka connections.
/**
* Configure SASL/SSL authentication listener for secure Kafka connections
* @param options - SASL/SSL configuration options
* @returns Container instance for method chaining
* @throws Error if KRaft mode with version < 7.5.0
* @throws Error if certificate passphrases are invalid or files are malformed
*/
withSaslSslListener(options: SaslSslListenerOptions): this;
interface SaslSslListenerOptions {
/** SASL authentication configuration */
sasl: SaslOptions;
/** Port number for the secure listener */
port: number;
/** Server keystore configuration (PKCS12 format) */
keystore: PKCS12CertificateStore;
/** Optional server truststore configuration (PKCS12 format) */
truststore?: PKCS12CertificateStore;
}
interface SaslOptions {
/** SASL authentication mechanism */
mechanism: "SCRAM-SHA-256" | "SCRAM-SHA-512";
/** User credentials for authentication */
user: User;
}
interface User {
/** Username for SASL authentication */
name: string;
/** Password for SASL authentication */
password: string;
}
interface PKCS12CertificateStore {
/** Certificate content as Buffer, string, or Readable stream */
content: Buffer | string | Readable;
/** Passphrase to decrypt the certificate */
passphrase: string;
}When using withSaslSslListener() with withKraft(), the container will throw an error if the Kafka version is below 7.5.0. The error occurs during start() method execution.
Version Detection:
The SASL/SSL listener provides:
import fs from "fs";
import { KafkaContainer } from "@testcontainers/kafka";
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync("kafka.server.keystore.pfx"),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync("kafka.server.truststore.pfx"),
passphrase: "serverTruststorePassword",
},
})
.start();
// Get connection details for secure listener
const host = container.getHost();
const securePort = container.getMappedPort(9096);
const secureBrokers = [`${host}:${securePort}`];
// Plaintext listener is still available on port 9093
const plaintextPort = container.getMappedPort(9093);
const plaintextBrokers = [`${host}:${plaintextPort}`];Important Notes:
import fs from "fs";
import { KafkaContainer } from "@testcontainers/kafka";
// KRaft mode with SASL requires Confluent Platform >= 7.5.0
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withKraft()
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync("kafka.server.keystore.pfx"),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync("kafka.server.truststore.pfx"),
passphrase: "serverTruststorePassword",
},
})
.start();Important Notes:
When using Docker networks, other containers can connect to Kafka using the secure listener via network aliases.
import fs from "fs";
import { Network } from "testcontainers";
import { KafkaContainer } from "@testcontainers/kafka";
await using network = await new Network().start();
await using kafka = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withNetwork(network)
.withNetworkAliases("kafka")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync("kafka.server.keystore.pfx"),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync("kafka.server.truststore.pfx"),
passphrase: "serverTruststorePassword",
},
})
.start();
// Other containers on the network can connect via:
// - kafka:9096 (secure listener within network)
// - kafka:9093 (plaintext listener within network)
// - kafka:9092 (internal broker port, not accessible from host)Network Communication:
getHost() and getMappedPort() (e.g., "localhost:32768")import fs from "fs";
import { KafkaContainer } from "@testcontainers/kafka";
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-256", // Using SHA-256 instead of SHA-512
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync("kafka.server.keystore.pfx"),
passphrase: "serverKeystorePassword",
},
})
.start();Mechanism Selection:
import fs from "fs";
import { KafkaContainer } from "@testcontainers/kafka";
const keystoreBuffer = fs.readFileSync("kafka.server.keystore.pfx");
const truststoreBuffer = fs.readFileSync("kafka.server.truststore.pfx");
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: keystoreBuffer, // Buffer directly
passphrase: "serverKeystorePassword",
},
truststore: {
content: truststoreBuffer, // Buffer directly
passphrase: "serverTruststorePassword",
},
})
.start();import fs from "fs";
import { KafkaContainer } from "@testcontainers/kafka";
const keystoreBase64 = fs.readFileSync("kafka.server.keystore.pfx").toString("base64");
const truststoreBase64 = fs.readFileSync("kafka.server.truststore.pfx").toString("base64");
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: keystoreBase64, // Base64 string
passphrase: "serverKeystorePassword",
},
truststore: {
content: truststoreBase64, // Base64 string
passphrase: "serverTruststorePassword",
},
})
.start();When connecting to a Kafka container with SASL/SSL enabled, your Kafka client must be configured appropriately.
import { Kafka } from "@confluentinc/kafka-javascript";
import { KafkaContainer } from "@testcontainers/kafka";
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync("kafka.server.keystore.pfx"),
passphrase: "serverKeystorePassword",
},
})
.start();
const kafka = new Kafka({
kafkaJS: {
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
ssl: true,
},
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "app-user",
"sasl.password": "userPassword",
"security.protocol": "sasl_ssl",
"ssl.ca.location": "/path/to/kafka.client.truststore.pem",
});
const producer = kafka.producer();
await producer.connect();Client Requirements:
ssl: true or "security.protocol": "sasl_ssl"import { Kafka } from "kafkajs";
import fs from "fs";
import { KafkaContainer } from "@testcontainers/kafka";
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.9.1")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync("kafka.server.keystore.pfx"),
passphrase: "serverKeystorePassword",
},
})
.start();
const kafka = new Kafka({
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
ssl: {
ca: [fs.readFileSync("/path/to/ca-cert.pem", "utf-8")],
},
sasl: {
mechanism: "scram-sha-512",
username: "app-user",
password: "userPassword",
},
});
const producer = kafka.producer();
await producer.connect();kafkajs Configuration:
ssl object with CA certificatessasl object with mechanism, username, passwordThe module requires certificates in PKCS12 (.pfx) format. If you have PEM certificates, you can convert them using OpenSSL:
# Convert PEM to PKCS12 for keystore
openssl pkcs12 -export \
-in server.crt \
-inkey server.key \
-out kafka.server.keystore.pfx \
-name kafka-server \
-password pass:serverKeystorePassword
# Convert PEM to PKCS12 for truststore
openssl pkcs12 -export \
-in ca-cert.pem \
-nokeys \
-out kafka.server.truststore.pfx \
-password pass:serverTruststorePasswordCertificate Requirements:
Generating Self-Signed Certificates:
# Generate CA certificate
openssl req -new -x509 -keyout ca-key.pem -out ca-cert.pem -days 365
# Generate server certificate
openssl req -new -keyout server-key.pem -out server.csr
openssl x509 -req -in server.csr -CA ca-cert.pem -CAkey ca-key.pem -out server.crt -days 365
# Convert to PKCS12
openssl pkcs12 -export \
-in server.crt \
-inkey server-key.pem \
-out kafka.server.keystore.pfx \
-name kafka-server \
-password pass:serverKeystorePasswordDual Listeners: The secure listener coexists with the plaintext listener. Both are available:
Port Configuration: Choose a secure listener port that doesn't conflict with default Kafka ports:
Truststore: The truststore parameter is optional. Include it when:
Certificate Content: The content field accepts both:
Buffer: From fs.readFileSync() without encodingstring: Base64-encoded certificate dataReadable: Stream of certificate dataAuthentication Timing:
kafka-configs commandError Handling: The container will throw errors for:
Client Certificate Validation: When using truststore:
Multiple Users: The withSaslSslListener() method configures a single user. To add additional users:
container.exec() to run kafka-configs commandsCertificate Expiration: Expired certificates cause connection failures:
Network Security: When using Docker networks: