Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, performance settings, and security configurations.
Configuration options for reading data from Kafka in both streaming and batch modes.
// Connection Configuration
"kafka.bootstrap.servers" -> "localhost:9092" // Required: Kafka broker addresses
"subscribe" -> "topic1,topic2,topic3" // Topic subscription (comma-separated)
"subscribePattern" -> "events_.*" // Topic pattern subscription (regex)
"assign" -> """{"topic1":[0,1],"topic2":[0]}""" // Manual partition assignment (JSON)
// Offset Management
"startingOffsets" -> "earliest" // Starting position: "earliest", "latest", or JSON
"endingOffsets" -> "latest" // Ending position: "latest" or JSON (batch only)
"failOnDataLoss" -> "true" // Fail query on data loss (default: true)
// Performance Tuning
"minPartitions" -> "10" // Minimum Spark partitions
"maxOffsetsPerTrigger" -> "1000000" // Rate limiting for streaming
"kafkaConsumer.pollTimeoutMs" -> "10000" // Consumer poll timeoutConfiguration options for writing data to Kafka topics.
// Connection Configuration
"kafka.bootstrap.servers" -> "localhost:9092" // Required: Kafka broker addresses
"topic" -> "output-topic" // Default output topic
// Producer Performance
"kafka.acks" -> "all" // Acknowledgment level: "0", "1", "all"
"kafka.retries" -> "3" // Number of retries
"kafka.batch.size" -> "16384" // Batch size in bytes
"kafka.linger.ms" -> "5" // Batching delay in milliseconds
"kafka.buffer.memory" -> "33554432" // Total memory for buffering
"kafka.compression.type" -> "snappy" // Compression: "none", "gzip", "snappy", "lz4", "zstd"
// Reliability
"kafka.enable.idempotence" -> "true" // Enable idempotent producer
"kafka.max.in.flight.requests.per.connection" -> "5" // Max unacknowledged requests
"kafka.request.timeout.ms" -> "30000" // Request timeoutComplete set of Kafka consumer configuration parameters supported through the kafka. prefix.
// Core Consumer Settings
"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"
"kafka.client.id" -> "spark-kafka-consumer"
"kafka.session.timeout.ms" -> "30000" // Consumer session timeout
"kafka.heartbeat.interval.ms" -> "3000" // Heartbeat interval
"kafka.max.poll.records" -> "500" // Records per poll
"kafka.max.poll.interval.ms" -> "300000" // Max time between polls
// Fetch Configuration
"kafka.fetch.min.bytes" -> "1" // Minimum bytes to fetch
"kafka.fetch.max.wait.ms" -> "500" // Max wait for min bytes
"kafka.fetch.max.bytes" -> "52428800" // Maximum bytes per fetch (50MB)
"kafka.max.partition.fetch.bytes" -> "1048576" // Maximum bytes per partition (1MB)
// Network Configuration
"kafka.receive.buffer.bytes" -> "65536" // Receive buffer size (64KB)
"kafka.send.buffer.bytes" -> "131072" // Send buffer size (128KB)
"kafka.request.timeout.ms" -> "30000" // Request timeout
"kafka.reconnect.backoff.ms" -> "50" // Reconnect backoff
"kafka.reconnect.backoff.max.ms" -> "1000" // Max reconnect backoff
"kafka.retry.backoff.ms" -> "100" // Retry backoff
// Connection Management
"kafka.connections.max.idle.ms" -> "540000" // Connection idle timeout (9 minutes)
"kafka.metadata.max.age.ms" -> "300000" // Metadata refresh interval (5 minutes)Complete set of Kafka producer configuration parameters for writing data.
// Core Producer Settings
"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"
"kafka.client.id" -> "spark-kafka-producer"
"kafka.acks" -> "all" // "0", "1", or "all"
"kafka.retries" -> "2147483647" // Max retries (Integer.MAX_VALUE)
"kafka.retry.backoff.ms" -> "100" // Retry backoff
// Batching and Performance
"kafka.batch.size" -> "16384" // Batch size (16KB)
"kafka.linger.ms" -> "0" // Batching delay
"kafka.buffer.memory" -> "33554432" // Buffer memory (32MB)
"kafka.compression.type" -> "none" // Compression type
"kafka.max.request.size" -> "1048576" // Max request size (1MB)
// Idempotence and Ordering
"kafka.enable.idempotence" -> "false" // Idempotent producer
"kafka.max.in.flight.requests.per.connection" -> "5" // Max unacknowledged requests
// Timing Configuration
"kafka.request.timeout.ms" -> "30000" // Request timeout
"kafka.delivery.timeout.ms" -> "120000" // Delivery timeout (2 minutes)
"kafka.send.buffer.bytes" -> "131072" // Send buffer (128KB)
"kafka.receive.buffer.bytes" -> "32768" // Receive buffer (32KB)
// Metadata and Connections
"kafka.metadata.max.age.ms" -> "300000" // Metadata refresh (5 minutes)
"kafka.connections.max.idle.ms" -> "540000" // Connection idle timeout (9 minutes)
"kafka.reconnect.backoff.ms" -> "50" // Reconnect backoff
"kafka.reconnect.backoff.max.ms" -> "1000" // Max reconnect backoffSecurity configuration for SSL and SASL authentication.
// SSL Configuration
"kafka.security.protocol" -> "SSL" // Security protocol
"kafka.ssl.protocol" -> "TLSv1.2" // SSL protocol version
"kafka.ssl.truststore.location" -> "/path/to/truststore.jks"
"kafka.ssl.truststore.password" -> "truststore-password"
"kafka.ssl.truststore.type" -> "JKS" // Truststore type
"kafka.ssl.keystore.location" -> "/path/to/keystore.jks"
"kafka.ssl.keystore.password" -> "keystore-password"
"kafka.ssl.keystore.type" -> "JKS" // Keystore type
"kafka.ssl.key.password" -> "key-password" // Key password
// SSL Verification
"kafka.ssl.endpoint.identification.algorithm" -> "https" // Hostname verification
"kafka.ssl.check.hostname" -> "true" // Check hostname
// SASL Configuration
"kafka.security.protocol" -> "SASL_SSL" // SASL with SSL
"kafka.sasl.mechanism" -> "PLAIN" // SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";"
// SASL SCRAM Configuration
"kafka.sasl.mechanism" -> "SCRAM-SHA-256"
"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";"
// Kerberos (GSSAPI) Configuration
"kafka.sasl.mechanism" -> "GSSAPI"
"kafka.sasl.kerberos.service.name" -> "kafka"
"kafka.sasl.jaas.config" -> "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/path/to/kafka.keytab\" storeKey=true useTicketCache=false principal=\"kafka/hostname@REALM\";"val streamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
val query = streamingDF
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()val highPerfDF = spark
.readStream
.format("kafka")
// Connection settings
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
.option("subscribe", "high-volume-topic")
// Performance tuning
.option("minPartitions", "50") // Increase parallelism
.option("maxOffsetsPerTrigger", "5000000") // 5M records per batch
.option("kafkaConsumer.pollTimeoutMs", "5000") // 5 second timeout
// Consumer optimization
.option("kafka.fetch.min.bytes", "1024") // 1KB minimum fetch
.option("kafka.fetch.max.wait.ms", "100") // Fast fetching
.option("kafka.max.poll.records", "1000") // More records per poll
.option("kafka.receive.buffer.bytes", "262144") // 256KB buffer
.option("kafka.fetch.max.bytes", "104857600") // 100MB max fetch
.load()
val writeQuery = processedDF
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
.option("topic", "processed-topic")
// Producer optimization
.option("kafka.batch.size", "65536") // 64KB batches
.option("kafka.linger.ms", "10") // 10ms batching delay
.option("kafka.compression.type", "snappy") // Compression
.option("kafka.buffer.memory", "134217728") // 128MB buffer
.option("kafka.acks", "1") // Balanced reliability
.start()val secureDF = spark
.readStream
.format("kafka")
// Basic connection
.option("kafka.bootstrap.servers", "secure-broker1:9093,secure-broker2:9093")
.option("subscribe", "secure-topic")
// SSL Configuration
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "/etc/kafka/truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-password")
.option("kafka.ssl.keystore.location", "/etc/kafka/keystore.jks")
.option("kafka.ssl.keystore.password", "keystore-password")
.option("kafka.ssl.key.password", "key-password")
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.load()
// SASL/SCRAM configuration
val saslDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "sasl-broker:9094")
.option("subscribe", "authenticated-topic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"myuser\" password=\"mypassword\";")
.load()val batchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "historical-data")
// Batch-specific offset configuration
.option("startingOffsets", "earliest")
.option("endingOffsets", """{"historical-data":{"0":1000000,"1":1500000}}""")
// Optimize for large batch reads
.option("minPartitions", "20")
.option("kafka.fetch.max.bytes", "104857600") // 100MB
.option("kafka.max.partition.fetch.bytes", "10485760") // 10MB per partition
.option("kafka.receive.buffer.bytes", "1048576") // 1MB buffer
.load()import scala.util.Properties
val environment = Properties.envOrElse("ENVIRONMENT", "dev")
val envConfig = environment match {
case "prod" => Map(
"kafka.bootstrap.servers" -> "prod-broker1:9092,prod-broker2:9092,prod-broker3:9092",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.acks" -> "all",
"kafka.retries" -> "10",
"failOnDataLoss" -> "true"
)
case "staging" => Map(
"kafka.bootstrap.servers" -> "staging-broker:9092",
"kafka.acks" -> "1",
"failOnDataLoss" -> "true"
)
case _ => Map(
"kafka.bootstrap.servers" -> "localhost:9092",
"kafka.acks" -> "1",
"failOnDataLoss" -> "false"
)
}
val configuredDF = spark
.readStream
.format("kafka")
.options(envConfig)
.option("subscribe", s"${environment}-events")
.load()Certain Kafka parameters are managed internally and cannot be overridden:
// Automatically managed by Spark (will throw IllegalArgumentException if specified)
"kafka.group.id" // Unique group IDs generated per query
"kafka.auto.offset.reset" // Controlled via startingOffsets option
"kafka.key.deserializer" // Fixed to ByteArrayDeserializer
"kafka.value.deserializer" // Fixed to ByteArrayDeserializer
"kafka.enable.auto.commit" // Disabled for offset management
"kafka.interceptor.classes" // Not supported for safety
// Producer-specific unsupported options
"kafka.key.serializer" // Fixed to ByteArraySerializer
"kafka.value.serializer" // Fixed to ByteArraySerializer// Required for all operations
"kafka.bootstrap.servers" // Must be specified
// Required for sources (exactly one must be specified)
"subscribe" // Topic list
"subscribePattern" // Topic pattern
"assign" // Partition assignment
// Schema validation occurs at runtime
// DataFrame must have required columns for sinks: "value" (and optionally "key", "topic")// Valid consumer strategy
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2") // Valid
.load()
// Invalid: multiple strategies
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1") // Both subscribe
.option("subscribePattern", "topic.*") // and pattern specified
.load() // Throws IllegalArgumentException
// Invalid: unsupported parameter
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.option("kafka.group.id", "my-group") // Unsupported
.load() // Throws IllegalArgumentException// Optimize fetch behavior
.option("kafka.fetch.min.bytes", "1024") // Wait for 1KB
.option("kafka.fetch.max.wait.ms", "500") // Max wait 500ms
.option("kafka.max.poll.records", "500") // Records per poll
// Buffer optimization
.option("kafka.receive.buffer.bytes", "65536") // 64KB receive buffer
.option("kafka.fetch.max.bytes", "52428800") // 50MB max fetch
// Partition parallelism
.option("minPartitions", "20") // Increase Spark partitions// Batching optimization
.option("kafka.batch.size", "32768") // 32KB batches
.option("kafka.linger.ms", "10") // 10ms linger time
.option("kafka.compression.type", "snappy") // Enable compression
// Memory and throughput
.option("kafka.buffer.memory", "67108864") // 64MB buffer
.option("kafka.max.in.flight.requests.per.connection", "5")// Spark configuration for Kafka workloads
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.cores", "4")
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
// JVM tuning for Kafka clients
spark.conf.set("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:+UnlockExperimentalVMOptions")// Enable JMX metrics
.option("kafka.metric.reporters", "org.apache.kafka.common.metrics.JmxReporter")
// Custom metrics reporting interval
.option("kafka.metrics.sample.window.ms", "30000") // 30 second window
.option("kafka.metrics.num.samples", "2") // 2 samples per window// Enable detailed logging for troubleshooting
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark.sql.kafka010").setLevel(Level.DEBUG)
Logger.getLogger("org.apache.kafka").setLevel(Level.INFO)// Increase timeouts for unreliable networks
.option("kafka.request.timeout.ms", "60000") // 60 second timeout
.option("kafka.reconnect.backoff.ms", "1000") // 1 second backoff
.option("kafka.retry.backoff.ms", "1000") // 1 second retry backoff// Handle offset out of range errors
.option("failOnDataLoss", "false") // Continue on data loss
.option("startingOffsets", "latest") // Start from latest on errors// Diagnose slow consumption
.option("kafkaConsumer.pollTimeoutMs", "30000") // Longer poll timeout
.option("kafka.session.timeout.ms", "60000") // Longer session timeout
.option("kafka.max.poll.interval.ms", "600000") // 10 minute max poll interval// Reduce memory usage
.option("kafka.fetch.max.bytes", "10485760") // Reduce to 10MB
.option("kafka.max.partition.fetch.bytes", "1048576") // 1MB per partition
.option("maxOffsetsPerTrigger", "100000") // Limit batch size