0
# Configuration
1
2
Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, performance settings, and security configurations.
3
4
## Capabilities
5
6
### Source Configuration Options
7
8
Configuration options for reading data from Kafka in both streaming and batch modes.
9
10
```scala { .api }
11
// Connection Configuration
12
"kafka.bootstrap.servers" -> "localhost:9092" // Required: Kafka broker addresses
13
"subscribe" -> "topic1,topic2,topic3" // Topic subscription (comma-separated)
14
"subscribePattern" -> "events_.*" // Topic pattern subscription (regex)
15
"assign" -> """{"topic1":[0,1],"topic2":[0]}""" // Manual partition assignment (JSON)
16
17
// Offset Management
18
"startingOffsets" -> "earliest" // Starting position: "earliest", "latest", or JSON
19
"endingOffsets" -> "latest" // Ending position: "latest" or JSON (batch only)
20
"failOnDataLoss" -> "true" // Fail query on data loss (default: true)
21
22
// Performance Tuning
23
"minPartitions" -> "10" // Minimum Spark partitions
24
"maxOffsetsPerTrigger" -> "1000000" // Rate limiting for streaming
25
"kafkaConsumer.pollTimeoutMs" -> "10000" // Consumer poll timeout
26
```
27
28
### Sink Configuration Options
29
30
Configuration options for writing data to Kafka topics.
31
32
```scala { .api }
33
// Connection Configuration
34
"kafka.bootstrap.servers" -> "localhost:9092" // Required: Kafka broker addresses
35
"topic" -> "output-topic" // Default output topic
36
37
// Producer Performance
38
"kafka.acks" -> "all" // Acknowledgment level: "0", "1", "all"
39
"kafka.retries" -> "3" // Number of retries
40
"kafka.batch.size" -> "16384" // Batch size in bytes
41
"kafka.linger.ms" -> "5" // Batching delay in milliseconds
42
"kafka.buffer.memory" -> "33554432" // Total memory for buffering
43
"kafka.compression.type" -> "snappy" // Compression: "none", "gzip", "snappy", "lz4", "zstd"
44
45
// Reliability
46
"kafka.enable.idempotence" -> "true" // Enable idempotent producer
47
"kafka.max.in.flight.requests.per.connection" -> "5" // Max unacknowledged requests
48
"kafka.request.timeout.ms" -> "30000" // Request timeout
49
```
50
51
### Kafka Consumer Parameters
52
53
Complete set of Kafka consumer configuration parameters supported through the `kafka.` prefix.
54
55
```scala { .api }
56
// Core Consumer Settings
57
"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"
58
"kafka.client.id" -> "spark-kafka-consumer"
59
"kafka.session.timeout.ms" -> "30000" // Consumer session timeout
60
"kafka.heartbeat.interval.ms" -> "3000" // Heartbeat interval
61
"kafka.max.poll.records" -> "500" // Records per poll
62
"kafka.max.poll.interval.ms" -> "300000" // Max time between polls
63
64
// Fetch Configuration
65
"kafka.fetch.min.bytes" -> "1" // Minimum bytes to fetch
66
"kafka.fetch.max.wait.ms" -> "500" // Max wait for min bytes
67
"kafka.fetch.max.bytes" -> "52428800" // Maximum bytes per fetch (50MB)
68
"kafka.max.partition.fetch.bytes" -> "1048576" // Maximum bytes per partition (1MB)
69
70
// Network Configuration
71
"kafka.receive.buffer.bytes" -> "65536" // Receive buffer size (64KB)
72
"kafka.send.buffer.bytes" -> "131072" // Send buffer size (128KB)
73
"kafka.request.timeout.ms" -> "30000" // Request timeout
74
"kafka.reconnect.backoff.ms" -> "50" // Reconnect backoff
75
"kafka.reconnect.backoff.max.ms" -> "1000" // Max reconnect backoff
76
"kafka.retry.backoff.ms" -> "100" // Retry backoff
77
78
// Connection Management
79
"kafka.connections.max.idle.ms" -> "540000" // Connection idle timeout (9 minutes)
80
"kafka.metadata.max.age.ms" -> "300000" // Metadata refresh interval (5 minutes)
81
```
82
83
### Kafka Producer Parameters
84
85
Complete set of Kafka producer configuration parameters for writing data.
86
87
```scala { .api }
88
// Core Producer Settings
89
"kafka.bootstrap.servers" -> "broker1:9092,broker2:9092"
90
"kafka.client.id" -> "spark-kafka-producer"
91
"kafka.acks" -> "all" // "0", "1", or "all"
92
"kafka.retries" -> "2147483647" // Max retries (Integer.MAX_VALUE)
93
"kafka.retry.backoff.ms" -> "100" // Retry backoff
94
95
// Batching and Performance
96
"kafka.batch.size" -> "16384" // Batch size (16KB)
97
"kafka.linger.ms" -> "0" // Batching delay
98
"kafka.buffer.memory" -> "33554432" // Buffer memory (32MB)
99
"kafka.compression.type" -> "none" // Compression type
100
"kafka.max.request.size" -> "1048576" // Max request size (1MB)
101
102
// Idempotence and Ordering
103
"kafka.enable.idempotence" -> "false" // Idempotent producer
104
"kafka.max.in.flight.requests.per.connection" -> "5" // Max unacknowledged requests
105
106
// Timing Configuration
107
"kafka.request.timeout.ms" -> "30000" // Request timeout
108
"kafka.delivery.timeout.ms" -> "120000" // Delivery timeout (2 minutes)
109
"kafka.send.buffer.bytes" -> "131072" // Send buffer (128KB)
110
"kafka.receive.buffer.bytes" -> "32768" // Receive buffer (32KB)
111
112
// Metadata and Connections
113
"kafka.metadata.max.age.ms" -> "300000" // Metadata refresh (5 minutes)
114
"kafka.connections.max.idle.ms" -> "540000" // Connection idle timeout (9 minutes)
115
"kafka.reconnect.backoff.ms" -> "50" // Reconnect backoff
116
"kafka.reconnect.backoff.max.ms" -> "1000" // Max reconnect backoff
117
```
118
119
### Security Configuration
120
121
Security configuration for SSL and SASL authentication.
122
123
```scala { .api }
124
// SSL Configuration
125
"kafka.security.protocol" -> "SSL" // Security protocol
126
"kafka.ssl.protocol" -> "TLSv1.2" // SSL protocol version
127
"kafka.ssl.truststore.location" -> "/path/to/truststore.jks"
128
"kafka.ssl.truststore.password" -> "truststore-password"
129
"kafka.ssl.truststore.type" -> "JKS" // Truststore type
130
"kafka.ssl.keystore.location" -> "/path/to/keystore.jks"
131
"kafka.ssl.keystore.password" -> "keystore-password"
132
"kafka.ssl.keystore.type" -> "JKS" // Keystore type
133
"kafka.ssl.key.password" -> "key-password" // Key password
134
135
// SSL Verification
136
"kafka.ssl.endpoint.identification.algorithm" -> "https" // Hostname verification
137
"kafka.ssl.check.hostname" -> "true" // Check hostname
138
139
// SASL Configuration
140
"kafka.security.protocol" -> "SASL_SSL" // SASL with SSL
141
"kafka.sasl.mechanism" -> "PLAIN" // SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
142
"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";"
143
144
// SASL SCRAM Configuration
145
"kafka.sasl.mechanism" -> "SCRAM-SHA-256"
146
"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";"
147
148
// Kerberos (GSSAPI) Configuration
149
"kafka.sasl.mechanism" -> "GSSAPI"
150
"kafka.sasl.kerberos.service.name" -> "kafka"
151
"kafka.sasl.jaas.config" -> "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab=\"/path/to/kafka.keytab\" storeKey=true useTicketCache=false principal=\"kafka/hostname@REALM\";"
152
```
153
154
## Configuration Examples
155
156
### Basic Streaming Configuration
157
158
```scala
159
val streamingDF = spark
160
.readStream
161
.format("kafka")
162
.option("kafka.bootstrap.servers", "localhost:9092")
163
.option("subscribe", "input-topic")
164
.option("startingOffsets", "latest")
165
.option("failOnDataLoss", "false")
166
.load()
167
168
val query = streamingDF
169
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
170
.writeStream
171
.format("kafka")
172
.option("kafka.bootstrap.servers", "localhost:9092")
173
.option("topic", "output-topic")
174
.option("checkpointLocation", "/tmp/checkpoint")
175
.start()
176
```
177
178
### High Performance Configuration
179
180
```scala
181
val highPerfDF = spark
182
.readStream
183
.format("kafka")
184
// Connection settings
185
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
186
.option("subscribe", "high-volume-topic")
187
188
// Performance tuning
189
.option("minPartitions", "50") // Increase parallelism
190
.option("maxOffsetsPerTrigger", "5000000") // 5M records per batch
191
.option("kafkaConsumer.pollTimeoutMs", "5000") // 5 second timeout
192
193
// Consumer optimization
194
.option("kafka.fetch.min.bytes", "1024") // 1KB minimum fetch
195
.option("kafka.fetch.max.wait.ms", "100") // Fast fetching
196
.option("kafka.max.poll.records", "1000") // More records per poll
197
.option("kafka.receive.buffer.bytes", "262144") // 256KB buffer
198
.option("kafka.fetch.max.bytes", "104857600") // 100MB max fetch
199
200
.load()
201
202
val writeQuery = processedDF
203
.writeStream
204
.format("kafka")
205
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
206
.option("topic", "processed-topic")
207
208
// Producer optimization
209
.option("kafka.batch.size", "65536") // 64KB batches
210
.option("kafka.linger.ms", "10") // 10ms batching delay
211
.option("kafka.compression.type", "snappy") // Compression
212
.option("kafka.buffer.memory", "134217728") // 128MB buffer
213
.option("kafka.acks", "1") // Balanced reliability
214
215
.start()
216
```
217
218
### Secure Configuration
219
220
```scala
221
val secureDF = spark
222
.readStream
223
.format("kafka")
224
// Basic connection
225
.option("kafka.bootstrap.servers", "secure-broker1:9093,secure-broker2:9093")
226
.option("subscribe", "secure-topic")
227
228
// SSL Configuration
229
.option("kafka.security.protocol", "SSL")
230
.option("kafka.ssl.truststore.location", "/etc/kafka/truststore.jks")
231
.option("kafka.ssl.truststore.password", "truststore-password")
232
.option("kafka.ssl.keystore.location", "/etc/kafka/keystore.jks")
233
.option("kafka.ssl.keystore.password", "keystore-password")
234
.option("kafka.ssl.key.password", "key-password")
235
.option("kafka.ssl.endpoint.identification.algorithm", "https")
236
237
.load()
238
239
// SASL/SCRAM configuration
240
val saslDF = spark
241
.readStream
242
.format("kafka")
243
.option("kafka.bootstrap.servers", "sasl-broker:9094")
244
.option("subscribe", "authenticated-topic")
245
.option("kafka.security.protocol", "SASL_SSL")
246
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
247
.option("kafka.sasl.jaas.config",
248
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
249
"username=\"myuser\" password=\"mypassword\";")
250
.load()
251
```
252
253
### Batch Processing Configuration
254
255
```scala
256
val batchDF = spark
257
.read
258
.format("kafka")
259
.option("kafka.bootstrap.servers", "localhost:9092")
260
.option("subscribe", "historical-data")
261
262
// Batch-specific offset configuration
263
.option("startingOffsets", "earliest")
264
.option("endingOffsets", """{"historical-data":{"0":1000000,"1":1500000}}""")
265
266
// Optimize for large batch reads
267
.option("minPartitions", "20")
268
.option("kafka.fetch.max.bytes", "104857600") // 100MB
269
.option("kafka.max.partition.fetch.bytes", "10485760") // 10MB per partition
270
.option("kafka.receive.buffer.bytes", "1048576") // 1MB buffer
271
272
.load()
273
```
274
275
### Multi-Environment Configuration
276
277
```scala
278
import scala.util.Properties
279
280
val environment = Properties.envOrElse("ENVIRONMENT", "dev")
281
282
val envConfig = environment match {
283
case "prod" => Map(
284
"kafka.bootstrap.servers" -> "prod-broker1:9092,prod-broker2:9092,prod-broker3:9092",
285
"kafka.security.protocol" -> "SASL_SSL",
286
"kafka.acks" -> "all",
287
"kafka.retries" -> "10",
288
"failOnDataLoss" -> "true"
289
)
290
291
case "staging" => Map(
292
"kafka.bootstrap.servers" -> "staging-broker:9092",
293
"kafka.acks" -> "1",
294
"failOnDataLoss" -> "true"
295
)
296
297
case _ => Map(
298
"kafka.bootstrap.servers" -> "localhost:9092",
299
"kafka.acks" -> "1",
300
"failOnDataLoss" -> "false"
301
)
302
}
303
304
val configuredDF = spark
305
.readStream
306
.format("kafka")
307
.options(envConfig)
308
.option("subscribe", s"${environment}-events")
309
.load()
310
```
311
312
## Configuration Validation
313
314
### Unsupported Options
315
316
Certain Kafka parameters are managed internally and cannot be overridden:
317
318
```scala { .api }
319
// Automatically managed by Spark (will throw IllegalArgumentException if specified)
320
"kafka.group.id" // Unique group IDs generated per query
321
"kafka.auto.offset.reset" // Controlled via startingOffsets option
322
"kafka.key.deserializer" // Fixed to ByteArrayDeserializer
323
"kafka.value.deserializer" // Fixed to ByteArrayDeserializer
324
"kafka.enable.auto.commit" // Disabled for offset management
325
"kafka.interceptor.classes" // Not supported for safety
326
327
// Producer-specific unsupported options
328
"kafka.key.serializer" // Fixed to ByteArraySerializer
329
"kafka.value.serializer" // Fixed to ByteArraySerializer
330
```
331
332
### Required Options
333
334
```scala { .api }
335
// Required for all operations
336
"kafka.bootstrap.servers" // Must be specified
337
338
// Required for sources (exactly one must be specified)
339
"subscribe" // Topic list
340
"subscribePattern" // Topic pattern
341
"assign" // Partition assignment
342
343
// Schema validation occurs at runtime
344
// DataFrame must have required columns for sinks: "value" (and optionally "key", "topic")
345
```
346
347
### Option Validation Examples
348
349
```scala
350
// Valid consumer strategy
351
spark.readStream.format("kafka")
352
.option("kafka.bootstrap.servers", "localhost:9092")
353
.option("subscribe", "topic1,topic2") // Valid
354
.load()
355
356
// Invalid: multiple strategies
357
spark.readStream.format("kafka")
358
.option("kafka.bootstrap.servers", "localhost:9092")
359
.option("subscribe", "topic1") // Both subscribe
360
.option("subscribePattern", "topic.*") // and pattern specified
361
.load() // Throws IllegalArgumentException
362
363
// Invalid: unsupported parameter
364
spark.readStream.format("kafka")
365
.option("kafka.bootstrap.servers", "localhost:9092")
366
.option("subscribe", "topic1")
367
.option("kafka.group.id", "my-group") // Unsupported
368
.load() // Throws IllegalArgumentException
369
```
370
371
## Performance Tuning Guidelines
372
373
### Consumer Performance
374
375
```scala
376
// Optimize fetch behavior
377
.option("kafka.fetch.min.bytes", "1024") // Wait for 1KB
378
.option("kafka.fetch.max.wait.ms", "500") // Max wait 500ms
379
.option("kafka.max.poll.records", "500") // Records per poll
380
381
// Buffer optimization
382
.option("kafka.receive.buffer.bytes", "65536") // 64KB receive buffer
383
.option("kafka.fetch.max.bytes", "52428800") // 50MB max fetch
384
385
// Partition parallelism
386
.option("minPartitions", "20") // Increase Spark partitions
387
```
388
389
### Producer Performance
390
391
```scala
392
// Batching optimization
393
.option("kafka.batch.size", "32768") // 32KB batches
394
.option("kafka.linger.ms", "10") // 10ms linger time
395
.option("kafka.compression.type", "snappy") // Enable compression
396
397
// Memory and throughput
398
.option("kafka.buffer.memory", "67108864") // 64MB buffer
399
.option("kafka.max.in.flight.requests.per.connection", "5")
400
```
401
402
### Memory Configuration
403
404
```scala
405
// Spark configuration for Kafka workloads
406
spark.conf.set("spark.executor.memory", "8g")
407
spark.conf.set("spark.executor.cores", "4")
408
spark.conf.set("spark.driver.memory", "4g")
409
spark.conf.set("spark.sql.shuffle.partitions", "200")
410
411
// JVM tuning for Kafka clients
412
spark.conf.set("spark.executor.extraJavaOptions",
413
"-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:+UnlockExperimentalVMOptions")
414
```
415
416
## Monitoring Configuration
417
418
### Metrics Collection
419
420
```scala
421
// Enable JMX metrics
422
.option("kafka.metric.reporters", "org.apache.kafka.common.metrics.JmxReporter")
423
424
// Custom metrics reporting interval
425
.option("kafka.metrics.sample.window.ms", "30000") // 30 second window
426
.option("kafka.metrics.num.samples", "2") // 2 samples per window
427
```
428
429
### Logging Configuration
430
431
```scala
432
// Enable detailed logging for troubleshooting
433
import org.apache.log4j.{Level, Logger}
434
435
Logger.getLogger("org.apache.spark.sql.kafka010").setLevel(Level.DEBUG)
436
Logger.getLogger("org.apache.kafka").setLevel(Level.INFO)
437
```
438
439
## Troubleshooting Common Issues
440
441
### Connection Issues
442
443
```scala
444
// Increase timeouts for unreliable networks
445
.option("kafka.request.timeout.ms", "60000") // 60 second timeout
446
.option("kafka.reconnect.backoff.ms", "1000") // 1 second backoff
447
.option("kafka.retry.backoff.ms", "1000") // 1 second retry backoff
448
```
449
450
### Offset Management Issues
451
452
```scala
453
// Handle offset out of range errors
454
.option("failOnDataLoss", "false") // Continue on data loss
455
.option("startingOffsets", "latest") // Start from latest on errors
456
```
457
458
### Performance Issues
459
460
```scala
461
// Diagnose slow consumption
462
.option("kafkaConsumer.pollTimeoutMs", "30000") // Longer poll timeout
463
.option("kafka.session.timeout.ms", "60000") // Longer session timeout
464
.option("kafka.max.poll.interval.ms", "600000") // 10 minute max poll interval
465
```
466
467
### Memory Issues
468
469
```scala
470
// Reduce memory usage
471
.option("kafka.fetch.max.bytes", "10485760") // Reduce to 10MB
472
.option("kafka.max.partition.fetch.bytes", "1048576") // 1MB per partition
473
.option("maxOffsetsPerTrigger", "100000") // Limit batch size
474
```