or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md

streaming-sources.mddocs/

0

# Streaming Sources

1

2

The Kafka connector provides comprehensive streaming capabilities with both micro-batch and continuous processing modes, supporting various trigger types and providing detailed metrics.

3

4

## Capabilities

5

6

### KafkaMicroBatchStream

7

8

V2 DataSource micro-batch streaming implementation for Kafka with comprehensive trigger support.

9

10

```scala { .api }

11

/**

12

* V2 DataSource micro-batch streaming implementation for Kafka

13

* Provides reliable streaming with exactly-once semantics

14

*/

15

class KafkaMicroBatchStream extends MicroBatchStream

16

with SupportsTriggerAvailableNow

17

with ReportsSourceMetrics {

18

19

/** Returns the initial offset for starting the stream */

20

def initialOffset(): Offset

21

22

/** Returns the latest available offset */

23

def latestOffset(): Offset

24

25

/** Returns latest offset with read limit applied */

26

def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset

27

28

/** Plans input partitions for the given offset range */

29

def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]

30

31

/** Creates reader factory for reading partitions */

32

def createReaderFactory(): PartitionReaderFactory

33

34

/** Deserializes offset from checkpoint */

35

def deserializeOffset(json: String): Offset

36

37

/** Commits processed offset */

38

def commit(end: Offset): Unit

39

40

/** Stops the stream and releases resources */

41

def stop(): Unit

42

43

/** Returns stream metrics */

44

def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String]

45

46

/** Prepares stream for Trigger.AvailableNow mode */

47

def prepareForTriggerAvailableNow(): Unit

48

}

49

```

50

51

**Usage Examples:**

52

53

```scala

54

// Basic micro-batch streaming

55

val microBatchStream = spark

56

.readStream

57

.format("kafka")

58

.option("kafka.bootstrap.servers", "localhost:9092")

59

.option("subscribe", "events")

60

.load()

61

62

val query = microBatchStream

63

.writeStream

64

.outputMode("append")

65

.format("console")

66

.trigger(Trigger.ProcessingTime("30 seconds")) // Micro-batch every 30 seconds

67

.start()

68

69

// Trigger.AvailableNow for processing all available data

70

val availableNowQuery = microBatchStream

71

.writeStream

72

.outputMode("append")

73

.format("parquet")

74

.option("path", "/output/path")

75

.trigger(Trigger.AvailableNow()) // Process all available data then stop

76

.start()

77

```

78

79

### KafkaContinuousStream

80

81

V2 DataSource continuous streaming implementation for low-latency processing.

82

83

```scala { .api }

84

/**

85

* V2 DataSource continuous streaming implementation

86

* Provides low-latency processing with at-least-once semantics

87

*/

88

class KafkaContinuousStream extends ContinuousStream {

89

90

/** Merges partition offsets into a single offset */

91

def mergeOffsets(offsets: Array[PartitionOffset]): Offset

92

93

/** Returns initial offset for continuous processing */

94

def initialOffset(): Offset

95

96

/** Deserializes offset from checkpoint */

97

def deserializeOffset(json: String): Offset

98

99

/** Plans continuous reader tasks */

100

def planInputPartitions(start: Offset): Array[InputPartition]

101

102

/** Creates continuous reader factory */

103

def createContinuousReaderFactory(): ContinuousPartitionReaderFactory

104

105

/** Commits processed offset */

106

def commit(end: Offset): Unit

107

108

/** Stops continuous processing */

109

def stop(): Unit

110

}

111

```

112

113

**Usage Examples:**

114

115

```scala

116

// Continuous streaming for low-latency processing

117

val continuousStream = spark

118

.readStream

119

.format("kafka")

120

.option("kafka.bootstrap.servers", "localhost:9092")

121

.option("subscribe", "real-time-events")

122

.load()

123

124

val continuousQuery = continuousStream

125

.writeStream

126

.outputMode("append")

127

.format("kafka")

128

.option("kafka.bootstrap.servers", "localhost:9092")

129

.option("topic", "processed-events")

130

.trigger(Trigger.Continuous("1 second")) // Continuous with 1-second checkpoint

131

.start()

132

```

133

134

### KafkaSource (Legacy)

135

136

Legacy V1 DataSource streaming implementation, maintained for backward compatibility.

137

138

```scala { .api }

139

/**

140

* Legacy streaming source for reading from Kafka (DataSource V1)

141

* Maintained for backward compatibility

142

*/

143

class KafkaSource extends Source with SupportsTriggerAvailableNow {

144

145

/** Returns the schema of the source */

146

def schema: StructType

147

148

/** Gets the current offset */

149

def getOffset: Option[Offset]

150

151

/** Gets batch DataFrame for the given offset range */

152

def getBatch(start: Option[Offset], end: Offset): DataFrame

153

154

/** Stops the source */

155

def stop(): Unit

156

157

/** Returns default read limit */

158

def getDefaultReadLimit: ReadLimit

159

160

/** Gets latest offset with read limit */

161

def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset

162

163

/** Reports latest offset for monitoring */

164

def reportLatestOffset(): streaming.Offset

165

166

/** Prepares for Trigger.AvailableNow */

167

def prepareForTriggerAvailableNow(): Unit

168

}

169

```

170

171

## Trigger Support

172

173

### Processing Time Triggers

174

175

Regular micro-batch processing at fixed intervals:

176

177

```scala

178

// Process every 30 seconds

179

.trigger(Trigger.ProcessingTime("30 seconds"))

180

181

// Process every 5 minutes

182

.trigger(Trigger.ProcessingTime("5 minutes"))

183

184

// Process as fast as possible

185

.trigger(Trigger.ProcessingTime("0 seconds"))

186

```

187

188

### Available Now Trigger

189

190

Process all currently available data then stop:

191

192

```scala

193

// Process all available data once

194

val query = kafkaStream

195

.writeStream

196

.outputMode("append")

197

.format("delta")

198

.option("path", "/delta/table/path")

199

.trigger(Trigger.AvailableNow())

200

.start()

201

202

// Wait for completion

203

query.awaitTermination()

204

```

205

206

### Continuous Triggers

207

208

Low-latency continuous processing:

209

210

```scala

211

// Continuous processing with 1-second checkpoints

212

.trigger(Trigger.Continuous("1 second"))

213

214

// Continuous processing with 10-second checkpoints

215

.trigger(Trigger.Continuous("10 seconds"))

216

```

217

218

### Once Trigger

219

220

Process one micro-batch then stop:

221

222

```scala

223

// Process exactly one batch

224

.trigger(Trigger.Once())

225

```

226

227

## Rate Limiting

228

229

Control the amount of data processed per trigger:

230

231

### Max Offsets Per Trigger

232

233

```scala

234

// Limit to 1000 offsets per trigger across all partitions

235

val rateLimitedStream = spark

236

.readStream

237

.format("kafka")

238

.option("kafka.bootstrap.servers", "localhost:9092")

239

.option("subscribe", "high-volume-topic")

240

.option("maxOffsetsPerTrigger", "1000")

241

.load()

242

```

243

244

### Min Offsets Per Trigger

245

246

```scala

247

// Ensure at least 100 offsets are processed per trigger

248

val minRateStream = spark

249

.readStream

250

.format("kafka")

251

.option("kafka.bootstrap.servers", "localhost:9092")

252

.option("subscribe", "low-volume-topic")

253

.option("minOffsetsPerTrigger", "100")

254

.load()

255

```

256

257

### Max Trigger Delay

258

259

```scala

260

// Maximum delay between triggers when minOffsetsPerTrigger is not met

261

val delayedStream = spark

262

.readStream

263

.format("kafka")

264

.option("kafka.bootstrap.servers", "localhost:9092")

265

.option("subscribe", "variable-volume-topic")

266

.option("minOffsetsPerTrigger", "500")

267

.option("maxTriggerDelay", "60s") // Wait max 60 seconds

268

.load()

269

```

270

271

## Metrics and Monitoring

272

273

### Built-in Metrics

274

275

The streaming sources provide comprehensive metrics:

276

277

```scala { .api }

278

// Available metrics from ReportsSourceMetrics

279

def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {

280

// Returns metrics including:

281

// - offsetOutOfRange: Number of out-of-range offsets

282

// - dataLoss: Number of data loss events

283

// - numRecords: Number of records processed

284

// - estimatedOffsetsToProcess: Estimated remaining offsets

285

}

286

```

287

288

**Usage Examples:**

289

290

```scala

291

// Access metrics through streaming query

292

val query = kafkaStream

293

.writeStream

294

.outputMode("append")

295

.format("console")

296

.start()

297

298

// Get metrics

299

val progress = query.lastProgress

300

val inputMetrics = progress.sources(0) // First source metrics

301

println(s"Records processed: ${inputMetrics.numInputRows}")

302

println(s"Processing rate: ${inputMetrics.inputRowsPerSecond}")

303

```

304

305

### Custom Metrics

306

307

The connector exposes custom metrics for monitoring:

308

309

```scala { .api }

310

// OffsetOutOfRangeMetric

311

class OffsetOutOfRangeMetric extends CustomSumMetric {

312

def name(): String = "offsetOutOfRange"

313

def description(): String = "estimated number of fetched offsets out of range"

314

}

315

316

// DataLossMetric

317

class DataLossMetric extends CustomSumMetric {

318

def name(): String = "dataLoss"

319

def description(): String = "number of data loss error"

320

}

321

```

322

323

**Accessing Custom Metrics:**

324

325

```scala

326

// Custom metrics are included in source metrics

327

val customMetrics = query.lastProgress.sources(0).metrics

328

val offsetOutOfRange = customMetrics.getOrElse("offsetOutOfRange", "0")

329

val dataLoss = customMetrics.getOrElse("dataLoss", "0")

330

```

331

332

## Fault Tolerance

333

334

### Checkpointing

335

336

Streaming queries automatically maintain checkpoints for fault tolerance:

337

338

```scala

339

val faultTolerantQuery = kafkaStream

340

.writeStream

341

.outputMode("append")

342

.format("delta")

343

.option("path", "/output/path")

344

.option("checkpointLocation", "/checkpoint/path") // Essential for production

345

.start()

346

```

347

348

### Data Loss Handling

349

350

Configure how to handle data loss scenarios:

351

352

```scala

353

// Fail query on data loss (default, recommended for critical data)

354

val strictStream = spark

355

.readStream

356

.format("kafka")

357

.option("kafka.bootstrap.servers", "localhost:9092")

358

.option("subscribe", "critical-events")

359

.option("failOnDataLoss", "true") // Default

360

.load()

361

362

// Continue processing despite data loss (for non-critical data)

363

val lenientStream = spark

364

.readStream

365

.format("kafka")

366

.option("kafka.bootstrap.servers", "localhost:9092")

367

.option("subscribe", "logs")

368

.option("failOnDataLoss", "false") // Continue on data loss

369

.load()

370

```

371

372

## Partition Management

373

374

### Min Partitions

375

376

Control the minimum number of Spark partitions:

377

378

```scala

379

// Ensure at least 10 Spark partitions for parallelism

380

val partitionedStream = spark

381

.readStream

382

.format("kafka")

383

.option("kafka.bootstrap.servers", "localhost:9092")

384

.option("subscribe", "large-topic")

385

.option("minPartitions", "10")

386

.load()

387

```

388

389

### Consumer Pool Configuration

390

391

Configure consumer pool behavior:

392

393

```scala

394

// Configure consumer cache settings

395

val configuredStream = spark

396

.readStream

397

.format("kafka")

398

.option("kafka.bootstrap.servers", "localhost:9092")

399

.option("subscribe", "events")

400

.option("kafkaConsumer.pollTimeoutMs", "5000") // Poll timeout

401

.option("fetchOffset.numRetries", "5") // Retry attempts

402

.option("fetchOffset.retryIntervalMs", "1000") // Retry interval

403

.load()

404

```

405

406

## Performance Optimization

407

408

### Batch Size Optimization

409

410

```scala

411

// Balance latency vs throughput

412

val optimizedStream = spark

413

.readStream

414

.format("kafka")

415

.option("kafka.bootstrap.servers", "localhost:9092")

416

.option("subscribe", "events")

417

.option("maxOffsetsPerTrigger", "50000") // Larger batches for higher throughput

418

.load()

419

.repartition(20) // Increase parallelism for processing

420

```

421

422

### Consumer Configuration

423

424

```scala

425

// Optimize Kafka consumer settings

426

val tuedStream = spark

427

.readStream

428

.format("kafka")

429

.option("kafka.bootstrap.servers", "localhost:9092")

430

.option("subscribe", "events")

431

.option("kafka.fetch.min.bytes", "1048576") // 1MB min fetch

432

.option("kafka.fetch.max.wait.ms", "500") // 500ms max wait

433

.option("kafka.max.poll.records", "10000") // More records per poll

434

.load()

435

```

436

437

## Error Handling

438

439

### Query Restart Behavior

440

441

```scala

442

// Automatic restart on failure

443

val resilientQuery = kafkaStream

444

.writeStream

445

.outputMode("append")

446

.format("console")

447

.option("checkpointLocation", "/checkpoint/path")

448

.trigger(Trigger.ProcessingTime("30 seconds"))

449

.start()

450

451

// Monitor for exceptions

452

query.exception.foreach { ex =>

453

println(s"Query failed with exception: ${ex.getMessage}")

454

// Implement custom restart logic

455

}

456

```

457

458

### Graceful Shutdown

459

460

```scala

461

// Graceful shutdown handling

462

sys.addShutdownHook {

463

println("Shutting down streaming query...")

464

query.stop()

465

query.awaitTermination(30000) // Wait up to 30 seconds

466

}

467

```