0
# Batch Reading
1
2
The Kafka connector provides efficient batch reading capabilities for processing historical Kafka data with offset range optimization and partition-aware processing.
3
4
## Capabilities
5
6
### KafkaBatch
7
8
V2 DataSource batch implementation for reading Kafka data efficiently.
9
10
```scala { .api }
11
/**
12
* V2 DataSource batch implementation for reading Kafka data
13
* Provides efficient batch processing with partition optimization
14
*/
15
class KafkaBatch extends Batch {
16
17
/** Plans input partitions for parallel processing */
18
def planInputPartitions(): Array[InputPartition]
19
20
/** Creates reader factory for partition processing */
21
def createReaderFactory(): PartitionReaderFactory
22
}
23
```
24
25
**Usage Examples:**
26
27
```scala
28
// Basic batch reading
29
val kafkaBatch = spark
30
.read
31
.format("kafka")
32
.option("kafka.bootstrap.servers", "localhost:9092")
33
.option("subscribe", "historical-data")
34
.option("startingOffsets", "earliest")
35
.option("endingOffsets", "latest")
36
.load()
37
38
// Process batch data
39
val processedBatch = kafkaBatch
40
.select(
41
col("topic"),
42
col("partition"),
43
col("offset"),
44
col("timestamp"),
45
expr("CAST(value AS STRING)").as("message")
46
)
47
.groupBy("topic", "partition")
48
.agg(
49
count("*").as("message_count"),
50
min("offset").as("min_offset"),
51
max("offset").as("max_offset")
52
)
53
```
54
55
### KafkaRelation
56
57
V1 DataSource relation for batch reading with TableScan support.
58
59
```scala { .api }
60
/**
61
* V1 DataSource relation for batch reading from Kafka
62
* Provides backward compatibility with DataSource V1 API
63
*/
64
class KafkaRelation extends BaseRelation with TableScan {
65
66
/** Returns the schema for Kafka records */
67
def schema: StructType
68
69
/** Builds RDD for scanning all data */
70
def buildScan(): RDD[Row]
71
72
/** Returns SQL context */
73
def sqlContext: SQLContext
74
}
75
```
76
77
### Partition Input Planning
78
79
### KafkaBatchInputPartition
80
81
Represents a single input partition for batch processing.
82
83
```scala { .api }
84
/**
85
* Input partition for batch Kafka reading
86
* Represents a range of offsets to be processed by a single task
87
*/
88
case class KafkaBatchInputPartition(
89
offsetRange: KafkaOffsetRange,
90
executorKafkaParams: ju.Map[String, Object],
91
pollTimeoutMs: Long,
92
failOnDataLoss: Boolean,
93
includeHeaders: Boolean
94
) extends InputPartition {
95
96
/** Returns preferred locations for this partition (empty for Kafka) */
97
def preferredLocations(): Array[String] = Array.empty
98
}
99
```
100
101
### KafkaOffsetRange
102
103
Defines the range of offsets to be processed by a partition reader.
104
105
```scala { .api }
106
/**
107
* Represents a range of offsets for a specific topic partition
108
* Used for planning batch processing tasks
109
*/
110
case class KafkaOffsetRange(
111
topicPartition: TopicPartition,
112
fromOffset: Long,
113
untilOffset: Long,
114
preferredLoc: Option[String]
115
) {
116
/** Topic name */
117
def topic: String = topicPartition.topic()
118
119
/** Partition number */
120
def partition: Int = topicPartition.partition()
121
122
/** Estimated number of messages in this range */
123
def size: Long = untilOffset - fromOffset
124
}
125
```
126
127
### Partition Readers
128
129
### KafkaBatchReaderFactory
130
131
Factory for creating partition readers.
132
133
```scala { .api }
134
/**
135
* Factory for creating Kafka batch partition readers
136
* Creates readers that can process KafkaBatchInputPartition instances
137
*/
138
object KafkaBatchReaderFactory extends PartitionReaderFactory {
139
140
/** Creates a partition reader for the given input partition */
141
def createReader(partition: InputPartition): PartitionReader[InternalRow]
142
}
143
```
144
145
### KafkaBatchPartitionReader
146
147
Reads data from a specific Kafka partition range.
148
149
```scala { .api }
150
/**
151
* Partition reader for batch Kafka data processing
152
* Reads a specific range of offsets from a Kafka partition
153
*/
154
case class KafkaBatchPartitionReader(
155
offsetRange: KafkaOffsetRange,
156
executorKafkaParams: ju.Map[String, Object],
157
pollTimeoutMs: Long,
158
failOnDataLoss: Boolean,
159
includeHeaders: Boolean
160
) extends PartitionReader[InternalRow] {
161
162
/** Advances to next record */
163
def next(): Boolean
164
165
/** Gets current record as UnsafeRow */
166
def get(): UnsafeRow
167
168
/** Closes reader and releases resources */
169
def close(): Unit
170
171
/** Returns current metrics values */
172
def currentMetricsValues(): Array[CustomTaskMetric]
173
}
174
```
175
176
## Offset Range Configuration
177
178
### Time-based Range Selection
179
180
```scala
181
// Read data from specific time range
182
val timeRangeBatch = spark
183
.read
184
.format("kafka")
185
.option("kafka.bootstrap.servers", "localhost:9092")
186
.option("subscribe", "events")
187
.option("startingTimestamp", "1609459200000") // Jan 1, 2021 00:00:00 UTC
188
.option("endingTimestamp", "1609545600000") // Jan 2, 2021 00:00:00 UTC
189
.load()
190
```
191
192
### Specific Offset Ranges
193
194
```scala
195
// Read specific offset ranges per partition
196
val specificOffsetsBatch = spark
197
.read
198
.format("kafka")
199
.option("kafka.bootstrap.servers", "localhost:9092")
200
.option("subscribe", "events")
201
.option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")
202
.option("endingOffsets", """{"events":{"0":5000,"1":6000,"2":4500}}""")
203
.load()
204
```
205
206
### Mixed Offset Strategies
207
208
```scala
209
// Mix earliest/latest with specific offsets
210
val mixedOffsetsBatch = spark
211
.read
212
.format("kafka")
213
.option("kafka.bootstrap.servers", "localhost:9092")
214
.option("subscribe", "events")
215
.option("startingOffsets", """{"events":{"0":-2,"1":1000,"2":-2}}""") // -2 = earliest
216
.option("endingOffsets", """{"events":{"0":-1,"1":2000,"2":-1}}""") // -1 = latest
217
.load()
218
```
219
220
## Partition Optimization
221
222
### Minimum Partitions
223
224
Control the minimum number of Spark partitions for processing:
225
226
```scala
227
// Ensure sufficient parallelism for batch processing
228
val parallelBatch = spark
229
.read
230
.format("kafka")
231
.option("kafka.bootstrap.servers", "localhost:9092")
232
.option("subscribe", "large-topic")
233
.option("minPartitions", "50") // Create at least 50 Spark partitions
234
.load()
235
```
236
237
### KafkaOffsetRangeCalculator
238
239
Internal calculator for optimizing offset ranges based on partition configuration.
240
241
```scala { .api }
242
/**
243
* Calculates optimal offset ranges for processing
244
* Splits large partition ranges to improve parallelism
245
*/
246
class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {
247
248
/**
249
* Calculates offset ranges ensuring minimum partition count
250
* Splits large ranges across multiple Spark partitions if needed
251
*/
252
def getRanges(
253
fromOffsets: Seq[KafkaOffsetRange],
254
executorLocations: Seq[String]
255
): Seq[KafkaOffsetRange]
256
}
257
258
object KafkaOffsetRangeCalculator {
259
/** Creates calculator from options */
260
def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator
261
}
262
```
263
264
**Range Splitting Logic:**
265
266
```scala
267
// If a Kafka partition has 100,000 messages and minPartitions=10:
268
// Original: [topic-0: 0 -> 100000]
269
// Split into:
270
// [topic-0: 0 -> 10000]
271
// [topic-0: 10000 -> 20000]
272
// [topic-0: 20000 -> 30000]
273
// ... (10 total ranges)
274
```
275
276
## Performance Optimization
277
278
### Consumer Configuration
279
280
```scala
281
// Optimize Kafka consumer for batch processing
282
val optimizedBatch = spark
283
.read
284
.format("kafka")
285
.option("kafka.bootstrap.servers", "localhost:9092")
286
.option("subscribe", "large-dataset")
287
.option("kafka.fetch.min.bytes", "1048576") // 1MB minimum fetch
288
.option("kafka.fetch.max.wait.ms", "500") // 500ms max wait
289
.option("kafka.max.poll.records", "50000") // Large batch sizes
290
.option("kafka.receive.buffer.bytes", "1048576") // 1MB receive buffer
291
.load()
292
```
293
294
### Parallel Processing
295
296
```scala
297
// Maximize parallelism for large datasets
298
val highParallelismBatch = spark
299
.read
300
.format("kafka")
301
.option("kafka.bootstrap.servers", "localhost:9092")
302
.option("subscribe", "big-data-topic")
303
.option("minPartitions", "200") // Force high parallelism
304
.load()
305
.repartition(400) // Further increase parallelism for processing
306
```
307
308
### Caching Strategy
309
310
```scala
311
// Cache frequently accessed batch data
312
val cachedBatch = spark
313
.read
314
.format("kafka")
315
.option("kafka.bootstrap.servers", "localhost:9092")
316
.option("subscribe", "reference-data")
317
.load()
318
.select(
319
col("topic"),
320
expr("CAST(key AS STRING)").as("key"),
321
expr("CAST(value AS STRING)").as("value"),
322
col("timestamp")
323
)
324
.cache() // Cache in memory for multiple operations
325
326
// Use cached data multiple times
327
val summary = cachedBatch.groupBy("topic").count()
328
val sample = cachedBatch.sample(0.1)
329
```
330
331
## Data Processing Patterns
332
333
### Time Window Analysis
334
335
```scala
336
// Analyze data by time windows
337
val timeWindowAnalysis = kafkaBatch
338
.select(
339
col("topic"),
340
col("timestamp"),
341
expr("CAST(value AS STRING)").as("message")
342
)
343
.withColumn("hour", hour(col("timestamp")))
344
.withColumn("day", date(col("timestamp")))
345
.groupBy("topic", "day", "hour")
346
.agg(
347
count("*").as("message_count"),
348
countDistinct("message").as("unique_messages")
349
)
350
.orderBy("topic", "day", "hour")
351
```
352
353
### Cross-Topic Analysis
354
355
```scala
356
// Analyze data across multiple topics
357
val crossTopicBatch = spark
358
.read
359
.format("kafka")
360
.option("kafka.bootstrap.servers", "localhost:9092")
361
.option("subscribe", "events,logs,metrics")
362
.load()
363
364
val topicComparison = crossTopicBatch
365
.groupBy("topic", date(col("timestamp")).as("date"))
366
.agg(
367
count("*").as("message_count"),
368
avg(length(col("value"))).as("avg_message_size"),
369
min("timestamp").as("first_message"),
370
max("timestamp").as("last_message")
371
)
372
```
373
374
### Data Quality Analysis
375
376
```scala
377
// Analyze data quality across partitions
378
val qualityAnalysis = kafkaBatch
379
.select(
380
col("topic"),
381
col("partition"),
382
col("offset"),
383
col("timestamp"),
384
when(col("key").isNull, 1).otherwise(0).as("null_key"),
385
when(col("value").isNull, 1).otherwise(0).as("null_value"),
386
length(col("value")).as("value_size")
387
)
388
.groupBy("topic", "partition")
389
.agg(
390
count("*").as("total_messages"),
391
sum("null_key").as("null_keys"),
392
sum("null_value").as("null_values"),
393
avg("value_size").as("avg_value_size"),
394
min("offset").as("min_offset"),
395
max("offset").as("max_offset")
396
)
397
```
398
399
## Error Handling
400
401
### Data Loss Detection
402
403
```scala
404
// Enable data loss detection for critical batch processing
405
val strictBatch = spark
406
.read
407
.format("kafka")
408
.option("kafka.bootstrap.servers", "localhost:9092")
409
.option("subscribe", "critical-data")
410
.option("failOnDataLoss", "true") // Fail if data is missing
411
.load()
412
```
413
414
### Timeout Configuration
415
416
```scala
417
// Configure timeouts for reliable batch processing
418
val timeoutConfiguredBatch = spark
419
.read
420
.format("kafka")
421
.option("kafka.bootstrap.servers", "localhost:9092")
422
.option("subscribe", "events")
423
.option("kafkaConsumer.pollTimeoutMs", "30000") // 30 second poll timeout
424
.option("fetchOffset.numRetries", "10") // 10 retry attempts
425
.option("fetchOffset.retryIntervalMs", "5000") // 5 second retry interval
426
.load()
427
```
428
429
### Partial Processing
430
431
```scala
432
// Process data even if some partitions are unavailable
433
val resilientBatch = spark
434
.read
435
.format("kafka")
436
.option("kafka.bootstrap.servers", "localhost:9092")
437
.option("subscribe", "events")
438
.option("failOnDataLoss", "false") // Continue despite missing data
439
.load()
440
.filter(col("value").isNotNull) // Filter out null values
441
```
442
443
## Monitoring and Metrics
444
445
### Processing Metrics
446
447
```scala
448
// Monitor batch processing metrics
449
val batchJob = kafkaBatch.count() // Trigger computation
450
451
// Access metrics through Spark UI or programmatically
452
val executorMetrics = spark.sparkContext.statusTracker.getExecutorInfos
453
executorMetrics.foreach { executor =>
454
println(s"Executor ${executor.executorId}: ${executor.totalCores} cores")
455
}
456
```
457
458
### Custom Metrics Collection
459
460
```scala
461
// Collect custom metrics during batch processing
462
val metricsCollector = kafkaBatch
463
.select(
464
col("topic"),
465
col("partition"),
466
col("timestamp"),
467
length(col("value")).as("message_size")
468
)
469
.groupBy("topic", "partition")
470
.agg(
471
count("*").as("message_count"),
472
sum("message_size").as("total_bytes"),
473
avg("message_size").as("avg_message_size"),
474
min("timestamp").as("earliest_timestamp"),
475
max("timestamp").as("latest_timestamp")
476
)
477
478
// Write metrics for monitoring
479
metricsCollector
480
.write
481
.format("delta")
482
.mode("overwrite")
483
.save("/metrics/kafka_batch_processing")
484
```