or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-rdd.mddirect-streaming.mdindex.mdjava-api.mdoffset-management.mdreceiver-streaming.md

offset-management.mddocs/

0

# Offset Management

1

2

Comprehensive offset management utilities for controlling exactly which Kafka messages to process, including offset range representation, cluster interaction helpers, and consumer group coordination.

3

4

## Capabilities

5

6

### OffsetRange Class

7

8

Represents a range of offsets from a single Kafka topic and partition.

9

10

```scala { .api }

11

/**

12

* Represents a range of offsets from a single Kafka TopicAndPartition.

13

*

14

* @param topic Kafka topic name

15

* @param partition Kafka partition id

16

* @param fromOffset Inclusive starting offset

17

* @param untilOffset Exclusive ending offset

18

*/

19

final class OffsetRange(

20

val topic: String,

21

val partition: Int,

22

val fromOffset: Long,

23

val untilOffset: Long

24

) extends Serializable {

25

26

/** Kafka TopicAndPartition object, for convenience */

27

def topicAndPartition(): TopicAndPartition

28

29

/** Number of messages this OffsetRange refers to */

30

def count(): Long

31

32

override def equals(obj: Any): Boolean

33

override def hashCode(): Int

34

override def toString(): String

35

}

36

```

37

38

**Usage Examples:**

39

40

```scala

41

import org.apache.spark.streaming.kafka.OffsetRange

42

import kafka.common.TopicAndPartition

43

44

// Create offset ranges

45

val range1 = OffsetRange("events", 0, 1000, 2000)

46

val range2 = OffsetRange.create("logs", 1, 500, 1500)

47

48

// From TopicAndPartition

49

val tp = TopicAndPartition("metrics", 2)

50

val range3 = OffsetRange(tp, 1000, 1500)

51

52

// Access properties

53

println(s"Topic: ${range1.topic}")

54

println(s"Partition: ${range1.partition}")

55

println(s"Message count: ${range1.count()}")

56

println(s"TopicAndPartition: ${range1.topicAndPartition()}")

57

```

58

59

### OffsetRange Companion Object

60

61

Factory methods for creating OffsetRange instances.

62

63

```scala { .api }

64

object OffsetRange {

65

/** Create OffsetRange from topic, partition, and offset values */

66

def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange

67

68

/** Create OffsetRange from TopicAndPartition and offset values */

69

def create(topicAndPartition: TopicAndPartition, fromOffset: Long, untilOffset: Long): OffsetRange

70

71

/** Apply method for creating OffsetRange instances */

72

def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange

73

74

/** Apply method with TopicAndPartition */

75

def apply(topicAndPartition: TopicAndPartition, fromOffset: Long, untilOffset: Long): OffsetRange

76

}

77

```

78

79

**Java Usage:**

80

81

```java

82

import org.apache.spark.streaming.kafka.OffsetRange;

83

import kafka.common.TopicAndPartition;

84

85

// Create offset ranges in Java

86

OffsetRange range1 = OffsetRange.create("events", 0, 1000, 2000);

87

OffsetRange range2 = OffsetRange.create(new TopicAndPartition("logs", 1), 500, 1500);

88

89

// Access properties

90

System.out.println("Topic: " + range1.topic());

91

System.out.println("Partition: " + range1.partition());

92

System.out.println("Count: " + range1.count());

93

```

94

95

### HasOffsetRanges Trait

96

97

Interface for objects that contain offset ranges, typically implemented by Kafka RDDs.

98

99

```scala { .api }

100

/**

101

* Represents any object that has a collection of OffsetRanges.

102

* This can be used to access the offset ranges in RDDs generated by direct Kafka DStream.

103

*/

104

trait HasOffsetRanges {

105

def offsetRanges: Array[OffsetRange]

106

}

107

```

108

109

**Usage Examples:**

110

111

```scala

112

// Access offset ranges from direct stream RDDs

113

directStream.foreachRDD { rdd =>

114

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

115

116

offsetRanges.foreach { offsetRange =>

117

println(s"${offsetRange.topic} ${offsetRange.partition} " +

118

s"${offsetRange.fromOffset} ${offsetRange.untilOffset}")

119

}

120

121

// Process the data

122

rdd.foreach(println)

123

}

124

```

125

126

**Java Usage:**

127

128

```java

129

import org.apache.spark.streaming.kafka.HasOffsetRanges;

130

131

stream.foreachRDD(rdd -> {

132

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

133

134

for (OffsetRange range : offsetRanges) {

135

System.out.printf("%s %d %d %d%n",

136

range.topic(), range.partition(),

137

range.fromOffset(), range.untilOffset());

138

}

139

});

140

```

141

142

### Broker Class

143

144

Represents Kafka broker host and port information.

145

146

```scala { .api }

147

/**

148

* :: Experimental ::

149

* Represents the host and port info for a Kafka broker.

150

* Differs from Kafka project's internal kafka.cluster.Broker, which contains a server ID.

151

*/

152

@Experimental

153

final class Broker(

154

/** Broker's hostname */

155

val host: String,

156

/** Broker's port */

157

val port: Int

158

) extends Serializable {

159

160

override def equals(obj: Any): Boolean

161

override def hashCode: Int

162

override def toString(): String

163

}

164

```

165

166

**Usage Examples:**

167

168

```scala

169

import org.apache.spark.streaming.kafka.Broker

170

171

// Create broker instances

172

val broker1 = Broker("kafka1.example.com", 9092)

173

val broker2 = Broker.create("kafka2.example.com", 9093)

174

175

// Use with RDD creation for leader optimization

176

val leaders = Map(

177

TopicAndPartition("events", 0) -> broker1,

178

TopicAndPartition("events", 1) -> broker2

179

)

180

181

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, (String, String)](

182

sc, kafkaParams, offsetRanges, leaders, messageHandler

183

)

184

```

185

186

187

## Advanced Offset Management Patterns

188

189

### Manual Offset Tracking

190

191

Implement custom offset storage for exactly-once processing:

192

193

```scala

194

import org.apache.spark.streaming.kafka._

195

196

class OffsetManager {

197

// Store offsets in external system (database, file, etc.)

198

def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {

199

offsetRanges.foreach { range =>

200

// Save to external store

201

saveOffsetToDatabase(range.topic, range.partition, range.untilOffset)

202

}

203

}

204

205

def getStoredOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = {

206

// Load from external store

207

loadOffsetsFromDatabase(topics)

208

}

209

}

210

211

val offsetManager = new OffsetManager()

212

213

// Get stored offsets for restart

214

val storedOffsets = offsetManager.getStoredOffsets(Set("events"))

215

216

// Create stream with stored offsets

217

val stream = if (storedOffsets.nonEmpty) {

218

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

219

ssc, kafkaParams, storedOffsets, messageHandler

220

)

221

} else {

222

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

223

ssc, kafkaParams, Set("events")

224

)

225

}

226

227

// Process and save offsets

228

stream.foreachRDD { rdd =>

229

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

230

231

// Process data

232

rdd.foreach(processMessage)

233

234

// Save offsets after successful processing

235

offsetManager.saveOffsets(offsetRanges)

236

}

237

```

238

239

### Consumer Group Offset Management

240

241

For consumer group coordination, use Kafka's built-in consumer group management with the receiver-based streaming approach, which automatically handles offset management through Zookeeper:

242

243

```scala

244

// Receiver-based streaming with automatic consumer group offset management

245

val kafkaParams = Map[String, String](

246

"zookeeper.connect" -> "localhost:2181",

247

"group.id" -> "my-consumer-group",

248

"auto.commit.interval.ms" -> "1000"

249

)

250

251

val topics = Map("events" -> 1)

252

253

val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)

254

255

// Offsets are automatically committed to Zookeeper by the consumer group

256

stream.foreachRDD { rdd =>

257

rdd.foreach { case (key, value) =>

258

processMessage(key, value)

259

}

260

}

261

```

262

263

### Offset Range Validation

264

265

When working with specific offset ranges, implement validation to ensure offsets are within available bounds:

266

267

```scala

268

def validateOffsetRanges(offsetRanges: Array[OffsetRange]): Array[OffsetRange] = {

269

// Basic validation - ensure from < until

270

val validRanges = offsetRanges.filter { range =>

271

range.fromOffset >= 0 && range.fromOffset < range.untilOffset

272

}

273

274

if (validRanges.length != offsetRanges.length) {

275

val invalidRanges = offsetRanges.diff(validRanges)

276

throw new IllegalArgumentException(s"Invalid offset ranges: ${invalidRanges.mkString(", ")}")

277

}

278

279

validRanges

280

}

281

282

// Use validation

283

val requestedRanges = Array(

284

OffsetRange("events", 0, 1000, 2000),

285

OffsetRange("events", 1, 500, 1500)

286

)

287

288

try {

289

val validRanges = validateOffsetRanges(requestedRanges)

290

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](

291

sc, kafkaParams, validRanges

292

)

293

rdd.foreach(println)

294

} catch {

295

case e: IllegalArgumentException =>

296

println(s"Validation failed: ${e.getMessage}")

297

}

298

```

299

300

## Error Handling

301

302

### Offset-related Exceptions

303

304

- **SparkException**: Thrown when requested offsets are not available on brokers

305

- **Connectivity Issues**: Handle broker unavailability gracefully

306

- **Metadata Errors**: Retry logic for temporary metadata failures

307

308

```scala

309

def safeCreateRDD(

310

sc: SparkContext,

311

kafkaParams: Map[String, String],

312

offsetRanges: Array[OffsetRange]

313

): Option[RDD[(String, String)]] = {

314

val maxRetries = 3

315

var attempt = 0

316

317

while (attempt < maxRetries) {

318

try {

319

val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](

320

sc, kafkaParams, offsetRanges

321

)

322

return Some(rdd)

323

} catch {

324

case e: SparkException =>

325

attempt += 1

326

println(s"Attempt $attempt failed: ${e.getMessage}")

327

if (attempt < maxRetries) {

328

Thread.sleep(1000 * attempt) // Exponential backoff

329

}

330

}

331

}

332

333

None

334

}

335

```