or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md

streaming-sources.mddocs/

0

# Streaming Sources

1

2

Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance, exactly-once semantics, and efficient partition-based data consumption from Kafka.

3

4

## Capabilities

5

6

### Micro-batch Reader

7

8

Micro-batch reader for structured streaming that processes data in discrete batches with configurable trigger intervals.

9

10

```scala { .api }

11

/**

12

* Micro-batch reader for Kafka data in structured streaming

13

*/

14

class KafkaMicroBatchReader extends MicroBatchReader with Logging {

15

/**

16

* Sets offset range for the current micro-batch

17

* @param start Starting offset (None for initial batch)

18

* @param end Ending offset for this batch

19

*/

20

def setOffsetRange(start: Option[Offset], end: Offset): Unit

21

22

/**

23

* Plans input partitions for the current batch

24

* @return List of InputPartition objects for parallel processing

25

*/

26

def planInputPartitions(): ju.List[InputPartition[InternalRow]]

27

28

/**

29

* Gets the start offset for the current batch

30

* @return Starting offset or null if none

31

*/

32

def getStartOffset: Offset

33

34

/**

35

* Gets the end offset for the current batch

36

* @return Ending offset for this batch

37

*/

38

def getEndOffset: Offset

39

40

/**

41

* Deserializes offset from JSON string

42

* @param json JSON representation of offset

43

* @return Deserialized Offset object

44

*/

45

def deserializeOffset(json: String): Offset

46

47

/**

48

* Returns schema for Kafka records

49

* @return StructType defining record schema

50

*/

51

def readSchema(): StructType

52

53

/**

54

* Commits the processed offset

55

* @param end Offset to commit

56

*/

57

def commit(end: Offset): Unit

58

59

/**

60

* Stops the reader and releases resources

61

*/

62

def stop(): Unit

63

}

64

```

65

66

### Micro-batch Input Partition

67

68

Input partition for micro-batch processing that handles a specific offset range.

69

70

```scala { .api }

71

/**

72

* Input partition for micro-batch processing

73

* @param offsetRange Range of offsets to process

74

* @param executorKafkaParams Kafka parameters for executors

75

* @param pollTimeoutMs Timeout for Kafka consumer polls

76

* @param failOnDataLoss Whether to fail on data loss

77

* @param reuseKafkaConsumer Whether to reuse consumer instances

78

*/

79

case class KafkaMicroBatchInputPartition(

80

offsetRange: KafkaOffsetRange,

81

executorKafkaParams: ju.Map[String, Object],

82

pollTimeoutMs: Long,

83

failOnDataLoss: Boolean,

84

reuseKafkaConsumer: Boolean

85

) extends InputPartition[InternalRow] {

86

/**

87

* Gets preferred executor locations for data locality

88

* @return Array of preferred executor hostnames

89

*/

90

def preferredLocations(): Array[String]

91

92

/**

93

* Creates partition reader for this input partition

94

* @return InputPartitionReader for processing records

95

*/

96

def createPartitionReader(): InputPartitionReader[InternalRow]

97

}

98

```

99

100

### Micro-batch Partition Reader

101

102

Partition reader for micro-batch processing that reads records from a specific offset range.

103

104

```scala { .api }

105

/**

106

* Partition reader for micro-batch processing

107

* @param offsetRange Range of offsets to read

108

* @param executorKafkaParams Kafka parameters for this executor

109

* @param pollTimeoutMs Consumer poll timeout

110

* @param failOnDataLoss Whether to fail on data loss

111

* @param reuseKafkaConsumer Whether to reuse consumer

112

*/

113

case class KafkaMicroBatchInputPartitionReader(

114

offsetRange: KafkaOffsetRange,

115

executorKafkaParams: ju.Map[String, Object],

116

pollTimeoutMs: Long,

117

failOnDataLoss: Boolean,

118

reuseKafkaConsumer: Boolean

119

) extends InputPartitionReader[InternalRow] with Logging {

120

/**

121

* Advances to the next record

122

* @return true if next record is available

123

*/

124

def next(): Boolean

125

126

/**

127

* Gets the current record as UnsafeRow

128

* @return Current record as UnsafeRow

129

*/

130

def get(): UnsafeRow

131

132

/**

133

* Closes the reader and releases resources

134

*/

135

def close(): Unit

136

}

137

```

138

139

### Continuous Reader

140

141

Continuous reader for low-latency streaming with sub-second processing capabilities.

142

143

```scala { .api }

144

/**

145

* Continuous reader for Kafka data in structured streaming

146

*/

147

class KafkaContinuousReader extends ContinuousReader with Logging {

148

/**

149

* Returns schema for Kafka records

150

* @return StructType defining record schema

151

*/

152

def readSchema: StructType

153

154

/**

155

* Sets starting offset for continuous processing

156

* @param start Starting offset (None to start from beginning)

157

*/

158

def setStartOffset(start: Option[Offset]): Unit

159

160

/**

161

* Gets the starting offset

162

* @return Starting offset or null if none

163

*/

164

def getStartOffset(): Offset

165

166

/**

167

* Deserializes offset from JSON string

168

* @param json JSON representation of offset

169

* @return Deserialized Offset object

170

*/

171

def deserializeOffset(json: String): Offset

172

173

/**

174

* Plans input partitions for continuous processing

175

* @return List of InputPartition objects

176

*/

177

def planInputPartitions(): ju.List[InputPartition[InternalRow]]

178

179

/**

180

* Stops the reader and releases resources

181

*/

182

def stop(): Unit

183

184

/**

185

* Commits processed offsets

186

* @param end Offset to commit

187

*/

188

def commit(end: Offset): Unit

189

190

/**

191

* Merges partition offsets into a single offset

192

* @param offsets Array of partition offsets

193

* @return Merged offset

194

*/

195

def mergeOffsets(offsets: Array[PartitionOffset]): Offset

196

197

/**

198

* Checks if reader reconfiguration is needed

199

* @return true if reconfiguration is required

200

*/

201

def needsReconfiguration(): Boolean

202

}

203

```

204

205

### Continuous Input Partition

206

207

Input partition for continuous processing that handles a specific topic partition.

208

209

```scala { .api }

210

/**

211

* Input partition for continuous processing

212

* @param topicPartition Kafka topic partition

213

* @param startOffset Starting offset for processing

214

* @param kafkaParams Kafka consumer parameters

215

* @param pollTimeoutMs Consumer poll timeout

216

* @param failOnDataLoss Whether to fail on data loss

217

*/

218

case class KafkaContinuousInputPartition(

219

topicPartition: TopicPartition,

220

startOffset: Long,

221

kafkaParams: ju.Map[String, Object],

222

pollTimeoutMs: Long,

223

failOnDataLoss: Boolean

224

) extends ContinuousInputPartition[InternalRow] {

225

/**

226

* Creates continuous reader for this partition

227

* @param offset Starting partition offset

228

* @return InputPartitionReader for continuous processing

229

*/

230

def createContinuousReader(offset: PartitionOffset): InputPartitionReader[InternalRow]

231

232

/**

233

* Creates partition reader for this partition

234

* @return KafkaContinuousInputPartitionReader instance

235

*/

236

def createPartitionReader(): KafkaContinuousInputPartitionReader

237

}

238

```

239

240

### Continuous Partition Reader

241

242

Partition reader for continuous processing that provides low-latency record consumption.

243

244

```scala { .api }

245

/**

246

* Partition reader for continuous processing

247

* @param topicPartition Kafka topic partition to read from

248

* @param startOffset Starting offset for reading

249

* @param kafkaParams Kafka consumer parameters

250

* @param pollTimeoutMs Consumer poll timeout

251

* @param failOnDataLoss Whether to fail on data loss

252

*/

253

class KafkaContinuousInputPartitionReader(

254

topicPartition: TopicPartition,

255

startOffset: Long,

256

kafkaParams: ju.Map[String, Object],

257

pollTimeoutMs: Long,

258

failOnDataLoss: Boolean

259

) extends ContinuousInputPartitionReader[InternalRow] {

260

/**

261

* Advances to the next record

262

* @return true if next record is available

263

*/

264

def next(): Boolean

265

266

/**

267

* Gets the current record as UnsafeRow

268

* @return Current record as UnsafeRow

269

*/

270

def get(): UnsafeRow

271

272

/**

273

* Gets the current partition offset

274

* @return Current offset for this partition

275

*/

276

def getOffset(): KafkaSourcePartitionOffset

277

278

/**

279

* Closes the reader and releases resources

280

*/

281

def close(): Unit

282

}

283

```

284

285

### Legacy Streaming Source

286

287

Legacy streaming source for backward compatibility with DataSource V1 API.

288

289

```scala { .api }

290

/**

291

* Legacy streaming source for reading from Kafka (DataSource V1)

292

*/

293

class KafkaSource extends Source with Logging {

294

/**

295

* Returns the schema of Kafka records

296

* @return StructType defining record schema

297

*/

298

def schema: StructType

299

300

/**

301

* Gets the maximum available offset

302

* @return Optional offset representing latest available data

303

*/

304

def getOffset: Option[Offset]

305

306

/**

307

* Gets batch data between specified offsets

308

* @param start Starting offset (None for initial batch)

309

* @param end Ending offset for this batch

310

* @return DataFrame containing batch data

311

*/

312

def getBatch(start: Option[Offset], end: Offset): DataFrame

313

314

/**

315

* Stops the source and releases resources

316

*/

317

def stop(): Unit

318

}

319

320

/**

321

* Companion object for KafkaSource

322

*/

323

object KafkaSource {

324

/**

325

* Gets sorted list of executor IDs for locality optimization

326

* @param sc SparkContext for accessing executor information

327

* @return Array of executor IDs sorted for consistent assignment

328

*/

329

def getSortedExecutorList(sc: SparkContext): Array[String]

330

}

331

```

332

333

## Usage Examples

334

335

### Micro-batch Streaming

336

337

```scala

338

import org.apache.spark.sql.streaming.Trigger

339

import java.util.concurrent.TimeUnit

340

341

val microBatchDF = spark

342

.readStream

343

.format("kafka")

344

.option("kafka.bootstrap.servers", "localhost:9092")

345

.option("subscribe", "events")

346

.option("startingOffsets", "latest")

347

.option("maxOffsetsPerTrigger", "10000")

348

.load()

349

350

val query = microBatchDF

351

.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

352

.writeStream

353

.outputMode("append")

354

.format("console")

355

.trigger(Trigger.ProcessingTime(30, TimeUnit.SECONDS))

356

.option("checkpointLocation", "/tmp/checkpoint")

357

.start()

358

359

query.awaitTermination()

360

```

361

362

### Continuous Streaming

363

364

```scala

365

val continuousDF = spark

366

.readStream

367

.format("kafka")

368

.option("kafka.bootstrap.servers", "localhost:9092")

369

.option("subscribe", "low-latency-events")

370

.option("startingOffsets", "latest")

371

.load()

372

373

val continuousQuery = continuousDF

374

.selectExpr("CAST(value AS STRING) as json")

375

.writeStream

376

.outputMode("append")

377

.format("console")

378

.trigger(Trigger.Continuous("1 second"))

379

.option("checkpointLocation", "/tmp/continuous-checkpoint")

380

.start()

381

382

continuousQuery.awaitTermination()

383

```

384

385

### Legacy Source (DataSource V1)

386

387

```scala

388

val legacyDF = spark

389

.readStream

390

.format("kafka")

391

.option("kafka.bootstrap.servers", "localhost:9092")

392

.options(Map(

393

"subscribe" -> "legacy-topic",

394

"startingOffsets" -> "earliest"

395

))

396

.load()

397

```

398

399

## Performance Configuration

400

401

### Consumer Tuning

402

403

```scala

404

.option("kafka.fetch.min.bytes", "1024") // Minimum fetch size

405

.option("kafka.fetch.max.wait.ms", "500") // Max wait for min bytes

406

.option("kafka.max.poll.records", "500") // Records per poll

407

.option("kafka.session.timeout.ms", "30000") // Session timeout

408

.option("kafka.heartbeat.interval.ms", "3000") // Heartbeat interval

409

```

410

411

### Partition Management

412

413

```scala

414

.option("minPartitions", "10") // Minimum Spark partitions

415

.option("maxOffsetsPerTrigger", "1000000") // Rate limiting

416

```

417

418

### Memory and Buffering

419

420

```scala

421

.option("kafka.receive.buffer.bytes", "65536") // Receive buffer

422

.option("kafka.send.buffer.bytes", "131072") // Send buffer

423

.option("kafka.fetch.max.bytes", "52428800") // Max fetch size

424

```

425

426

## Error Handling and Recovery

427

428

### Data Loss Scenarios

429

430

The streaming sources handle various data loss scenarios:

431

432

```scala

433

// Strict error handling

434

.option("failOnDataLoss", "true")

435

436

// Lenient error handling with warnings

437

.option("failOnDataLoss", "false")

438

```

439

440

### Timeout Configuration

441

442

```scala

443

// Consumer poll timeout

444

.option("kafkaConsumer.pollTimeoutMs", "10000")

445

446

// Connection timeout

447

.option("kafka.request.timeout.ms", "30000")

448

```

449

450

### Fault Tolerance

451

452

```scala

453

// Checkpoint location for exactly-once processing

454

.option("checkpointLocation", "/path/to/checkpoint")

455

456

// Automatic retry configuration

457

.option("kafka.retry.backoff.ms", "100")

458

.option("kafka.reconnect.backoff.ms", "50")

459

```

460

461

## Monitoring and Metrics

462

463

### Progress Reporting

464

465

```scala

466

val query = df.writeStream

467

.format("console")

468

.start()

469

470

// Monitor progress

471

val progress = query.lastProgress

472

println(s"Input rows per second: ${progress.inputRowsPerSecond}")

473

println(s"Processing time: ${progress.durationMs}")

474

```

475

476

### Offset Tracking

477

478

```scala

479

// Access current offsets

480

val currentOffsets = query.lastProgress.sources(0).endOffset

481

println(s"Current offsets: $currentOffsets")

482

```

483

484

## Processing Modes Comparison

485

486

| Feature | Micro-batch | Continuous | Legacy |

487

|---------|-------------|------------|--------|

488

| Latency | ~100ms+ | ~1ms | ~100ms+ |

489

| Throughput | High | Medium | High |

490

| Fault Tolerance | Full | Full | Full |

491

| Exactly-once | Yes | Yes | Yes |

492

| State Management | Full | Limited | Full |

493

| Aggregations | All | Simple | All |

494

| API Version | DataSource V2 | DataSource V2 | DataSource V1 |