Comprehensive data writing capabilities for both streaming and batch workloads with producer connection pooling, automatic serialization, and support for multiple output modes.
Legacy streaming sink for writing DataFrames to Kafka topics in streaming queries.
/**
* Legacy streaming sink for writing to Kafka
*/
class KafkaSink extends Sink with Logging {
/**
* Adds a batch of data to the sink
* @param batchId Unique identifier for this batch
* @param data DataFrame containing data to write
*/
def addBatch(batchId: Long, data: DataFrame): Unit
/**
* String representation of the sink
* @return String describing this sink
*/
def toString(): String
}Modern stream writer for DataSource V2 with improved performance and reliability.
/**
* Stream writer for DataSource V2 streaming writes
* @param topic Optional default topic for writes
* @param producerParams Kafka producer configuration
* @param schema Schema of input data
*/
class KafkaStreamWriter(
topic: Option[String],
producerParams: Map[String, String],
schema: StructType
) extends StreamWriter {
/**
* Creates writer factory for this stream
* @return KafkaStreamWriterFactory instance
*/
def createWriterFactory(): KafkaStreamWriterFactory
/**
* Commits an epoch of writes
* @param epochId Epoch identifier
* @param messages Array of commit messages from writers
*/
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
/**
* Aborts an epoch of writes
* @param epochId Epoch identifier
* @param messages Array of commit messages from writers
*/
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit
}Factory for creating data writers in streaming contexts.
/**
* Factory for creating stream data writers
* @param topic Optional default topic
* @param producerParams Kafka producer configuration
* @param schema Schema of input data
*/
case class KafkaStreamWriterFactory(
topic: Option[String],
producerParams: Map[String, String],
schema: StructType
) extends DataWriterFactory[InternalRow] {
/**
* Creates data writer for a specific partition and task
* @param partitionId Partition identifier
* @param taskId Task identifier
* @param epochId Epoch identifier
* @return DataWriter for writing records
*/
def createDataWriter(
partitionId: Int,
taskId: Long,
epochId: Long
): DataWriter[InternalRow]
}Data writer for streaming Kafka writes with row-level processing.
/**
* Data writer for streaming Kafka writes
* @param targetTopic Optional target topic
* @param producerParams Kafka producer configuration
* @param inputSchema Schema of input data
*/
class KafkaStreamDataWriter(
targetTopic: Option[String],
producerParams: Map[String, String],
inputSchema: StructType
) extends KafkaRowWriter with DataWriter[InternalRow] {
/**
* Writes a single row to Kafka
* @param row InternalRow to write
*/
def write(row: InternalRow): Unit
/**
* Commits all pending writes
* @return WriterCommitMessage confirming completion
*/
def commit(): WriterCommitMessage
/**
* Aborts all pending writes
*/
def abort(): Unit
/**
* Closes the writer and releases resources
*/
def close(): Unit
}Utilities for writing data to Kafka from batch and streaming queries.
/**
* Utilities for writing data to Kafka from batch/streaming queries
*/
object KafkaWriter extends Logging {
/** Topic column name in DataFrame */
val TOPIC_ATTRIBUTE_NAME: String = "topic"
/** Key column name in DataFrame */
val KEY_ATTRIBUTE_NAME: String = "key"
/** Value column name in DataFrame */
val VALUE_ATTRIBUTE_NAME: String = "value"
/**
* Validates query schema for Kafka write compatibility
* @param schema Attribute schema to validate
* @param kafkaParameters Kafka producer parameters
* @param topic Optional default topic
*/
def validateQuery(
schema: Seq[Attribute],
kafkaParameters: ju.Map[String, Object],
topic: Option[String]
): Unit
/**
* Writes DataFrame data to Kafka
* @param sparkSession Current Spark session
* @param queryExecution Query execution plan
* @param kafkaParameters Kafka producer parameters
* @param topic Optional default topic
*/
def write(
sparkSession: SparkSession,
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String]
): Unit
/**
* String representation of the writer
* @return String describing writer configuration
*/
def toString: String
}Task for writing data to Kafka in batch mode with proper resource management.
/**
* Task for writing data to Kafka in batch mode
* @param producerConfiguration Kafka producer configuration
* @param inputSchema Schema of input data
* @param topic Optional target topic
*/
class KafkaWriteTask(
producerConfiguration: ju.Map[String, Object],
inputSchema: StructType,
topic: Option[String]
) extends KafkaRowWriter {
/**
* Executes write task for iterator of rows
* @param iterator Iterator of InternalRow objects to write
*/
def execute(iterator: Iterator[InternalRow]): Unit
/**
* Closes task and releases resources
*/
def close(): Unit
}Base class for writing rows to Kafka with common functionality.
/**
* Base class for writing rows to Kafka
* @param inputSchema Schema of input data
* @param topic Optional target topic
*/
abstract class KafkaRowWriter(inputSchema: StructType, topic: Option[String]) {
/**
* Sends a row to Kafka producer
* @param row InternalRow to send
* @param producer Kafka producer instance
*/
protected def sendRow(row: InternalRow, producer: Producer[Array[Byte], Array[Byte]]): Unit
/**
* Checks for write errors and throws exceptions if found
*/
protected def checkForErrors(): Unit
}Commit message for Kafka stream writes indicating successful completion.
/**
* Commit message for Kafka stream writes
*/
case object KafkaWriterCommitMessage extends WriterCommitMessageProducer cache for improved performance and connection reuse.
/**
* Cache for Kafka producers to improve performance
*/
object CachedKafkaProducer extends Logging {
/**
* Gets existing producer from cache or creates new one
* @param kafkaParams Kafka producer parameters
* @return Cached or new Producer instance
*/
def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer[Array[Byte], Array[Byte]]
/**
* Explicitly closes and removes producer from cache
* @param kafkaParams Producer parameters for identification
*/
def close(kafkaParams: ju.Map[String, Object]): Unit
/**
* Clears entire producer cache
*/
def clear(): Unit
}val query = df
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()val multiTopicDF = inputDF
.withColumn("topic", when($"event_type" === "user", "user-events")
.when($"event_type" === "order", "order-events")
.otherwise("other-events"))
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value", "topic")
val query = multiTopicDF
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/multi-topic-checkpoint")
.start()df.select(
col("user_id").cast("string").as("key"),
to_json(struct(col("*"))).as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "user-data")
.save()val partitionedDF = df
.withColumn("partition_key", hash($"user_id") % 10)
.selectExpr(
"CAST(partition_key AS STRING) AS key",
"to_json(struct(*)) AS value"
)
partitionedDF
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "partitioned-topic")
.save()import org.apache.spark.sql.functions._
val customSerializedDF = df
.selectExpr(
"CAST(id AS STRING) AS key",
"CAST(serialize_avro(struct(*)) AS BINARY) AS value" // Custom serialization
)
customSerializedDF
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "avro-topic")
.start()// Basic configuration
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("topic", "my-topic")
// Performance tuning
.option("kafka.acks", "all") // Acknowledgment level
.option("kafka.retries", "3") // Retry count
.option("kafka.batch.size", "16384") // Batch size
.option("kafka.linger.ms", "5") // Batching delay
.option("kafka.buffer.memory", "33554432") // Buffer memory
.option("kafka.compression.type", "snappy") // Compression
// Reliability
.option("kafka.enable.idempotence", "true") // Idempotent producer
.option("kafka.max.in.flight.requests.per.connection", "5")
.option("kafka.request.timeout.ms", "30000") // Request timeout// SSL configuration
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
.option("kafka.ssl.truststore.password", "password")
.option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
.option("kafka.ssl.keystore.password", "password")
// SASL configuration
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")The DataFrame must contain specific columns for Kafka writes:
// Minimum required: value column
df.select(
lit("default-key").as("key"), // Optional: key column (String or Binary)
to_json(struct(col("*"))).as("value"), // Required: value column (String or Binary)
lit("my-topic").as("topic") // Optional: topic column (String)
)val schemaValidation = StructType(Seq(
StructField("key", StringType, nullable = true), // or BinaryType
StructField("value", StringType, nullable = false), // or BinaryType
StructField("topic", StringType, nullable = true) // Optional
))// String to binary conversion
df.selectExpr(
"CAST(key AS BINARY) AS key",
"CAST(value AS BINARY) AS value"
)
// JSON serialization
df.select(
col("id").cast("string").as("key"),
to_json(struct(col("*"))).as("value")
)
// Custom serialization function
import org.apache.spark.sql.functions.udf
val customSerializer = udf((data: String) => {
// Custom serialization logic
data.getBytes("UTF-8")
})
df.select(
col("key"),
customSerializer(col("data")).as("value")
)import org.apache.spark.sql.streaming.StreamingQueryException
try {
val query = df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test-topic")
.start()
query.awaitTermination()
} catch {
case e: StreamingQueryException =>
println(s"Streaming query failed: ${e.getMessage}")
e.getCause match {
case kafkaException: org.apache.kafka.common.KafkaException =>
println(s"Kafka error: ${kafkaException.getMessage}")
case _ =>
println("Non-Kafka error occurred")
}
}// Configure retry behavior
.option("kafka.retries", "3")
.option("kafka.retry.backoff.ms", "100")
.option("kafka.request.timeout.ms", "30000")
// Error tolerance
.option("kafka.acks", "1") // or "all" for stronger guaranteesdef validateKafkaSchema(df: DataFrame): Unit = {
val schema = df.schema
val hasValue = schema.fieldNames.contains("value")
val hasValidValueType = hasValue &&
(schema("value").dataType == StringType || schema("value").dataType == BinaryType)
require(hasValue && hasValidValueType,
"DataFrame must contain 'value' column of String or Binary type")
if (schema.fieldNames.contains("key")) {
val keyType = schema("key").dataType
require(keyType == StringType || keyType == BinaryType,
"'key' column must be String or Binary type")
}
if (schema.fieldNames.contains("topic")) {
require(schema("topic").dataType == StringType,
"'topic' column must be String type")
}
}
validateKafkaSchema(df)// Optimize batching for throughput
.option("kafka.batch.size", "32768") // 32KB batches
.option("kafka.linger.ms", "10") // Wait up to 10ms
.option("kafka.buffer.memory", "67108864") // 64MB buffer
// Optimize for latency
.option("kafka.batch.size", "0") // No batching
.option("kafka.linger.ms", "0") // Send immediately// Connection pooling
.option("kafka.connections.max.idle.ms", "540000") // 9 minutes
.option("kafka.max.in.flight.requests.per.connection", "5")
// Reduce connection overhead by reusing producers
CachedKafkaProducer.getOrCreate(kafkaParams)// Enable compression for large messages
.option("kafka.compression.type", "snappy") // or "gzip", "lz4", "zstd"val query = df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "metrics-topic")
.start()
// Monitor progress
query.progress.foreach { progress =>
println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
println(s"Processing time: ${progress.durationMs.get("triggerExecution")}ms")
}// Access producer metrics through JMX or custom metrics collectors
import org.apache.kafka.clients.producer.ProducerConfig
val metricsReporters = "org.apache.kafka.common.metrics.JmxReporter"
.option(s"kafka.${ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG}", metricsReporters)Keep keys consistent for partitioning:
df.withColumn("key", col("user_id").cast("string"))Use efficient serialization:
// Prefer binary formats for large messages
df.select(col("key"), col("avro_bytes").as("value"))Include metadata in messages:
df.select(
col("key"),
to_json(struct(
col("*"),
current_timestamp().as("processed_at"),
lit("spark").as("source")
)).as("value")
)Enable idempotence for exactly-once:
.option("kafka.enable.idempotence", "true")
.option("kafka.acks", "all")Use checkpointing for fault tolerance:
.option("checkpointLocation", "/reliable/checkpoint/path")Monitor for failures:
query.exception.foreach(throw _) // Propagate exceptionsClose resources properly:
try {
// Write operations
} finally {
CachedKafkaProducer.clear() // Clean up on shutdown
}Configure memory appropriately:
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.driver.memory", "2g")