Write operations enable sending data from Spark DataFrames to Kafka topics for real-time data distribution, event publishing, and data pipeline integration. The connector supports both batch and streaming write modes.
Write streaming DataFrames to Kafka topics in real-time with exactly-once semantics.
/**
* Write streaming DataFrame to Kafka topics
* Supports exactly-once semantics with checkpointing
*/
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers
.option("topic", topicName: String) // Target topic (optional if in data)
.outputMode("append") // Required: only "append" supported
.option("checkpointLocation", path: String) // Required: checkpoint directory
.start(): StreamingQueryUsage Examples:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder()
.appName("KafkaWriter")
.getOrCreate()
// Basic streaming write
val inputStream = spark.readStream
.format("rate")
.option("rowsPerSecond", "10")
.load()
val kafkaWrite = inputStream
.select(to_json(struct("*")).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.outputMode("append")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
// Write with custom key and headers
val enrichedWrite = inputStream
.select(
lit("sensor-data").as("key"),
to_json(struct("*")).as("value"),
map(lit("source"), lit("spark-app")).as("headers")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sensor-events")
.outputMode("append")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()Write batch DataFrames to Kafka topics for bulk data publishing.
/**
* Write batch DataFrame to Kafka topics
* Supports transactional writes for exactly-once semantics
*/
df.write
.format("kafka")
.option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers
.option("topic", topicName: String) // Target topic (optional if in data)
.save()Usage Examples:
// Basic batch write
val batchData = spark.range(1000)
.select(
col("id").cast("string").as("key"),
to_json(struct(col("id"), current_timestamp().as("timestamp"))).as("value")
)
batchData.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "batch-events")
.save()
// Write with producer configuration
val configuredWrite = batchData.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "secure-topic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='pass';")
.save()Write to different topics based on data content using the topic column.
/**
* Dynamic topic selection using data content
* Topic determined per row based on 'topic' column value
*/
df.select(
expr("CASE WHEN category = 'error' THEN 'error-topic' ELSE 'info-topic' END").as("topic"),
col("key"),
col("value")
).write
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.save()Usage Examples:
import org.apache.spark.sql.functions._
// Route messages by category
val categorizedData = spark.range(100)
.select(
(col("id") % 10).as("category"),
col("id").cast("string").as("key"),
to_json(struct("*")).as("value")
)
.select(
when(col("category") < 3, "priority-topic")
.when(col("category") < 7, "normal-topic")
.otherwise("low-priority-topic").as("topic"),
col("key"),
col("value")
)
categorizedData.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save()
// Route by timestamp (time-based partitioning)
val timePartitioned = inputData
.select(
concat(lit("events-"), date_format(col("timestamp"), "yyyy-MM-dd")).as("topic"),
col("userId").cast("string").as("key"),
to_json(col("eventData")).as("value")
)Control which Kafka partition messages are sent to for load balancing and ordering.
/**
* Partition control using the 'partition' column
* Specify exact partition numbers for message placement
*/
df.select(
col("key"),
col("value"),
expr("abs(hash(key)) % 10").as("partition") // Custom partitioning logic
).write
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("topic", topicName: String)
.save()Usage Examples:
// Partition by user ID for ordering guarantees
val userPartitioned = userData
.select(
col("userId").cast("string").as("key"),
to_json(struct("*")).as("value"),
(abs(hash(col("userId"))) % 12).as("partition") // 12 partitions
)
userPartitioned.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "user-events")
.save()
// Load balancing with round-robin
val roundRobin = inputData
.withColumn("row_number", monotonically_increasing_id())
.select(
col("key"),
col("value"),
(col("row_number") % 8).as("partition") // Distribute across 8 partitions
)Include custom headers and metadata with Kafka messages.
/**
* Include headers using the 'headers' column
* Headers should be Map[String, Array[Byte]] or Map[String, String]
*/
df.select(
col("key"),
col("value"),
map(
lit("source"), lit("spark-app").cast("binary"),
lit("version"), lit("1.0").cast("binary")
).as("headers")
).write
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("topic", topicName: String)
.save()Usage Examples:
// Add tracing headers
val tracedMessages = processedData
.select(
col("messageId").cast("string").as("key"),
to_json(col("payload")).as("value"),
map(
lit("trace-id"), lit(java.util.UUID.randomUUID().toString).cast("binary"),
lit("source"), lit("data-processor").cast("binary"),
lit("version"), lit("2.1.0").cast("binary")
).as("headers")
)
// Add schema information headers
val schemaHeaders = jsonData
.select(
col("key"),
col("value"),
map(
lit("schema-id"), lit("user-event-v1").cast("binary"),
lit("content-type"), lit("application/json").cast("binary")
).as("headers")
)Configure Kafka producer settings for performance, security, and reliability.
/**
* Producer configuration options (prefixed with 'kafka.')
* All standard Kafka producer configurations are supported
*/
.option("kafka.acks", "all") // Acknowledgment level
.option("kafka.retries", "3") // Retry attempts
.option("kafka.batch.size", "16384") // Batch size in bytes
.option("kafka.linger.ms", "10") // Batching delay
.option("kafka.buffer.memory", "33554432") // Total memory for buffering
.option("kafka.compression.type", "gzip") // Compression algorithm
.option("kafka.enable.idempotence", "true") // Idempotent producerUsage Examples:
// High-throughput configuration
val highThroughputWrite = largeDataset.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "high-volume-topic")
.option("kafka.acks", "1") // Faster but less durable
.option("kafka.batch.size", "65536") // Larger batches
.option("kafka.linger.ms", "100") // More batching delay
.option("kafka.compression.type", "lz4") // Fast compression
.option("kafka.buffer.memory", "67108864") // More buffer memory
.save()
// High-reliability configuration
val reliableWrite = criticalData.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "critical-events")
.option("kafka.acks", "all") // Wait for all replicas
.option("kafka.retries", "10") // More retry attempts
.option("kafka.enable.idempotence", "true") // Prevent duplicates
.option("kafka.max.in.flight.requests.per.connection", "1") // Ordering guarantee
.save()Configure authentication and encryption for secure Kafka clusters.
/**
* Security configuration options
* Support for SASL, SSL, and other security mechanisms
*/
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", jaasConfig: String)
.option("kafka.ssl.truststore.location", truststorePath: String)
.option("kafka.ssl.truststore.password", truststorePassword: String)
.option("kafka.ssl.keystore.location", keystorePath: String)
.option("kafka.ssl.keystore.password", keystorePassword: String)Usage Examples:
// SASL/PLAIN authentication
val saslWrite = secureData.write
.format("kafka")
.option("kafka.bootstrap.servers", "secure-kafka:9093")
.option("topic", "secure-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username='producer' password='secret';")
.save()
// SSL with client certificates
val sslWrite = sensitiveData.write
.format("kafka")
.option("kafka.bootstrap.servers", "ssl-kafka:9094")
.option("topic", "sensitive-data")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-password")
.option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
.option("kafka.ssl.keystore.password", "keystore-password")
.save()/**
* Minimum required schema for Kafka writes
* Only 'value' field is mandatory
*/
case class MinimalKafkaRecord(
value: Any // Required: message payload (will be serialized to bytes)
)/**
* Complete schema for Kafka writes with all optional fields
* All fields except 'value' are optional
*/
case class KafkaWriteRecord(
topic: String, // Target topic (optional if set in options)
key: Any, // Message key (optional, will be serialized)
value: Any, // Message value (required, will be serialized)
partition: Int, // Target partition (optional, auto-assigned if not specified)
headers: Map[String, Array[Byte]] // Message headers (optional)
)import org.apache.spark.sql.functions._
// Convert structured data to JSON
val jsonMessages = structuredData
.select(
col("id").cast("string").as("key"),
to_json(struct(
col("timestamp"),
col("userId"),
col("eventType"),
col("properties")
)).as("value")
)
// Add metadata to JSON
val enrichedJson = rawData
.select(
col("messageId").as("key"),
to_json(struct(
col("*"),
current_timestamp().as("processedAt"),
lit("spark-processor").as("source")
)).as("value")
)// Handle binary payloads
val binaryMessages = imageData
.select(
col("imageId").cast("string").as("key"),
col("imageBytes").as("value"), // Already binary
map(
lit("content-type"), lit("image/jpeg").cast("binary"),
lit("size"), col("imageSize").cast("string").cast("binary")
).as("headers")
)// Using Confluent Schema Registry with Avro
val avroMessages = userData
.select(
col("userId").cast("string").as("key"),
to_avro(struct(col("*")), "user-schema").as("value")
)// Monitor write progress and handle failures
val query = processedData
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "events")
.option("checkpointLocation", "/tmp/checkpoint")
.foreachBatch { (batchDF, batchId) =>
try {
batchDF.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "events")
.save()
println(s"Successfully wrote batch $batchId")
} catch {
case ex: Exception =>
println(s"Failed to write batch $batchId: ${ex.getMessage}")
// Handle error (retry, log, alert, etc.)
throw ex // Re-throw to trigger stream failure
}
}
.start()// Implement dead letter queue for failed messages
val (validMessages, invalidMessages) = inputData
.select(
col("*"),
when(col("payload").isNotNull && col("userId").isNotNull, true)
.otherwise(false).as("isValid")
)
.cache()
.split(col("isValid"))
// Write valid messages to main topic
validMessages
.select(col("userId").cast("string").as("key"), to_json(col("payload")).as("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "valid-events")
.save()
// Write invalid messages to dead letter queue
invalidMessages
.select(
col("messageId").cast("string").as("key"),
to_json(struct(
col("*"),
lit("VALIDATION_FAILED").as("errorType"),
current_timestamp().as("errorTimestamp")
)).as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "dead-letter-queue")
.save()// Optimize for high throughput
val optimizedWrite = largeDataset.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "high-volume")
.option("kafka.batch.size", "131072") // 128KB batches
.option("kafka.linger.ms", "50") // 50ms batching delay
.option("kafka.compression.type", "snappy") // Fast compression
.save()// Configure memory usage for large datasets
spark.conf.set("spark.sql.streaming.kafka.producer.cache.timeout", "10m")
spark.conf.set("spark.kafka.producer.cache.evictorThreadRunInterval", "30s")
val memoryOptimizedWrite = bigDataset.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "large-messages")
.option("kafka.buffer.memory", "134217728") // 128MB buffer
.option("kafka.max.request.size", "10485760") // 10MB max message
.save()