The Kafka connector provides comprehensive writing capabilities for both batch and streaming scenarios, with support for topic routing, producer pooling, and flexible data formatting.
Core writer object providing the main logic for batch and streaming writes to Kafka.
/**
* Core writer functionality for batch and streaming writes to Kafka
* Handles data validation, topic routing, and producer management
*/
object KafkaWriter {
/** Column names for Kafka message attributes */
val TOPIC_ATTRIBUTE_NAME: String = "topic"
val KEY_ATTRIBUTE_NAME: String = "key"
val VALUE_ATTRIBUTE_NAME: String = "value"
val HEADERS_ATTRIBUTE_NAME: String = "headers"
val PARTITION_ATTRIBUTE_NAME: String = "partition"
/**
* Validates query plan and writes data to Kafka
* Main entry point for all Kafka write operations
*/
def write(
sparkSession: SparkSession,
queryExecution: QueryExecution,
kafkaParams: ju.Map[String, Object],
topic: Option[String]
): Unit
/** Validates DataFrame schema and configuration */
def validateQuery(
schema: Seq[Attribute],
kafkaParameters: ju.Map[String, Object],
topic: Option[String]
): Unit
/** Creates expression for topic routing */
def topicExpression(schema: Seq[Attribute], topic: Option[String]): Expression
/** Creates expression for message key extraction */
def keyExpression(schema: Seq[Attribute]): Expression
/** Creates expression for message value extraction */
def valueExpression(schema: Seq[Attribute]): Expression
/** Creates expression for headers extraction */
def headersExpression(schema: Seq[Attribute]): Expression
/** Creates expression for partition assignment */
def partitionExpression(schema: Seq[Attribute]): Expression
}Usage Examples:
// Basic write with topic specified in options
dataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()
// Write with topic column for dynamic routing
dataFrame
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save()V2 DataSource write implementation supporting both batch and streaming modes.
/**
* V2 DataSource write implementation for Kafka
* Supports both batch and streaming write operations
*/
case class KafkaWrite(
topic: Option[String],
producerParams: ju.Map[String, Object],
schema: StructType
) extends Write {
/** Returns description of the write operation */
def description(): String = "KafkaWrite"
/** Creates batch write implementation */
def toBatch: BatchWrite = new KafkaBatchWrite(topic, producerParams, schema)
/** Creates streaming write implementation */
def toStreaming: StreamingWrite = new KafkaStreamingWrite(topic, producerParams, schema)
}Legacy V1 DataSource streaming sink implementation.
/**
* V1 DataSource streaming sink for writing to Kafka
* Provides backward compatibility with Structured Streaming V1 API
*/
class KafkaSink(
sqlContext: SQLContext,
kafkaParams: ju.Map[String, Object],
topic: Option[String]
) extends Sink {
/** Adds a batch of data to Kafka */
def addBatch(batchId: Long, data: DataFrame): Unit
}The DataFrame must contain specific columns for Kafka message construction:
// Required: value column (message payload)
val validDataFrame = spark.createDataFrame(Seq(
("Hello World",)
)).toDF("value")
// Write with just value
validDataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "messages")
.save()Additional columns provide more control over message attributes:
// Full control with all optional columns
val fullControlDataFrame = spark.createDataFrame(Seq(
("user-events", "user123", """{"event":"login","timestamp":1234567890}""", 0, Array(("correlation-id", "abc123".getBytes), ("source", "web".getBytes))),
("user-events", "user456", """{"event":"logout","timestamp":1234567891}""", 1, Array(("correlation-id", "def456".getBytes), ("source", "mobile".getBytes)))
)).toDF("topic", "key", "value", "partition", "headers")
fullControlDataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save() // No topic option needed - using topic column// Expected column types:
// topic: StringType (optional - can use option instead)
// key: StringType or BinaryType (optional - null if not provided)
// value: StringType or BinaryType (required)
// partition: IntegerType (optional - Kafka will assign if not provided)
// headers: ArrayType of StructType with "key" (StringType) and "value" (BinaryType) (optional)Type Conversion Examples:
// Convert different types to appropriate Kafka format
val typedDataFrame = originalDataFrame
.select(
col("topic"),
col("user_id").cast(StringType).as("key"), // Convert to string key
to_json(struct(col("*"))).as("value"), // Convert struct to JSON value
(col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition"), // Partition by user_id mod 10
array(
struct(lit("content-type").as("key"), lit("application/json").cast(BinaryType).as("value")),
struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value"))
).as("headers")
)Specify topic in write options:
// All records go to the same topic
dataFrame
.select(col("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "static-topic")
.save()Use topic column for per-record topic routing:
// Route records to different topics based on data
val routedDataFrame = sourceDataFrame
.withColumn("topic",
when(col("event_type") === "error", "error-topic")
.when(col("event_type") === "warning", "warning-topic")
.otherwise("info-topic")
)
.select("topic", "value")
routedDataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save()// Topic must be specified either in options or as a column
dataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
// Missing topic specification will cause error:
// "Topic option or topic column must be specified for Kafka writes"
.save()// Essential producer configuration
dataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.acks", "all") // Wait for all replicas
.option("kafka.retries", "3") // Retry failed sends
.option("kafka.batch.size", "16384") // Batch size in bytes
.option("kafka.linger.ms", "5") // Wait up to 5ms to batch
.option("kafka.buffer.memory", "33554432") // 32MB producer buffer
.option("topic", "reliable-topic")
.save()// High-throughput producer configuration
dataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.acks", "1") // Leader acknowledgment only
.option("kafka.compression.type", "snappy") // Enable compression
.option("kafka.batch.size", "65536") // Larger batch size
.option("kafka.linger.ms", "10") // Higher batching delay
.option("kafka.buffer.memory", "67108864") // 64MB producer buffer
.option("topic", "high-throughput-topic")
.save()// Maximum reliability producer configuration
dataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.acks", "all") // Wait for all in-sync replicas
.option("kafka.retries", "10") // More retry attempts
.option("kafka.retry.backoff.ms", "1000") // 1 second retry backoff
.option("kafka.enable.idempotence", "true") // Exactly-once semantics
.option("kafka.max.in.flight.requests.per.connection", "1") // Maintain order
.option("topic", "critical-topic")
.save()// Simple streaming write to Kafka
val streamingQuery = kafkaInputStream
.selectExpr("CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-stream")
.outputMode("append")
.start()// Advanced streaming write with processing
val advancedStreamingQuery = kafkaInputStream
.select(
expr("CAST(key AS STRING)").as("key"),
from_json(expr("CAST(value AS STRING)"), inputSchema).as("data")
)
.select(
col("key"),
col("data.user_id"),
col("data.event_type"),
to_json(col("data")).as("value")
)
.withColumn("topic",
when(col("event_type") === "purchase", "purchase-events")
.otherwise("general-events")
)
.select("topic", "key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/checkpoints/kafka-output")
.outputMode("append")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()// Include headers in streaming write
val headerStreamingQuery = kafkaInputStream
.select(
expr("CAST(key AS STRING)").as("key"),
expr("CAST(value AS STRING)").as("value"),
array(
struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value")),
struct(lit("version").as("key"), lit("1.0").cast(BinaryType).as("value"))
).as("headers")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "enriched-stream")
.outputMode("append")
.start()// Write DataFrame to Kafka in batch mode
batchDataFrame
.select(
col("id").cast(StringType).as("key"),
to_json(struct(col("*"))).as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "batch-data")
.save()// Control Kafka partitioning for batch writes
batchDataFrame
.select(
col("user_id").cast(StringType).as("key"),
to_json(struct(col("*"))).as("value"),
(col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "partitioned-data")
.save()// Write to multiple topics in a single operation
multiTopicDataFrame
.withColumn("topic",
when(col("category") === "orders", "order-events")
.when(col("category") === "payments", "payment-events")
.when(col("category") === "inventory", "inventory-events")
.otherwise("misc-events")
)
.select("topic", "key", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save()The writer performs comprehensive schema validation:
// Invalid schema - missing value column
invalidDataFrame
.select("key", "topic") // Missing required "value" column
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
// Throws: "Required attribute 'value' not found"// Invalid producer configuration
dataFrame
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.key.serializer", "custom.serializer") // Not allowed
.option("topic", "test")
.save()
// Throws: "Kafka option 'key.serializer' is not supported as keys are serialized with ByteArraySerializer"// Invalid save modes for Kafka
dataFrame
.write
.mode(SaveMode.Overwrite) // Not supported
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()
// Throws: "Save mode Overwrite not allowed for Kafka"
// Supported save modes:
dataFrame.write.mode(SaveMode.Append).format("kafka")... // Allowed (default)
dataFrame.write.mode(SaveMode.ErrorIfExists).format("kafka")... // AllowedThe connector automatically manages producer instances for efficiency:
// Configure producer cache behavior (internal settings)
// These are managed automatically but can be tuned via Spark configuration:
// spark.kafka.producer.cache.timeout = "10m" // Producer cache timeout
// spark.kafka.producer.cache.evictorThreadRunInterval = "1m" // Cache cleanup interval/**
* Internal producer pool management
* Automatically handles producer lifecycle and connection pooling
*/
object InternalKafkaProducerPool {
/** Acquires a cached producer for the given configuration */
def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer
/** Releases a producer back to the pool */
def release(producer: CachedKafkaProducer): Unit
}// Optimize for high throughput
highVolumeDataFrame
.coalesce(10) // Reduce number of partitions for larger batches
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.batch.size", "131072") // 128KB batches
.option("kafka.linger.ms", "20") // Wait longer to fill batches
.option("kafka.compression.type", "lz4") // Use fast compression
.option("topic", "high-volume-topic")
.save()// Optimize memory usage for large writes
largeDataFrame
.repartition(50) // Increase parallelism to reduce memory per partition
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.buffer.memory", "134217728") // 128MB producer buffer
.option("kafka.max.request.size", "10485760") // 10MB max request size
.option("topic", "large-data-topic")
.save()// Maximize concurrent producer connections
dataFrame
.repartition(100) // More partitions = more concurrent producers
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.max.in.flight.requests.per.connection", "5") // More concurrent requests
.option("topic", "concurrent-topic")
.save()