or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

offset-management.mddocs/

0

# Offset Management

1

2

The Kafka connector provides comprehensive offset management capabilities for controlling where to start and stop reading data, including support for specific offsets, timestamps, and automatic offset tracking.

3

4

## Capabilities

5

6

### KafkaOffsetRangeLimit

7

8

Base interface for defining offset boundaries and reading ranges.

9

10

```scala { .api }

11

/**

12

* Represents desired offset range limits for starting, ending, and specific offsets

13

* Used to control the boundaries of Kafka data consumption

14

*/

15

sealed trait KafkaOffsetRangeLimit

16

```

17

18

### EarliestOffsetRangeLimit

19

20

Binds to the earliest available offsets in Kafka topics.

21

22

```scala { .api }

23

/**

24

* Binds to earliest available offsets in Kafka

25

* Starts reading from the beginning of each partition

26

*/

27

case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

28

```

29

30

**Configuration:**

31

32

```scala

33

// For streaming queries

34

.option("startingOffsets", "earliest")

35

36

// For batch queries (default for startingOffsets)

37

.option("startingOffsets", "earliest")

38

.option("endingOffsets", "latest") // Cannot use "earliest" for ending

39

```

40

41

**Usage Examples:**

42

43

```scala

44

// Stream from beginning of topics

45

val stream = spark

46

.readStream

47

.format("kafka")

48

.option("kafka.bootstrap.servers", "localhost:9092")

49

.option("subscribe", "my-topic")

50

.option("startingOffsets", "earliest") // Read from beginning

51

.load()

52

53

// Batch read from earliest to latest

54

val batch = spark

55

.read

56

.format("kafka")

57

.option("kafka.bootstrap.servers", "localhost:9092")

58

.option("subscribe", "my-topic")

59

.option("startingOffsets", "earliest")

60

.option("endingOffsets", "latest")

61

.load()

62

```

63

64

### LatestOffsetRangeLimit

65

66

Binds to the latest available offsets in Kafka topics.

67

68

```scala { .api }

69

/**

70

* Binds to latest available offsets in Kafka

71

* Starts reading from the current end of each partition

72

*/

73

case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

74

```

75

76

**Configuration:**

77

78

```scala

79

// For streaming queries (default for startingOffsets)

80

.option("startingOffsets", "latest")

81

82

// For batch queries (default for endingOffsets)

83

.option("endingOffsets", "latest") // Cannot use "latest" for starting in batch

84

```

85

86

**Usage Examples:**

87

88

```scala

89

// Stream from current position (default behavior)

90

val stream = spark

91

.readStream

92

.format("kafka")

93

.option("kafka.bootstrap.servers", "localhost:9092")

94

.option("subscribe", "my-topic")

95

.option("startingOffsets", "latest") // Start from current end

96

.load()

97

98

// Batch read up to current position

99

val batch = spark

100

.read

101

.format("kafka")

102

.option("kafka.bootstrap.servers", "localhost:9092")

103

.option("subscribe", "my-topic")

104

.option("startingOffsets", "earliest")

105

.option("endingOffsets", "latest") // Read up to current end

106

.load()

107

```

108

109

### SpecificOffsetRangeLimit

110

111

Binds to specific offsets for precise control over reading positions.

112

113

```scala { .api }

114

/**

115

* Binds to specific offsets per partition

116

* Provides precise control over starting/ending positions

117

*

118

* @param partitionOffsets Map of TopicPartition to offset values

119

* -1 = latest offset, -2 = earliest offset

120

*/

121

case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

122

```

123

124

**Configuration:**

125

126

```scala

127

// JSON specification of partition offsets

128

.option("startingOffsets", """{"topic1":{"0":23,"1":345},"topic2":{"0":0}}""")

129

.option("endingOffsets", """{"topic1":{"0":100,"1":500},"topic2":{"0":50}}""")

130

```

131

132

**Usage Examples:**

133

134

```scala

135

// Start from specific offsets

136

val stream = spark

137

.readStream

138

.format("kafka")

139

.option("kafka.bootstrap.servers", "localhost:9092")

140

.option("subscribe", "orders,payments")

141

.option("startingOffsets", """{"orders":{"0":1000,"1":2000},"payments":{"0":500}}""")

142

.load()

143

144

// Batch read with specific ranges

145

val batch = spark

146

.read

147

.format("kafka")

148

.option("kafka.bootstrap.servers", "localhost:9092")

149

.option("subscribe", "my-topic")

150

.option("startingOffsets", """{"my-topic":{"0":100,"1":200}}""")

151

.option("endingOffsets", """{"my-topic":{"0":1000,"1":2000}}""")

152

.load()

153

154

// Mix specific offsets with latest/earliest using special values

155

val mixed = spark

156

.read

157

.format("kafka")

158

.option("kafka.bootstrap.servers", "localhost:9092")

159

.option("subscribe", "my-topic")

160

.option("startingOffsets", """{"my-topic":{"0":-2,"1":500}}""") // -2 = earliest, 500 = specific

161

.option("endingOffsets", """{"my-topic":{"0":-1,"1":1000}}""") // -1 = latest, 1000 = specific

162

.load()

163

```

164

165

**Special Offset Values:**

166

167

```scala { .api }

168

object KafkaOffsetRangeLimit {

169

val LATEST = -1L // Use latest available offset

170

val EARLIEST = -2L // Use earliest available offset

171

}

172

```

173

174

### SpecificTimestampRangeLimit

175

176

Binds to earliest offset with timestamp greater than or equal to specified timestamp per partition.

177

178

```scala { .api }

179

/**

180

* Binds to earliest offset with timestamp >= specified timestamp per partition

181

* Enables time-based offset resolution

182

*

183

* @param topicTimestamps Map of TopicPartition to timestamp values (Unix milliseconds)

184

* @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found

185

*/

186

case class SpecificTimestampRangeLimit(

187

topicTimestamps: Map[TopicPartition, Long],

188

strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value

189

) extends KafkaOffsetRangeLimit

190

```

191

192

**Configuration:**

193

194

```scala

195

// JSON specification of partition timestamps

196

.option("startingOffsetsByTimestamp", """{"topic1":{"0":1609459200000,"1":1609459200000}}""")

197

.option("endingOffsetsByTimestamp", """{"topic1":{"0":1609545600000,"1":1609545600000}}""")

198

199

// Strategy for when no matching timestamp is found

200

.option("startingOffsetsByTimestampStrategy", "error") // Default: throw error

201

.option("startingOffsetsByTimestampStrategy", "latest") // Use latest offset

202

```

203

204

**Usage Examples:**

205

206

```scala

207

// Start from specific timestamp (January 1, 2021 00:00:00 UTC)

208

val stream = spark

209

.readStream

210

.format("kafka")

211

.option("kafka.bootstrap.servers", "localhost:9092")

212

.option("subscribe", "events")

213

.option("startingOffsetsByTimestamp", """{"events":{"0":1609459200000,"1":1609459200000}}""")

214

.load()

215

216

// Batch read between timestamps

217

val batch = spark

218

.read

219

.format("kafka")

220

.option("kafka.bootstrap.servers", "localhost:9092")

221

.option("subscribe", "logs")

222

.option("startingOffsetsByTimestamp", """{"logs":{"0":1609459200000}}""") // Jan 1, 2021

223

.option("endingOffsetsByTimestamp", """{"logs":{"0":1609545600000}}""") // Jan 2, 2021

224

.load()

225

```

226

227

### GlobalTimestampRangeLimit

228

229

Applies timestamp-based offset resolution to all partitions using a single timestamp.

230

231

```scala { .api }

232

/**

233

* Applies timestamp-based offset resolution to all partitions

234

* Uses single timestamp for all discovered partitions

235

*

236

* @param timestamp Unix timestamp in milliseconds

237

* @param strategyOnNoMatchingStartingOffset Strategy when no matching timestamp found

238

*/

239

case class GlobalTimestampRangeLimit(

240

timestamp: Long,

241

strategyOnNoMatchingStartingOffset: StrategyOnNoMatchStartingOffset.Value

242

) extends KafkaOffsetRangeLimit

243

```

244

245

**Configuration:**

246

247

```scala

248

// Single timestamp applied to all partitions

249

.option("startingTimestamp", "1609459200000") // January 1, 2021 00:00:00 UTC

250

.option("endingTimestamp", "1609545600000") // January 2, 2021 00:00:00 UTC

251

```

252

253

**Usage Examples:**

254

255

```scala

256

// Start all partitions from same timestamp

257

val stream = spark

258

.readStream

259

.format("kafka")

260

.option("kafka.bootstrap.servers", "localhost:9092")

261

.option("subscribe", "user-events")

262

.option("startingTimestamp", "1609459200000") // Applies to all partitions

263

.load()

264

265

// Batch read time range across all partitions

266

val batch = spark

267

.read

268

.format("kafka")

269

.option("kafka.bootstrap.servers", "localhost:9092")

270

.option("subscribePattern", "logs-.*")

271

.option("startingTimestamp", "1609459200000")

272

.option("endingTimestamp", "1609545600000")

273

.load()

274

```

275

276

## KafkaSourceOffset

277

278

Custom offset implementation for tracking partition positions in streaming queries.

279

280

```scala { .api }

281

/**

282

* Custom Offset implementation tracking all partitions and their offsets

283

* Used internally by streaming sources for checkpoint management

284

*

285

* @param partitionToOffsets Map of TopicPartition to current offset

286

*/

287

case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends streaming.Offset

288

```

289

290

**Companion Object Methods:**

291

292

```scala { .api }

293

object KafkaSourceOffset {

294

/** Extract partition offsets from generic Offset */

295

def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long]

296

297

/** Create offset from offset tuples */

298

def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset

299

300

/** Create from serialized offset */

301

def apply(offset: SerializedOffset): KafkaSourceOffset

302

303

/** Create from streaming offset */

304

def apply(offset: streaming.Offset): KafkaSourceOffset

305

}

306

```

307

308

### KafkaSourcePartitionOffset

309

310

Represents offset for a specific partition in V2 DataSource API.

311

312

```scala { .api }

313

/**

314

* Represents offset for a specific partition in V2 DataSource API

315

* Used for continuous streaming offset management

316

*

317

* @param topicPartition The topic partition

318

* @param partitionOffset Current offset in the partition

319

*/

320

case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, partitionOffset: Long) extends PartitionOffset

321

```

322

323

## Offset Reading and Management

324

325

### KafkaOffsetReader

326

327

Interface for fetching and managing offsets from Kafka.

328

329

```scala { .api }

330

/**

331

* Base interface for fetching offsets from Kafka

332

* Handles interaction with Kafka brokers for offset management

333

*/

334

trait KafkaOffsetReader {

335

/** Close resources and connections */

336

def close(): Unit

337

338

/** Fetch partition offsets based on range limit */

339

def fetchPartitionOffsets(offsetRangeLimit: KafkaOffsetRangeLimit, isStartingOffsets: Boolean): Map[TopicPartition, Long]

340

341

/** Fetch specific offsets with validation */

342

def fetchSpecificOffsets(partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset

343

344

/** Fetch timestamp-based offsets for specific partitions */

345

def fetchSpecificTimestampBasedOffsets(

346

topicTimestamps: Map[TopicPartition, Long],

347

isStartingOffsets: Boolean,

348

strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value

349

): KafkaSourceOffset

350

351

/** Fetch timestamp-based offsets globally */

352

def fetchGlobalTimestampBasedOffsets(

353

timestamp: Long,

354

isStartingOffsets: Boolean,

355

strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value

356

): KafkaSourceOffset

357

358

/** Fetch earliest available offsets */

359

def fetchEarliestOffsets(): Map[TopicPartition, Long]

360

361

/** Fetch latest available offsets */

362

def fetchLatestOffsets(knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap

363

}

364

```

365

366

## Timestamp Strategy Configuration

367

368

When using timestamp-based offsets, configure the strategy for handling missing timestamps:

369

370

```scala { .api }

371

object StrategyOnNoMatchStartingOffset extends Enumeration {

372

val ERROR = Value // Throw exception when no matching timestamp found (default)

373

val LATEST = Value // Use latest offset when no matching timestamp found

374

}

375

```

376

377

**Configuration:**

378

379

```scala

380

// Default behavior - throw error if timestamp not found

381

.option("startingOffsetsByTimestampStrategy", "error")

382

383

// Fallback to latest offset if timestamp not found

384

.option("startingOffsetsByTimestampStrategy", "latest")

385

```

386

387

## Option Priority

388

389

When multiple offset options are specified, they are processed in priority order:

390

391

1. **Global timestamp**: `startingTimestamp` / `endingTimestamp`

392

2. **Partition timestamps**: `startingOffsetsByTimestamp` / `endingOffsetsByTimestamp`

393

3. **Specific offsets**: `startingOffsets` / `endingOffsets`

394

4. **Default values**: `LatestOffsetRangeLimit` for streaming, `EarliestOffsetRangeLimit` for batch

395

396

## Validation Rules

397

398

### Streaming Query Restrictions

399

400

```scala

401

// These options are invalid for streaming queries

402

.option("endingOffsets", "...") // Not supported

403

.option("endingOffsetsByTimestamp", "...") // Not supported

404

.option("endingTimestamp", "...") // Not supported

405

```

406

407

### Batch Query Restrictions

408

409

```scala

410

// Invalid starting offset for batch

411

.option("startingOffsets", "latest") // Must use "earliest" or specific offsets

412

413

// Invalid ending offset for batch

414

.option("endingOffsets", "earliest") // Must use "latest" or specific offsets

415

416

// Specific offset restrictions

417

.option("startingOffsets", """{"topic":{"0":-1}}""") // -1 (latest) not allowed for starting

418

.option("endingOffsets", """{"topic":{"0":-2}}""") // -2 (earliest) not allowed for ending

419

```