or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md

batch-processing.mddocs/

0

# Batch Processing

1

2

The batch processing capability provides a batch-oriented interface for consuming from Kafka with precise offset control. This is ideal for exactly-once semantics where you need to specify the exact range of messages to process, making it perfect for reprocessing scenarios and precise data processing workflows.

3

4

## Core Functions

5

6

### createRDD (Scala)

7

8

Creates an RDD for batch consumption from Kafka with specified offset ranges.

9

10

```scala { .api }

11

def createRDD[K, V](

12

sc: SparkContext,

13

kafkaParams: java.util.Map[String, Object],

14

offsetRanges: Array[OffsetRange],

15

locationStrategy: LocationStrategy

16

): RDD[ConsumerRecord[K, V]]

17

```

18

19

**Parameters:**

20

- `sc`: SparkContext - The Spark context

21

- `kafkaParams`: java.util.Map[String, Object] - Kafka configuration parameters (requires "bootstrap.servers")

22

- `offsetRanges`: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDD

23

- `locationStrategy`: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent)

24

25

**Returns:** RDD[ConsumerRecord[K, V]] - Kafka RDD implementing HasOffsetRanges

26

27

### createRDD (Java)

28

29

Java version of the batch RDD creation method.

30

31

```java { .api }

32

public static <K, V> JavaRDD<ConsumerRecord<K, V>> createRDD(

33

JavaSparkContext jsc,

34

java.util.Map<String, Object> kafkaParams,

35

OffsetRange[] offsetRanges,

36

LocationStrategy locationStrategy

37

)

38

```

39

40

**Parameters:**

41

- `jsc`: JavaSparkContext - The Java Spark context

42

- `kafkaParams`: Map[String, Object] - Kafka configuration parameters

43

- `offsetRanges`: OffsetRange[] - Array of offset ranges

44

- `locationStrategy`: LocationStrategy - Consumer placement strategy

45

46

**Returns:** JavaRDD[ConsumerRecord[K, V]] - Java RDD wrapper for Kafka data

47

48

## Usage Examples

49

50

### Basic Batch Processing

51

52

```scala

53

import org.apache.spark.streaming.kafka010._

54

import org.apache.kafka.clients.consumer.ConsumerRecord

55

import org.apache.kafka.common.serialization.StringDeserializer

56

import org.apache.kafka.common.TopicPartition

57

58

val kafkaParams = Map[String, Object](

59

"bootstrap.servers" -> "localhost:9092",

60

"key.deserializer" -> classOf[StringDeserializer],

61

"value.deserializer" -> classOf[StringDeserializer],

62

"group.id" -> "batch-processing-group"

63

)

64

65

// Define specific offset ranges to process

66

val offsetRanges = Array(

67

OffsetRange("orders", 0, 100, 200), // Process messages 100-199 from orders partition 0

68

OffsetRange("orders", 1, 50, 150), // Process messages 50-149 from orders partition 1

69

OffsetRange("payments", 0, 0, 100) // Process messages 0-99 from payments partition 0

70

)

71

72

val rdd = KafkaUtils.createRDD[String, String](

73

sparkContext,

74

kafkaParams,

75

offsetRanges,

76

LocationStrategies.PreferConsistent

77

)

78

79

// Process the RDD

80

val processedData = rdd.map { record =>

81

(record.topic, record.partition, record.offset, record.key, record.value)

82

}.collect()

83

84

processedData.foreach { case (topic, partition, offset, key, value) =>

85

println(s"Topic: $topic, Partition: $partition, Offset: $offset, Key: $key, Value: $value")

86

}

87

```

88

89

### Reprocessing with Offset Ranges

90

91

```scala

92

import org.apache.spark.streaming.kafka010._

93

94

// Get offset ranges from a previous streaming batch

95

val previousOffsetRanges: Array[OffsetRange] = // ... obtained from HasOffsetRanges

96

97

// Create RDD to reprocess the same data

98

val reprocessRDD = KafkaUtils.createRDD[String, String](

99

sparkContext,

100

kafkaParams,

101

previousOffsetRanges,

102

LocationStrategies.PreferConsistent

103

)

104

105

// Apply different processing logic

106

val reprocessedResults = reprocessRDD

107

.filter(record => record.value.contains("error"))

108

.map(record => s"Reprocessed: ${record.value}")

109

.collect()

110

```

111

112

### Java Batch Processing

113

114

```java

115

import org.apache.spark.streaming.kafka010.*;

116

import org.apache.kafka.clients.consumer.ConsumerRecord;

117

import org.apache.kafka.common.serialization.StringDeserializer;

118

119

Map<String, Object> kafkaParams = new HashMap<>();

120

kafkaParams.put("bootstrap.servers", "localhost:9092");

121

kafkaParams.put("key.deserializer", StringDeserializer.class);

122

kafkaParams.put("value.deserializer", StringDeserializer.class);

123

kafkaParams.put("group.id", "batch-processing-group");

124

125

OffsetRange[] offsetRanges = {

126

OffsetRange.create("topic1", 0, 100L, 200L),

127

OffsetRange.create("topic1", 1, 150L, 250L)

128

};

129

130

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(

131

javaSparkContext,

132

kafkaParams,

133

offsetRanges,

134

LocationStrategies.PreferConsistent()

135

);

136

137

// Process the RDD

138

rdd.foreach(record -> {

139

System.out.println("Topic: " + record.topic() +

140

", Partition: " + record.partition() +

141

", Offset: " + record.offset() +

142

", Value: " + record.value());

143

});

144

```

145

146

### Working with HasOffsetRanges

147

148

```scala

149

import org.apache.spark.streaming.kafka010._

150

151

val rdd = KafkaUtils.createRDD[String, String](

152

sparkContext,

153

kafkaParams,

154

offsetRanges,

155

LocationStrategies.PreferConsistent

156

)

157

158

// RDD implements HasOffsetRanges, so you can get offset information

159

val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

160

161

ranges.foreach { range =>

162

println(s"Topic: ${range.topic}, Partition: ${range.partition}, " +

163

s"From: ${range.fromOffset}, Until: ${range.untilOffset}, Count: ${range.count}")

164

}

165

166

// Process data and track progress

167

val processedCount = rdd.count()

168

val totalMessages = ranges.map(_.count).sum

169

println(s"Processed $processedCount messages out of $totalMessages total")

170

```

171

172

## Configuration Notes

173

174

### Required Kafka Parameters

175

176

- `bootstrap.servers`: Kafka broker addresses (required)

177

- `key.deserializer`: Key deserializer class (required)

178

- `value.deserializer`: Value deserializer class (required)

179

180

### Optional Kafka Parameters

181

182

- `group.id`: Consumer group ID (recommended for monitoring)

183

- `security.protocol`: Security protocol if using authenticated Kafka

184

- `sasl.mechanism`: SASL mechanism for authentication

185

186

### Automatic Parameter Handling

187

188

The `createRDD` method automatically sets several parameters for executor safety:

189

- `enable.auto.commit` is set to `false`

190

- `auto.offset.reset` is set to `none`

191

- `group.id` is modified to be executor-specific

192

- `receive.buffer.config` is set to 65536 (KAFKA-3135 workaround)

193

194

## Important Notes

195

196

- All batch processing methods are marked as `@Experimental` in Spark 2.4.8

197

- Starting and ending offsets are specified in advance for exactly-once semantics

198

- The RDD implements `HasOffsetRanges` interface for offset introspection

199

- Each offset range corresponds to a single RDD partition

200

- Consumer instances are managed automatically and cached for performance

201

- Use `LocationStrategies.PreferConsistent` unless you have specific host preferences

202

- Cannot use `LocationStrategies.PreferBrokers` with RDD creation (no driver consumer available)