0
# Configuration Options
1
2
The Spark Kafka connector provides comprehensive configuration options for connection management, performance tuning, security, and reliability. All Kafka client configurations are supported with the `kafka.` prefix.
3
4
## Capabilities
5
6
### Connection Configuration
7
8
Essential connection settings for accessing Kafka clusters.
9
10
```scala { .api }
11
/**
12
* Required connection configuration
13
*/
14
.option("kafka.bootstrap.servers", servers: String) // Required: Comma-separated list of Kafka brokers
15
16
/**
17
* Optional connection settings
18
*/
19
.option("kafka.client.id", clientId: String) // Client identifier for broker logs
20
.option("kafka.request.timeout.ms", timeout: String) // Request timeout in milliseconds
21
.option("kafka.connections.max.idle.ms", idle: String) // Max idle time for connections
22
```
23
24
**Usage Examples:**
25
26
```scala
27
// Basic connection
28
val basicConfig = spark.readStream
29
.format("kafka")
30
.option("kafka.bootstrap.servers", "localhost:9092")
31
.option("subscribe", "events")
32
33
// High availability setup
34
val haConfig = spark.readStream
35
.format("kafka")
36
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")
37
.option("kafka.client.id", "spark-consumer-app")
38
.option("kafka.request.timeout.ms", "60000") // 60 second timeout
39
.option("kafka.connections.max.idle.ms", "300000") // 5 minute idle timeout
40
.option("subscribe", "critical-events")
41
```
42
43
### Topic Selection Configuration
44
45
Configure how topics are selected and subscribed to.
46
47
```scala { .api }
48
/**
49
* Topic selection options (exactly one required)
50
*/
51
.option("subscribe", topics: String) // Comma-separated topic names
52
.option("subscribepattern", pattern: String) // Regex pattern for topic names
53
.option("assign", partitions: String) // JSON specification of TopicPartitions
54
55
/**
56
* Topic-related settings
57
*/
58
.option("topic", topicName: String) // Default topic for writes (optional)
59
```
60
61
**Usage Examples:**
62
63
```scala
64
// Subscribe to specific topics
65
val topicSubscription = spark.readStream
66
.format("kafka")
67
.option("kafka.bootstrap.servers", "localhost:9092")
68
.option("subscribe", "events,logs,metrics")
69
70
// Pattern-based subscription
71
val patternSubscription = spark.readStream
72
.format("kafka")
73
.option("kafka.bootstrap.servers", "localhost:9092")
74
.option("subscribepattern", "prod-.*-events")
75
76
// Specific partition assignment
77
val partitionAssignment = spark.readStream
78
.format("kafka")
79
.option("kafka.bootstrap.servers", "localhost:9092")
80
.option("assign", """{"events":[0,1,2],"logs":[0,1]}""")
81
```
82
83
### Offset Management Configuration
84
85
Control how offsets are managed and tracked for reading operations.
86
87
```scala { .api }
88
/**
89
* Offset specification for reads
90
*/
91
.option("startingOffsets", offsets: String) // Starting position: "earliest", "latest", or JSON
92
.option("endingOffsets", offsets: String) // Ending position: "earliest", "latest", or JSON (batch only)
93
94
/**
95
* Timestamp-based offset resolution
96
*/
97
.option("startingTimestamp", timestamp: String) // Global timestamp (ms since epoch)
98
.option("endingTimestamp", timestamp: String) // Global timestamp (ms since epoch, batch only)
99
.option("startingOffsetsByTimestamp", timestamps: String) // Per-partition timestamps (JSON)
100
.option("endingOffsetsByTimestamp", timestamps: String) // Per-partition timestamps (JSON, batch only)
101
102
/**
103
* Offset resolution strategy
104
*/
105
.option("startingOffsetsByTimestampStrategy", strategy: String) // "ERROR" or "LATEST"
106
```
107
108
**Usage Examples:**
109
110
```scala
111
// Start from earliest available
112
val earliestConfig = spark.readStream
113
.format("kafka")
114
.option("kafka.bootstrap.servers", "localhost:9092")
115
.option("subscribe", "events")
116
.option("startingOffsets", "earliest")
117
118
// Specific offsets per partition
119
val specificOffsets = spark.read
120
.format("kafka")
121
.option("kafka.bootstrap.servers", "localhost:9092")
122
.option("subscribe", "logs")
123
.option("startingOffsets", """{"logs":{"0":1000,"1":2000,"2":1500}}""")
124
.option("endingOffsets", """{"logs":{"0":5000,"1":6000,"2":4500}}""")
125
126
// Timestamp-based reading
127
val timestampConfig = spark.read
128
.format("kafka")
129
.option("kafka.bootstrap.servers", "localhost:9092")
130
.option("subscribe", "events")
131
.option("startingTimestamp", "1640995200000") // Jan 1, 2022 UTC
132
.option("endingTimestamp", "1641081600000") // Jan 2, 2022 UTC
133
```
134
135
### Performance Configuration
136
137
Tune performance characteristics for streaming and batch operations.
138
139
```scala { .api }
140
/**
141
* Streaming performance options
142
*/
143
.option("maxOffsetsPerTrigger", maxRecords: String) // Maximum records per micro-batch
144
.option("minOffsetsPerTrigger", minRecords: String) // Minimum records before triggering
145
.option("maxTriggerDelay", delay: String) // Maximum delay before triggering (e.g., "30s")
146
147
/**
148
* Batch performance options
149
*/
150
.option("minPartitions", partitions: String) // Minimum Spark partitions for batch reads
151
152
/**
153
* Offset fetching configuration
154
*/
155
.option("fetchOffset.numRetries", retries: String) // Number of retries for offset fetching
156
.option("fetchOffset.retryIntervalMs", interval: String) // Retry interval in milliseconds
157
```
158
159
**Usage Examples:**
160
161
```scala
162
// High-throughput streaming configuration
163
val highThroughput = spark.readStream
164
.format("kafka")
165
.option("kafka.bootstrap.servers", "localhost:9092")
166
.option("subscribe", "high-volume-topic")
167
.option("maxOffsetsPerTrigger", "100000") // Max 100K records per batch
168
.option("minOffsetsPerTrigger", "10000") // Min 10K records before processing
169
.option("maxTriggerDelay", "60s") // Process every 60s regardless
170
171
// Batch optimization
172
val batchOptimized = spark.read
173
.format("kafka")
174
.option("kafka.bootstrap.servers", "localhost:9092")
175
.option("subscribe", "large-topic")
176
.option("startingOffsets", "earliest")
177
.option("endingOffsets", "latest")
178
.option("minPartitions", "200") // Force higher parallelism
179
.option("fetchOffset.numRetries", "10") // More retries for reliability
180
.option("fetchOffset.retryIntervalMs", "500") // Wait 500ms between retries
181
```
182
183
### Reliability Configuration
184
185
Configure fault tolerance, error handling, and data consistency.
186
187
```scala { .api }
188
/**
189
* Reliability and fault tolerance options
190
*/
191
.option("failOnDataLoss", failBehavior: String) // "true" or "false"
192
.option("groupIdPrefix", prefix: String) // Consumer group ID prefix
193
.option("includeHeaders", includeHeaders: String) // "true" or "false"
194
195
/**
196
* Consumer polling configuration
197
*/
198
.option("kafkaConsumer.pollTimeoutMs", timeout: String) // Consumer poll timeout in milliseconds
199
```
200
201
**Usage Examples:**
202
203
```scala
204
// Strict reliability mode
205
val strictMode = spark.readStream
206
.format("kafka")
207
.option("kafka.bootstrap.servers", "localhost:9092")
208
.option("subscribe", "critical-events")
209
.option("failOnDataLoss", "true") // Fail query on any data loss
210
.option("groupIdPrefix", "critical-app") // Custom consumer group prefix
211
.option("includeHeaders", "true") // Include message headers
212
213
// Permissive mode for non-critical data
214
val permissiveMode = spark.readStream
215
.format("kafka")
216
.option("kafka.bootstrap.servers", "localhost:9092")
217
.option("subscribe", "logs")
218
.option("failOnDataLoss", "false") // Continue despite data loss
219
.option("kafkaConsumer.pollTimeoutMs", "30000") // 30 second poll timeout
220
```
221
222
### Security Configuration
223
224
Configure authentication, encryption, and access control.
225
226
```scala { .api }
227
/**
228
* Security protocol configuration
229
*/
230
.option("kafka.security.protocol", protocol: String) // "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"
231
232
/**
233
* SASL Authentication
234
*/
235
.option("kafka.sasl.mechanism", mechanism: String) // "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI"
236
.option("kafka.sasl.jaas.config", jaasConfig: String) // JAAS configuration string
237
238
/**
239
* SSL Configuration
240
*/
241
.option("kafka.ssl.truststore.location", path: String) // Truststore file path
242
.option("kafka.ssl.truststore.password", password: String) // Truststore password
243
.option("kafka.ssl.keystore.location", path: String) // Keystore file path (for client auth)
244
.option("kafka.ssl.keystore.password", password: String) // Keystore password
245
.option("kafka.ssl.key.password", password: String) // Key password
246
```
247
248
**Usage Examples:**
249
250
```scala
251
// SASL/PLAIN authentication
252
val saslConfig = spark.readStream
253
.format("kafka")
254
.option("kafka.bootstrap.servers", "secure-kafka:9093")
255
.option("kafka.security.protocol", "SASL_PLAINTEXT")
256
.option("kafka.sasl.mechanism", "PLAIN")
257
.option("kafka.sasl.jaas.config",
258
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
259
"username='consumer' password='consumer-secret';")
260
.option("subscribe", "secure-topic")
261
262
// SSL with mutual authentication
263
val sslConfig = spark.readStream
264
.format("kafka")
265
.option("kafka.bootstrap.servers", "ssl-kafka:9094")
266
.option("kafka.security.protocol", "SSL")
267
.option("kafka.ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
268
.option("kafka.ssl.truststore.password", "truststore-password")
269
.option("kafka.ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
270
.option("kafka.ssl.keystore.password", "keystore-password")
271
.option("kafka.ssl.key.password", "key-password")
272
.option("subscribe", "ssl-topic")
273
274
// SASL/SSL combination
275
val saslSslConfig = spark.readStream
276
.format("kafka")
277
.option("kafka.bootstrap.servers", "secure-kafka:9095")
278
.option("kafka.security.protocol", "SASL_SSL")
279
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
280
.option("kafka.sasl.jaas.config",
281
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
282
"username='app-user' password='app-password';")
283
.option("kafka.ssl.truststore.location", "/path/to/truststore.jks")
284
.option("kafka.ssl.truststore.password", "truststore-password")
285
.option("subscribe", "encrypted-topic")
286
```
287
288
### Producer Configuration (Write Operations)
289
290
Configure Kafka producer settings for write operations.
291
292
```scala { .api }
293
/**
294
* Producer reliability configuration
295
*/
296
.option("kafka.acks", acks: String) // "0", "1", or "all"
297
.option("kafka.retries", retries: String) // Number of retry attempts
298
.option("kafka.enable.idempotence", idempotent: String) // "true" or "false"
299
300
/**
301
* Producer performance configuration
302
*/
303
.option("kafka.batch.size", batchSize: String) // Batch size in bytes
304
.option("kafka.linger.ms", lingerMs: String) // Batching delay in milliseconds
305
.option("kafka.buffer.memory", bufferMemory: String) // Total memory for buffering
306
.option("kafka.compression.type", compression: String) // "none", "gzip", "snappy", "lz4", "zstd"
307
308
/**
309
* Producer connection configuration
310
*/
311
.option("kafka.max.in.flight.requests.per.connection", maxInflight: String) // Max unacked requests
312
.option("kafka.request.timeout.ms", timeout: String) // Request timeout
313
.option("kafka.delivery.timeout.ms", timeout: String) // Total delivery timeout
314
```
315
316
**Usage Examples:**
317
318
```scala
319
// High-reliability producer configuration
320
val reliableProducer = dataFrame.write
321
.format("kafka")
322
.option("kafka.bootstrap.servers", "localhost:9092")
323
.option("topic", "critical-events")
324
.option("kafka.acks", "all") // Wait for all replicas
325
.option("kafka.retries", "10") // Retry up to 10 times
326
.option("kafka.enable.idempotence", "true") // Prevent duplicates
327
.option("kafka.max.in.flight.requests.per.connection", "1") // Maintain ordering
328
329
// High-throughput producer configuration
330
val highThroughputProducer = dataFrame.write
331
.format("kafka")
332
.option("kafka.bootstrap.servers", "localhost:9092")
333
.option("topic", "high-volume-events")
334
.option("kafka.acks", "1") // Faster acknowledgment
335
.option("kafka.batch.size", "131072") // 128KB batches
336
.option("kafka.linger.ms", "100") // 100ms batching delay
337
.option("kafka.compression.type", "lz4") // Fast compression
338
.option("kafka.buffer.memory", "134217728") // 128MB buffer
339
340
// Balanced configuration
341
val balancedProducer = dataFrame.write
342
.format("kafka")
343
.option("kafka.bootstrap.servers", "localhost:9092")
344
.option("topic", "standard-events")
345
.option("kafka.acks", "1") // Leader acknowledgment
346
.option("kafka.retries", "3") // Moderate retry count
347
.option("kafka.batch.size", "16384") // 16KB batches
348
.option("kafka.linger.ms", "10") // 10ms batching delay
349
.option("kafka.compression.type", "snappy") // Good compression ratio
350
```
351
352
### Consumer Configuration (Read Operations)
353
354
Configure Kafka consumer settings for read operations (advanced use cases).
355
356
```scala { .api }
357
/**
358
* Consumer session and heartbeat configuration
359
*/
360
.option("kafka.session.timeout.ms", timeout: String) // Session timeout
361
.option("kafka.heartbeat.interval.ms", interval: String) // Heartbeat interval
362
.option("kafka.max.poll.interval.ms", interval: String) // Max time between polls
363
364
/**
365
* Consumer fetch configuration
366
*/
367
.option("kafka.fetch.min.bytes", minBytes: String) // Minimum fetch size
368
.option("kafka.fetch.max.wait.ms", waitMs: String) // Maximum fetch wait time
369
.option("kafka.max.partition.fetch.bytes", maxBytes: String) // Max bytes per partition
370
```
371
372
**Usage Examples:**
373
374
```scala
375
// Low-latency consumer configuration
376
val lowLatencyConsumer = spark.readStream
377
.format("kafka")
378
.option("kafka.bootstrap.servers", "localhost:9092")
379
.option("subscribe", "real-time-events")
380
.option("kafka.fetch.min.bytes", "1") // Fetch immediately
381
.option("kafka.fetch.max.wait.ms", "100") // Max 100ms wait
382
.option("kafka.session.timeout.ms", "10000") // 10s session timeout
383
.option("kafka.heartbeat.interval.ms", "3000") // 3s heartbeat
384
385
// High-throughput consumer configuration
386
val highThroughputConsumer = spark.readStream
387
.format("kafka")
388
.option("kafka.bootstrap.servers", "localhost:9092")
389
.option("subscribe", "bulk-data")
390
.option("kafka.fetch.min.bytes", "1048576") // 1MB minimum fetch
391
.option("kafka.max.partition.fetch.bytes", "10485760") // 10MB max per partition
392
.option("kafka.session.timeout.ms", "30000") // 30s session timeout
393
```
394
395
## Configuration Best Practices
396
397
### Development Environment
398
399
```scala
400
// Development configuration - prioritize ease of debugging
401
val devConfig = Map(
402
"kafka.bootstrap.servers" -> "localhost:9092",
403
"failOnDataLoss" -> "false", // Continue despite data issues
404
"startingOffsets" -> "earliest", // Read all available data
405
"includeHeaders" -> "true", // Include headers for debugging
406
"maxOffsetsPerTrigger" -> "1000" // Small batches for testing
407
)
408
```
409
410
### Production Environment
411
412
```scala
413
// Production configuration - prioritize reliability and performance
414
val prodConfig = Map(
415
"kafka.bootstrap.servers" -> "kafka1:9092,kafka2:9092,kafka3:9092",
416
"failOnDataLoss" -> "true", // Strict data consistency
417
"groupIdPrefix" -> "prod-spark-app", // Identifiable consumer groups
418
"maxOffsetsPerTrigger" -> "100000", // Larger batches for efficiency
419
"kafka.session.timeout.ms" -> "30000", // Longer session timeout
420
"kafka.request.timeout.ms" -> "60000", // Longer request timeout
421
"fetchOffset.numRetries" -> "5", // More retries
422
"fetchOffset.retryIntervalMs" -> "1000" // Longer retry intervals
423
)
424
```
425
426
### Security-First Configuration
427
428
```scala
429
// Security-focused configuration
430
val secureConfig = Map(
431
"kafka.bootstrap.servers" -> "secure-kafka:9093",
432
"kafka.security.protocol" -> "SASL_SSL",
433
"kafka.sasl.mechanism" -> "SCRAM-SHA-256",
434
"kafka.sasl.jaas.config" ->
435
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
436
"username='spark-user' password='${KAFKA_PASSWORD}';",
437
"kafka.ssl.truststore.location" -> "/etc/kafka/ssl/truststore.jks",
438
"kafka.ssl.truststore.password" -> "${TRUSTSTORE_PASSWORD}",
439
"kafka.ssl.endpoint.identification.algorithm" -> "https"
440
)
441
```
442
443
## Common Configuration Patterns
444
445
### Auto Scaling Configuration
446
447
```scala
448
// Configuration that adapts to load
449
val autoScalingConfig = spark.readStream
450
.format("kafka")
451
.option("kafka.bootstrap.servers", brokers)
452
.option("subscribe", topics)
453
.option("minOffsetsPerTrigger", "1000") // Process at least 1K records
454
.option("maxOffsetsPerTrigger", "50000") // But no more than 50K
455
.option("maxTriggerDelay", "30s") // Force processing every 30s
456
```
457
458
### Multi-Region Configuration
459
460
```scala
461
// Configuration for multi-region deployment
462
val multiRegionConfig = Map(
463
"kafka.bootstrap.servers" ->
464
"us-kafka1:9092,us-kafka2:9092,eu-kafka1:9092,eu-kafka2:9092",
465
"kafka.client.id" -> s"spark-${region}-${applicationId}",
466
"kafka.request.timeout.ms" -> "120000", // Longer timeout for cross-region
467
"kafka.session.timeout.ms" -> "60000", // Longer session timeout
468
"fetchOffset.numRetries" -> "10", // More retries for network issues
469
"fetchOffset.retryIntervalMs" -> "2000" // Longer retry intervals
470
)
471
```
472
473
### Schema Registry Integration
474
475
```scala
476
// Configuration for Confluent Schema Registry
477
val schemaRegistryConfig = Map(
478
"kafka.bootstrap.servers" -> "kafka:9092",
479
"kafka.schema.registry.url" -> "http://schema-registry:8081",
480
"kafka.schema.registry.basic.auth.user.info" -> "user:password",
481
"kafka.key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
482
"kafka.value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer"
483
)
484
```