0
# Batch Processing
1
2
Batch data access capabilities for reading historical data from Kafka topics with configurable offset ranges, supporting large-scale data processing and analytics workloads.
3
4
## Capabilities
5
6
### Kafka Relation
7
8
Batch relation for reading historical data from Kafka topics with precise offset control.
9
10
```scala { .api }
11
/**
12
* Batch relation for reading from Kafka topics
13
*/
14
class KafkaRelation extends BaseRelation with TableScan with Logging {
15
/**
16
* Returns the SQL context
17
* @return SQLContext for this relation
18
*/
19
def sqlContext: SQLContext
20
21
/**
22
* Returns schema for Kafka records
23
* @return StructType defining record schema
24
*/
25
def schema: StructType
26
27
/**
28
* Builds scan RDD for batch processing
29
* @return RDD[Row] containing Kafka records
30
*/
31
def buildScan(): RDD[Row]
32
33
/**
34
* String representation of the relation
35
* @return String describing this relation
36
*/
37
def toString: String
38
}
39
```
40
41
### Kafka Source RDD
42
43
RDD implementation for reading Kafka data based on offset ranges with partition-aware processing.
44
45
```scala { .api }
46
/**
47
* RDD for reading Kafka data based on offset ranges
48
*/
49
class KafkaSourceRDD extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]] {
50
/**
51
* Persistence not supported for KafkaSourceRDD
52
* @param newLevel Storage level (ignored)
53
* @return this RDD (logs error)
54
*/
55
def persist(newLevel: StorageLevel): this.type
56
57
/**
58
* Gets RDD partitions based on offset ranges
59
* @return Array of Partition objects
60
*/
61
def getPartitions: Array[Partition]
62
63
/**
64
* Gets preferred executor locations for data locality
65
* @param split Partition to get locations for
66
* @return Sequence of preferred executor hostnames
67
*/
68
def getPreferredLocations(split: Partition): Seq[String]
69
70
/**
71
* Computes partition data by reading from Kafka
72
* @param thePart Partition to compute
73
* @param context Task context for the computation
74
* @return Iterator of Kafka ConsumerRecord objects
75
*/
76
def compute(
77
thePart: Partition,
78
context: TaskContext
79
): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]
80
}
81
```
82
83
### RDD Partition Types
84
85
Partition implementations for Kafka RDD processing.
86
87
```scala { .api }
88
/**
89
* Offset range for one RDD partition
90
* @param topicPartition Kafka topic partition
91
* @param fromOffset Starting offset (inclusive)
92
* @param untilOffset Ending offset (exclusive)
93
* @param preferredLoc Preferred executor location
94
*/
95
case class KafkaSourceRDDOffsetRange(
96
topicPartition: TopicPartition,
97
fromOffset: Long,
98
untilOffset: Long,
99
preferredLoc: Option[String]
100
) {
101
/** Gets topic name */
102
def topic: String = topicPartition.topic()
103
104
/** Gets partition number */
105
def partition: Int = topicPartition.partition()
106
107
/** Gets size of offset range */
108
def size: Long = untilOffset - fromOffset
109
}
110
111
/**
112
* RDD partition containing offset range
113
* @param index Partition index
114
* @param offsetRange Offset range for this partition
115
*/
116
case class KafkaSourceRDDPartition(
117
index: Int,
118
offsetRange: KafkaSourceRDDOffsetRange
119
) extends Partition
120
```
121
122
## Usage Examples
123
124
### Basic Batch Reading
125
126
```scala
127
val batchDF = spark
128
.read
129
.format("kafka")
130
.option("kafka.bootstrap.servers", "localhost:9092")
131
.option("subscribe", "transactions")
132
.option("startingOffsets", "earliest")
133
.option("endingOffsets", "latest")
134
.load()
135
136
batchDF.show()
137
```
138
139
### Reading Specific Offset Range
140
141
```scala
142
val historicalDF = spark
143
.read
144
.format("kafka")
145
.option("kafka.bootstrap.servers", "localhost:9092")
146
.option("subscribe", "events")
147
.option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")
148
.option("endingOffsets", """{"events":{"0":5000,"1":6000}}""")
149
.load()
150
151
// Process historical data
152
val processedDF = historicalDF
153
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
154
.filter($"timestamp" > "2023-01-01")
155
```
156
157
### Multiple Topics Batch Processing
158
159
```scala
160
val multiTopicDF = spark
161
.read
162
.format("kafka")
163
.option("kafka.bootstrap.servers", "localhost:9092")
164
.option("subscribe", "topic1,topic2,topic3")
165
.option("startingOffsets", "earliest")
166
.option("endingOffsets", "latest")
167
.load()
168
169
// Aggregate by topic
170
val topicStats = multiTopicDF
171
.groupBy("topic")
172
.agg(
173
count("*").as("record_count"),
174
min("timestamp").as("earliest_timestamp"),
175
max("timestamp").as("latest_timestamp")
176
)
177
178
topicStats.show()
179
```
180
181
### Pattern-based Topic Reading
182
183
```scala
184
val patternDF = spark
185
.read
186
.format("kafka")
187
.option("kafka.bootstrap.servers", "localhost:9092")
188
.option("subscribePattern", "logs_.*")
189
.option("startingOffsets", "earliest")
190
.option("endingOffsets", "latest")
191
.load()
192
```
193
194
### Partition Assignment Reading
195
196
```scala
197
val assignedDF = spark
198
.read
199
.format("kafka")
200
.option("kafka.bootstrap.servers", "localhost:9092")
201
.option("assign", """{"important_topic":[0,1,2],"critical_topic":[0]}""")
202
.option("startingOffsets", "earliest")
203
.option("endingOffsets", "latest")
204
.load()
205
```
206
207
## Performance Optimization
208
209
### Partition Parallelism
210
211
```scala
212
val optimizedDF = spark
213
.read
214
.format("kafka")
215
.option("kafka.bootstrap.servers", "localhost:9092")
216
.option("subscribe", "large-topic")
217
.option("startingOffsets", "earliest")
218
.option("endingOffsets", "latest")
219
.option("minPartitions", "20") // Increase parallelism
220
.load()
221
```
222
223
### Consumer Configuration
224
225
```scala
226
val tunedDF = spark
227
.read
228
.format("kafka")
229
.option("kafka.bootstrap.servers", "localhost:9092")
230
.option("subscribe", "high-throughput-topic")
231
.option("startingOffsets", "earliest")
232
.option("endingOffsets", "latest")
233
.option("kafka.fetch.max.bytes", "52428800") // 50MB
234
.option("kafka.max.poll.records", "1000") // Records per poll
235
.option("kafka.fetch.min.bytes", "1024") // Min fetch size
236
.option("kafka.fetch.max.wait.ms", "500") // Max wait
237
.load()
238
```
239
240
### Memory Management
241
242
```scala
243
val memoryOptimizedDF = spark
244
.read
245
.format("kafka")
246
.option("kafka.bootstrap.servers", "localhost:9092")
247
.option("subscribe", "memory-intensive-topic")
248
.option("startingOffsets", "earliest")
249
.option("endingOffsets", "latest")
250
.option("kafka.receive.buffer.bytes", "262144") // 256KB
251
.option("kafka.send.buffer.bytes", "131072") // 128KB
252
.load()
253
```
254
255
## Data Processing Patterns
256
257
### Time-based Processing
258
259
```scala
260
import org.apache.spark.sql.functions._
261
262
val timeBasedDF = batchDF
263
.withColumn("event_time", from_unixtime($"timestamp" / 1000))
264
.withColumn("date", to_date($"event_time"))
265
.filter($"date" >= "2023-01-01" && $"date" <= "2023-12-31")
266
267
// Group by date and count records
268
val dailyStats = timeBasedDF
269
.groupBy("date", "topic")
270
.agg(count("*").as("daily_count"))
271
.orderBy("date", "topic")
272
```
273
274
### Message Content Processing
275
276
```scala
277
import org.apache.spark.sql.functions._
278
279
val contentDF = batchDF
280
.selectExpr("CAST(key AS STRING) as message_key", "CAST(value AS STRING) as message_value")
281
.withColumn("json_data", from_json($"message_value", messageSchema))
282
.select("message_key", "json_data.*", "topic", "partition", "offset")
283
```
284
285
### Deduplication
286
287
```scala
288
val deduplicatedDF = batchDF
289
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "*")
290
.dropDuplicates("key") // Deduplicate by key
291
.orderBy("timestamp") // Maintain order
292
```
293
294
## Schema and Data Types
295
296
### Kafka Record Schema
297
298
```scala
299
import org.apache.spark.sql.types._
300
301
val kafkaSchema = StructType(Seq(
302
StructField("key", BinaryType, nullable = true),
303
StructField("value", BinaryType, nullable = true),
304
StructField("topic", StringType, nullable = false),
305
StructField("partition", IntegerType, nullable = false),
306
StructField("offset", LongType, nullable = false),
307
StructField("timestamp", TimestampType, nullable = false),
308
StructField("timestampType", IntegerType, nullable = false)
309
))
310
```
311
312
### Type Conversions
313
314
```scala
315
val convertedDF = batchDF
316
.selectExpr(
317
"CAST(key AS STRING) as key_str",
318
"CAST(value AS STRING) as value_str",
319
"topic",
320
"partition",
321
"offset",
322
"timestamp",
323
"CASE WHEN timestampType = 0 THEN 'CreateTime' ELSE 'LogAppendTime' END as timestamp_type"
324
)
325
```
326
327
## Error Handling
328
329
### Offset Validation
330
331
```scala
332
try {
333
val df = spark
334
.read
335
.format("kafka")
336
.option("kafka.bootstrap.servers", "localhost:9092")
337
.option("subscribe", "topic")
338
.option("startingOffsets", """{"topic":{"0":1000}}""")
339
.option("endingOffsets", """{"topic":{"0":5000}}""")
340
.load()
341
342
df.count()
343
} catch {
344
case e: IllegalArgumentException =>
345
println(s"Invalid offset configuration: ${e.getMessage}")
346
case e: Exception =>
347
println(s"Error reading from Kafka: ${e.getMessage}")
348
}
349
```
350
351
### Topic Existence Check
352
353
```scala
354
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
355
import scala.collection.JavaConverters._
356
357
def checkTopicExists(brokers: String, topic: String): Boolean = {
358
val props = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers).asJava
359
val adminClient = AdminClient.create(props)
360
361
try {
362
val topics = adminClient.listTopics().names().get()
363
topics.contains(topic)
364
} finally {
365
adminClient.close()
366
}
367
}
368
369
if (checkTopicExists("localhost:9092", "my-topic")) {
370
val df = spark.read.format("kafka")
371
.option("kafka.bootstrap.servers", "localhost:9092")
372
.option("subscribe", "my-topic")
373
.load()
374
}
375
```
376
377
## Integration with Spark SQL
378
379
### Creating Temporary Views
380
381
```scala
382
batchDF.createOrReplaceTempView("kafka_messages")
383
384
val sqlResult = spark.sql("""
385
SELECT
386
topic,
387
partition,
388
COUNT(*) as message_count,
389
MIN(offset) as min_offset,
390
MAX(offset) as max_offset,
391
MIN(timestamp) as earliest_time,
392
MAX(timestamp) as latest_time
393
FROM kafka_messages
394
GROUP BY topic, partition
395
ORDER BY topic, partition
396
""")
397
398
sqlResult.show()
399
```
400
401
### Complex Analytics
402
403
```scala
404
spark.sql("""
405
SELECT
406
topic,
407
DATE(timestamp) as date,
408
HOUR(timestamp) as hour,
409
COUNT(*) as hourly_count,
410
AVG(LENGTH(CAST(value AS STRING))) as avg_message_size
411
FROM kafka_messages
412
WHERE timestamp >= '2023-01-01'
413
GROUP BY topic, DATE(timestamp), HOUR(timestamp)
414
ORDER BY date, hour
415
""").show()
416
```
417
418
## Best Practices
419
420
### Offset Range Planning
421
422
1. **Use earliest/latest for full scans**:
423
```scala
424
.option("startingOffsets", "earliest")
425
.option("endingOffsets", "latest")
426
```
427
428
2. **Use specific offsets for incremental processing**:
429
```scala
430
.option("startingOffsets", s"""{"$topic":{"0":$lastProcessedOffset}}""")
431
```
432
433
3. **Monitor partition lag**:
434
```scala
435
val offsetInfo = batchDF
436
.groupBy("topic", "partition")
437
.agg(min("offset").as("min_offset"), max("offset").as("max_offset"))
438
```
439
440
### Performance Guidelines
441
442
1. **Set appropriate minPartitions for large datasets**
443
2. **Use partition assignment for known partition layouts**
444
3. **Configure Kafka consumer buffers based on message size**
445
4. **Cache DataFrames for multiple operations**
446
5. **Use columnar formats for intermediate results**
447
448
### Resource Management
449
450
```scala
451
// Configure driver and executor memory appropriately
452
spark.conf.set("spark.driver.memory", "4g")
453
spark.conf.set("spark.executor.memory", "8g")
454
spark.conf.set("spark.executor.cores", "4")
455
456
// Optimize shuffle partitions for Kafka data
457
spark.conf.set("spark.sql.shuffle.partitions", "200")
458
```