or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md

batch-processing.mddocs/

0

# Batch Processing

1

2

Batch data access capabilities for reading historical data from Kafka topics with configurable offset ranges, supporting large-scale data processing and analytics workloads.

3

4

## Capabilities

5

6

### Kafka Relation

7

8

Batch relation for reading historical data from Kafka topics with precise offset control.

9

10

```scala { .api }

11

/**

12

* Batch relation for reading from Kafka topics

13

*/

14

class KafkaRelation extends BaseRelation with TableScan with Logging {

15

/**

16

* Returns the SQL context

17

* @return SQLContext for this relation

18

*/

19

def sqlContext: SQLContext

20

21

/**

22

* Returns schema for Kafka records

23

* @return StructType defining record schema

24

*/

25

def schema: StructType

26

27

/**

28

* Builds scan RDD for batch processing

29

* @return RDD[Row] containing Kafka records

30

*/

31

def buildScan(): RDD[Row]

32

33

/**

34

* String representation of the relation

35

* @return String describing this relation

36

*/

37

def toString: String

38

}

39

```

40

41

### Kafka Source RDD

42

43

RDD implementation for reading Kafka data based on offset ranges with partition-aware processing.

44

45

```scala { .api }

46

/**

47

* RDD for reading Kafka data based on offset ranges

48

*/

49

class KafkaSourceRDD extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]] {

50

/**

51

* Persistence not supported for KafkaSourceRDD

52

* @param newLevel Storage level (ignored)

53

* @return this RDD (logs error)

54

*/

55

def persist(newLevel: StorageLevel): this.type

56

57

/**

58

* Gets RDD partitions based on offset ranges

59

* @return Array of Partition objects

60

*/

61

def getPartitions: Array[Partition]

62

63

/**

64

* Gets preferred executor locations for data locality

65

* @param split Partition to get locations for

66

* @return Sequence of preferred executor hostnames

67

*/

68

def getPreferredLocations(split: Partition): Seq[String]

69

70

/**

71

* Computes partition data by reading from Kafka

72

* @param thePart Partition to compute

73

* @param context Task context for the computation

74

* @return Iterator of Kafka ConsumerRecord objects

75

*/

76

def compute(

77

thePart: Partition,

78

context: TaskContext

79

): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]

80

}

81

```

82

83

### RDD Partition Types

84

85

Partition implementations for Kafka RDD processing.

86

87

```scala { .api }

88

/**

89

* Offset range for one RDD partition

90

* @param topicPartition Kafka topic partition

91

* @param fromOffset Starting offset (inclusive)

92

* @param untilOffset Ending offset (exclusive)

93

* @param preferredLoc Preferred executor location

94

*/

95

case class KafkaSourceRDDOffsetRange(

96

topicPartition: TopicPartition,

97

fromOffset: Long,

98

untilOffset: Long,

99

preferredLoc: Option[String]

100

) {

101

/** Gets topic name */

102

def topic: String = topicPartition.topic()

103

104

/** Gets partition number */

105

def partition: Int = topicPartition.partition()

106

107

/** Gets size of offset range */

108

def size: Long = untilOffset - fromOffset

109

}

110

111

/**

112

* RDD partition containing offset range

113

* @param index Partition index

114

* @param offsetRange Offset range for this partition

115

*/

116

case class KafkaSourceRDDPartition(

117

index: Int,

118

offsetRange: KafkaSourceRDDOffsetRange

119

) extends Partition

120

```

121

122

## Usage Examples

123

124

### Basic Batch Reading

125

126

```scala

127

val batchDF = spark

128

.read

129

.format("kafka")

130

.option("kafka.bootstrap.servers", "localhost:9092")

131

.option("subscribe", "transactions")

132

.option("startingOffsets", "earliest")

133

.option("endingOffsets", "latest")

134

.load()

135

136

batchDF.show()

137

```

138

139

### Reading Specific Offset Range

140

141

```scala

142

val historicalDF = spark

143

.read

144

.format("kafka")

145

.option("kafka.bootstrap.servers", "localhost:9092")

146

.option("subscribe", "events")

147

.option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")

148

.option("endingOffsets", """{"events":{"0":5000,"1":6000}}""")

149

.load()

150

151

// Process historical data

152

val processedDF = historicalDF

153

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")

154

.filter($"timestamp" > "2023-01-01")

155

```

156

157

### Multiple Topics Batch Processing

158

159

```scala

160

val multiTopicDF = spark

161

.read

162

.format("kafka")

163

.option("kafka.bootstrap.servers", "localhost:9092")

164

.option("subscribe", "topic1,topic2,topic3")

165

.option("startingOffsets", "earliest")

166

.option("endingOffsets", "latest")

167

.load()

168

169

// Aggregate by topic

170

val topicStats = multiTopicDF

171

.groupBy("topic")

172

.agg(

173

count("*").as("record_count"),

174

min("timestamp").as("earliest_timestamp"),

175

max("timestamp").as("latest_timestamp")

176

)

177

178

topicStats.show()

179

```

180

181

### Pattern-based Topic Reading

182

183

```scala

184

val patternDF = spark

185

.read

186

.format("kafka")

187

.option("kafka.bootstrap.servers", "localhost:9092")

188

.option("subscribePattern", "logs_.*")

189

.option("startingOffsets", "earliest")

190

.option("endingOffsets", "latest")

191

.load()

192

```

193

194

### Partition Assignment Reading

195

196

```scala

197

val assignedDF = spark

198

.read

199

.format("kafka")

200

.option("kafka.bootstrap.servers", "localhost:9092")

201

.option("assign", """{"important_topic":[0,1,2],"critical_topic":[0]}""")

202

.option("startingOffsets", "earliest")

203

.option("endingOffsets", "latest")

204

.load()

205

```

206

207

## Performance Optimization

208

209

### Partition Parallelism

210

211

```scala

212

val optimizedDF = spark

213

.read

214

.format("kafka")

215

.option("kafka.bootstrap.servers", "localhost:9092")

216

.option("subscribe", "large-topic")

217

.option("startingOffsets", "earliest")

218

.option("endingOffsets", "latest")

219

.option("minPartitions", "20") // Increase parallelism

220

.load()

221

```

222

223

### Consumer Configuration

224

225

```scala

226

val tunedDF = spark

227

.read

228

.format("kafka")

229

.option("kafka.bootstrap.servers", "localhost:9092")

230

.option("subscribe", "high-throughput-topic")

231

.option("startingOffsets", "earliest")

232

.option("endingOffsets", "latest")

233

.option("kafka.fetch.max.bytes", "52428800") // 50MB

234

.option("kafka.max.poll.records", "1000") // Records per poll

235

.option("kafka.fetch.min.bytes", "1024") // Min fetch size

236

.option("kafka.fetch.max.wait.ms", "500") // Max wait

237

.load()

238

```

239

240

### Memory Management

241

242

```scala

243

val memoryOptimizedDF = spark

244

.read

245

.format("kafka")

246

.option("kafka.bootstrap.servers", "localhost:9092")

247

.option("subscribe", "memory-intensive-topic")

248

.option("startingOffsets", "earliest")

249

.option("endingOffsets", "latest")

250

.option("kafka.receive.buffer.bytes", "262144") // 256KB

251

.option("kafka.send.buffer.bytes", "131072") // 128KB

252

.load()

253

```

254

255

## Data Processing Patterns

256

257

### Time-based Processing

258

259

```scala

260

import org.apache.spark.sql.functions._

261

262

val timeBasedDF = batchDF

263

.withColumn("event_time", from_unixtime($"timestamp" / 1000))

264

.withColumn("date", to_date($"event_time"))

265

.filter($"date" >= "2023-01-01" && $"date" <= "2023-12-31")

266

267

// Group by date and count records

268

val dailyStats = timeBasedDF

269

.groupBy("date", "topic")

270

.agg(count("*").as("daily_count"))

271

.orderBy("date", "topic")

272

```

273

274

### Message Content Processing

275

276

```scala

277

import org.apache.spark.sql.functions._

278

279

val contentDF = batchDF

280

.selectExpr("CAST(key AS STRING) as message_key", "CAST(value AS STRING) as message_value")

281

.withColumn("json_data", from_json($"message_value", messageSchema))

282

.select("message_key", "json_data.*", "topic", "partition", "offset")

283

```

284

285

### Deduplication

286

287

```scala

288

val deduplicatedDF = batchDF

289

.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "*")

290

.dropDuplicates("key") // Deduplicate by key

291

.orderBy("timestamp") // Maintain order

292

```

293

294

## Schema and Data Types

295

296

### Kafka Record Schema

297

298

```scala

299

import org.apache.spark.sql.types._

300

301

val kafkaSchema = StructType(Seq(

302

StructField("key", BinaryType, nullable = true),

303

StructField("value", BinaryType, nullable = true),

304

StructField("topic", StringType, nullable = false),

305

StructField("partition", IntegerType, nullable = false),

306

StructField("offset", LongType, nullable = false),

307

StructField("timestamp", TimestampType, nullable = false),

308

StructField("timestampType", IntegerType, nullable = false)

309

))

310

```

311

312

### Type Conversions

313

314

```scala

315

val convertedDF = batchDF

316

.selectExpr(

317

"CAST(key AS STRING) as key_str",

318

"CAST(value AS STRING) as value_str",

319

"topic",

320

"partition",

321

"offset",

322

"timestamp",

323

"CASE WHEN timestampType = 0 THEN 'CreateTime' ELSE 'LogAppendTime' END as timestamp_type"

324

)

325

```

326

327

## Error Handling

328

329

### Offset Validation

330

331

```scala

332

try {

333

val df = spark

334

.read

335

.format("kafka")

336

.option("kafka.bootstrap.servers", "localhost:9092")

337

.option("subscribe", "topic")

338

.option("startingOffsets", """{"topic":{"0":1000}}""")

339

.option("endingOffsets", """{"topic":{"0":5000}}""")

340

.load()

341

342

df.count()

343

} catch {

344

case e: IllegalArgumentException =>

345

println(s"Invalid offset configuration: ${e.getMessage}")

346

case e: Exception =>

347

println(s"Error reading from Kafka: ${e.getMessage}")

348

}

349

```

350

351

### Topic Existence Check

352

353

```scala

354

import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}

355

import scala.collection.JavaConverters._

356

357

def checkTopicExists(brokers: String, topic: String): Boolean = {

358

val props = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers).asJava

359

val adminClient = AdminClient.create(props)

360

361

try {

362

val topics = adminClient.listTopics().names().get()

363

topics.contains(topic)

364

} finally {

365

adminClient.close()

366

}

367

}

368

369

if (checkTopicExists("localhost:9092", "my-topic")) {

370

val df = spark.read.format("kafka")

371

.option("kafka.bootstrap.servers", "localhost:9092")

372

.option("subscribe", "my-topic")

373

.load()

374

}

375

```

376

377

## Integration with Spark SQL

378

379

### Creating Temporary Views

380

381

```scala

382

batchDF.createOrReplaceTempView("kafka_messages")

383

384

val sqlResult = spark.sql("""

385

SELECT

386

topic,

387

partition,

388

COUNT(*) as message_count,

389

MIN(offset) as min_offset,

390

MAX(offset) as max_offset,

391

MIN(timestamp) as earliest_time,

392

MAX(timestamp) as latest_time

393

FROM kafka_messages

394

GROUP BY topic, partition

395

ORDER BY topic, partition

396

""")

397

398

sqlResult.show()

399

```

400

401

### Complex Analytics

402

403

```scala

404

spark.sql("""

405

SELECT

406

topic,

407

DATE(timestamp) as date,

408

HOUR(timestamp) as hour,

409

COUNT(*) as hourly_count,

410

AVG(LENGTH(CAST(value AS STRING))) as avg_message_size

411

FROM kafka_messages

412

WHERE timestamp >= '2023-01-01'

413

GROUP BY topic, DATE(timestamp), HOUR(timestamp)

414

ORDER BY date, hour

415

""").show()

416

```

417

418

## Best Practices

419

420

### Offset Range Planning

421

422

1. **Use earliest/latest for full scans**:

423

```scala

424

.option("startingOffsets", "earliest")

425

.option("endingOffsets", "latest")

426

```

427

428

2. **Use specific offsets for incremental processing**:

429

```scala

430

.option("startingOffsets", s"""{"$topic":{"0":$lastProcessedOffset}}""")

431

```

432

433

3. **Monitor partition lag**:

434

```scala

435

val offsetInfo = batchDF

436

.groupBy("topic", "partition")

437

.agg(min("offset").as("min_offset"), max("offset").as("max_offset"))

438

```

439

440

### Performance Guidelines

441

442

1. **Set appropriate minPartitions for large datasets**

443

2. **Use partition assignment for known partition layouts**

444

3. **Configure Kafka consumer buffers based on message size**

445

4. **Cache DataFrames for multiple operations**

446

5. **Use columnar formats for intermediate results**

447

448

### Resource Management

449

450

```scala

451

// Configure driver and executor memory appropriately

452

spark.conf.set("spark.driver.memory", "4g")

453

spark.conf.set("spark.executor.memory", "8g")

454

spark.conf.set("spark.executor.cores", "4")

455

456

// Optimize shuffle partitions for Kafka data

457

spark.conf.set("spark.sql.shuffle.partitions", "200")

458

```