0
# Batch Processing
1
2
The batch processing capability provides a batch-oriented interface for consuming from Kafka with precise offset control. This is ideal for exactly-once semantics where you need to specify the exact range of messages to process, making it perfect for reprocessing scenarios and precise data processing workflows.
3
4
## Core Functions
5
6
### createRDD (Scala)
7
8
Creates an RDD for batch consumption from Kafka with specified offset ranges.
9
10
```scala { .api }
11
def createRDD[K, V](
12
sc: SparkContext,
13
kafkaParams: java.util.Map[String, Object],
14
offsetRanges: Array[OffsetRange],
15
locationStrategy: LocationStrategy
16
): RDD[ConsumerRecord[K, V]]
17
```
18
19
**Parameters:**
20
- `sc`: SparkContext - The Spark context
21
- `kafkaParams`: java.util.Map[String, Object] - Kafka configuration parameters (requires "bootstrap.servers")
22
- `offsetRanges`: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDD
23
- `locationStrategy`: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent)
24
25
**Returns:** RDD[ConsumerRecord[K, V]] - Kafka RDD implementing HasOffsetRanges
26
27
### createRDD (Java)
28
29
Java version of the batch RDD creation method.
30
31
```java { .api }
32
public static <K, V> JavaRDD<ConsumerRecord<K, V>> createRDD(
33
JavaSparkContext jsc,
34
java.util.Map<String, Object> kafkaParams,
35
OffsetRange[] offsetRanges,
36
LocationStrategy locationStrategy
37
)
38
```
39
40
**Parameters:**
41
- `jsc`: JavaSparkContext - The Java Spark context
42
- `kafkaParams`: Map[String, Object] - Kafka configuration parameters
43
- `offsetRanges`: OffsetRange[] - Array of offset ranges
44
- `locationStrategy`: LocationStrategy - Consumer placement strategy
45
46
**Returns:** JavaRDD[ConsumerRecord[K, V]] - Java RDD wrapper for Kafka data
47
48
## Usage Examples
49
50
### Basic Batch Processing
51
52
```scala
53
import org.apache.spark.streaming.kafka010._
54
import org.apache.kafka.clients.consumer.ConsumerRecord
55
import org.apache.kafka.common.serialization.StringDeserializer
56
import org.apache.kafka.common.TopicPartition
57
58
val kafkaParams = Map[String, Object](
59
"bootstrap.servers" -> "localhost:9092",
60
"key.deserializer" -> classOf[StringDeserializer],
61
"value.deserializer" -> classOf[StringDeserializer],
62
"group.id" -> "batch-processing-group"
63
)
64
65
// Define specific offset ranges to process
66
val offsetRanges = Array(
67
OffsetRange("orders", 0, 100, 200), // Process messages 100-199 from orders partition 0
68
OffsetRange("orders", 1, 50, 150), // Process messages 50-149 from orders partition 1
69
OffsetRange("payments", 0, 0, 100) // Process messages 0-99 from payments partition 0
70
)
71
72
val rdd = KafkaUtils.createRDD[String, String](
73
sparkContext,
74
kafkaParams,
75
offsetRanges,
76
LocationStrategies.PreferConsistent
77
)
78
79
// Process the RDD
80
val processedData = rdd.map { record =>
81
(record.topic, record.partition, record.offset, record.key, record.value)
82
}.collect()
83
84
processedData.foreach { case (topic, partition, offset, key, value) =>
85
println(s"Topic: $topic, Partition: $partition, Offset: $offset, Key: $key, Value: $value")
86
}
87
```
88
89
### Reprocessing with Offset Ranges
90
91
```scala
92
import org.apache.spark.streaming.kafka010._
93
94
// Get offset ranges from a previous streaming batch
95
val previousOffsetRanges: Array[OffsetRange] = // ... obtained from HasOffsetRanges
96
97
// Create RDD to reprocess the same data
98
val reprocessRDD = KafkaUtils.createRDD[String, String](
99
sparkContext,
100
kafkaParams,
101
previousOffsetRanges,
102
LocationStrategies.PreferConsistent
103
)
104
105
// Apply different processing logic
106
val reprocessedResults = reprocessRDD
107
.filter(record => record.value.contains("error"))
108
.map(record => s"Reprocessed: ${record.value}")
109
.collect()
110
```
111
112
### Java Batch Processing
113
114
```java
115
import org.apache.spark.streaming.kafka010.*;
116
import org.apache.kafka.clients.consumer.ConsumerRecord;
117
import org.apache.kafka.common.serialization.StringDeserializer;
118
119
Map<String, Object> kafkaParams = new HashMap<>();
120
kafkaParams.put("bootstrap.servers", "localhost:9092");
121
kafkaParams.put("key.deserializer", StringDeserializer.class);
122
kafkaParams.put("value.deserializer", StringDeserializer.class);
123
kafkaParams.put("group.id", "batch-processing-group");
124
125
OffsetRange[] offsetRanges = {
126
OffsetRange.create("topic1", 0, 100L, 200L),
127
OffsetRange.create("topic1", 1, 150L, 250L)
128
};
129
130
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
131
javaSparkContext,
132
kafkaParams,
133
offsetRanges,
134
LocationStrategies.PreferConsistent()
135
);
136
137
// Process the RDD
138
rdd.foreach(record -> {
139
System.out.println("Topic: " + record.topic() +
140
", Partition: " + record.partition() +
141
", Offset: " + record.offset() +
142
", Value: " + record.value());
143
});
144
```
145
146
### Working with HasOffsetRanges
147
148
```scala
149
import org.apache.spark.streaming.kafka010._
150
151
val rdd = KafkaUtils.createRDD[String, String](
152
sparkContext,
153
kafkaParams,
154
offsetRanges,
155
LocationStrategies.PreferConsistent
156
)
157
158
// RDD implements HasOffsetRanges, so you can get offset information
159
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
160
161
ranges.foreach { range =>
162
println(s"Topic: ${range.topic}, Partition: ${range.partition}, " +
163
s"From: ${range.fromOffset}, Until: ${range.untilOffset}, Count: ${range.count}")
164
}
165
166
// Process data and track progress
167
val processedCount = rdd.count()
168
val totalMessages = ranges.map(_.count).sum
169
println(s"Processed $processedCount messages out of $totalMessages total")
170
```
171
172
## Configuration Notes
173
174
### Required Kafka Parameters
175
176
- `bootstrap.servers`: Kafka broker addresses (required)
177
- `key.deserializer`: Key deserializer class (required)
178
- `value.deserializer`: Value deserializer class (required)
179
180
### Optional Kafka Parameters
181
182
- `group.id`: Consumer group ID (recommended for monitoring)
183
- `security.protocol`: Security protocol if using authenticated Kafka
184
- `sasl.mechanism`: SASL mechanism for authentication
185
186
### Automatic Parameter Handling
187
188
The `createRDD` method automatically sets several parameters for executor safety:
189
- `enable.auto.commit` is set to `false`
190
- `auto.offset.reset` is set to `none`
191
- `group.id` is modified to be executor-specific
192
- `receive.buffer.config` is set to 65536 (KAFKA-3135 workaround)
193
194
## Important Notes
195
196
- All batch processing methods are marked as `@Experimental` in Spark 2.4.8
197
- Starting and ending offsets are specified in advance for exactly-once semantics
198
- The RDD implements `HasOffsetRanges` interface for offset introspection
199
- Each offset range corresponds to a single RDD partition
200
- Consumer instances are managed automatically and cached for performance
201
- Use `LocationStrategies.PreferConsistent` unless you have specific host preferences
202
- Cannot use `LocationStrategies.PreferBrokers` with RDD creation (no driver consumer available)