The Spark Kafka connector provides comprehensive configuration options for connection management, performance tuning, security, and reliability. All Kafka client configurations are supported with the kafka. prefix.
Essential connection settings for accessing Kafka clusters.
/**
* Required connection configuration
*/
.option("kafka.bootstrap.servers", servers: String) // Required: Comma-separated list of Kafka brokers
/**
* Optional connection settings
*/
.option("kafka.client.id", clientId: String) // Client identifier for broker logs
.option("kafka.request.timeout.ms", timeout: String) // Request timeout in milliseconds
.option("kafka.connections.max.idle.ms", idle: String) // Max idle time for connectionsUsage Examples:
// Basic connection
val basicConfig = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
// High availability setup
val haConfig = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")
.option("kafka.client.id", "spark-consumer-app")
.option("kafka.request.timeout.ms", "60000") // 60 second timeout
.option("kafka.connections.max.idle.ms", "300000") // 5 minute idle timeout
.option("subscribe", "critical-events")Configure how topics are selected and subscribed to.
/**
* Topic selection options (exactly one required)
*/
.option("subscribe", topics: String) // Comma-separated topic names
.option("subscribepattern", pattern: String) // Regex pattern for topic names
.option("assign", partitions: String) // JSON specification of TopicPartitions
/**
* Topic-related settings
*/
.option("topic", topicName: String) // Default topic for writes (optional)Usage Examples:
// Subscribe to specific topics
val topicSubscription = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events,logs,metrics")
// Pattern-based subscription
val patternSubscription = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribepattern", "prod-.*-events")
// Specific partition assignment
val partitionAssignment = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"events":[0,1,2],"logs":[0,1]}""")Control how offsets are managed and tracked for reading operations.
/**
* Offset specification for reads
*/
.option("startingOffsets", offsets: String) // Starting position: "earliest", "latest", or JSON
.option("endingOffsets", offsets: String) // Ending position: "earliest", "latest", or JSON (batch only)
/**
* Timestamp-based offset resolution
*/
.option("startingTimestamp", timestamp: String) // Global timestamp (ms since epoch)
.option("endingTimestamp", timestamp: String) // Global timestamp (ms since epoch, batch only)
.option("startingOffsetsByTimestamp", timestamps: String) // Per-partition timestamps (JSON)
.option("endingOffsetsByTimestamp", timestamps: String) // Per-partition timestamps (JSON, batch only)
/**
* Offset resolution strategy
*/
.option("startingOffsetsByTimestampStrategy", strategy: String) // "ERROR" or "LATEST"Usage Examples:
// Start from earliest available
val earliestConfig = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
// Specific offsets per partition
val specificOffsets = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("startingOffsets", """{"logs":{"0":1000,"1":2000,"2":1500}}""")
.option("endingOffsets", """{"logs":{"0":5000,"1":6000,"2":4500}}""")
// Timestamp-based reading
val timestampConfig = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingTimestamp", "1640995200000") // Jan 1, 2022 UTC
.option("endingTimestamp", "1641081600000") // Jan 2, 2022 UTCTune performance characteristics for streaming and batch operations.
/**
* Streaming performance options
*/
.option("maxOffsetsPerTrigger", maxRecords: String) // Maximum records per micro-batch
.option("minOffsetsPerTrigger", minRecords: String) // Minimum records before triggering
.option("maxTriggerDelay", delay: String) // Maximum delay before triggering (e.g., "30s")
/**
* Batch performance options
*/
.option("minPartitions", partitions: String) // Minimum Spark partitions for batch reads
/**
* Offset fetching configuration
*/
.option("fetchOffset.numRetries", retries: String) // Number of retries for offset fetching
.option("fetchOffset.retryIntervalMs", interval: String) // Retry interval in millisecondsUsage Examples:
// High-throughput streaming configuration
val highThroughput = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-volume-topic")
.option("maxOffsetsPerTrigger", "100000") // Max 100K records per batch
.option("minOffsetsPerTrigger", "10000") // Min 10K records before processing
.option("maxTriggerDelay", "60s") // Process every 60s regardless
// Batch optimization
val batchOptimized = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("minPartitions", "200") // Force higher parallelism
.option("fetchOffset.numRetries", "10") // More retries for reliability
.option("fetchOffset.retryIntervalMs", "500") // Wait 500ms between retriesConfigure fault tolerance, error handling, and data consistency.
/**
* Reliability and fault tolerance options
*/
.option("failOnDataLoss", failBehavior: String) // "true" or "false"
.option("groupIdPrefix", prefix: String) // Consumer group ID prefix
.option("includeHeaders", includeHeaders: String) // "true" or "false"
/**
* Consumer polling configuration
*/
.option("kafkaConsumer.pollTimeoutMs", timeout: String) // Consumer poll timeout in millisecondsUsage Examples:
// Strict reliability mode
val strictMode = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "critical-events")
.option("failOnDataLoss", "true") // Fail query on any data loss
.option("groupIdPrefix", "critical-app") // Custom consumer group prefix
.option("includeHeaders", "true") // Include message headers
// Permissive mode for non-critical data
val permissiveMode = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("failOnDataLoss", "false") // Continue despite data loss
.option("kafkaConsumer.pollTimeoutMs", "30000") // 30 second poll timeoutConfigure authentication, encryption, and access control.
/**
* Security protocol configuration
*/
.option("kafka.security.protocol", protocol: String) // "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"
/**
* SASL Authentication
*/
.option("kafka.sasl.mechanism", mechanism: String) // "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI"
.option("kafka.sasl.jaas.config", jaasConfig: String) // JAAS configuration string
/**
* SSL Configuration
*/
.option("kafka.ssl.truststore.location", path: String) // Truststore file path
.option("kafka.ssl.truststore.password", password: String) // Truststore password
.option("kafka.ssl.keystore.location", path: String) // Keystore file path (for client auth)
.option("kafka.ssl.keystore.password", password: String) // Keystore password
.option("kafka.ssl.key.password", password: String) // Key passwordUsage Examples:
// SASL/PLAIN authentication
val saslConfig = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "secure-kafka:9093")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username='consumer' password='consumer-secret';")
.option("subscribe", "secure-topic")
// SSL with mutual authentication
val sslConfig = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ssl-kafka:9094")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-password")
.option("kafka.ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
.option("kafka.ssl.keystore.password", "keystore-password")
.option("kafka.ssl.key.password", "key-password")
.option("subscribe", "ssl-topic")
// SASL/SSL combination
val saslSslConfig = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "secure-kafka:9095")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username='app-user' password='app-password';")
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-password")
.option("subscribe", "encrypted-topic")Configure Kafka producer settings for write operations.
/**
* Producer reliability configuration
*/
.option("kafka.acks", acks: String) // "0", "1", or "all"
.option("kafka.retries", retries: String) // Number of retry attempts
.option("kafka.enable.idempotence", idempotent: String) // "true" or "false"
/**
* Producer performance configuration
*/
.option("kafka.batch.size", batchSize: String) // Batch size in bytes
.option("kafka.linger.ms", lingerMs: String) // Batching delay in milliseconds
.option("kafka.buffer.memory", bufferMemory: String) // Total memory for buffering
.option("kafka.compression.type", compression: String) // "none", "gzip", "snappy", "lz4", "zstd"
/**
* Producer connection configuration
*/
.option("kafka.max.in.flight.requests.per.connection", maxInflight: String) // Max unacked requests
.option("kafka.request.timeout.ms", timeout: String) // Request timeout
.option("kafka.delivery.timeout.ms", timeout: String) // Total delivery timeoutUsage Examples:
// High-reliability producer configuration
val reliableProducer = dataFrame.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "critical-events")
.option("kafka.acks", "all") // Wait for all replicas
.option("kafka.retries", "10") // Retry up to 10 times
.option("kafka.enable.idempotence", "true") // Prevent duplicates
.option("kafka.max.in.flight.requests.per.connection", "1") // Maintain ordering
// High-throughput producer configuration
val highThroughputProducer = dataFrame.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "high-volume-events")
.option("kafka.acks", "1") // Faster acknowledgment
.option("kafka.batch.size", "131072") // 128KB batches
.option("kafka.linger.ms", "100") // 100ms batching delay
.option("kafka.compression.type", "lz4") // Fast compression
.option("kafka.buffer.memory", "134217728") // 128MB buffer
// Balanced configuration
val balancedProducer = dataFrame.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "standard-events")
.option("kafka.acks", "1") // Leader acknowledgment
.option("kafka.retries", "3") // Moderate retry count
.option("kafka.batch.size", "16384") // 16KB batches
.option("kafka.linger.ms", "10") // 10ms batching delay
.option("kafka.compression.type", "snappy") // Good compression ratioConfigure Kafka consumer settings for read operations (advanced use cases).
/**
* Consumer session and heartbeat configuration
*/
.option("kafka.session.timeout.ms", timeout: String) // Session timeout
.option("kafka.heartbeat.interval.ms", interval: String) // Heartbeat interval
.option("kafka.max.poll.interval.ms", interval: String) // Max time between polls
/**
* Consumer fetch configuration
*/
.option("kafka.fetch.min.bytes", minBytes: String) // Minimum fetch size
.option("kafka.fetch.max.wait.ms", waitMs: String) // Maximum fetch wait time
.option("kafka.max.partition.fetch.bytes", maxBytes: String) // Max bytes per partitionUsage Examples:
// Low-latency consumer configuration
val lowLatencyConsumer = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "real-time-events")
.option("kafka.fetch.min.bytes", "1") // Fetch immediately
.option("kafka.fetch.max.wait.ms", "100") // Max 100ms wait
.option("kafka.session.timeout.ms", "10000") // 10s session timeout
.option("kafka.heartbeat.interval.ms", "3000") // 3s heartbeat
// High-throughput consumer configuration
val highThroughputConsumer = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "bulk-data")
.option("kafka.fetch.min.bytes", "1048576") // 1MB minimum fetch
.option("kafka.max.partition.fetch.bytes", "10485760") // 10MB max per partition
.option("kafka.session.timeout.ms", "30000") // 30s session timeout// Development configuration - prioritize ease of debugging
val devConfig = Map(
"kafka.bootstrap.servers" -> "localhost:9092",
"failOnDataLoss" -> "false", // Continue despite data issues
"startingOffsets" -> "earliest", // Read all available data
"includeHeaders" -> "true", // Include headers for debugging
"maxOffsetsPerTrigger" -> "1000" // Small batches for testing
)// Production configuration - prioritize reliability and performance
val prodConfig = Map(
"kafka.bootstrap.servers" -> "kafka1:9092,kafka2:9092,kafka3:9092",
"failOnDataLoss" -> "true", // Strict data consistency
"groupIdPrefix" -> "prod-spark-app", // Identifiable consumer groups
"maxOffsetsPerTrigger" -> "100000", // Larger batches for efficiency
"kafka.session.timeout.ms" -> "30000", // Longer session timeout
"kafka.request.timeout.ms" -> "60000", // Longer request timeout
"fetchOffset.numRetries" -> "5", // More retries
"fetchOffset.retryIntervalMs" -> "1000" // Longer retry intervals
)// Security-focused configuration
val secureConfig = Map(
"kafka.bootstrap.servers" -> "secure-kafka:9093",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-256",
"kafka.sasl.jaas.config" ->
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username='spark-user' password='${KAFKA_PASSWORD}';",
"kafka.ssl.truststore.location" -> "/etc/kafka/ssl/truststore.jks",
"kafka.ssl.truststore.password" -> "${TRUSTSTORE_PASSWORD}",
"kafka.ssl.endpoint.identification.algorithm" -> "https"
)// Configuration that adapts to load
val autoScalingConfig = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.option("minOffsetsPerTrigger", "1000") // Process at least 1K records
.option("maxOffsetsPerTrigger", "50000") // But no more than 50K
.option("maxTriggerDelay", "30s") // Force processing every 30s// Configuration for multi-region deployment
val multiRegionConfig = Map(
"kafka.bootstrap.servers" ->
"us-kafka1:9092,us-kafka2:9092,eu-kafka1:9092,eu-kafka2:9092",
"kafka.client.id" -> s"spark-${region}-${applicationId}",
"kafka.request.timeout.ms" -> "120000", // Longer timeout for cross-region
"kafka.session.timeout.ms" -> "60000", // Longer session timeout
"fetchOffset.numRetries" -> "10", // More retries for network issues
"fetchOffset.retryIntervalMs" -> "2000" // Longer retry intervals
)// Configuration for Confluent Schema Registry
val schemaRegistryConfig = Map(
"kafka.bootstrap.servers" -> "kafka:9092",
"kafka.schema.registry.url" -> "http://schema-registry:8081",
"kafka.schema.registry.basic.auth.user.info" -> "user:password",
"kafka.key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"kafka.value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer"
)