0
# Streaming Sources
1
2
The Kafka connector provides comprehensive streaming capabilities with both micro-batch and continuous processing modes, supporting various trigger types and providing detailed metrics.
3
4
## Capabilities
5
6
### KafkaMicroBatchStream
7
8
V2 DataSource micro-batch streaming implementation for Kafka with comprehensive trigger support.
9
10
```scala { .api }
11
/**
12
* V2 DataSource micro-batch streaming implementation for Kafka
13
* Provides reliable streaming with exactly-once semantics
14
*/
15
class KafkaMicroBatchStream extends MicroBatchStream
16
with SupportsTriggerAvailableNow
17
with ReportsSourceMetrics {
18
19
/** Returns the initial offset for starting the stream */
20
def initialOffset(): Offset
21
22
/** Returns the latest available offset */
23
def latestOffset(): Offset
24
25
/** Returns latest offset with read limit applied */
26
def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset
27
28
/** Plans input partitions for the given offset range */
29
def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
30
31
/** Creates reader factory for reading partitions */
32
def createReaderFactory(): PartitionReaderFactory
33
34
/** Deserializes offset from checkpoint */
35
def deserializeOffset(json: String): Offset
36
37
/** Commits processed offset */
38
def commit(end: Offset): Unit
39
40
/** Stops the stream and releases resources */
41
def stop(): Unit
42
43
/** Returns stream metrics */
44
def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String]
45
46
/** Prepares stream for Trigger.AvailableNow mode */
47
def prepareForTriggerAvailableNow(): Unit
48
}
49
```
50
51
**Usage Examples:**
52
53
```scala
54
// Basic micro-batch streaming
55
val microBatchStream = spark
56
.readStream
57
.format("kafka")
58
.option("kafka.bootstrap.servers", "localhost:9092")
59
.option("subscribe", "events")
60
.load()
61
62
val query = microBatchStream
63
.writeStream
64
.outputMode("append")
65
.format("console")
66
.trigger(Trigger.ProcessingTime("30 seconds")) // Micro-batch every 30 seconds
67
.start()
68
69
// Trigger.AvailableNow for processing all available data
70
val availableNowQuery = microBatchStream
71
.writeStream
72
.outputMode("append")
73
.format("parquet")
74
.option("path", "/output/path")
75
.trigger(Trigger.AvailableNow()) // Process all available data then stop
76
.start()
77
```
78
79
### KafkaContinuousStream
80
81
V2 DataSource continuous streaming implementation for low-latency processing.
82
83
```scala { .api }
84
/**
85
* V2 DataSource continuous streaming implementation
86
* Provides low-latency processing with at-least-once semantics
87
*/
88
class KafkaContinuousStream extends ContinuousStream {
89
90
/** Merges partition offsets into a single offset */
91
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
92
93
/** Returns initial offset for continuous processing */
94
def initialOffset(): Offset
95
96
/** Deserializes offset from checkpoint */
97
def deserializeOffset(json: String): Offset
98
99
/** Plans continuous reader tasks */
100
def planInputPartitions(start: Offset): Array[InputPartition]
101
102
/** Creates continuous reader factory */
103
def createContinuousReaderFactory(): ContinuousPartitionReaderFactory
104
105
/** Commits processed offset */
106
def commit(end: Offset): Unit
107
108
/** Stops continuous processing */
109
def stop(): Unit
110
}
111
```
112
113
**Usage Examples:**
114
115
```scala
116
// Continuous streaming for low-latency processing
117
val continuousStream = spark
118
.readStream
119
.format("kafka")
120
.option("kafka.bootstrap.servers", "localhost:9092")
121
.option("subscribe", "real-time-events")
122
.load()
123
124
val continuousQuery = continuousStream
125
.writeStream
126
.outputMode("append")
127
.format("kafka")
128
.option("kafka.bootstrap.servers", "localhost:9092")
129
.option("topic", "processed-events")
130
.trigger(Trigger.Continuous("1 second")) // Continuous with 1-second checkpoint
131
.start()
132
```
133
134
### KafkaSource (Legacy)
135
136
Legacy V1 DataSource streaming implementation, maintained for backward compatibility.
137
138
```scala { .api }
139
/**
140
* Legacy streaming source for reading from Kafka (DataSource V1)
141
* Maintained for backward compatibility
142
*/
143
class KafkaSource extends Source with SupportsTriggerAvailableNow {
144
145
/** Returns the schema of the source */
146
def schema: StructType
147
148
/** Gets the current offset */
149
def getOffset: Option[Offset]
150
151
/** Gets batch DataFrame for the given offset range */
152
def getBatch(start: Option[Offset], end: Offset): DataFrame
153
154
/** Stops the source */
155
def stop(): Unit
156
157
/** Returns default read limit */
158
def getDefaultReadLimit: ReadLimit
159
160
/** Gets latest offset with read limit */
161
def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset
162
163
/** Reports latest offset for monitoring */
164
def reportLatestOffset(): streaming.Offset
165
166
/** Prepares for Trigger.AvailableNow */
167
def prepareForTriggerAvailableNow(): Unit
168
}
169
```
170
171
## Trigger Support
172
173
### Processing Time Triggers
174
175
Regular micro-batch processing at fixed intervals:
176
177
```scala
178
// Process every 30 seconds
179
.trigger(Trigger.ProcessingTime("30 seconds"))
180
181
// Process every 5 minutes
182
.trigger(Trigger.ProcessingTime("5 minutes"))
183
184
// Process as fast as possible
185
.trigger(Trigger.ProcessingTime("0 seconds"))
186
```
187
188
### Available Now Trigger
189
190
Process all currently available data then stop:
191
192
```scala
193
// Process all available data once
194
val query = kafkaStream
195
.writeStream
196
.outputMode("append")
197
.format("delta")
198
.option("path", "/delta/table/path")
199
.trigger(Trigger.AvailableNow())
200
.start()
201
202
// Wait for completion
203
query.awaitTermination()
204
```
205
206
### Continuous Triggers
207
208
Low-latency continuous processing:
209
210
```scala
211
// Continuous processing with 1-second checkpoints
212
.trigger(Trigger.Continuous("1 second"))
213
214
// Continuous processing with 10-second checkpoints
215
.trigger(Trigger.Continuous("10 seconds"))
216
```
217
218
### Once Trigger
219
220
Process one micro-batch then stop:
221
222
```scala
223
// Process exactly one batch
224
.trigger(Trigger.Once())
225
```
226
227
## Rate Limiting
228
229
Control the amount of data processed per trigger:
230
231
### Max Offsets Per Trigger
232
233
```scala
234
// Limit to 1000 offsets per trigger across all partitions
235
val rateLimitedStream = spark
236
.readStream
237
.format("kafka")
238
.option("kafka.bootstrap.servers", "localhost:9092")
239
.option("subscribe", "high-volume-topic")
240
.option("maxOffsetsPerTrigger", "1000")
241
.load()
242
```
243
244
### Min Offsets Per Trigger
245
246
```scala
247
// Ensure at least 100 offsets are processed per trigger
248
val minRateStream = spark
249
.readStream
250
.format("kafka")
251
.option("kafka.bootstrap.servers", "localhost:9092")
252
.option("subscribe", "low-volume-topic")
253
.option("minOffsetsPerTrigger", "100")
254
.load()
255
```
256
257
### Max Trigger Delay
258
259
```scala
260
// Maximum delay between triggers when minOffsetsPerTrigger is not met
261
val delayedStream = spark
262
.readStream
263
.format("kafka")
264
.option("kafka.bootstrap.servers", "localhost:9092")
265
.option("subscribe", "variable-volume-topic")
266
.option("minOffsetsPerTrigger", "500")
267
.option("maxTriggerDelay", "60s") // Wait max 60 seconds
268
.load()
269
```
270
271
## Metrics and Monitoring
272
273
### Built-in Metrics
274
275
The streaming sources provide comprehensive metrics:
276
277
```scala { .api }
278
// Available metrics from ReportsSourceMetrics
279
def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {
280
// Returns metrics including:
281
// - offsetOutOfRange: Number of out-of-range offsets
282
// - dataLoss: Number of data loss events
283
// - numRecords: Number of records processed
284
// - estimatedOffsetsToProcess: Estimated remaining offsets
285
}
286
```
287
288
**Usage Examples:**
289
290
```scala
291
// Access metrics through streaming query
292
val query = kafkaStream
293
.writeStream
294
.outputMode("append")
295
.format("console")
296
.start()
297
298
// Get metrics
299
val progress = query.lastProgress
300
val inputMetrics = progress.sources(0) // First source metrics
301
println(s"Records processed: ${inputMetrics.numInputRows}")
302
println(s"Processing rate: ${inputMetrics.inputRowsPerSecond}")
303
```
304
305
### Custom Metrics
306
307
The connector exposes custom metrics for monitoring:
308
309
```scala { .api }
310
// OffsetOutOfRangeMetric
311
class OffsetOutOfRangeMetric extends CustomSumMetric {
312
def name(): String = "offsetOutOfRange"
313
def description(): String = "estimated number of fetched offsets out of range"
314
}
315
316
// DataLossMetric
317
class DataLossMetric extends CustomSumMetric {
318
def name(): String = "dataLoss"
319
def description(): String = "number of data loss error"
320
}
321
```
322
323
**Accessing Custom Metrics:**
324
325
```scala
326
// Custom metrics are included in source metrics
327
val customMetrics = query.lastProgress.sources(0).metrics
328
val offsetOutOfRange = customMetrics.getOrElse("offsetOutOfRange", "0")
329
val dataLoss = customMetrics.getOrElse("dataLoss", "0")
330
```
331
332
## Fault Tolerance
333
334
### Checkpointing
335
336
Streaming queries automatically maintain checkpoints for fault tolerance:
337
338
```scala
339
val faultTolerantQuery = kafkaStream
340
.writeStream
341
.outputMode("append")
342
.format("delta")
343
.option("path", "/output/path")
344
.option("checkpointLocation", "/checkpoint/path") // Essential for production
345
.start()
346
```
347
348
### Data Loss Handling
349
350
Configure how to handle data loss scenarios:
351
352
```scala
353
// Fail query on data loss (default, recommended for critical data)
354
val strictStream = spark
355
.readStream
356
.format("kafka")
357
.option("kafka.bootstrap.servers", "localhost:9092")
358
.option("subscribe", "critical-events")
359
.option("failOnDataLoss", "true") // Default
360
.load()
361
362
// Continue processing despite data loss (for non-critical data)
363
val lenientStream = spark
364
.readStream
365
.format("kafka")
366
.option("kafka.bootstrap.servers", "localhost:9092")
367
.option("subscribe", "logs")
368
.option("failOnDataLoss", "false") // Continue on data loss
369
.load()
370
```
371
372
## Partition Management
373
374
### Min Partitions
375
376
Control the minimum number of Spark partitions:
377
378
```scala
379
// Ensure at least 10 Spark partitions for parallelism
380
val partitionedStream = spark
381
.readStream
382
.format("kafka")
383
.option("kafka.bootstrap.servers", "localhost:9092")
384
.option("subscribe", "large-topic")
385
.option("minPartitions", "10")
386
.load()
387
```
388
389
### Consumer Pool Configuration
390
391
Configure consumer pool behavior:
392
393
```scala
394
// Configure consumer cache settings
395
val configuredStream = spark
396
.readStream
397
.format("kafka")
398
.option("kafka.bootstrap.servers", "localhost:9092")
399
.option("subscribe", "events")
400
.option("kafkaConsumer.pollTimeoutMs", "5000") // Poll timeout
401
.option("fetchOffset.numRetries", "5") // Retry attempts
402
.option("fetchOffset.retryIntervalMs", "1000") // Retry interval
403
.load()
404
```
405
406
## Performance Optimization
407
408
### Batch Size Optimization
409
410
```scala
411
// Balance latency vs throughput
412
val optimizedStream = spark
413
.readStream
414
.format("kafka")
415
.option("kafka.bootstrap.servers", "localhost:9092")
416
.option("subscribe", "events")
417
.option("maxOffsetsPerTrigger", "50000") // Larger batches for higher throughput
418
.load()
419
.repartition(20) // Increase parallelism for processing
420
```
421
422
### Consumer Configuration
423
424
```scala
425
// Optimize Kafka consumer settings
426
val tuedStream = spark
427
.readStream
428
.format("kafka")
429
.option("kafka.bootstrap.servers", "localhost:9092")
430
.option("subscribe", "events")
431
.option("kafka.fetch.min.bytes", "1048576") // 1MB min fetch
432
.option("kafka.fetch.max.wait.ms", "500") // 500ms max wait
433
.option("kafka.max.poll.records", "10000") // More records per poll
434
.load()
435
```
436
437
## Error Handling
438
439
### Query Restart Behavior
440
441
```scala
442
// Automatic restart on failure
443
val resilientQuery = kafkaStream
444
.writeStream
445
.outputMode("append")
446
.format("console")
447
.option("checkpointLocation", "/checkpoint/path")
448
.trigger(Trigger.ProcessingTime("30 seconds"))
449
.start()
450
451
// Monitor for exceptions
452
query.exception.foreach { ex =>
453
println(s"Query failed with exception: ${ex.getMessage}")
454
// Implement custom restart logic
455
}
456
```
457
458
### Graceful Shutdown
459
460
```scala
461
// Graceful shutdown handling
462
sys.addShutdownHook {
463
println("Shutting down streaming query...")
464
query.stop()
465
query.awaitTermination(30000) // Wait up to 30 seconds
466
}
467
```