0
# Streaming Sources
1
2
Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance, exactly-once semantics, and efficient partition-based data consumption from Kafka.
3
4
## Capabilities
5
6
### Micro-batch Reader
7
8
Micro-batch reader for structured streaming that processes data in discrete batches with configurable trigger intervals.
9
10
```scala { .api }
11
/**
12
* Micro-batch reader for Kafka data in structured streaming
13
*/
14
class KafkaMicroBatchReader extends MicroBatchReader with Logging {
15
/**
16
* Sets offset range for the current micro-batch
17
* @param start Starting offset (None for initial batch)
18
* @param end Ending offset for this batch
19
*/
20
def setOffsetRange(start: Option[Offset], end: Offset): Unit
21
22
/**
23
* Plans input partitions for the current batch
24
* @return List of InputPartition objects for parallel processing
25
*/
26
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
27
28
/**
29
* Gets the start offset for the current batch
30
* @return Starting offset or null if none
31
*/
32
def getStartOffset: Offset
33
34
/**
35
* Gets the end offset for the current batch
36
* @return Ending offset for this batch
37
*/
38
def getEndOffset: Offset
39
40
/**
41
* Deserializes offset from JSON string
42
* @param json JSON representation of offset
43
* @return Deserialized Offset object
44
*/
45
def deserializeOffset(json: String): Offset
46
47
/**
48
* Returns schema for Kafka records
49
* @return StructType defining record schema
50
*/
51
def readSchema(): StructType
52
53
/**
54
* Commits the processed offset
55
* @param end Offset to commit
56
*/
57
def commit(end: Offset): Unit
58
59
/**
60
* Stops the reader and releases resources
61
*/
62
def stop(): Unit
63
}
64
```
65
66
### Micro-batch Input Partition
67
68
Input partition for micro-batch processing that handles a specific offset range.
69
70
```scala { .api }
71
/**
72
* Input partition for micro-batch processing
73
* @param offsetRange Range of offsets to process
74
* @param executorKafkaParams Kafka parameters for executors
75
* @param pollTimeoutMs Timeout for Kafka consumer polls
76
* @param failOnDataLoss Whether to fail on data loss
77
* @param reuseKafkaConsumer Whether to reuse consumer instances
78
*/
79
case class KafkaMicroBatchInputPartition(
80
offsetRange: KafkaOffsetRange,
81
executorKafkaParams: ju.Map[String, Object],
82
pollTimeoutMs: Long,
83
failOnDataLoss: Boolean,
84
reuseKafkaConsumer: Boolean
85
) extends InputPartition[InternalRow] {
86
/**
87
* Gets preferred executor locations for data locality
88
* @return Array of preferred executor hostnames
89
*/
90
def preferredLocations(): Array[String]
91
92
/**
93
* Creates partition reader for this input partition
94
* @return InputPartitionReader for processing records
95
*/
96
def createPartitionReader(): InputPartitionReader[InternalRow]
97
}
98
```
99
100
### Micro-batch Partition Reader
101
102
Partition reader for micro-batch processing that reads records from a specific offset range.
103
104
```scala { .api }
105
/**
106
* Partition reader for micro-batch processing
107
* @param offsetRange Range of offsets to read
108
* @param executorKafkaParams Kafka parameters for this executor
109
* @param pollTimeoutMs Consumer poll timeout
110
* @param failOnDataLoss Whether to fail on data loss
111
* @param reuseKafkaConsumer Whether to reuse consumer
112
*/
113
case class KafkaMicroBatchInputPartitionReader(
114
offsetRange: KafkaOffsetRange,
115
executorKafkaParams: ju.Map[String, Object],
116
pollTimeoutMs: Long,
117
failOnDataLoss: Boolean,
118
reuseKafkaConsumer: Boolean
119
) extends InputPartitionReader[InternalRow] with Logging {
120
/**
121
* Advances to the next record
122
* @return true if next record is available
123
*/
124
def next(): Boolean
125
126
/**
127
* Gets the current record as UnsafeRow
128
* @return Current record as UnsafeRow
129
*/
130
def get(): UnsafeRow
131
132
/**
133
* Closes the reader and releases resources
134
*/
135
def close(): Unit
136
}
137
```
138
139
### Continuous Reader
140
141
Continuous reader for low-latency streaming with sub-second processing capabilities.
142
143
```scala { .api }
144
/**
145
* Continuous reader for Kafka data in structured streaming
146
*/
147
class KafkaContinuousReader extends ContinuousReader with Logging {
148
/**
149
* Returns schema for Kafka records
150
* @return StructType defining record schema
151
*/
152
def readSchema: StructType
153
154
/**
155
* Sets starting offset for continuous processing
156
* @param start Starting offset (None to start from beginning)
157
*/
158
def setStartOffset(start: Option[Offset]): Unit
159
160
/**
161
* Gets the starting offset
162
* @return Starting offset or null if none
163
*/
164
def getStartOffset(): Offset
165
166
/**
167
* Deserializes offset from JSON string
168
* @param json JSON representation of offset
169
* @return Deserialized Offset object
170
*/
171
def deserializeOffset(json: String): Offset
172
173
/**
174
* Plans input partitions for continuous processing
175
* @return List of InputPartition objects
176
*/
177
def planInputPartitions(): ju.List[InputPartition[InternalRow]]
178
179
/**
180
* Stops the reader and releases resources
181
*/
182
def stop(): Unit
183
184
/**
185
* Commits processed offsets
186
* @param end Offset to commit
187
*/
188
def commit(end: Offset): Unit
189
190
/**
191
* Merges partition offsets into a single offset
192
* @param offsets Array of partition offsets
193
* @return Merged offset
194
*/
195
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
196
197
/**
198
* Checks if reader reconfiguration is needed
199
* @return true if reconfiguration is required
200
*/
201
def needsReconfiguration(): Boolean
202
}
203
```
204
205
### Continuous Input Partition
206
207
Input partition for continuous processing that handles a specific topic partition.
208
209
```scala { .api }
210
/**
211
* Input partition for continuous processing
212
* @param topicPartition Kafka topic partition
213
* @param startOffset Starting offset for processing
214
* @param kafkaParams Kafka consumer parameters
215
* @param pollTimeoutMs Consumer poll timeout
216
* @param failOnDataLoss Whether to fail on data loss
217
*/
218
case class KafkaContinuousInputPartition(
219
topicPartition: TopicPartition,
220
startOffset: Long,
221
kafkaParams: ju.Map[String, Object],
222
pollTimeoutMs: Long,
223
failOnDataLoss: Boolean
224
) extends ContinuousInputPartition[InternalRow] {
225
/**
226
* Creates continuous reader for this partition
227
* @param offset Starting partition offset
228
* @return InputPartitionReader for continuous processing
229
*/
230
def createContinuousReader(offset: PartitionOffset): InputPartitionReader[InternalRow]
231
232
/**
233
* Creates partition reader for this partition
234
* @return KafkaContinuousInputPartitionReader instance
235
*/
236
def createPartitionReader(): KafkaContinuousInputPartitionReader
237
}
238
```
239
240
### Continuous Partition Reader
241
242
Partition reader for continuous processing that provides low-latency record consumption.
243
244
```scala { .api }
245
/**
246
* Partition reader for continuous processing
247
* @param topicPartition Kafka topic partition to read from
248
* @param startOffset Starting offset for reading
249
* @param kafkaParams Kafka consumer parameters
250
* @param pollTimeoutMs Consumer poll timeout
251
* @param failOnDataLoss Whether to fail on data loss
252
*/
253
class KafkaContinuousInputPartitionReader(
254
topicPartition: TopicPartition,
255
startOffset: Long,
256
kafkaParams: ju.Map[String, Object],
257
pollTimeoutMs: Long,
258
failOnDataLoss: Boolean
259
) extends ContinuousInputPartitionReader[InternalRow] {
260
/**
261
* Advances to the next record
262
* @return true if next record is available
263
*/
264
def next(): Boolean
265
266
/**
267
* Gets the current record as UnsafeRow
268
* @return Current record as UnsafeRow
269
*/
270
def get(): UnsafeRow
271
272
/**
273
* Gets the current partition offset
274
* @return Current offset for this partition
275
*/
276
def getOffset(): KafkaSourcePartitionOffset
277
278
/**
279
* Closes the reader and releases resources
280
*/
281
def close(): Unit
282
}
283
```
284
285
### Legacy Streaming Source
286
287
Legacy streaming source for backward compatibility with DataSource V1 API.
288
289
```scala { .api }
290
/**
291
* Legacy streaming source for reading from Kafka (DataSource V1)
292
*/
293
class KafkaSource extends Source with Logging {
294
/**
295
* Returns the schema of Kafka records
296
* @return StructType defining record schema
297
*/
298
def schema: StructType
299
300
/**
301
* Gets the maximum available offset
302
* @return Optional offset representing latest available data
303
*/
304
def getOffset: Option[Offset]
305
306
/**
307
* Gets batch data between specified offsets
308
* @param start Starting offset (None for initial batch)
309
* @param end Ending offset for this batch
310
* @return DataFrame containing batch data
311
*/
312
def getBatch(start: Option[Offset], end: Offset): DataFrame
313
314
/**
315
* Stops the source and releases resources
316
*/
317
def stop(): Unit
318
}
319
320
/**
321
* Companion object for KafkaSource
322
*/
323
object KafkaSource {
324
/**
325
* Gets sorted list of executor IDs for locality optimization
326
* @param sc SparkContext for accessing executor information
327
* @return Array of executor IDs sorted for consistent assignment
328
*/
329
def getSortedExecutorList(sc: SparkContext): Array[String]
330
}
331
```
332
333
## Usage Examples
334
335
### Micro-batch Streaming
336
337
```scala
338
import org.apache.spark.sql.streaming.Trigger
339
import java.util.concurrent.TimeUnit
340
341
val microBatchDF = spark
342
.readStream
343
.format("kafka")
344
.option("kafka.bootstrap.servers", "localhost:9092")
345
.option("subscribe", "events")
346
.option("startingOffsets", "latest")
347
.option("maxOffsetsPerTrigger", "10000")
348
.load()
349
350
val query = microBatchDF
351
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
352
.writeStream
353
.outputMode("append")
354
.format("console")
355
.trigger(Trigger.ProcessingTime(30, TimeUnit.SECONDS))
356
.option("checkpointLocation", "/tmp/checkpoint")
357
.start()
358
359
query.awaitTermination()
360
```
361
362
### Continuous Streaming
363
364
```scala
365
val continuousDF = spark
366
.readStream
367
.format("kafka")
368
.option("kafka.bootstrap.servers", "localhost:9092")
369
.option("subscribe", "low-latency-events")
370
.option("startingOffsets", "latest")
371
.load()
372
373
val continuousQuery = continuousDF
374
.selectExpr("CAST(value AS STRING) as json")
375
.writeStream
376
.outputMode("append")
377
.format("console")
378
.trigger(Trigger.Continuous("1 second"))
379
.option("checkpointLocation", "/tmp/continuous-checkpoint")
380
.start()
381
382
continuousQuery.awaitTermination()
383
```
384
385
### Legacy Source (DataSource V1)
386
387
```scala
388
val legacyDF = spark
389
.readStream
390
.format("kafka")
391
.option("kafka.bootstrap.servers", "localhost:9092")
392
.options(Map(
393
"subscribe" -> "legacy-topic",
394
"startingOffsets" -> "earliest"
395
))
396
.load()
397
```
398
399
## Performance Configuration
400
401
### Consumer Tuning
402
403
```scala
404
.option("kafka.fetch.min.bytes", "1024") // Minimum fetch size
405
.option("kafka.fetch.max.wait.ms", "500") // Max wait for min bytes
406
.option("kafka.max.poll.records", "500") // Records per poll
407
.option("kafka.session.timeout.ms", "30000") // Session timeout
408
.option("kafka.heartbeat.interval.ms", "3000") // Heartbeat interval
409
```
410
411
### Partition Management
412
413
```scala
414
.option("minPartitions", "10") // Minimum Spark partitions
415
.option("maxOffsetsPerTrigger", "1000000") // Rate limiting
416
```
417
418
### Memory and Buffering
419
420
```scala
421
.option("kafka.receive.buffer.bytes", "65536") // Receive buffer
422
.option("kafka.send.buffer.bytes", "131072") // Send buffer
423
.option("kafka.fetch.max.bytes", "52428800") // Max fetch size
424
```
425
426
## Error Handling and Recovery
427
428
### Data Loss Scenarios
429
430
The streaming sources handle various data loss scenarios:
431
432
```scala
433
// Strict error handling
434
.option("failOnDataLoss", "true")
435
436
// Lenient error handling with warnings
437
.option("failOnDataLoss", "false")
438
```
439
440
### Timeout Configuration
441
442
```scala
443
// Consumer poll timeout
444
.option("kafkaConsumer.pollTimeoutMs", "10000")
445
446
// Connection timeout
447
.option("kafka.request.timeout.ms", "30000")
448
```
449
450
### Fault Tolerance
451
452
```scala
453
// Checkpoint location for exactly-once processing
454
.option("checkpointLocation", "/path/to/checkpoint")
455
456
// Automatic retry configuration
457
.option("kafka.retry.backoff.ms", "100")
458
.option("kafka.reconnect.backoff.ms", "50")
459
```
460
461
## Monitoring and Metrics
462
463
### Progress Reporting
464
465
```scala
466
val query = df.writeStream
467
.format("console")
468
.start()
469
470
// Monitor progress
471
val progress = query.lastProgress
472
println(s"Input rows per second: ${progress.inputRowsPerSecond}")
473
println(s"Processing time: ${progress.durationMs}")
474
```
475
476
### Offset Tracking
477
478
```scala
479
// Access current offsets
480
val currentOffsets = query.lastProgress.sources(0).endOffset
481
println(s"Current offsets: $currentOffsets")
482
```
483
484
## Processing Modes Comparison
485
486
| Feature | Micro-batch | Continuous | Legacy |
487
|---------|-------------|------------|--------|
488
| Latency | ~100ms+ | ~1ms | ~100ms+ |
489
| Throughput | High | Medium | High |
490
| Fault Tolerance | Full | Full | Full |
491
| Exactly-once | Yes | Yes | Yes |
492
| State Management | Full | Limited | Full |
493
| Aggregations | All | Simple | All |
494
| API Version | DataSource V2 | DataSource V2 | DataSource V1 |