0
# Writing to Kafka
1
2
The Kafka connector provides comprehensive writing capabilities for both batch and streaming scenarios, with support for topic routing, producer pooling, and flexible data formatting.
3
4
## Capabilities
5
6
### KafkaWriter
7
8
Core writer object providing the main logic for batch and streaming writes to Kafka.
9
10
```scala { .api }
11
/**
12
* Core writer functionality for batch and streaming writes to Kafka
13
* Handles data validation, topic routing, and producer management
14
*/
15
object KafkaWriter {
16
17
/** Column names for Kafka message attributes */
18
val TOPIC_ATTRIBUTE_NAME: String = "topic"
19
val KEY_ATTRIBUTE_NAME: String = "key"
20
val VALUE_ATTRIBUTE_NAME: String = "value"
21
val HEADERS_ATTRIBUTE_NAME: String = "headers"
22
val PARTITION_ATTRIBUTE_NAME: String = "partition"
23
24
/**
25
* Validates query plan and writes data to Kafka
26
* Main entry point for all Kafka write operations
27
*/
28
def write(
29
sparkSession: SparkSession,
30
queryExecution: QueryExecution,
31
kafkaParams: ju.Map[String, Object],
32
topic: Option[String]
33
): Unit
34
35
/** Validates DataFrame schema and configuration */
36
def validateQuery(
37
schema: Seq[Attribute],
38
kafkaParameters: ju.Map[String, Object],
39
topic: Option[String]
40
): Unit
41
42
/** Creates expression for topic routing */
43
def topicExpression(schema: Seq[Attribute], topic: Option[String]): Expression
44
45
/** Creates expression for message key extraction */
46
def keyExpression(schema: Seq[Attribute]): Expression
47
48
/** Creates expression for message value extraction */
49
def valueExpression(schema: Seq[Attribute]): Expression
50
51
/** Creates expression for headers extraction */
52
def headersExpression(schema: Seq[Attribute]): Expression
53
54
/** Creates expression for partition assignment */
55
def partitionExpression(schema: Seq[Attribute]): Expression
56
}
57
```
58
59
**Usage Examples:**
60
61
```scala
62
// Basic write with topic specified in options
63
dataFrame
64
.write
65
.format("kafka")
66
.option("kafka.bootstrap.servers", "localhost:9092")
67
.option("topic", "output-topic")
68
.save()
69
70
// Write with topic column for dynamic routing
71
dataFrame
72
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
73
.write
74
.format("kafka")
75
.option("kafka.bootstrap.servers", "localhost:9092")
76
.save()
77
```
78
79
### KafkaWrite
80
81
V2 DataSource write implementation supporting both batch and streaming modes.
82
83
```scala { .api }
84
/**
85
* V2 DataSource write implementation for Kafka
86
* Supports both batch and streaming write operations
87
*/
88
case class KafkaWrite(
89
topic: Option[String],
90
producerParams: ju.Map[String, Object],
91
schema: StructType
92
) extends Write {
93
94
/** Returns description of the write operation */
95
def description(): String = "KafkaWrite"
96
97
/** Creates batch write implementation */
98
def toBatch: BatchWrite = new KafkaBatchWrite(topic, producerParams, schema)
99
100
/** Creates streaming write implementation */
101
def toStreaming: StreamingWrite = new KafkaStreamingWrite(topic, producerParams, schema)
102
}
103
```
104
105
### KafkaSink
106
107
Legacy V1 DataSource streaming sink implementation.
108
109
```scala { .api }
110
/**
111
* V1 DataSource streaming sink for writing to Kafka
112
* Provides backward compatibility with Structured Streaming V1 API
113
*/
114
class KafkaSink(
115
sqlContext: SQLContext,
116
kafkaParams: ju.Map[String, Object],
117
topic: Option[String]
118
) extends Sink {
119
120
/** Adds a batch of data to Kafka */
121
def addBatch(batchId: Long, data: DataFrame): Unit
122
}
123
```
124
125
## DataFrame Schema Requirements
126
127
### Required Columns
128
129
The DataFrame must contain specific columns for Kafka message construction:
130
131
```scala
132
// Required: value column (message payload)
133
val validDataFrame = spark.createDataFrame(Seq(
134
("Hello World",)
135
)).toDF("value")
136
137
// Write with just value
138
validDataFrame
139
.write
140
.format("kafka")
141
.option("kafka.bootstrap.servers", "localhost:9092")
142
.option("topic", "messages")
143
.save()
144
```
145
146
### Optional Columns
147
148
Additional columns provide more control over message attributes:
149
150
```scala
151
// Full control with all optional columns
152
val fullControlDataFrame = spark.createDataFrame(Seq(
153
("user-events", "user123", """{"event":"login","timestamp":1234567890}""", 0, Array(("correlation-id", "abc123".getBytes), ("source", "web".getBytes))),
154
("user-events", "user456", """{"event":"logout","timestamp":1234567891}""", 1, Array(("correlation-id", "def456".getBytes), ("source", "mobile".getBytes)))
155
)).toDF("topic", "key", "value", "partition", "headers")
156
157
fullControlDataFrame
158
.write
159
.format("kafka")
160
.option("kafka.bootstrap.servers", "localhost:9092")
161
.save() // No topic option needed - using topic column
162
```
163
164
### Column Types and Conversion
165
166
```scala { .api }
167
// Expected column types:
168
// topic: StringType (optional - can use option instead)
169
// key: StringType or BinaryType (optional - null if not provided)
170
// value: StringType or BinaryType (required)
171
// partition: IntegerType (optional - Kafka will assign if not provided)
172
// headers: ArrayType of StructType with "key" (StringType) and "value" (BinaryType) (optional)
173
```
174
175
**Type Conversion Examples:**
176
177
```scala
178
// Convert different types to appropriate Kafka format
179
val typedDataFrame = originalDataFrame
180
.select(
181
col("topic"),
182
col("user_id").cast(StringType).as("key"), // Convert to string key
183
to_json(struct(col("*"))).as("value"), // Convert struct to JSON value
184
(col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition"), // Partition by user_id mod 10
185
array(
186
struct(lit("content-type").as("key"), lit("application/json").cast(BinaryType).as("value")),
187
struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value"))
188
).as("headers")
189
)
190
```
191
192
## Topic Routing
193
194
### Static Topic Assignment
195
196
Specify topic in write options:
197
198
```scala
199
// All records go to the same topic
200
dataFrame
201
.select(col("value"))
202
.write
203
.format("kafka")
204
.option("kafka.bootstrap.servers", "localhost:9092")
205
.option("topic", "static-topic")
206
.save()
207
```
208
209
### Dynamic Topic Routing
210
211
Use topic column for per-record topic routing:
212
213
```scala
214
// Route records to different topics based on data
215
val routedDataFrame = sourceDataFrame
216
.withColumn("topic",
217
when(col("event_type") === "error", "error-topic")
218
.when(col("event_type") === "warning", "warning-topic")
219
.otherwise("info-topic")
220
)
221
.select("topic", "value")
222
223
routedDataFrame
224
.write
225
.format("kafka")
226
.option("kafka.bootstrap.servers", "localhost:9092")
227
.save()
228
```
229
230
### Topic Validation
231
232
```scala
233
// Topic must be specified either in options or as a column
234
dataFrame
235
.write
236
.format("kafka")
237
.option("kafka.bootstrap.servers", "localhost:9092")
238
// Missing topic specification will cause error:
239
// "Topic option or topic column must be specified for Kafka writes"
240
.save()
241
```
242
243
## Producer Configuration
244
245
### Basic Producer Settings
246
247
```scala
248
// Essential producer configuration
249
dataFrame
250
.write
251
.format("kafka")
252
.option("kafka.bootstrap.servers", "localhost:9092")
253
.option("kafka.acks", "all") // Wait for all replicas
254
.option("kafka.retries", "3") // Retry failed sends
255
.option("kafka.batch.size", "16384") // Batch size in bytes
256
.option("kafka.linger.ms", "5") // Wait up to 5ms to batch
257
.option("kafka.buffer.memory", "33554432") // 32MB producer buffer
258
.option("topic", "reliable-topic")
259
.save()
260
```
261
262
### Performance Optimization
263
264
```scala
265
// High-throughput producer configuration
266
dataFrame
267
.write
268
.format("kafka")
269
.option("kafka.bootstrap.servers", "localhost:9092")
270
.option("kafka.acks", "1") // Leader acknowledgment only
271
.option("kafka.compression.type", "snappy") // Enable compression
272
.option("kafka.batch.size", "65536") // Larger batch size
273
.option("kafka.linger.ms", "10") // Higher batching delay
274
.option("kafka.buffer.memory", "67108864") // 64MB producer buffer
275
.option("topic", "high-throughput-topic")
276
.save()
277
```
278
279
### Reliability Configuration
280
281
```scala
282
// Maximum reliability producer configuration
283
dataFrame
284
.write
285
.format("kafka")
286
.option("kafka.bootstrap.servers", "localhost:9092")
287
.option("kafka.acks", "all") // Wait for all in-sync replicas
288
.option("kafka.retries", "10") // More retry attempts
289
.option("kafka.retry.backoff.ms", "1000") // 1 second retry backoff
290
.option("kafka.enable.idempotence", "true") // Exactly-once semantics
291
.option("kafka.max.in.flight.requests.per.connection", "1") // Maintain order
292
.option("topic", "critical-topic")
293
.save()
294
```
295
296
## Streaming Writes
297
298
### Basic Streaming Write
299
300
```scala
301
// Simple streaming write to Kafka
302
val streamingQuery = kafkaInputStream
303
.selectExpr("CAST(value AS STRING)")
304
.writeStream
305
.format("kafka")
306
.option("kafka.bootstrap.servers", "localhost:9092")
307
.option("topic", "output-stream")
308
.outputMode("append")
309
.start()
310
```
311
312
### Advanced Streaming Write
313
314
```scala
315
// Advanced streaming write with processing
316
val advancedStreamingQuery = kafkaInputStream
317
.select(
318
expr("CAST(key AS STRING)").as("key"),
319
from_json(expr("CAST(value AS STRING)"), inputSchema).as("data")
320
)
321
.select(
322
col("key"),
323
col("data.user_id"),
324
col("data.event_type"),
325
to_json(col("data")).as("value")
326
)
327
.withColumn("topic",
328
when(col("event_type") === "purchase", "purchase-events")
329
.otherwise("general-events")
330
)
331
.select("topic", "key", "value")
332
.writeStream
333
.format("kafka")
334
.option("kafka.bootstrap.servers", "localhost:9092")
335
.option("checkpointLocation", "/checkpoints/kafka-output")
336
.outputMode("append")
337
.trigger(Trigger.ProcessingTime("30 seconds"))
338
.start()
339
```
340
341
### Streaming Write with Headers
342
343
```scala
344
// Include headers in streaming write
345
val headerStreamingQuery = kafkaInputStream
346
.select(
347
expr("CAST(key AS STRING)").as("key"),
348
expr("CAST(value AS STRING)").as("value"),
349
array(
350
struct(lit("source").as("key"), lit("spark-streaming").cast(BinaryType).as("value")),
351
struct(lit("version").as("key"), lit("1.0").cast(BinaryType).as("value"))
352
).as("headers")
353
)
354
.writeStream
355
.format("kafka")
356
.option("kafka.bootstrap.servers", "localhost:9092")
357
.option("topic", "enriched-stream")
358
.outputMode("append")
359
.start()
360
```
361
362
## Batch Writes
363
364
### Simple Batch Write
365
366
```scala
367
// Write DataFrame to Kafka in batch mode
368
batchDataFrame
369
.select(
370
col("id").cast(StringType).as("key"),
371
to_json(struct(col("*"))).as("value")
372
)
373
.write
374
.format("kafka")
375
.option("kafka.bootstrap.servers", "localhost:9092")
376
.option("topic", "batch-data")
377
.save()
378
```
379
380
### Partitioned Batch Write
381
382
```scala
383
// Control Kafka partitioning for batch writes
384
batchDataFrame
385
.select(
386
col("user_id").cast(StringType).as("key"),
387
to_json(struct(col("*"))).as("value"),
388
(col("user_id").cast(LongType) % 10).cast(IntegerType).as("partition")
389
)
390
.write
391
.format("kafka")
392
.option("kafka.bootstrap.servers", "localhost:9092")
393
.option("topic", "partitioned-data")
394
.save()
395
```
396
397
### Multi-Topic Batch Write
398
399
```scala
400
// Write to multiple topics in a single operation
401
multiTopicDataFrame
402
.withColumn("topic",
403
when(col("category") === "orders", "order-events")
404
.when(col("category") === "payments", "payment-events")
405
.when(col("category") === "inventory", "inventory-events")
406
.otherwise("misc-events")
407
)
408
.select("topic", "key", "value")
409
.write
410
.format("kafka")
411
.option("kafka.bootstrap.servers", "localhost:9092")
412
.save()
413
```
414
415
## Error Handling and Validation
416
417
### Schema Validation
418
419
The writer performs comprehensive schema validation:
420
421
```scala
422
// Invalid schema - missing value column
423
invalidDataFrame
424
.select("key", "topic") // Missing required "value" column
425
.write
426
.format("kafka")
427
.option("kafka.bootstrap.servers", "localhost:9092")
428
.option("topic", "test")
429
.save()
430
// Throws: "Required attribute 'value' not found"
431
```
432
433
### Producer Parameter Validation
434
435
```scala
436
// Invalid producer configuration
437
dataFrame
438
.write
439
.format("kafka")
440
.option("kafka.bootstrap.servers", "localhost:9092")
441
.option("kafka.key.serializer", "custom.serializer") // Not allowed
442
.option("topic", "test")
443
.save()
444
// Throws: "Kafka option 'key.serializer' is not supported as keys are serialized with ByteArraySerializer"
445
```
446
447
### Save Mode Validation
448
449
```scala
450
// Invalid save modes for Kafka
451
dataFrame
452
.write
453
.mode(SaveMode.Overwrite) // Not supported
454
.format("kafka")
455
.option("kafka.bootstrap.servers", "localhost:9092")
456
.option("topic", "test")
457
.save()
458
// Throws: "Save mode Overwrite not allowed for Kafka"
459
460
// Supported save modes:
461
dataFrame.write.mode(SaveMode.Append).format("kafka")... // Allowed (default)
462
dataFrame.write.mode(SaveMode.ErrorIfExists).format("kafka")... // Allowed
463
```
464
465
## Producer Pooling
466
467
The connector automatically manages producer instances for efficiency:
468
469
### Producer Cache Configuration
470
471
```scala
472
// Configure producer cache behavior (internal settings)
473
// These are managed automatically but can be tuned via Spark configuration:
474
475
// spark.kafka.producer.cache.timeout = "10m" // Producer cache timeout
476
// spark.kafka.producer.cache.evictorThreadRunInterval = "1m" // Cache cleanup interval
477
```
478
479
### Producer Pool Management
480
481
```scala { .api }
482
/**
483
* Internal producer pool management
484
* Automatically handles producer lifecycle and connection pooling
485
*/
486
object InternalKafkaProducerPool {
487
/** Acquires a cached producer for the given configuration */
488
def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer
489
490
/** Releases a producer back to the pool */
491
def release(producer: CachedKafkaProducer): Unit
492
}
493
```
494
495
## Performance Optimization
496
497
### Batch Size Tuning
498
499
```scala
500
// Optimize for high throughput
501
highVolumeDataFrame
502
.coalesce(10) // Reduce number of partitions for larger batches
503
.write
504
.format("kafka")
505
.option("kafka.bootstrap.servers", "localhost:9092")
506
.option("kafka.batch.size", "131072") // 128KB batches
507
.option("kafka.linger.ms", "20") // Wait longer to fill batches
508
.option("kafka.compression.type", "lz4") // Use fast compression
509
.option("topic", "high-volume-topic")
510
.save()
511
```
512
513
### Memory Management
514
515
```scala
516
// Optimize memory usage for large writes
517
largeDataFrame
518
.repartition(50) // Increase parallelism to reduce memory per partition
519
.write
520
.format("kafka")
521
.option("kafka.bootstrap.servers", "localhost:9092")
522
.option("kafka.buffer.memory", "134217728") // 128MB producer buffer
523
.option("kafka.max.request.size", "10485760") // 10MB max request size
524
.option("topic", "large-data-topic")
525
.save()
526
```
527
528
### Concurrent Writes
529
530
```scala
531
// Maximize concurrent producer connections
532
dataFrame
533
.repartition(100) // More partitions = more concurrent producers
534
.write
535
.format("kafka")
536
.option("kafka.bootstrap.servers", "localhost:9092")
537
.option("kafka.max.in.flight.requests.per.connection", "5") // More concurrent requests
538
.option("topic", "concurrent-topic")
539
.save()
540
```