or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-rdd.mddirect-streaming.mdindex.mdjava-api.mdoffset-management.mdreceiver-streaming.md

batch-rdd.mddocs/

0

# Batch RDD Processing

1

2

Batch RDD processing allows you to create RDDs from Kafka using specific offset ranges, enabling batch-oriented consumption of Kafka data with exact control over which messages to process.

3

4

## Capabilities

5

6

### Basic RDD Creation

7

8

Creates an RDD from Kafka using offset ranges for each topic and partition.

9

10

```scala { .api }

11

/**

12

* Create a RDD from Kafka using offset ranges for each topic and partition.

13

*

14

* @param sc SparkContext object

15

* @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list"

16

* or "bootstrap.servers" to be set with Kafka broker(s) specified in

17

* host1:port1,host2:port2 form.

18

* @param offsetRanges Each OffsetRange in the batch corresponds to a range of offsets

19

* for a given Kafka topic/partition

20

* @tparam K type of Kafka message key

21

* @tparam V type of Kafka message value

22

* @tparam KD type of Kafka message key decoder

23

* @tparam VD type of Kafka message value decoder

24

* @return RDD of (Kafka message key, Kafka message value)

25

*/

26

def createRDD[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](

27

sc: SparkContext,

28

kafkaParams: Map[String, String],

29

offsetRanges: Array[OffsetRange]

30

): RDD[(K, V)]

31

```

32

33

**Usage Example:**

34

35

```scala

36

import org.apache.spark.streaming.kafka._

37

import kafka.serializer.StringDecoder

38

39

val kafkaParams = Map[String, String](

40

"metadata.broker.list" -> "localhost:9092"

41

)

42

43

val offsetRanges = Array(

44

OffsetRange("events", 0, 1000, 2000),

45

OffsetRange("events", 1, 500, 1500),

46

OffsetRange("logs", 0, 100, 200)

47

)

48

49

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](

50

sparkContext, kafkaParams, offsetRanges

51

)

52

53

rdd.foreach { case (key, value) =>

54

println(s"Key: $key, Value: $value")

55

}

56

57

println(s"Total messages: ${rdd.count()}")

58

```

59

60

### Advanced RDD Creation with Custom Message Handler

61

62

Creates an RDD with custom message handler and broker leadership information for optimized fetching.

63

64

```scala { .api }

65

/**

66

* Create a RDD from Kafka with custom message handler and broker leadership info.

67

*

68

* @param sc SparkContext object

69

* @param kafkaParams Kafka configuration parameters

70

* @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition

71

* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be empty map,

72

* in which case leaders will be looked up on the driver.

73

* @param messageHandler Function for translating each message and metadata into desired type

74

* @tparam K type of Kafka message key

75

* @tparam V type of Kafka message value

76

* @tparam KD type of Kafka message key decoder

77

* @tparam VD type of Kafka message value decoder

78

* @tparam R type returned by messageHandler

79

* @return RDD of R

80

*/

81

def createRDD[

82

K: ClassTag,

83

V: ClassTag,

84

KD <: Decoder[K]: ClassTag,

85

VD <: Decoder[V]: ClassTag,

86

R: ClassTag

87

](

88

sc: SparkContext,

89

kafkaParams: Map[String, String],

90

offsetRanges: Array[OffsetRange],

91

leaders: Map[TopicAndPartition, Broker],

92

messageHandler: MessageAndMetadata[K, V] => R

93

): RDD[R]

94

```

95

96

**Usage Example:**

97

98

```scala

99

import kafka.common.TopicAndPartition

100

101

val leaders = Map(

102

TopicAndPartition("events", 0) -> Broker("broker1.example.com", 9092),

103

TopicAndPartition("events", 1) -> Broker("broker2.example.com", 9092)

104

)

105

106

val messageHandler = (mmd: MessageAndMetadata[String, String]) => {

107

MessageInfo(

108

topic = mmd.topic,

109

partition = mmd.partition,

110

offset = mmd.offset,

111

timestamp = System.currentTimeMillis(),

112

data = s"${mmd.key()}-${mmd.message()}"

113

)

114

}

115

116

case class MessageInfo(topic: String, partition: Int, offset: Long, timestamp: Long, data: String)

117

118

val enrichedRDD = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, MessageInfo](

119

sparkContext, kafkaParams, offsetRanges, leaders, messageHandler

120

)

121

122

enrichedRDD.foreach(println)

123

```

124

125

### Java RDD Creation API

126

127

Java-friendly API for creating RDDs from Kafka.

128

129

```java { .api }

130

/**

131

* Create a RDD from Kafka using offset ranges (Java API).

132

*

133

* @param jsc JavaSparkContext object

134

* @param kafkaParams Kafka configuration parameters

135

* @param offsetRanges Each OffsetRange corresponds to a range of offsets for a topic/partition

136

* @param keyClass type of Kafka message key

137

* @param valueClass type of Kafka message value

138

* @param keyDecoderClass type of Kafka message key decoder

139

* @param valueDecoderClass type of Kafka message value decoder

140

* @tparam K type of Kafka message key

141

* @tparam V type of Kafka message value

142

* @tparam KD type of Kafka message key decoder

143

* @tparam VD type of Kafka message value decoder

144

* @return RDD of (Kafka message key, Kafka message value)

145

*/

146

public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>

147

JavaPairRDD<K, V> createRDD(

148

JavaSparkContext jsc,

149

Class<K> keyClass,

150

Class<V> valueClass,

151

Class<KD> keyDecoderClass,

152

Class<VD> valueDecoderClass,

153

Map<String, String> kafkaParams,

154

OffsetRange[] offsetRanges

155

)

156

```

157

158

**Java Usage Example:**

159

160

```java

161

import org.apache.spark.streaming.kafka.KafkaUtils;

162

import org.apache.spark.streaming.kafka.OffsetRange;

163

import kafka.serializer.StringDecoder;

164

165

Map<String, String> kafkaParams = new HashMap<>();

166

kafkaParams.put("metadata.broker.list", "localhost:9092");

167

168

OffsetRange[] offsetRanges = {

169

OffsetRange.create("events", 0, 1000, 2000),

170

OffsetRange.create("events", 1, 500, 1500)

171

};

172

173

JavaPairRDD<String, String> rdd = KafkaUtils.createRDD(

174

jsc,

175

String.class,

176

String.class,

177

StringDecoder.class,

178

StringDecoder.class,

179

kafkaParams,

180

offsetRanges

181

);

182

183

rdd.foreach(record -> {

184

System.out.println("Key: " + record._1 + ", Value: " + record._2);

185

});

186

187

System.out.println("Total messages: " + rdd.count());

188

```

189

190

### Advanced Java RDD with Custom Message Handler

191

192

Java API with custom message handler for complex transformations.

193

194

```java { .api }

195

/**

196

* Create a RDD from Kafka with custom message handler (Java API).

197

*

198

* @param jsc JavaSparkContext object

199

* @param kafkaParams Kafka configuration parameters

200

* @param offsetRanges Each OffsetRange corresponds to a range of offsets

201

* @param leaders Kafka brokers for each TopicAndPartition

202

* @param messageHandler Function for translating each message and metadata

203

* @param keyClass type of Kafka message key

204

* @param valueClass type of Kafka message value

205

* @param keyDecoderClass type of Kafka message key decoder

206

* @param valueDecoderClass type of Kafka message value decoder

207

* @param recordClass type returned by messageHandler

208

* @tparam K type of Kafka message key

209

* @tparam V type of Kafka message value

210

* @tparam KD type of Kafka message key decoder

211

* @tparam VD type of Kafka message value decoder

212

* @tparam R type returned by messageHandler

213

* @return RDD of R

214

*/

215

public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>, R>

216

JavaRDD<R> createRDD(

217

JavaSparkContext jsc,

218

Class<K> keyClass,

219

Class<V> valueClass,

220

Class<KD> keyDecoderClass,

221

Class<VD> valueDecoderClass,

222

Class<R> recordClass,

223

Map<String, String> kafkaParams,

224

OffsetRange[] offsetRanges,

225

Map<TopicAndPartition, Broker> leaders,

226

Function<MessageAndMetadata<K, V>, R> messageHandler

227

)

228

```

229

230

## Key Features

231

232

### Exact Offset Control

233

- Specify precise offset ranges for each topic/partition

234

- Process historical data or specific time windows

235

- Replay data for debugging or reprocessing

236

237

### Broker Leadership Optimization

238

- Provide known broker leaders to avoid metadata lookup

239

- Optimize network topology for better performance

240

- Handle broker failures gracefully

241

242

### Message Metadata Access

243

Access complete message metadata including:

244

- Topic name

245

- Partition number

246

- Offset within partition

247

- Message timestamp (if available)

248

- Message key and value

249

250

### Partition-Level Parallelism

251

- Each OffsetRange becomes an RDD partition

252

- Parallel processing across topic/partitions

253

- Fine-grained control over parallelism

254

255

## Offset Range Management

256

257

### Creating Offset Ranges

258

259

```scala

260

// Manual creation

261

val range1 = OffsetRange("events", 0, 1000, 2000)

262

val range2 = OffsetRange.create("logs", 1, 500, 1500)

263

264

// From TopicAndPartition

265

import kafka.common.TopicAndPartition

266

val tp = TopicAndPartition("events", 0)

267

val range3 = OffsetRange(tp, 1000, 2000)

268

```

269

270

### Offset Range Properties

271

272

```scala

273

val range = OffsetRange("events", 0, 1000, 2000)

274

275

println(s"Topic: ${range.topic}")

276

println(s"Partition: ${range.partition}")

277

println(s"From offset: ${range.fromOffset}")

278

println(s"Until offset: ${range.untilOffset}")

279

println(s"Message count: ${range.count}")

280

println(s"TopicAndPartition: ${range.topicAndPartition}")

281

```

282

283

## Use Cases

284

285

### Historical Data Processing

286

```scala

287

// Process specific time window

288

val historicalRanges = Array(

289

OffsetRange("sales", 0, 10000, 20000),

290

OffsetRange("sales", 1, 15000, 25000)

291

)

292

293

val historicalData = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](

294

sc, kafkaParams, historicalRanges

295

)

296

```

297

298

### Data Quality Validation

299

```scala

300

// Validate specific message ranges

301

val validationHandler = (mmd: MessageAndMetadata[String, String]) => {

302

ValidationResult(

303

offset = mmd.offset,

304

isValid = mmd.message().nonEmpty && mmd.key().nonEmpty,

305

data = mmd.message()

306

)

307

}

308

309

case class ValidationResult(offset: Long, isValid: Boolean, data: String)

310

```

311

312

### Incremental Processing

313

```scala

314

// Process data incrementally

315

def processIncremental(lastProcessedOffsets: Map[TopicAndPartition, Long]): Unit = {

316

val kc = new KafkaCluster(kafkaParams)

317

val latestOffsets = kc.getLatestLeaderOffsets(lastProcessedOffsets.keySet)

318

319

val offsetRanges = lastProcessedOffsets.map { case (tp, fromOffset) =>

320

val untilOffset = latestOffsets.right.get(tp).offset

321

OffsetRange(tp.topic, tp.partition, fromOffset, untilOffset)

322

}.toArray

323

324

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](

325

sc, kafkaParams, offsetRanges

326

)

327

328

// Process the RDD

329

rdd.foreach(println)

330

}

331

```

332

333

## Error Handling

334

335

- **SparkException**: Thrown for invalid offset ranges or connectivity issues

336

- **Offset validation**: Automatic validation that offsets are available on brokers

337

- **Leader discovery**: Automatic leader lookup when not provided

338

- **Partition failures**: Individual partition failures don't affect other partitions