Configuration options for handling data loss, connection failures, retry policies, and ensuring reliable message processing.
Configure how the connector handles situations where data is no longer available in Kafka.
/**
* Strict data loss policy - fail query when data is unavailable
* @param policy - "true" to fail query, "false" to continue with warning
* Default: "true" (fail on data loss)
*/
.option("failOnDataLoss", "true")Usage Example:
// Critical data pipeline - fail if any data is lost
val criticalStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "financial-transactions")
.option("failOnDataLoss", "true") // Explicit - this is the default
.load()
// This query will fail if data is aged out or partitions are deleted
val query = criticalStream
.selectExpr("CAST(value AS STRING) as transaction")
.writeStream
.outputMode("append")
.format("console")
.start()/**
* Lenient data loss policy - continue processing with warnings
* Logs warning messages when data is unavailable but continues query execution
*/
.option("failOnDataLoss", "false")Usage Example:
// Log analysis pipeline - continue even if some logs are lost
val logStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "application-logs")
.option("failOnDataLoss", "false") // Continue on data loss
.load()
// Query continues with warnings logged to driver
val query = logStream
.selectExpr("CAST(value AS STRING) as log_message")
.filter("log_message LIKE '%ERROR%'")
.writeStream
.outputMode("append")
.format("console")
.start()Configure retry behavior for offset fetching operations to handle transient network issues.
/**
* Number of retries for offset fetching operations
* @param retries - Number of retry attempts (default: 3)
*/
.option("fetchOffset.numRetries", "5")
/**
* Interval between offset fetch retry attempts
* @param interval - Retry interval in milliseconds (default: 1000)
*/
.option("fetchOffset.retryIntervalMs", "2000")Usage Example:
// Reliable offset fetching for unstable network conditions
val reliableStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "remote-cluster:9092")
.option("subscribe", "remote-events")
.option("fetchOffset.numRetries", "10") // Retry up to 10 times
.option("fetchOffset.retryIntervalMs", "5000") // Wait 5 seconds between retries
.load()Configure consumer polling timeouts to handle slow or unresponsive Kafka brokers.
/**
* Consumer poll timeout in milliseconds
* How long to wait for data in each consumer.poll() call
* @param timeout - Timeout in milliseconds (default: 60000)
*/
.option("kafkaConsumer.pollTimeoutMs", "30000")Usage Example:
// Shorter timeout for low-latency applications
val lowLatencyStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "real-time-events")
.option("kafkaConsumer.pollTimeoutMs", "5000") // 5 second timeout
.load()
// Longer timeout for high-latency networks
val highLatencyStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "distant-cluster:9092")
.option("subscribe", "batch-events")
.option("kafkaConsumer.pollTimeoutMs", "120000") // 2 minute timeout
.load()Configure Kafka client settings for reliable connections and error recovery.
/**
* Consumer session timeout - how long broker waits before removing consumer from group
* @param timeout - Session timeout in milliseconds
*/
.option("kafka.session.timeout.ms", "30000")
/**
* Request timeout - how long to wait for broker responses
* @param timeout - Request timeout in milliseconds
*/
.option("kafka.request.timeout.ms", "40000")/**
* Network receive buffer size for consumer
* @param size - Buffer size in bytes (default: 65536)
*/
.option("kafka.receive.buffer.bytes", "131072") // 128KB
/**
* Network send buffer size for producer
* @param size - Buffer size in bytes (default: 131072)
*/
.option("kafka.send.buffer.bytes", "262144") // 256KB
/**
* Connection maximum idle time
* @param timeout - Idle timeout in milliseconds
*/
.option("kafka.connections.max.idle.ms", "540000") // 9 minutesUsage Example:
// Robust connection configuration for production
val productionStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "prod-cluster:9092")
.option("subscribe", "production-events")
.option("kafka.session.timeout.ms", "45000") // Longer session timeout
.option("kafka.request.timeout.ms", "60000") // Longer request timeout
.option("kafka.receive.buffer.bytes", "262144") // Larger receive buffer
.option("kafka.connections.max.idle.ms", "300000") // 5 minute idle timeout
.load()Configure producer reliability settings for write operations.
/**
* Producer acknowledgment configuration
* @param acks - "0" (no wait), "1" (leader only), "all" (all replicas)
*/
.option("kafka.acks", "all") // Wait for all replicas
/**
* Producer retry configuration
* @param retries - Number of retry attempts for failed sends
*/
.option("kafka.retries", "10") // Retry failed sends
/**
* Maximum time to wait for acknowledgments
* @param timeout - Request timeout in milliseconds
*/
.option("kafka.request.timeout.ms", "30000")/**
* Enable idempotent producer for exactly-once semantics
* Prevents duplicate messages during retries
*/
.option("kafka.enable.idempotence", "true")Usage Example:
// Reliable write configuration with exactly-once semantics
val reliableWrite = processedDF
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "critical-output")
.option("kafka.acks", "all") // Wait for all replicas
.option("kafka.retries", "2147483647") // Retry indefinitely
.option("kafka.enable.idempotence", "true") // Prevent duplicates
.option("kafka.max.in.flight.requests.per.connection", "5")
.outputMode("append")
.start()Understanding common error messages and their meanings.
/**
* Error message constants for data loss scenarios
*/
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = """
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you want your streaming query to fail on such cases, set the source
option "failOnDataLoss" to "true".
"""
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE = """
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
"""/**
* Warning message for custom group ID usage
*/
val CUSTOM_GROUP_ID_ERROR_MESSAGE = """
Kafka option 'kafka.group.id' has been set on this query, it is not recommended to set this
option. This option is unsafe to use since multiple concurrent queries or sources using the
same group id will interfere with each other as they are part of the same consumer group.
Restarted queries may also suffer interference from the previous run having the same group id.
The user should have only one query per group id, and/or set the option
'kafka.session.timeout.ms' to be very small so that the Kafka consumers from the previous
query are marked dead by the Kafka group coordinator before the restarted query starts running.
"""Common patterns for handling exceptions in Kafka streaming applications.
Usage Example:
import org.apache.spark.sql.streaming.StreamingQueryException
// Robust streaming application with error handling
def createKafkaStream(): StreamingQuery = {
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("failOnDataLoss", "false") // Continue on data loss
.option("fetchOffset.numRetries", "5") // Retry offset fetches
.option("kafkaConsumer.pollTimeoutMs", "30000") // 30s poll timeout
.load()
df.selectExpr("CAST(value AS STRING) as message")
.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
}
// Application with restart logic
def runWithRetry(): Unit = {
var query: StreamingQuery = null
var attempts = 0
val maxAttempts = 3
while (attempts < maxAttempts) {
try {
query = createKafkaStream()
query.awaitTermination()
return // Successful completion
} catch {
case e: StreamingQueryException =>
attempts += 1
println(s"Query failed (attempt $attempts): ${e.getMessage}")
if (query != null) query.stop()
if (attempts < maxAttempts) {
Thread.sleep(10000) // Wait before retry
}
}
}
println(s"Failed after $maxAttempts attempts")
}Best practices for monitoring Kafka streaming applications.
/**
* Query progress monitoring for health checks
*/
val query = df.writeStream./* ... */.start()
// Monitor query health
def monitorQuery(query: StreamingQuery): Unit = {
while (query.isActive) {
val progress = query.lastProgress
// Check for processing delays
if (progress.durationMs.get("triggerExecution") > 60000) {
println("WARNING: Processing is taking too long")
}
// Check input rate
if (progress.inputRowsPerSecond < expectedRate * 0.8) {
println("WARNING: Input rate is below expected threshold")
}
// Check for exceptions
if (query.exception.isDefined) {
println(s"ERROR: Query failed with exception: ${query.exception.get}")
// Trigger alert system
sendAlert(query.exception.get)
}
Thread.sleep(30000) // Check every 30 seconds
}
}import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException}
// Exception types
class StreamingQueryException(
message: String,
cause: Throwable,
val query: StreamingQuery
) extends Exception
// Query monitoring types
trait StreamingQuery {
def exception: Option[StreamingQueryException] // Current exception if any
def isActive: Boolean // Whether query is running
def lastProgress: StreamingQueryProgress // Latest progress info
def status: StreamingQueryStatus // Current status
}
// Progress monitoring
case class StreamingQueryProgress(
id: UUID,
batchId: Long,
inputRowsPerSecond: Double,
durationMs: Map[String, Long],
/* ... other fields ... */
)