or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

batch-reading.mddocs/

0

# Batch Reading

1

2

The Kafka connector provides efficient batch reading capabilities for processing historical Kafka data with offset range optimization and partition-aware processing.

3

4

## Capabilities

5

6

### KafkaBatch

7

8

V2 DataSource batch implementation for reading Kafka data efficiently.

9

10

```scala { .api }

11

/**

12

* V2 DataSource batch implementation for reading Kafka data

13

* Provides efficient batch processing with partition optimization

14

*/

15

class KafkaBatch extends Batch {

16

17

/** Plans input partitions for parallel processing */

18

def planInputPartitions(): Array[InputPartition]

19

20

/** Creates reader factory for partition processing */

21

def createReaderFactory(): PartitionReaderFactory

22

}

23

```

24

25

**Usage Examples:**

26

27

```scala

28

// Basic batch reading

29

val kafkaBatch = spark

30

.read

31

.format("kafka")

32

.option("kafka.bootstrap.servers", "localhost:9092")

33

.option("subscribe", "historical-data")

34

.option("startingOffsets", "earliest")

35

.option("endingOffsets", "latest")

36

.load()

37

38

// Process batch data

39

val processedBatch = kafkaBatch

40

.select(

41

col("topic"),

42

col("partition"),

43

col("offset"),

44

col("timestamp"),

45

expr("CAST(value AS STRING)").as("message")

46

)

47

.groupBy("topic", "partition")

48

.agg(

49

count("*").as("message_count"),

50

min("offset").as("min_offset"),

51

max("offset").as("max_offset")

52

)

53

```

54

55

### KafkaRelation

56

57

V1 DataSource relation for batch reading with TableScan support.

58

59

```scala { .api }

60

/**

61

* V1 DataSource relation for batch reading from Kafka

62

* Provides backward compatibility with DataSource V1 API

63

*/

64

class KafkaRelation extends BaseRelation with TableScan {

65

66

/** Returns the schema for Kafka records */

67

def schema: StructType

68

69

/** Builds RDD for scanning all data */

70

def buildScan(): RDD[Row]

71

72

/** Returns SQL context */

73

def sqlContext: SQLContext

74

}

75

```

76

77

### Partition Input Planning

78

79

### KafkaBatchInputPartition

80

81

Represents a single input partition for batch processing.

82

83

```scala { .api }

84

/**

85

* Input partition for batch Kafka reading

86

* Represents a range of offsets to be processed by a single task

87

*/

88

case class KafkaBatchInputPartition(

89

offsetRange: KafkaOffsetRange,

90

executorKafkaParams: ju.Map[String, Object],

91

pollTimeoutMs: Long,

92

failOnDataLoss: Boolean,

93

includeHeaders: Boolean

94

) extends InputPartition {

95

96

/** Returns preferred locations for this partition (empty for Kafka) */

97

def preferredLocations(): Array[String] = Array.empty

98

}

99

```

100

101

### KafkaOffsetRange

102

103

Defines the range of offsets to be processed by a partition reader.

104

105

```scala { .api }

106

/**

107

* Represents a range of offsets for a specific topic partition

108

* Used for planning batch processing tasks

109

*/

110

case class KafkaOffsetRange(

111

topicPartition: TopicPartition,

112

fromOffset: Long,

113

untilOffset: Long,

114

preferredLoc: Option[String]

115

) {

116

/** Topic name */

117

def topic: String = topicPartition.topic()

118

119

/** Partition number */

120

def partition: Int = topicPartition.partition()

121

122

/** Estimated number of messages in this range */

123

def size: Long = untilOffset - fromOffset

124

}

125

```

126

127

### Partition Readers

128

129

### KafkaBatchReaderFactory

130

131

Factory for creating partition readers.

132

133

```scala { .api }

134

/**

135

* Factory for creating Kafka batch partition readers

136

* Creates readers that can process KafkaBatchInputPartition instances

137

*/

138

object KafkaBatchReaderFactory extends PartitionReaderFactory {

139

140

/** Creates a partition reader for the given input partition */

141

def createReader(partition: InputPartition): PartitionReader[InternalRow]

142

}

143

```

144

145

### KafkaBatchPartitionReader

146

147

Reads data from a specific Kafka partition range.

148

149

```scala { .api }

150

/**

151

* Partition reader for batch Kafka data processing

152

* Reads a specific range of offsets from a Kafka partition

153

*/

154

case class KafkaBatchPartitionReader(

155

offsetRange: KafkaOffsetRange,

156

executorKafkaParams: ju.Map[String, Object],

157

pollTimeoutMs: Long,

158

failOnDataLoss: Boolean,

159

includeHeaders: Boolean

160

) extends PartitionReader[InternalRow] {

161

162

/** Advances to next record */

163

def next(): Boolean

164

165

/** Gets current record as UnsafeRow */

166

def get(): UnsafeRow

167

168

/** Closes reader and releases resources */

169

def close(): Unit

170

171

/** Returns current metrics values */

172

def currentMetricsValues(): Array[CustomTaskMetric]

173

}

174

```

175

176

## Offset Range Configuration

177

178

### Time-based Range Selection

179

180

```scala

181

// Read data from specific time range

182

val timeRangeBatch = spark

183

.read

184

.format("kafka")

185

.option("kafka.bootstrap.servers", "localhost:9092")

186

.option("subscribe", "events")

187

.option("startingTimestamp", "1609459200000") // Jan 1, 2021 00:00:00 UTC

188

.option("endingTimestamp", "1609545600000") // Jan 2, 2021 00:00:00 UTC

189

.load()

190

```

191

192

### Specific Offset Ranges

193

194

```scala

195

// Read specific offset ranges per partition

196

val specificOffsetsBatch = spark

197

.read

198

.format("kafka")

199

.option("kafka.bootstrap.servers", "localhost:9092")

200

.option("subscribe", "events")

201

.option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")

202

.option("endingOffsets", """{"events":{"0":5000,"1":6000,"2":4500}}""")

203

.load()

204

```

205

206

### Mixed Offset Strategies

207

208

```scala

209

// Mix earliest/latest with specific offsets

210

val mixedOffsetsBatch = spark

211

.read

212

.format("kafka")

213

.option("kafka.bootstrap.servers", "localhost:9092")

214

.option("subscribe", "events")

215

.option("startingOffsets", """{"events":{"0":-2,"1":1000,"2":-2}}""") // -2 = earliest

216

.option("endingOffsets", """{"events":{"0":-1,"1":2000,"2":-1}}""") // -1 = latest

217

.load()

218

```

219

220

## Partition Optimization

221

222

### Minimum Partitions

223

224

Control the minimum number of Spark partitions for processing:

225

226

```scala

227

// Ensure sufficient parallelism for batch processing

228

val parallelBatch = spark

229

.read

230

.format("kafka")

231

.option("kafka.bootstrap.servers", "localhost:9092")

232

.option("subscribe", "large-topic")

233

.option("minPartitions", "50") // Create at least 50 Spark partitions

234

.load()

235

```

236

237

### KafkaOffsetRangeCalculator

238

239

Internal calculator for optimizing offset ranges based on partition configuration.

240

241

```scala { .api }

242

/**

243

* Calculates optimal offset ranges for processing

244

* Splits large partition ranges to improve parallelism

245

*/

246

class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {

247

248

/**

249

* Calculates offset ranges ensuring minimum partition count

250

* Splits large ranges across multiple Spark partitions if needed

251

*/

252

def getRanges(

253

fromOffsets: Seq[KafkaOffsetRange],

254

executorLocations: Seq[String]

255

): Seq[KafkaOffsetRange]

256

}

257

258

object KafkaOffsetRangeCalculator {

259

/** Creates calculator from options */

260

def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator

261

}

262

```

263

264

**Range Splitting Logic:**

265

266

```scala

267

// If a Kafka partition has 100,000 messages and minPartitions=10:

268

// Original: [topic-0: 0 -> 100000]

269

// Split into:

270

// [topic-0: 0 -> 10000]

271

// [topic-0: 10000 -> 20000]

272

// [topic-0: 20000 -> 30000]

273

// ... (10 total ranges)

274

```

275

276

## Performance Optimization

277

278

### Consumer Configuration

279

280

```scala

281

// Optimize Kafka consumer for batch processing

282

val optimizedBatch = spark

283

.read

284

.format("kafka")

285

.option("kafka.bootstrap.servers", "localhost:9092")

286

.option("subscribe", "large-dataset")

287

.option("kafka.fetch.min.bytes", "1048576") // 1MB minimum fetch

288

.option("kafka.fetch.max.wait.ms", "500") // 500ms max wait

289

.option("kafka.max.poll.records", "50000") // Large batch sizes

290

.option("kafka.receive.buffer.bytes", "1048576") // 1MB receive buffer

291

.load()

292

```

293

294

### Parallel Processing

295

296

```scala

297

// Maximize parallelism for large datasets

298

val highParallelismBatch = spark

299

.read

300

.format("kafka")

301

.option("kafka.bootstrap.servers", "localhost:9092")

302

.option("subscribe", "big-data-topic")

303

.option("minPartitions", "200") // Force high parallelism

304

.load()

305

.repartition(400) // Further increase parallelism for processing

306

```

307

308

### Caching Strategy

309

310

```scala

311

// Cache frequently accessed batch data

312

val cachedBatch = spark

313

.read

314

.format("kafka")

315

.option("kafka.bootstrap.servers", "localhost:9092")

316

.option("subscribe", "reference-data")

317

.load()

318

.select(

319

col("topic"),

320

expr("CAST(key AS STRING)").as("key"),

321

expr("CAST(value AS STRING)").as("value"),

322

col("timestamp")

323

)

324

.cache() // Cache in memory for multiple operations

325

326

// Use cached data multiple times

327

val summary = cachedBatch.groupBy("topic").count()

328

val sample = cachedBatch.sample(0.1)

329

```

330

331

## Data Processing Patterns

332

333

### Time Window Analysis

334

335

```scala

336

// Analyze data by time windows

337

val timeWindowAnalysis = kafkaBatch

338

.select(

339

col("topic"),

340

col("timestamp"),

341

expr("CAST(value AS STRING)").as("message")

342

)

343

.withColumn("hour", hour(col("timestamp")))

344

.withColumn("day", date(col("timestamp")))

345

.groupBy("topic", "day", "hour")

346

.agg(

347

count("*").as("message_count"),

348

countDistinct("message").as("unique_messages")

349

)

350

.orderBy("topic", "day", "hour")

351

```

352

353

### Cross-Topic Analysis

354

355

```scala

356

// Analyze data across multiple topics

357

val crossTopicBatch = spark

358

.read

359

.format("kafka")

360

.option("kafka.bootstrap.servers", "localhost:9092")

361

.option("subscribe", "events,logs,metrics")

362

.load()

363

364

val topicComparison = crossTopicBatch

365

.groupBy("topic", date(col("timestamp")).as("date"))

366

.agg(

367

count("*").as("message_count"),

368

avg(length(col("value"))).as("avg_message_size"),

369

min("timestamp").as("first_message"),

370

max("timestamp").as("last_message")

371

)

372

```

373

374

### Data Quality Analysis

375

376

```scala

377

// Analyze data quality across partitions

378

val qualityAnalysis = kafkaBatch

379

.select(

380

col("topic"),

381

col("partition"),

382

col("offset"),

383

col("timestamp"),

384

when(col("key").isNull, 1).otherwise(0).as("null_key"),

385

when(col("value").isNull, 1).otherwise(0).as("null_value"),

386

length(col("value")).as("value_size")

387

)

388

.groupBy("topic", "partition")

389

.agg(

390

count("*").as("total_messages"),

391

sum("null_key").as("null_keys"),

392

sum("null_value").as("null_values"),

393

avg("value_size").as("avg_value_size"),

394

min("offset").as("min_offset"),

395

max("offset").as("max_offset")

396

)

397

```

398

399

## Error Handling

400

401

### Data Loss Detection

402

403

```scala

404

// Enable data loss detection for critical batch processing

405

val strictBatch = spark

406

.read

407

.format("kafka")

408

.option("kafka.bootstrap.servers", "localhost:9092")

409

.option("subscribe", "critical-data")

410

.option("failOnDataLoss", "true") // Fail if data is missing

411

.load()

412

```

413

414

### Timeout Configuration

415

416

```scala

417

// Configure timeouts for reliable batch processing

418

val timeoutConfiguredBatch = spark

419

.read

420

.format("kafka")

421

.option("kafka.bootstrap.servers", "localhost:9092")

422

.option("subscribe", "events")

423

.option("kafkaConsumer.pollTimeoutMs", "30000") // 30 second poll timeout

424

.option("fetchOffset.numRetries", "10") // 10 retry attempts

425

.option("fetchOffset.retryIntervalMs", "5000") // 5 second retry interval

426

.load()

427

```

428

429

### Partial Processing

430

431

```scala

432

// Process data even if some partitions are unavailable

433

val resilientBatch = spark

434

.read

435

.format("kafka")

436

.option("kafka.bootstrap.servers", "localhost:9092")

437

.option("subscribe", "events")

438

.option("failOnDataLoss", "false") // Continue despite missing data

439

.load()

440

.filter(col("value").isNotNull) // Filter out null values

441

```

442

443

## Monitoring and Metrics

444

445

### Processing Metrics

446

447

```scala

448

// Monitor batch processing metrics

449

val batchJob = kafkaBatch.count() // Trigger computation

450

451

// Access metrics through Spark UI or programmatically

452

val executorMetrics = spark.sparkContext.statusTracker.getExecutorInfos

453

executorMetrics.foreach { executor =>

454

println(s"Executor ${executor.executorId}: ${executor.totalCores} cores")

455

}

456

```

457

458

### Custom Metrics Collection

459

460

```scala

461

// Collect custom metrics during batch processing

462

val metricsCollector = kafkaBatch

463

.select(

464

col("topic"),

465

col("partition"),

466

col("timestamp"),

467

length(col("value")).as("message_size")

468

)

469

.groupBy("topic", "partition")

470

.agg(

471

count("*").as("message_count"),

472

sum("message_size").as("total_bytes"),

473

avg("message_size").as("avg_message_size"),

474

min("timestamp").as("earliest_timestamp"),

475

max("timestamp").as("latest_timestamp")

476

)

477

478

// Write metrics for monitoring

479

metricsCollector

480

.write

481

.format("delta")

482

.mode("overwrite")

483

.save("/metrics/kafka_batch_processing")

484

```