or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

security.mddocs/common/

Security API

The Kafka Security API provides authentication and encryption capabilities for secure client-broker communication. Kafka supports multiple security protocols and authentication mechanisms to protect data in transit and verify client identities.

Security Protocols

SecurityProtocol

Enumeration defining the available security protocols for Kafka communication.

package org.apache.kafka.common.security.auth;

public enum SecurityProtocol {
    /** Un-authenticated, non-encrypted channel */
    PLAINTEXT(0, "PLAINTEXT"),

    /** SSL channel */
    SSL(1, "SSL"),

    /** SASL authenticated, non-encrypted channel */
    SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),

    /** SASL authenticated, SSL channel */
    SASL_SSL(3, "SASL_SSL");

    public final short id;
    public final String name;

    public static List<String> names();
    public static SecurityProtocol forId(short id);
    public static SecurityProtocol forName(String name);
}

Usage Example:

import org.apache.kafka.common.security.auth.SecurityProtocol;

// Get protocol by name
SecurityProtocol protocol = SecurityProtocol.forName("SASL_SSL");

// Get all protocol names
List<String> protocols = SecurityProtocol.names();
// Returns: ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]

Protocol Selection:

  • PLAINTEXT: No authentication or encryption. Use only for development or trusted networks.
  • SSL: Encrypted communication with optional client authentication via certificates.
  • SASL_PLAINTEXT: SASL authentication without encryption. Not recommended for production.
  • SASL_SSL: SASL authentication with SSL encryption. Recommended for production.

Authentication

KafkaPrincipal

Represents an authenticated principal (user or service) in Kafka.

package org.apache.kafka.common.security.auth;

import java.security.Principal;

public class KafkaPrincipal implements Principal {
    public static final String USER_TYPE = "User";
    public static final KafkaPrincipal ANONYMOUS = new KafkaPrincipal(USER_TYPE, "ANONYMOUS");

    public KafkaPrincipal(String principalType, String name);
    public KafkaPrincipal(String principalType, String name, boolean tokenAuthenticated);

    @Override
    public String getName();
    public String getPrincipalType();
    public boolean tokenAuthenticated();
    public void tokenAuthenticated(boolean tokenAuthenticated);

    @Override
    public String toString(); // Returns "principalType:name"
    @Override
    public boolean equals(Object o);
    @Override
    public int hashCode();
}

Usage Example:

import org.apache.kafka.common.security.auth.KafkaPrincipal;

// Create a user principal
KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");

// Create a service principal
KafkaPrincipal service = new KafkaPrincipal("Service", "data-pipeline");

// Token-authenticated principal
KafkaPrincipal tokenUser = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob", true);

System.out.println(user.toString()); // "User:alice"
System.out.println(user.getName()); // "alice"
System.out.println(user.getPrincipalType()); // "User"

Principal Types:

  • The default authorizer uses "User" as the principal type
  • Custom authorizers can use different principal types for group or role-based ACLs
  • Extend KafkaPrincipal to represent relationships between principals

KafkaPrincipalBuilder

Interface for custom principal building from authentication context.

package org.apache.kafka.common.security.auth;

public interface KafkaPrincipalBuilder {
    /**
     * Build a kafka principal from the authentication context.
     *
     * @param context The authentication context (either SslAuthenticationContext
     *                or SaslAuthenticationContext)
     * @return The built principal which may provide additional enrichment
     */
    KafkaPrincipal build(AuthenticationContext context);
}

Usage Example:

import org.apache.kafka.common.security.auth.*;
import java.util.Map;

public class CustomPrincipalBuilder implements KafkaPrincipalBuilder {
    @Override
    public KafkaPrincipal build(AuthenticationContext context) {
        if (context instanceof SslAuthenticationContext) {
            SslAuthenticationContext sslContext = (SslAuthenticationContext) context;
            // Extract principal from SSL certificate
            String commonName = extractCommonName(sslContext.session());
            return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, commonName);
        } else if (context instanceof SaslAuthenticationContext) {
            SaslAuthenticationContext saslContext = (SaslAuthenticationContext) context;
            // Extract principal from SASL authentication
            return new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
                saslContext.server().getAuthorizationID());
        }
        return KafkaPrincipal.ANONYMOUS;
    }

    private String extractCommonName(javax.net.ssl.SSLSession session) {
        // Extract CN from certificate
        // Implementation details omitted
        return "extracted-name";
    }
}

SSL Configuration

SslConfigs

Configuration constants for SSL/TLS connections.

package org.apache.kafka.common.config;

public class SslConfigs {
    // Protocol and provider
    public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
    public static final String DEFAULT_SSL_PROTOCOL = "TLSv1.3";
    public static final String SSL_PROVIDER_CONFIG = "ssl.provider";

    // Enabled protocols and cipher suites
    public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
    public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.3";
    public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";

    // Keystore configuration (client authentication)
    public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
    public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
    public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
    public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
    public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";

    // PEM format configuration
    public static final String SSL_KEYSTORE_KEY_CONFIG = "ssl.keystore.key";
    public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = "ssl.keystore.certificate.chain";
    public static final String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = "ssl.truststore.certificates";

    // Truststore configuration (server validation)
    public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
    public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
    public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
    public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";

    // Key and trust manager algorithms
    public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
    public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";

    // Endpoint identification
    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =
        "ssl.endpoint.identification.algorithm";
    public static final String DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "https";

    // Advanced configuration
    public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG =
        "ssl.secure.random.implementation";
    public static final String SSL_ENGINE_FACTORY_CLASS_CONFIG = "ssl.engine.factory.class";
}

SSL Configuration Examples

Basic SSL Configuration (Server Authentication Only):

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// SSL configuration
props.put("security.protocol", "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore-password");

Producer<String, String> producer = new KafkaProducer<>(props);

Mutual TLS (mTLS) with Client Authentication:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "secure-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

// SSL configuration with client authentication
props.put("security.protocol", "SSL");

// Truststore (for server certificate validation)
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore-password");

// Keystore (for client certificate authentication)
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystore-password");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "key-password");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

SSL with PEM Format:

import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SSL");

// PEM format configuration
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");
props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
    "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----");
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
    "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----");
props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG,
    "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----");

Custom SSL Protocol and Cipher Suites:

import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SSL");

// Protocol configuration
props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2,TLSv1.3");

// Cipher suites
props.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
    "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");

// Truststore configuration
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");

SASL Configuration

SaslConfigs

Configuration constants for SASL authentication.

package org.apache.kafka.common.config;

public class SaslConfigs {
    // Mechanism selection
    public static final String SASL_MECHANISM = "sasl.mechanism";
    public static final String GSSAPI_MECHANISM = "GSSAPI";
    public static final String DEFAULT_SASL_MECHANISM = GSSAPI_MECHANISM;

    // JAAS configuration
    public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";

    // Callback handlers
    public static final String SASL_CLIENT_CALLBACK_HANDLER_CLASS =
        "sasl.client.callback.handler.class";
    public static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS =
        "sasl.login.callback.handler.class";
    public static final String SASL_LOGIN_CLASS = "sasl.login.class";

    // Kerberos configuration
    public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
    public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
    public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
    public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR =
        "sasl.kerberos.ticket.renew.window.factor";
    public static final double DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = 0.80;
    public static final String SASL_KERBEROS_TICKET_RENEW_JITTER =
        "sasl.kerberos.ticket.renew.jitter";
    public static final double DEFAULT_KERBEROS_TICKET_RENEW_JITTER = 0.05;
    public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN =
        "sasl.kerberos.min.time.before.relogin";
    public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 60000L;

    // Login refresh configuration
    public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR =
        "sasl.login.refresh.window.factor";
    public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80;
    public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER =
        "sasl.login.refresh.window.jitter";
    public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05;
    public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS =
        "sasl.login.refresh.min.period.seconds";
    public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60;
    public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS =
        "sasl.login.refresh.buffer.seconds";
    public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300;

    // Connection timeouts
    public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms";
    public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms";

    // Retry configuration
    public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS =
        "sasl.login.retry.backoff.max.ms";
    public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS = 10000;
    public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms";
    public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS = 100;

    // OAuth configuration
    public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL =
        "sasl.oauthbearer.token.endpoint.url";
    public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_URL =
        "sasl.oauthbearer.jwks.endpoint.url";
    public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope";
    public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID =
        "sasl.oauthbearer.client.credentials.client.id";
    public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET =
        "sasl.oauthbearer.client.credentials.client.secret";

    // OAuth JWT configuration
    public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS =
        "sasl.oauthbearer.jwt.retriever.class";
    public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS =
        "sasl.oauthbearer.jwt.validator.class";

    // OAuth scope and claim names
    public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME =
        "sasl.oauthbearer.scope.claim.name";
    public static final String DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "scope";
    public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME =
        "sasl.oauthbearer.sub.claim.name";
    public static final String DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sub";

    // OAuth validation
    public static final String SASL_OAUTHBEARER_EXPECTED_AUDIENCE =
        "sasl.oauthbearer.expected.audience";
    public static final String SASL_OAUTHBEARER_EXPECTED_ISSUER =
        "sasl.oauthbearer.expected.issuer";
    public static final String SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS =
        "sasl.oauthbearer.clock.skew.seconds";
    public static final int DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS = 30;
}

SASL Mechanisms

SASL/PLAIN

Simple username/password authentication mechanism.

LoginModule:

package org.apache.kafka.common.security.plain;

import javax.security.auth.spi.LoginModule;

public class PlainLoginModule implements LoginModule {
    // JAAS options: username, password
}

Configuration Example:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// SASL/PLAIN configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"alice\" " +
    "password=\"alice-secret\";");

// SSL truststore for encryption
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Producer<String, String> producer = new KafkaProducer<>(props);

SASL/SCRAM

Salted Challenge Response Authentication Mechanism with SHA-256 or SHA-512.

LoginModule:

package org.apache.kafka.common.security.scram;

import javax.security.auth.spi.LoginModule;

public class ScramLoginModule implements LoginModule {
    // JAAS options: username, password, tokenauth (optional)
    public static final String TOKEN_AUTH_CONFIG = "tokenauth";
}

SCRAM-SHA-256 Configuration:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "scram-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

// SASL/SCRAM-SHA-256 configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"bob\" " +
    "password=\"bob-secret\";");

// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

SCRAM-SHA-512 Configuration:

import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"admin\" " +
    "password=\"admin-secret\";");

// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

SCRAM with Delegation Tokens:

import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"DELEGATION_TOKEN\" " +
    "password=\"<token-hmac>\" " +
    "tokenauth=\"true\";");

// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

SASL/OAUTHBEARER

OAuth 2.0 bearer token authentication mechanism.

LoginModule:

package org.apache.kafka.common.security.oauthbearer;

import javax.security.auth.spi.LoginModule;

public class OAuthBearerLoginModule implements LoginModule {
    public static final String OAUTHBEARER_MECHANISM = "OAUTHBEARER";
}

OAuth with Client Credentials Flow:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// SASL/OAUTHBEARER configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");

// OAuth token endpoint
props.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL,
    "https://oauth-provider.example.com/oauth2/token");

// Client credentials
props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID, "kafka-client");
props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET, "client-secret");

// Optional scope
props.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "kafka.read kafka.write");

// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Producer<String, String> producer = new KafkaProducer<>(props);

OAuth with Custom Callback Handler:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import java.util.Properties;

// Custom callback handler implementation
public class CustomOAuthCallbackHandler implements AuthenticateCallbackHandler {
    @Override
    public void configure(Map<String, ?> configs, String mechanism,
                         List<AppConfigurationEntry> jaasConfigEntries) {
        // Initialize with configuration
    }

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerTokenCallback) {
                OAuthBearerTokenCallback tokenCallback = (OAuthBearerTokenCallback) callback;
                // Retrieve token from your identity provider
                OAuthBearerToken token = retrieveToken();
                tokenCallback.token(token);
            }
        }
    }

    @Override
    public void close() {
        // Cleanup resources
    }

    private OAuthBearerToken retrieveToken() {
        // Implementation to retrieve token from OAuth provider
        return null;
    }
}

// Configuration using custom handler
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "oauth-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
    "com.example.CustomOAuthCallbackHandler");

// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

OAuth Token Refresh Configuration:

import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");

// Token refresh settings
props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, 0.8);
props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, 0.05);
props.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 60);
props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, 300);

// Connection timeouts
props.put(SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS, 10000);
props.put(SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS, 10000);

// Retry configuration
props.put(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS, 100);
props.put(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS, 10000);

SASL/GSSAPI (Kerberos)

Kerberos authentication using GSSAPI mechanism.

Configuration Example:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

// SASL/GSSAPI (Kerberos) configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "com.sun.security.auth.module.Krb5LoginModule required " +
    "useKeyTab=true " +
    "storeKey=true " +
    "keyTab=\"/path/to/kafka-client.keytab\" " +
    "principal=\"kafka-client@EXAMPLE.COM\";");

// Kerberos ticket renewal
props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 0.8);
props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, 0.05);
props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 60000);

// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Producer<String, String> producer = new KafkaProducer<>(props);

Kerberos with Ticket Cache:

import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "com.sun.security.auth.module.Krb5LoginModule required " +
    "useTicketCache=true " +
    "renewTicket=true " +
    "serviceName=\"kafka\";");

// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Authentication Interfaces

Login

Interface for SASL login implementations.

package org.apache.kafka.common.security.auth;

import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import java.util.Map;

public interface Login {
    /**
     * Configures this login instance.
     *
     * @param configs Kafka configuration options
     * @param contextName JAAS context name
     * @param jaasConfiguration JAAS configuration
     * @param loginCallbackHandler Login callback handler
     */
    void configure(Map<String, ?> configs, String contextName,
                  Configuration jaasConfiguration,
                  AuthenticateCallbackHandler loginCallbackHandler);

    /**
     * Performs login for each login module.
     */
    LoginContext login() throws LoginException;

    /**
     * Returns the authenticated subject.
     */
    Subject subject();

    /**
     * Returns the service name for SASL.
     */
    String serviceName();

    /**
     * Closes this instance.
     */
    void close();
}

AuthenticateCallbackHandler

Callback handler interface for SASL authentication.

package org.apache.kafka.common.security.auth;

import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.AppConfigurationEntry;
import java.util.List;
import java.util.Map;

public interface AuthenticateCallbackHandler extends CallbackHandler {
    /**
     * Configures this callback handler for the specified SASL mechanism.
     *
     * @param configs Kafka configuration options
     * @param saslMechanism Negotiated SASL mechanism
     * @param jaasConfigEntries JAAS configuration entries
     */
    void configure(Map<String, ?> configs, String saslMechanism,
                  List<AppConfigurationEntry> jaasConfigEntries);

    /**
     * Closes this instance.
     */
    void close();
}

Custom Callback Handler Example:

import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import javax.security.auth.callback.*;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;

public class CustomCallbackHandler implements AuthenticateCallbackHandler {
    private String mechanism;

    @Override
    public void configure(Map<String, ?> configs, String saslMechanism,
                         List<AppConfigurationEntry> jaasConfigEntries) {
        this.mechanism = saslMechanism;
        // Initialize handler with configuration
    }

    @Override
    public void handle(Callback[] callbacks)
            throws IOException, UnsupportedCallbackException {
        for (Callback callback : callbacks) {
            if (callback instanceof NameCallback) {
                NameCallback nameCallback = (NameCallback) callback;
                nameCallback.setName(getUsername());
            } else if (callback instanceof PasswordCallback) {
                PasswordCallback passwordCallback = (PasswordCallback) callback;
                passwordCallback.setPassword(getPassword());
            } else {
                throw new UnsupportedCallbackException(callback);
            }
        }
    }

    @Override
    public void close() {
        // Cleanup resources
    }

    private String getUsername() {
        // Retrieve username from secure source
        return "username";
    }

    private char[] getPassword() {
        // Retrieve password from secure source
        return "password".toCharArray();
    }
}

Security Best Practices

Production Configuration Checklist

SSL/TLS Configuration:

  • Use TLS 1.2 or higher (ssl.protocol=TLSv1.3)
  • Enable hostname verification (ssl.endpoint.identification.algorithm=https)
  • Use strong cipher suites
  • Rotate certificates before expiration
  • Store keystore and truststore files securely with proper file permissions
  • Never commit passwords or certificates to version control

SASL Configuration:

  • Use SASL_SSL (not SASL_PLAINTEXT) for production
  • Choose appropriate mechanism based on infrastructure:
    • SCRAM-SHA-512 for username/password with strong security
    • GSSAPI for Kerberos environments
    • OAUTHBEARER for OAuth/OIDC integration
  • Store credentials securely (environment variables, secrets management)
  • Configure token refresh settings appropriately
  • Enable credential rotation where supported

General Security:

  • Use separate credentials for each application
  • Implement proper ACLs to restrict access
  • Monitor authentication failures
  • Enable audit logging
  • Regularly update Kafka clients to latest versions
  • Use network segmentation to isolate Kafka brokers

Password Storage Example

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

// DO NOT hardcode credentials
// BAD: props.put("password", "hardcoded-password");

// GOOD: Read from environment variables
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");

String username = System.getenv("KAFKA_USERNAME");
String password = System.getenv("KAFKA_PASSWORD");

props.put(SaslConfigs.SASL_JAAS_CONFIG,
    "org.apache.kafka.common.security.scram.ScramLoginModule required " +
    "username=\"" + username + "\" " +
    "password=\"" + password + "\";");

props.put("ssl.truststore.location", System.getenv("KAFKA_TRUSTSTORE_LOCATION"));
props.put("ssl.truststore.password", System.getenv("KAFKA_TRUSTSTORE_PASSWORD"));

Producer<String, String> producer = new KafkaProducer<>(props);

Certificate Validation Example

import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;

Properties props = new Properties();
props.put("bootstrap.servers", "kafka.example.com:9093");
props.put("security.protocol", "SSL");

// Enable hostname verification (default: https)
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");

// Configure truststore with CA certificates
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/ca-cert.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
    System.getenv("TRUSTSTORE_PASSWORD"));

// Optional: Specify trusted protocols
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.3");

// The client will:
// 1. Verify server certificate is signed by trusted CA
// 2. Verify server certificate is not expired
// 3. Verify hostname matches certificate CN/SAN

Complete Configuration Examples

Producer with SASL_SSL and SCRAM

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class SecureProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();

        // Basic configuration
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "secure-producer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // Security protocol
        props.put("security.protocol", "SASL_SSL");

        // SASL configuration
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,
            "org.apache.kafka.common.security.scram.ScramLoginModule required " +
            "username=\"" + System.getenv("KAFKA_USERNAME") + "\" " +
            "password=\"" + System.getenv("KAFKA_PASSWORD") + "\";");

        // SSL configuration
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            "/path/to/client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
            System.getenv("TRUSTSTORE_PASSWORD"));
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");

        // Create producer
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, String> record =
                new ProducerRecord<>("secure-topic", "key", "value");

            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Error sending: " + exception.getMessage());
                } else {
                    System.out.println("Sent to partition " + metadata.partition() +
                        " at offset " + metadata.offset());
                }
            });

            producer.flush();
        }
    }
}

Consumer with SASL_SSL and OAuth

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SecureConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();

        // Basic configuration
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "secure-consumer-group");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "secure-consumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Security protocol
        props.put("security.protocol", "SASL_SSL");

        // SASL/OAUTHBEARER configuration
        props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,
            "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
        props.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL,
            "https://oauth.example.com/oauth2/token");
        props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
            System.getenv("OAUTH_CLIENT_ID"));
        props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
            System.getenv("OAUTH_CLIENT_SECRET"));
        props.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "kafka.read");

        // Token refresh configuration
        props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, 0.8);
        props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, 300);

        // SSL configuration
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            "/path/to/client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
            System.getenv("TRUSTSTORE_PASSWORD"));
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");

        // Create consumer
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("secure-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
                }

                consumer.commitAsync();
            }
        }
    }
}

Admin Client with mTLS

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class SecureAdminExample {
    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        Properties props = new Properties();

        // Basic configuration
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "secure-admin");

        // Security protocol
        props.put("security.protocol", "SSL");

        // SSL configuration with client authentication (mTLS)
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
            "/path/to/client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
            System.getenv("TRUSTSTORE_PASSWORD"));

        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
            "/path/to/client.keystore.jks");
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
            System.getenv("KEYSTORE_PASSWORD"));
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
            System.getenv("KEY_PASSWORD"));

        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");

        // Create admin client
        try (AdminClient admin = AdminClient.create(props)) {
            // List topics
            ListTopicsResult topics = admin.listTopics();
            System.out.println("Topics: " + topics.names().get());

            // Describe cluster
            DescribeClusterResult cluster = admin.describeCluster();
            System.out.println("Cluster ID: " + cluster.clusterId().get());
            System.out.println("Controller: " + cluster.controller().get());
        }
    }
}

Security Troubleshooting

Common Security Issues

Issue: SSL Handshake Failure

Symptoms:

  • SSLHandshakeException
  • Connection failures
  • "unable to find valid certification path" errors

Causes:

  • Server certificate not trusted
  • Certificate expired
  • Hostname mismatch
  • Wrong truststore configuration

Solutions:

import org.apache.kafka.common.config.SslConfigs;
import javax.net.ssl.*;
import java.security.cert.X509Certificate;
import java.util.Properties;

public class SSLTroubleshooting {
    
    public void diagnoseSSLIssue() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka.example.com:9093");
        props.put("security.protocol", "SSL");
        
        // Enable SSL debugging
        System.setProperty("javax.net.debug", "ssl,handshake");
        
        // Verify truststore
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
            "/path/to/client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
            System.getenv("TRUSTSTORE_PASSWORD"));
        
        // Verify hostname
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");
        
        try {
            Producer<String, String> producer = new KafkaProducer<>(props);
            System.out.println("SSL connection successful");
            producer.close();
        } catch (Exception e) {
            System.err.println("SSL connection failed: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

Prevention:

  • Use valid certificates from trusted CA
  • Monitor certificate expiration
  • Ensure hostname matches certificate
  • Keep truststore updated

Issue: SASL Authentication Failure

Symptoms:

  • SaslAuthenticationException
  • "Authentication failed" errors

Causes:

  • Wrong username/password
  • Incorrect SASL mechanism
  • Token expired (OAuth)

Solutions:

import org.apache.kafka.common.config.SaslConfigs;

public class SASLTroubleshooting {
    
    public void diagnoseSASLIssue() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        
        String jaasConfig = 
            "org.apache.kafka.common.security.scram.ScramLoginModule required " +
            "username=\"" + System.getenv("KAFKA_USERNAME") + "\" " +
            "password=\"" + System.getenv("KAFKA_PASSWORD") + "\";";
        
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        
        try {
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.listTopics();
            System.out.println("SASL authentication successful");
            consumer.close();
        } catch (SaslAuthenticationException e) {
            System.err.println("SASL authentication failed: " + e.getMessage());
            System.err.println("Verify username, password, and mechanism");
        }
    }
}