tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
The Kafka Security API provides authentication and encryption capabilities for secure client-broker communication. Kafka supports multiple security protocols and authentication mechanisms to protect data in transit and verify client identities.
Enumeration defining the available security protocols for Kafka communication.
package org.apache.kafka.common.security.auth;
public enum SecurityProtocol {
/** Un-authenticated, non-encrypted channel */
PLAINTEXT(0, "PLAINTEXT"),
/** SSL channel */
SSL(1, "SSL"),
/** SASL authenticated, non-encrypted channel */
SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
/** SASL authenticated, SSL channel */
SASL_SSL(3, "SASL_SSL");
public final short id;
public final String name;
public static List<String> names();
public static SecurityProtocol forId(short id);
public static SecurityProtocol forName(String name);
}Usage Example:
import org.apache.kafka.common.security.auth.SecurityProtocol;
// Get protocol by name
SecurityProtocol protocol = SecurityProtocol.forName("SASL_SSL");
// Get all protocol names
List<String> protocols = SecurityProtocol.names();
// Returns: ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]Protocol Selection:
Represents an authenticated principal (user or service) in Kafka.
package org.apache.kafka.common.security.auth;
import java.security.Principal;
public class KafkaPrincipal implements Principal {
public static final String USER_TYPE = "User";
public static final KafkaPrincipal ANONYMOUS = new KafkaPrincipal(USER_TYPE, "ANONYMOUS");
public KafkaPrincipal(String principalType, String name);
public KafkaPrincipal(String principalType, String name, boolean tokenAuthenticated);
@Override
public String getName();
public String getPrincipalType();
public boolean tokenAuthenticated();
public void tokenAuthenticated(boolean tokenAuthenticated);
@Override
public String toString(); // Returns "principalType:name"
@Override
public boolean equals(Object o);
@Override
public int hashCode();
}Usage Example:
import org.apache.kafka.common.security.auth.KafkaPrincipal;
// Create a user principal
KafkaPrincipal user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
// Create a service principal
KafkaPrincipal service = new KafkaPrincipal("Service", "data-pipeline");
// Token-authenticated principal
KafkaPrincipal tokenUser = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob", true);
System.out.println(user.toString()); // "User:alice"
System.out.println(user.getName()); // "alice"
System.out.println(user.getPrincipalType()); // "User"Principal Types:
"User" as the principal typeKafkaPrincipal to represent relationships between principalsInterface for custom principal building from authentication context.
package org.apache.kafka.common.security.auth;
public interface KafkaPrincipalBuilder {
/**
* Build a kafka principal from the authentication context.
*
* @param context The authentication context (either SslAuthenticationContext
* or SaslAuthenticationContext)
* @return The built principal which may provide additional enrichment
*/
KafkaPrincipal build(AuthenticationContext context);
}Usage Example:
import org.apache.kafka.common.security.auth.*;
import java.util.Map;
public class CustomPrincipalBuilder implements KafkaPrincipalBuilder {
@Override
public KafkaPrincipal build(AuthenticationContext context) {
if (context instanceof SslAuthenticationContext) {
SslAuthenticationContext sslContext = (SslAuthenticationContext) context;
// Extract principal from SSL certificate
String commonName = extractCommonName(sslContext.session());
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, commonName);
} else if (context instanceof SaslAuthenticationContext) {
SaslAuthenticationContext saslContext = (SaslAuthenticationContext) context;
// Extract principal from SASL authentication
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
saslContext.server().getAuthorizationID());
}
return KafkaPrincipal.ANONYMOUS;
}
private String extractCommonName(javax.net.ssl.SSLSession session) {
// Extract CN from certificate
// Implementation details omitted
return "extracted-name";
}
}Configuration constants for SSL/TLS connections.
package org.apache.kafka.common.config;
public class SslConfigs {
// Protocol and provider
public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
public static final String DEFAULT_SSL_PROTOCOL = "TLSv1.3";
public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
// Enabled protocols and cipher suites
public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.3";
public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
// Keystore configuration (client authentication)
public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
// PEM format configuration
public static final String SSL_KEYSTORE_KEY_CONFIG = "ssl.keystore.key";
public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = "ssl.keystore.certificate.chain";
public static final String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = "ssl.truststore.certificates";
// Truststore configuration (server validation)
public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
// Key and trust manager algorithms
public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
// Endpoint identification
public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =
"ssl.endpoint.identification.algorithm";
public static final String DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "https";
// Advanced configuration
public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG =
"ssl.secure.random.implementation";
public static final String SSL_ENGINE_FACTORY_CLASS_CONFIG = "ssl.engine.factory.class";
}Basic SSL Configuration (Server Authentication Only):
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// SSL configuration
props.put("security.protocol", "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore-password");
Producer<String, String> producer = new KafkaProducer<>(props);Mutual TLS (mTLS) with Client Authentication:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "secure-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// SSL configuration with client authentication
props.put("security.protocol", "SSL");
// Truststore (for server certificate validation)
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore-password");
// Keystore (for client certificate authentication)
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystore-password");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "key-password");
Consumer<String, String> consumer = new KafkaConsumer<>(props);SSL with PEM Format:
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SSL");
// PEM format configuration
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");
props.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
"-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----");
props.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
"-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----");
props.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG,
"-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----");Custom SSL Protocol and Cipher Suites:
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SSL");
// Protocol configuration
props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2,TLSv1.3");
// Cipher suites
props.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
"TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
// Truststore configuration
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");Configuration constants for SASL authentication.
package org.apache.kafka.common.config;
public class SaslConfigs {
// Mechanism selection
public static final String SASL_MECHANISM = "sasl.mechanism";
public static final String GSSAPI_MECHANISM = "GSSAPI";
public static final String DEFAULT_SASL_MECHANISM = GSSAPI_MECHANISM;
// JAAS configuration
public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
// Callback handlers
public static final String SASL_CLIENT_CALLBACK_HANDLER_CLASS =
"sasl.client.callback.handler.class";
public static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS =
"sasl.login.callback.handler.class";
public static final String SASL_LOGIN_CLASS = "sasl.login.class";
// Kerberos configuration
public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR =
"sasl.kerberos.ticket.renew.window.factor";
public static final double DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = 0.80;
public static final String SASL_KERBEROS_TICKET_RENEW_JITTER =
"sasl.kerberos.ticket.renew.jitter";
public static final double DEFAULT_KERBEROS_TICKET_RENEW_JITTER = 0.05;
public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN =
"sasl.kerberos.min.time.before.relogin";
public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 60000L;
// Login refresh configuration
public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR =
"sasl.login.refresh.window.factor";
public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80;
public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER =
"sasl.login.refresh.window.jitter";
public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05;
public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS =
"sasl.login.refresh.min.period.seconds";
public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60;
public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS =
"sasl.login.refresh.buffer.seconds";
public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300;
// Connection timeouts
public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms";
public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms";
// Retry configuration
public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS =
"sasl.login.retry.backoff.max.ms";
public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS = 10000;
public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms";
public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS = 100;
// OAuth configuration
public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL =
"sasl.oauthbearer.token.endpoint.url";
public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_URL =
"sasl.oauthbearer.jwks.endpoint.url";
public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope";
public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID =
"sasl.oauthbearer.client.credentials.client.id";
public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET =
"sasl.oauthbearer.client.credentials.client.secret";
// OAuth JWT configuration
public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS =
"sasl.oauthbearer.jwt.retriever.class";
public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS =
"sasl.oauthbearer.jwt.validator.class";
// OAuth scope and claim names
public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME =
"sasl.oauthbearer.scope.claim.name";
public static final String DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "scope";
public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME =
"sasl.oauthbearer.sub.claim.name";
public static final String DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sub";
// OAuth validation
public static final String SASL_OAUTHBEARER_EXPECTED_AUDIENCE =
"sasl.oauthbearer.expected.audience";
public static final String SASL_OAUTHBEARER_EXPECTED_ISSUER =
"sasl.oauthbearer.expected.issuer";
public static final String SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS =
"sasl.oauthbearer.clock.skew.seconds";
public static final int DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS = 30;
}Simple username/password authentication mechanism.
LoginModule:
package org.apache.kafka.common.security.plain;
import javax.security.auth.spi.LoginModule;
public class PlainLoginModule implements LoginModule {
// JAAS options: username, password
}Configuration Example:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// SASL/PLAIN configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"alice\" " +
"password=\"alice-secret\";");
// SSL truststore for encryption
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
Producer<String, String> producer = new KafkaProducer<>(props);Salted Challenge Response Authentication Mechanism with SHA-256 or SHA-512.
LoginModule:
package org.apache.kafka.common.security.scram;
import javax.security.auth.spi.LoginModule;
public class ScramLoginModule implements LoginModule {
// JAAS options: username, password, tokenauth (optional)
public static final String TOKEN_AUTH_CONFIG = "tokenauth";
}SCRAM-SHA-256 Configuration:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "scram-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// SASL/SCRAM-SHA-256 configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"bob\" " +
"password=\"bob-secret\";");
// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
Consumer<String, String> consumer = new KafkaConsumer<>(props);SCRAM-SHA-512 Configuration:
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\";");
// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");SCRAM with Delegation Tokens:
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"DELEGATION_TOKEN\" " +
"password=\"<token-hmac>\" " +
"tokenauth=\"true\";");
// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");OAuth 2.0 bearer token authentication mechanism.
LoginModule:
package org.apache.kafka.common.security.oauthbearer;
import javax.security.auth.spi.LoginModule;
public class OAuthBearerLoginModule implements LoginModule {
public static final String OAUTHBEARER_MECHANISM = "OAUTHBEARER";
}OAuth with Client Credentials Flow:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// SASL/OAUTHBEARER configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
// OAuth token endpoint
props.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL,
"https://oauth-provider.example.com/oauth2/token");
// Client credentials
props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID, "kafka-client");
props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET, "client-secret");
// Optional scope
props.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "kafka.read kafka.write");
// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
Producer<String, String> producer = new KafkaProducer<>(props);OAuth with Custom Callback Handler:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import java.util.Properties;
// Custom callback handler implementation
public class CustomOAuthCallbackHandler implements AuthenticateCallbackHandler {
@Override
public void configure(Map<String, ?> configs, String mechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
// Initialize with configuration
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
OAuthBearerTokenCallback tokenCallback = (OAuthBearerTokenCallback) callback;
// Retrieve token from your identity provider
OAuthBearerToken token = retrieveToken();
tokenCallback.token(token);
}
}
}
@Override
public void close() {
// Cleanup resources
}
private OAuthBearerToken retrieveToken() {
// Implementation to retrieve token from OAuth provider
return null;
}
}
// Configuration using custom handler
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "oauth-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
"com.example.CustomOAuthCallbackHandler");
// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
Consumer<String, String> consumer = new KafkaConsumer<>(props);OAuth Token Refresh Configuration:
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
// Token refresh settings
props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, 0.8);
props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, 0.05);
props.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 60);
props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, 300);
// Connection timeouts
props.put(SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS, 10000);
props.put(SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS, 10000);
// Retry configuration
props.put(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS, 100);
props.put(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS, 10000);Kerberos authentication using GSSAPI mechanism.
Configuration Example:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// SASL/GSSAPI (Kerberos) configuration
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"com.sun.security.auth.module.Krb5LoginModule required " +
"useKeyTab=true " +
"storeKey=true " +
"keyTab=\"/path/to/kafka-client.keytab\" " +
"principal=\"kafka-client@EXAMPLE.COM\";");
// Kerberos ticket renewal
props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, 0.8);
props.put(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, 0.05);
props.put(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, 60000);
// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");
Producer<String, String> producer = new KafkaProducer<>(props);Kerberos with Ticket Cache:
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"com.sun.security.auth.module.Krb5LoginModule required " +
"useTicketCache=true " +
"renewTicket=true " +
"serviceName=\"kafka\";");
// SSL configuration
props.put("ssl.truststore.location", "/path/to/truststore.jks");
props.put("ssl.truststore.password", "truststore-password");Interface for SASL login implementations.
package org.apache.kafka.common.security.auth;
import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import java.util.Map;
public interface Login {
/**
* Configures this login instance.
*
* @param configs Kafka configuration options
* @param contextName JAAS context name
* @param jaasConfiguration JAAS configuration
* @param loginCallbackHandler Login callback handler
*/
void configure(Map<String, ?> configs, String contextName,
Configuration jaasConfiguration,
AuthenticateCallbackHandler loginCallbackHandler);
/**
* Performs login for each login module.
*/
LoginContext login() throws LoginException;
/**
* Returns the authenticated subject.
*/
Subject subject();
/**
* Returns the service name for SASL.
*/
String serviceName();
/**
* Closes this instance.
*/
void close();
}Callback handler interface for SASL authentication.
package org.apache.kafka.common.security.auth;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.AppConfigurationEntry;
import java.util.List;
import java.util.Map;
public interface AuthenticateCallbackHandler extends CallbackHandler {
/**
* Configures this callback handler for the specified SASL mechanism.
*
* @param configs Kafka configuration options
* @param saslMechanism Negotiated SASL mechanism
* @param jaasConfigEntries JAAS configuration entries
*/
void configure(Map<String, ?> configs, String saslMechanism,
List<AppConfigurationEntry> jaasConfigEntries);
/**
* Closes this instance.
*/
void close();
}Custom Callback Handler Example:
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import javax.security.auth.callback.*;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class CustomCallbackHandler implements AuthenticateCallbackHandler {
private String mechanism;
@Override
public void configure(Map<String, ?> configs, String saslMechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
this.mechanism = saslMechanism;
// Initialize handler with configuration
}
@Override
public void handle(Callback[] callbacks)
throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
NameCallback nameCallback = (NameCallback) callback;
nameCallback.setName(getUsername());
} else if (callback instanceof PasswordCallback) {
PasswordCallback passwordCallback = (PasswordCallback) callback;
passwordCallback.setPassword(getPassword());
} else {
throw new UnsupportedCallbackException(callback);
}
}
}
@Override
public void close() {
// Cleanup resources
}
private String getUsername() {
// Retrieve username from secure source
return "username";
}
private char[] getPassword() {
// Retrieve password from secure source
return "password".toCharArray();
}
}SSL/TLS Configuration:
ssl.protocol=TLSv1.3)ssl.endpoint.identification.algorithm=https)SASL Configuration:
General Security:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
// DO NOT hardcode credentials
// BAD: props.put("password", "hardcoded-password");
// GOOD: Read from environment variables
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
String username = System.getenv("KAFKA_USERNAME");
String password = System.getenv("KAFKA_PASSWORD");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
props.put("ssl.truststore.location", System.getenv("KAFKA_TRUSTSTORE_LOCATION"));
props.put("ssl.truststore.password", System.getenv("KAFKA_TRUSTSTORE_PASSWORD"));
Producer<String, String> producer = new KafkaProducer<>(props);import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "kafka.example.com:9093");
props.put("security.protocol", "SSL");
// Enable hostname verification (default: https)
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");
// Configure truststore with CA certificates
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/ca-cert.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
System.getenv("TRUSTSTORE_PASSWORD"));
// Optional: Specify trusted protocols
props.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.3");
// The client will:
// 1. Verify server certificate is signed by trusted CA
// 2. Verify server certificate is not expired
// 3. Verify hostname matches certificate CN/SANimport org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SecureProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
// Basic configuration
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "secure-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Security protocol
props.put("security.protocol", "SASL_SSL");
// SASL configuration
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"" + System.getenv("KAFKA_USERNAME") + "\" " +
"password=\"" + System.getenv("KAFKA_PASSWORD") + "\";");
// SSL configuration
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
System.getenv("TRUSTSTORE_PASSWORD"));
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");
// Create producer
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("secure-topic", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error sending: " + exception.getMessage());
} else {
System.out.println("Sent to partition " + metadata.partition() +
" at offset " + metadata.offset());
}
});
producer.flush();
}
}
}import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SecureConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
// Basic configuration
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "secure-consumer-group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "secure-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Security protocol
props.put("security.protocol", "SASL_SSL");
// SASL/OAUTHBEARER configuration
props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
props.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL,
"https://oauth.example.com/oauth2/token");
props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
System.getenv("OAUTH_CLIENT_ID"));
props.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
System.getenv("OAUTH_CLIENT_SECRET"));
props.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "kafka.read");
// Token refresh configuration
props.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, 0.8);
props.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, 300);
// SSL configuration
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
System.getenv("TRUSTSTORE_PASSWORD"));
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");
// Create consumer
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("secure-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitAsync();
}
}
}
}import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SecureAdminExample {
public static void main(String[] args)
throws ExecutionException, InterruptedException {
Properties props = new Properties();
// Basic configuration
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "secure-admin");
// Security protocol
props.put("security.protocol", "SSL");
// SSL configuration with client authentication (mTLS)
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
System.getenv("TRUSTSTORE_PASSWORD"));
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
"/path/to/client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
System.getenv("KEYSTORE_PASSWORD"));
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
System.getenv("KEY_PASSWORD"));
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");
// Create admin client
try (AdminClient admin = AdminClient.create(props)) {
// List topics
ListTopicsResult topics = admin.listTopics();
System.out.println("Topics: " + topics.names().get());
// Describe cluster
DescribeClusterResult cluster = admin.describeCluster();
System.out.println("Cluster ID: " + cluster.clusterId().get());
System.out.println("Controller: " + cluster.controller().get());
}
}
}Symptoms:
Causes:
Solutions:
import org.apache.kafka.common.config.SslConfigs;
import javax.net.ssl.*;
import java.security.cert.X509Certificate;
import java.util.Properties;
public class SSLTroubleshooting {
public void diagnoseSSLIssue() {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka.example.com:9093");
props.put("security.protocol", "SSL");
// Enable SSL debugging
System.setProperty("javax.net.debug", "ssl,handshake");
// Verify truststore
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"/path/to/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
System.getenv("TRUSTSTORE_PASSWORD"));
// Verify hostname
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");
try {
Producer<String, String> producer = new KafkaProducer<>(props);
System.out.println("SSL connection successful");
producer.close();
} catch (Exception e) {
System.err.println("SSL connection failed: " + e.getMessage());
e.printStackTrace();
}
}
}Prevention:
Symptoms:
Causes:
Solutions:
import org.apache.kafka.common.config.SaslConfigs;
public class SASLTroubleshooting {
public void diagnoseSASLIssue() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
String jaasConfig =
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"" + System.getenv("KAFKA_USERNAME") + "\" " +
"password=\"" + System.getenv("KAFKA_PASSWORD") + "\";";
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
try {
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.listTopics();
System.out.println("SASL authentication successful");
consumer.close();
} catch (SaslAuthenticationException e) {
System.err.println("SASL authentication failed: " + e.getMessage());
System.err.println("Verify username, password, and mechanism");
}
}
}