0
# Data Writing
1
2
Comprehensive data writing capabilities for both streaming and batch workloads with producer connection pooling, automatic serialization, and support for multiple output modes.
3
4
## Capabilities
5
6
### Streaming Sink (Legacy)
7
8
Legacy streaming sink for writing DataFrames to Kafka topics in streaming queries.
9
10
```scala { .api }
11
/**
12
* Legacy streaming sink for writing to Kafka
13
*/
14
class KafkaSink extends Sink with Logging {
15
/**
16
* Adds a batch of data to the sink
17
* @param batchId Unique identifier for this batch
18
* @param data DataFrame containing data to write
19
*/
20
def addBatch(batchId: Long, data: DataFrame): Unit
21
22
/**
23
* String representation of the sink
24
* @return String describing this sink
25
*/
26
def toString(): String
27
}
28
```
29
30
### Stream Writer (DataSource V2)
31
32
Modern stream writer for DataSource V2 with improved performance and reliability.
33
34
```scala { .api }
35
/**
36
* Stream writer for DataSource V2 streaming writes
37
* @param topic Optional default topic for writes
38
* @param producerParams Kafka producer configuration
39
* @param schema Schema of input data
40
*/
41
class KafkaStreamWriter(
42
topic: Option[String],
43
producerParams: Map[String, String],
44
schema: StructType
45
) extends StreamWriter {
46
/**
47
* Creates writer factory for this stream
48
* @return KafkaStreamWriterFactory instance
49
*/
50
def createWriterFactory(): KafkaStreamWriterFactory
51
52
/**
53
* Commits an epoch of writes
54
* @param epochId Epoch identifier
55
* @param messages Array of commit messages from writers
56
*/
57
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
58
59
/**
60
* Aborts an epoch of writes
61
* @param epochId Epoch identifier
62
* @param messages Array of commit messages from writers
63
*/
64
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit
65
}
66
```
67
68
### Stream Writer Factory
69
70
Factory for creating data writers in streaming contexts.
71
72
```scala { .api }
73
/**
74
* Factory for creating stream data writers
75
* @param topic Optional default topic
76
* @param producerParams Kafka producer configuration
77
* @param schema Schema of input data
78
*/
79
case class KafkaStreamWriterFactory(
80
topic: Option[String],
81
producerParams: Map[String, String],
82
schema: StructType
83
) extends DataWriterFactory[InternalRow] {
84
/**
85
* Creates data writer for a specific partition and task
86
* @param partitionId Partition identifier
87
* @param taskId Task identifier
88
* @param epochId Epoch identifier
89
* @return DataWriter for writing records
90
*/
91
def createDataWriter(
92
partitionId: Int,
93
taskId: Long,
94
epochId: Long
95
): DataWriter[InternalRow]
96
}
97
```
98
99
### Stream Data Writer
100
101
Data writer for streaming Kafka writes with row-level processing.
102
103
```scala { .api }
104
/**
105
* Data writer for streaming Kafka writes
106
* @param targetTopic Optional target topic
107
* @param producerParams Kafka producer configuration
108
* @param inputSchema Schema of input data
109
*/
110
class KafkaStreamDataWriter(
111
targetTopic: Option[String],
112
producerParams: Map[String, String],
113
inputSchema: StructType
114
) extends KafkaRowWriter with DataWriter[InternalRow] {
115
/**
116
* Writes a single row to Kafka
117
* @param row InternalRow to write
118
*/
119
def write(row: InternalRow): Unit
120
121
/**
122
* Commits all pending writes
123
* @return WriterCommitMessage confirming completion
124
*/
125
def commit(): WriterCommitMessage
126
127
/**
128
* Aborts all pending writes
129
*/
130
def abort(): Unit
131
132
/**
133
* Closes the writer and releases resources
134
*/
135
def close(): Unit
136
}
137
```
138
139
### Batch Writer
140
141
Utilities for writing data to Kafka from batch and streaming queries.
142
143
```scala { .api }
144
/**
145
* Utilities for writing data to Kafka from batch/streaming queries
146
*/
147
object KafkaWriter extends Logging {
148
/** Topic column name in DataFrame */
149
val TOPIC_ATTRIBUTE_NAME: String = "topic"
150
151
/** Key column name in DataFrame */
152
val KEY_ATTRIBUTE_NAME: String = "key"
153
154
/** Value column name in DataFrame */
155
val VALUE_ATTRIBUTE_NAME: String = "value"
156
157
/**
158
* Validates query schema for Kafka write compatibility
159
* @param schema Attribute schema to validate
160
* @param kafkaParameters Kafka producer parameters
161
* @param topic Optional default topic
162
*/
163
def validateQuery(
164
schema: Seq[Attribute],
165
kafkaParameters: ju.Map[String, Object],
166
topic: Option[String]
167
): Unit
168
169
/**
170
* Writes DataFrame data to Kafka
171
* @param sparkSession Current Spark session
172
* @param queryExecution Query execution plan
173
* @param kafkaParameters Kafka producer parameters
174
* @param topic Optional default topic
175
*/
176
def write(
177
sparkSession: SparkSession,
178
queryExecution: QueryExecution,
179
kafkaParameters: ju.Map[String, Object],
180
topic: Option[String]
181
): Unit
182
183
/**
184
* String representation of the writer
185
* @return String describing writer configuration
186
*/
187
def toString: String
188
}
189
```
190
191
### Write Task
192
193
Task for writing data to Kafka in batch mode with proper resource management.
194
195
```scala { .api }
196
/**
197
* Task for writing data to Kafka in batch mode
198
* @param producerConfiguration Kafka producer configuration
199
* @param inputSchema Schema of input data
200
* @param topic Optional target topic
201
*/
202
class KafkaWriteTask(
203
producerConfiguration: ju.Map[String, Object],
204
inputSchema: StructType,
205
topic: Option[String]
206
) extends KafkaRowWriter {
207
/**
208
* Executes write task for iterator of rows
209
* @param iterator Iterator of InternalRow objects to write
210
*/
211
def execute(iterator: Iterator[InternalRow]): Unit
212
213
/**
214
* Closes task and releases resources
215
*/
216
def close(): Unit
217
}
218
```
219
220
### Row Writer Base
221
222
Base class for writing rows to Kafka with common functionality.
223
224
```scala { .api }
225
/**
226
* Base class for writing rows to Kafka
227
* @param inputSchema Schema of input data
228
* @param topic Optional target topic
229
*/
230
abstract class KafkaRowWriter(inputSchema: StructType, topic: Option[String]) {
231
/**
232
* Sends a row to Kafka producer
233
* @param row InternalRow to send
234
* @param producer Kafka producer instance
235
*/
236
protected def sendRow(row: InternalRow, producer: Producer[Array[Byte], Array[Byte]]): Unit
237
238
/**
239
* Checks for write errors and throws exceptions if found
240
*/
241
protected def checkForErrors(): Unit
242
}
243
```
244
245
### Writer Commit Message
246
247
Commit message for Kafka stream writes indicating successful completion.
248
249
```scala { .api }
250
/**
251
* Commit message for Kafka stream writes
252
*/
253
case object KafkaWriterCommitMessage extends WriterCommitMessage
254
```
255
256
### Cached Producer
257
258
Producer cache for improved performance and connection reuse.
259
260
```scala { .api }
261
/**
262
* Cache for Kafka producers to improve performance
263
*/
264
object CachedKafkaProducer extends Logging {
265
/**
266
* Gets existing producer from cache or creates new one
267
* @param kafkaParams Kafka producer parameters
268
* @return Cached or new Producer instance
269
*/
270
def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer[Array[Byte], Array[Byte]]
271
272
/**
273
* Explicitly closes and removes producer from cache
274
* @param kafkaParams Producer parameters for identification
275
*/
276
def close(kafkaParams: ju.Map[String, Object]): Unit
277
278
/**
279
* Clears entire producer cache
280
*/
281
def clear(): Unit
282
}
283
```
284
285
## Usage Examples
286
287
### Streaming Write to Single Topic
288
289
```scala
290
val query = df
291
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
292
.writeStream
293
.format("kafka")
294
.option("kafka.bootstrap.servers", "localhost:9092")
295
.option("topic", "output-topic")
296
.option("checkpointLocation", "/tmp/checkpoint")
297
.start()
298
299
query.awaitTermination()
300
```
301
302
### Streaming Write to Multiple Topics
303
304
```scala
305
val multiTopicDF = inputDF
306
.withColumn("topic", when($"event_type" === "user", "user-events")
307
.when($"event_type" === "order", "order-events")
308
.otherwise("other-events"))
309
.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value", "topic")
310
311
val query = multiTopicDF
312
.writeStream
313
.format("kafka")
314
.option("kafka.bootstrap.servers", "localhost:9092")
315
.option("checkpointLocation", "/tmp/multi-topic-checkpoint")
316
.start()
317
```
318
319
### Batch Write
320
321
```scala
322
df.select(
323
col("user_id").cast("string").as("key"),
324
to_json(struct(col("*"))).as("value")
325
)
326
.write
327
.format("kafka")
328
.option("kafka.bootstrap.servers", "localhost:9092")
329
.option("topic", "user-data")
330
.save()
331
```
332
333
### Partitioned Write
334
335
```scala
336
val partitionedDF = df
337
.withColumn("partition_key", hash($"user_id") % 10)
338
.selectExpr(
339
"CAST(partition_key AS STRING) AS key",
340
"to_json(struct(*)) AS value"
341
)
342
343
partitionedDF
344
.write
345
.format("kafka")
346
.option("kafka.bootstrap.servers", "localhost:9092")
347
.option("topic", "partitioned-topic")
348
.save()
349
```
350
351
### Custom Serialization
352
353
```scala
354
import org.apache.spark.sql.functions._
355
356
val customSerializedDF = df
357
.selectExpr(
358
"CAST(id AS STRING) AS key",
359
"CAST(serialize_avro(struct(*)) AS BINARY) AS value" // Custom serialization
360
)
361
362
customSerializedDF
363
.writeStream
364
.format("kafka")
365
.option("kafka.bootstrap.servers", "localhost:9092")
366
.option("topic", "avro-topic")
367
.start()
368
```
369
370
## Configuration Options
371
372
### Producer Configuration
373
374
```scala
375
// Basic configuration
376
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
377
.option("topic", "my-topic")
378
379
// Performance tuning
380
.option("kafka.acks", "all") // Acknowledgment level
381
.option("kafka.retries", "3") // Retry count
382
.option("kafka.batch.size", "16384") // Batch size
383
.option("kafka.linger.ms", "5") // Batching delay
384
.option("kafka.buffer.memory", "33554432") // Buffer memory
385
.option("kafka.compression.type", "snappy") // Compression
386
387
// Reliability
388
.option("kafka.enable.idempotence", "true") // Idempotent producer
389
.option("kafka.max.in.flight.requests.per.connection", "5")
390
.option("kafka.request.timeout.ms", "30000") // Request timeout
391
```
392
393
### Security Configuration
394
395
```scala
396
// SSL configuration
397
.option("kafka.security.protocol", "SSL")
398
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
399
.option("kafka.ssl.truststore.password", "password")
400
.option("kafka.ssl.keystore.location", "/path/to/keystore.jks")
401
.option("kafka.ssl.keystore.password", "password")
402
403
// SASL configuration
404
.option("kafka.security.protocol", "SASL_SSL")
405
.option("kafka.sasl.mechanism", "PLAIN")
406
.option("kafka.sasl.jaas.config",
407
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";")
408
```
409
410
## Data Format Requirements
411
412
### Required Columns
413
414
The DataFrame must contain specific columns for Kafka writes:
415
416
```scala
417
// Minimum required: value column
418
df.select(
419
lit("default-key").as("key"), // Optional: key column (String or Binary)
420
to_json(struct(col("*"))).as("value"), // Required: value column (String or Binary)
421
lit("my-topic").as("topic") // Optional: topic column (String)
422
)
423
```
424
425
### Column Types
426
427
```scala
428
val schemaValidation = StructType(Seq(
429
StructField("key", StringType, nullable = true), // or BinaryType
430
StructField("value", StringType, nullable = false), // or BinaryType
431
StructField("topic", StringType, nullable = true) // Optional
432
))
433
```
434
435
### Type Conversions
436
437
```scala
438
// String to binary conversion
439
df.selectExpr(
440
"CAST(key AS BINARY) AS key",
441
"CAST(value AS BINARY) AS value"
442
)
443
444
// JSON serialization
445
df.select(
446
col("id").cast("string").as("key"),
447
to_json(struct(col("*"))).as("value")
448
)
449
450
// Custom serialization function
451
import org.apache.spark.sql.functions.udf
452
453
val customSerializer = udf((data: String) => {
454
// Custom serialization logic
455
data.getBytes("UTF-8")
456
})
457
458
df.select(
459
col("key"),
460
customSerializer(col("data")).as("value")
461
)
462
```
463
464
## Error Handling
465
466
### Write Failures
467
468
```scala
469
import org.apache.spark.sql.streaming.StreamingQueryException
470
471
try {
472
val query = df.writeStream
473
.format("kafka")
474
.option("kafka.bootstrap.servers", "localhost:9092")
475
.option("topic", "test-topic")
476
.start()
477
478
query.awaitTermination()
479
} catch {
480
case e: StreamingQueryException =>
481
println(s"Streaming query failed: ${e.getMessage}")
482
e.getCause match {
483
case kafkaException: org.apache.kafka.common.KafkaException =>
484
println(s"Kafka error: ${kafkaException.getMessage}")
485
case _ =>
486
println("Non-Kafka error occurred")
487
}
488
}
489
```
490
491
### Producer Error Handling
492
493
```scala
494
// Configure retry behavior
495
.option("kafka.retries", "3")
496
.option("kafka.retry.backoff.ms", "100")
497
.option("kafka.request.timeout.ms", "30000")
498
499
// Error tolerance
500
.option("kafka.acks", "1") // or "all" for stronger guarantees
501
```
502
503
### Schema Validation
504
505
```scala
506
def validateKafkaSchema(df: DataFrame): Unit = {
507
val schema = df.schema
508
val hasValue = schema.fieldNames.contains("value")
509
val hasValidValueType = hasValue &&
510
(schema("value").dataType == StringType || schema("value").dataType == BinaryType)
511
512
require(hasValue && hasValidValueType,
513
"DataFrame must contain 'value' column of String or Binary type")
514
515
if (schema.fieldNames.contains("key")) {
516
val keyType = schema("key").dataType
517
require(keyType == StringType || keyType == BinaryType,
518
"'key' column must be String or Binary type")
519
}
520
521
if (schema.fieldNames.contains("topic")) {
522
require(schema("topic").dataType == StringType,
523
"'topic' column must be String type")
524
}
525
}
526
527
validateKafkaSchema(df)
528
```
529
530
## Performance Optimization
531
532
### Batching Configuration
533
534
```scala
535
// Optimize batching for throughput
536
.option("kafka.batch.size", "32768") // 32KB batches
537
.option("kafka.linger.ms", "10") // Wait up to 10ms
538
.option("kafka.buffer.memory", "67108864") // 64MB buffer
539
540
// Optimize for latency
541
.option("kafka.batch.size", "0") // No batching
542
.option("kafka.linger.ms", "0") // Send immediately
543
```
544
545
### Connection Management
546
547
```scala
548
// Connection pooling
549
.option("kafka.connections.max.idle.ms", "540000") // 9 minutes
550
.option("kafka.max.in.flight.requests.per.connection", "5")
551
552
// Reduce connection overhead by reusing producers
553
CachedKafkaProducer.getOrCreate(kafkaParams)
554
```
555
556
### Compression
557
558
```scala
559
// Enable compression for large messages
560
.option("kafka.compression.type", "snappy") // or "gzip", "lz4", "zstd"
561
```
562
563
## Monitoring and Metrics
564
565
### Query Progress
566
567
```scala
568
val query = df.writeStream
569
.format("kafka")
570
.option("kafka.bootstrap.servers", "localhost:9092")
571
.option("topic", "metrics-topic")
572
.start()
573
574
// Monitor progress
575
query.progress.foreach { progress =>
576
println(s"Batch ${progress.batchId}: ${progress.inputRowsPerSecond} rows/sec")
577
println(s"Processing time: ${progress.durationMs.get("triggerExecution")}ms")
578
}
579
```
580
581
### Producer Metrics
582
583
```scala
584
// Access producer metrics through JMX or custom metrics collectors
585
import org.apache.kafka.clients.producer.ProducerConfig
586
587
val metricsReporters = "org.apache.kafka.common.metrics.JmxReporter"
588
.option(s"kafka.${ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG}", metricsReporters)
589
```
590
591
## Best Practices
592
593
### Message Design
594
595
1. **Keep keys consistent for partitioning**:
596
```scala
597
df.withColumn("key", col("user_id").cast("string"))
598
```
599
600
2. **Use efficient serialization**:
601
```scala
602
// Prefer binary formats for large messages
603
df.select(col("key"), col("avro_bytes").as("value"))
604
```
605
606
3. **Include metadata in messages**:
607
```scala
608
df.select(
609
col("key"),
610
to_json(struct(
611
col("*"),
612
current_timestamp().as("processed_at"),
613
lit("spark").as("source")
614
)).as("value")
615
)
616
```
617
618
### Reliability Patterns
619
620
1. **Enable idempotence for exactly-once**:
621
```scala
622
.option("kafka.enable.idempotence", "true")
623
.option("kafka.acks", "all")
624
```
625
626
2. **Use checkpointing for fault tolerance**:
627
```scala
628
.option("checkpointLocation", "/reliable/checkpoint/path")
629
```
630
631
3. **Monitor for failures**:
632
```scala
633
query.exception.foreach(throw _) // Propagate exceptions
634
```
635
636
### Resource Management
637
638
1. **Close resources properly**:
639
```scala
640
try {
641
// Write operations
642
} finally {
643
CachedKafkaProducer.clear() // Clean up on shutdown
644
}
645
```
646
647
2. **Configure memory appropriately**:
648
```scala
649
spark.conf.set("spark.executor.memory", "4g")
650
spark.conf.set("spark.driver.memory", "2g")
651
```