0
# Batch RDD Processing
1
2
Batch RDD processing allows you to create RDDs from Kafka using specific offset ranges, enabling batch-oriented consumption of Kafka data with exact control over which messages to process.
3
4
## Capabilities
5
6
### Basic RDD Creation
7
8
Creates an RDD from Kafka using offset ranges for each topic and partition.
9
10
```scala { .api }
11
/**
12
* Create a RDD from Kafka using offset ranges for each topic and partition.
13
*
14
* @param sc SparkContext object
15
* @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list"
16
* or "bootstrap.servers" to be set with Kafka broker(s) specified in
17
* host1:port1,host2:port2 form.
18
* @param offsetRanges Each OffsetRange in the batch corresponds to a range of offsets
19
* for a given Kafka topic/partition
20
* @tparam K type of Kafka message key
21
* @tparam V type of Kafka message value
22
* @tparam KD type of Kafka message key decoder
23
* @tparam VD type of Kafka message value decoder
24
* @return RDD of (Kafka message key, Kafka message value)
25
*/
26
def createRDD[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
27
sc: SparkContext,
28
kafkaParams: Map[String, String],
29
offsetRanges: Array[OffsetRange]
30
): RDD[(K, V)]
31
```
32
33
**Usage Example:**
34
35
```scala
36
import org.apache.spark.streaming.kafka._
37
import kafka.serializer.StringDecoder
38
39
val kafkaParams = Map[String, String](
40
"metadata.broker.list" -> "localhost:9092"
41
)
42
43
val offsetRanges = Array(
44
OffsetRange("events", 0, 1000, 2000),
45
OffsetRange("events", 1, 500, 1500),
46
OffsetRange("logs", 0, 100, 200)
47
)
48
49
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
50
sparkContext, kafkaParams, offsetRanges
51
)
52
53
rdd.foreach { case (key, value) =>
54
println(s"Key: $key, Value: $value")
55
}
56
57
println(s"Total messages: ${rdd.count()}")
58
```
59
60
### Advanced RDD Creation with Custom Message Handler
61
62
Creates an RDD with custom message handler and broker leadership information for optimized fetching.
63
64
```scala { .api }
65
/**
66
* Create a RDD from Kafka with custom message handler and broker leadership info.
67
*
68
* @param sc SparkContext object
69
* @param kafkaParams Kafka configuration parameters
70
* @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition
71
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be empty map,
72
* in which case leaders will be looked up on the driver.
73
* @param messageHandler Function for translating each message and metadata into desired type
74
* @tparam K type of Kafka message key
75
* @tparam V type of Kafka message value
76
* @tparam KD type of Kafka message key decoder
77
* @tparam VD type of Kafka message value decoder
78
* @tparam R type returned by messageHandler
79
* @return RDD of R
80
*/
81
def createRDD[
82
K: ClassTag,
83
V: ClassTag,
84
KD <: Decoder[K]: ClassTag,
85
VD <: Decoder[V]: ClassTag,
86
R: ClassTag
87
](
88
sc: SparkContext,
89
kafkaParams: Map[String, String],
90
offsetRanges: Array[OffsetRange],
91
leaders: Map[TopicAndPartition, Broker],
92
messageHandler: MessageAndMetadata[K, V] => R
93
): RDD[R]
94
```
95
96
**Usage Example:**
97
98
```scala
99
import kafka.common.TopicAndPartition
100
101
val leaders = Map(
102
TopicAndPartition("events", 0) -> Broker("broker1.example.com", 9092),
103
TopicAndPartition("events", 1) -> Broker("broker2.example.com", 9092)
104
)
105
106
val messageHandler = (mmd: MessageAndMetadata[String, String]) => {
107
MessageInfo(
108
topic = mmd.topic,
109
partition = mmd.partition,
110
offset = mmd.offset,
111
timestamp = System.currentTimeMillis(),
112
data = s"${mmd.key()}-${mmd.message()}"
113
)
114
}
115
116
case class MessageInfo(topic: String, partition: Int, offset: Long, timestamp: Long, data: String)
117
118
val enrichedRDD = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, MessageInfo](
119
sparkContext, kafkaParams, offsetRanges, leaders, messageHandler
120
)
121
122
enrichedRDD.foreach(println)
123
```
124
125
### Java RDD Creation API
126
127
Java-friendly API for creating RDDs from Kafka.
128
129
```java { .api }
130
/**
131
* Create a RDD from Kafka using offset ranges (Java API).
132
*
133
* @param jsc JavaSparkContext object
134
* @param kafkaParams Kafka configuration parameters
135
* @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition
136
* @param keyClass type of Kafka message key
137
* @param valueClass type of Kafka message value
138
* @param keyDecoderClass type of Kafka message key decoder
139
* @param valueDecoderClass type of Kafka message value decoder
140
* @tparam K type of Kafka message key
141
* @tparam V type of Kafka message value
142
* @tparam KD type of Kafka message key decoder
143
* @tparam VD type of Kafka message value decoder
144
* @return RDD of (Kafka message key, Kafka message value)
145
*/
146
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>
147
JavaPairRDD<K, V> createRDD(
148
JavaSparkContext jsc,
149
Class<K> keyClass,
150
Class<V> valueClass,
151
Class<KD> keyDecoderClass,
152
Class<VD> valueDecoderClass,
153
Map<String, String> kafkaParams,
154
OffsetRange[] offsetRanges
155
)
156
```
157
158
**Java Usage Example:**
159
160
```java
161
import org.apache.spark.streaming.kafka.KafkaUtils;
162
import org.apache.spark.streaming.kafka.OffsetRange;
163
import kafka.serializer.StringDecoder;
164
165
Map<String, String> kafkaParams = new HashMap<>();
166
kafkaParams.put("metadata.broker.list", "localhost:9092");
167
168
OffsetRange[] offsetRanges = {
169
OffsetRange.create("events", 0, 1000, 2000),
170
OffsetRange.create("events", 1, 500, 1500)
171
};
172
173
JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(
174
jsc,
175
String.class,
176
String.class,
177
StringDecoder.class,
178
StringDecoder.class,
179
kafkaParams,
180
offsetRanges
181
);
182
183
rdd.foreach(record -> {
184
System.out.println("Key: " + record._1 + ", Value: " + record._2);
185
});
186
187
System.out.println("Total messages: " + rdd.count());
188
```
189
190
### Advanced Java RDD with Custom Message Handler
191
192
Java API with custom message handler for complex transformations.
193
194
```java { .api }
195
/**
196
* Create a RDD from Kafka with custom message handler (Java API).
197
*
198
* @param jsc JavaSparkContext object
199
* @param kafkaParams Kafka configuration parameters
200
* @param offsetRanges Each OffsetRange corresponds to a range of offsets
201
* @param leaders Kafka brokers for each TopicAndPartition
202
* @param messageHandler Function for translating each message and metadata
203
* @param keyClass type of Kafka message key
204
* @param valueClass type of Kafka message value
205
* @param keyDecoderClass type of Kafka message key decoder
206
* @param valueDecoderClass type of Kafka message value decoder
207
* @param recordClass type returned by messageHandler
208
* @tparam K type of Kafka message key
209
* @tparam V type of Kafka message value
210
* @tparam KD type of Kafka message key decoder
211
* @tparam VD type of Kafka message value decoder
212
* @tparam R type returned by messageHandler
213
* @return RDD of R
214
*/
215
public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R>
216
JavaRDD<R> createRDD(
217
JavaSparkContext jsc,
218
Class<K> keyClass,
219
Class<V> valueClass,
220
Class<KD> keyDecoderClass,
221
Class<VD> valueDecoderClass,
222
Class<R> recordClass,
223
Map<String, String> kafkaParams,
224
OffsetRange[] offsetRanges,
225
Map<TopicAndPartition, Broker> leaders,
226
Function<MessageAndMetadata<K, V>, R> messageHandler
227
)
228
```
229
230
## Key Features
231
232
### Exact Offset Control
233
- Specify precise offset ranges for each topic/partition
234
- Process historical data or specific time windows
235
- Replay data for debugging or reprocessing
236
237
### Broker Leadership Optimization
238
- Provide known broker leaders to avoid metadata lookup
239
- Optimize network topology for better performance
240
- Handle broker failures gracefully
241
242
### Message Metadata Access
243
Access complete message metadata including:
244
- Topic name
245
- Partition number
246
- Offset within partition
247
- Message timestamp (if available)
248
- Message key and value
249
250
### Partition-Level Parallelism
251
- Each OffsetRange becomes an RDD partition
252
- Parallel processing across topic/partitions
253
- Fine-grained control over parallelism
254
255
## Offset Range Management
256
257
### Creating Offset Ranges
258
259
```scala
260
// Manual creation
261
val range1 = OffsetRange("events", 0, 1000, 2000)
262
val range2 = OffsetRange.create("logs", 1, 500, 1500)
263
264
// From TopicAndPartition
265
import kafka.common.TopicAndPartition
266
val tp = TopicAndPartition("events", 0)
267
val range3 = OffsetRange(tp, 1000, 2000)
268
```
269
270
### Offset Range Properties
271
272
```scala
273
val range = OffsetRange("events", 0, 1000, 2000)
274
275
println(s"Topic: ${range.topic}")
276
println(s"Partition: ${range.partition}")
277
println(s"From offset: ${range.fromOffset}")
278
println(s"Until offset: ${range.untilOffset}")
279
println(s"Message count: ${range.count}")
280
println(s"TopicAndPartition: ${range.topicAndPartition}")
281
```
282
283
## Use Cases
284
285
### Historical Data Processing
286
```scala
287
// Process specific time window
288
val historicalRanges = Array(
289
OffsetRange("sales", 0, 10000, 20000),
290
OffsetRange("sales", 1, 15000, 25000)
291
)
292
293
val historicalData = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
294
sc, kafkaParams, historicalRanges
295
)
296
```
297
298
### Data Quality Validation
299
```scala
300
// Validate specific message ranges
301
val validationHandler = (mmd: MessageAndMetadata[String, String]) => {
302
ValidationResult(
303
offset = mmd.offset,
304
isValid = mmd.message().nonEmpty && mmd.key().nonEmpty,
305
data = mmd.message()
306
)
307
}
308
309
case class ValidationResult(offset: Long, isValid: Boolean, data: String)
310
```
311
312
### Incremental Processing
313
```scala
314
// Process data incrementally
315
def processIncremental(lastProcessedOffsets: Map[TopicAndPartition, Long]): Unit = {
316
val kc = new KafkaCluster(kafkaParams)
317
val latestOffsets = kc.getLatestLeaderOffsets(lastProcessedOffsets.keySet)
318
319
val offsetRanges = lastProcessedOffsets.map { case (tp, fromOffset) =>
320
val untilOffset = latestOffsets.right.get(tp).offset
321
OffsetRange(tp.topic, tp.partition, fromOffset, untilOffset)
322
}.toArray
323
324
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
325
sc, kafkaParams, offsetRanges
326
)
327
328
// Process the RDD
329
rdd.foreach(println)
330
}
331
```
332
333
## Error Handling
334
335
- **SparkException**: Thrown for invalid offset ranges or connectivity issues
336
- **Offset validation**: Automatic validation that offsets are available on brokers
337
- **Leader discovery**: Automatic leader lookup when not provided
338
- **Partition failures**: Individual partition failures don't affect other partitions