0
# Streaming APIs
1
2
The Streaming APIs in Apache Spark Catalyst provide comprehensive support for real-time data processing through both micro-batch and continuous processing modes. These APIs enable building custom streaming data sources and sinks with advanced features like exactly-once semantics, fault tolerance, and state management.
3
4
## Core Streaming Read APIs
5
6
### MicroBatchStream
7
8
Interface for streaming data sources in micro-batch mode:
9
10
```java { .api }
11
package org.apache.spark.sql.connector.read.streaming;
12
13
public interface MicroBatchStream {
14
/**
15
* Get the latest available offset
16
*/
17
Offset latestOffset();
18
19
/**
20
* Get the initial offset for starting the stream
21
*/
22
Offset initialOffset();
23
24
/**
25
* Deserialize offset from JSON string
26
*/
27
Offset deserializeOffset(String json);
28
29
/**
30
* Commit processing up to the given offset
31
*/
32
void commit(Offset end);
33
34
/**
35
* Stop the stream
36
*/
37
void stop();
38
}
39
```
40
41
### ContinuousStream
42
43
Interface for streaming data sources in continuous mode:
44
45
```java { .api }
46
public interface ContinuousStream {
47
/**
48
* Create continuous reader factory
49
*/
50
ContinuousPartitionReaderFactory createContinuousReaderFactory();
51
52
/**
53
* Get initial offset for the stream
54
*/
55
Offset initialOffset();
56
57
/**
58
* Merge partition offsets into a single offset
59
*/
60
Offset mergeOffsets(PartitionOffset[] offsets);
61
62
/**
63
* Deserialize offset from JSON
64
*/
65
Offset deserializeOffset(String json);
66
67
/**
68
* Commit processing up to the given offset
69
*/
70
void commit(Offset end);
71
72
/**
73
* Stop the stream
74
*/
75
void stop();
76
}
77
```
78
79
### Offset
80
81
Abstract base class for representing positions in streaming data:
82
83
```java { .api }
84
public abstract class Offset {
85
/**
86
* JSON representation of this offset
87
*/
88
public abstract String json();
89
90
@Override
91
public abstract boolean equals(Object obj);
92
93
@Override
94
public abstract int hashCode();
95
}
96
```
97
98
## Implementing a Micro-Batch Streaming Source
99
100
### Custom Offset Implementation
101
102
```java
103
public class MyStreamOffset extends Offset {
104
private final long batchId;
105
private final long recordCount;
106
107
public MyStreamOffset(long batchId, long recordCount) {
108
this.batchId = batchId;
109
this.recordCount = recordCount;
110
}
111
112
@Override
113
public String json() {
114
return String.format("{\"batchId\":%d,\"recordCount\":%d}", batchId, recordCount);
115
}
116
117
@Override
118
public boolean equals(Object obj) {
119
if (obj instanceof MyStreamOffset) {
120
MyStreamOffset other = (MyStreamOffset) obj;
121
return this.batchId == other.batchId && this.recordCount == other.recordCount;
122
}
123
return false;
124
}
125
126
@Override
127
public int hashCode() {
128
return Objects.hash(batchId, recordCount);
129
}
130
131
public long getBatchId() { return batchId; }
132
public long getRecordCount() { return recordCount; }
133
134
public static MyStreamOffset fromJson(String json) {
135
// Parse JSON and return offset
136
ObjectMapper mapper = new ObjectMapper();
137
try {
138
JsonNode node = mapper.readTree(json);
139
return new MyStreamOffset(
140
node.get("batchId").asLong(),
141
node.get("recordCount").asLong()
142
);
143
} catch (Exception e) {
144
throw new RuntimeException("Failed to parse offset: " + json, e);
145
}
146
}
147
}
148
```
149
150
### Complete Micro-Batch Stream Implementation
151
152
```java
153
public class MyMicroBatchStream implements MicroBatchStream {
154
private final String streamSource;
155
private final StructType schema;
156
private volatile MyStreamOffset currentOffset;
157
private volatile boolean stopped = false;
158
159
public MyMicroBatchStream(String streamSource, StructType schema) {
160
this.streamSource = streamSource;
161
this.schema = schema;
162
this.currentOffset = new MyStreamOffset(0, 0);
163
}
164
165
@Override
166
public Offset latestOffset() {
167
if (stopped) {
168
return currentOffset;
169
}
170
171
// Check for new data and update offset
172
long newBatchId = currentOffset.getBatchId() + 1;
173
long newRecordCount = checkForNewRecords();
174
175
if (newRecordCount > currentOffset.getRecordCount()) {
176
currentOffset = new MyStreamOffset(newBatchId, newRecordCount);
177
}
178
179
return currentOffset;
180
}
181
182
@Override
183
public Offset initialOffset() {
184
return new MyStreamOffset(0, 0);
185
}
186
187
@Override
188
public Offset deserializeOffset(String json) {
189
return MyStreamOffset.fromJson(json);
190
}
191
192
@Override
193
public void commit(Offset end) {
194
MyStreamOffset offset = (MyStreamOffset) end;
195
// Persist checkpoint information
196
persistCheckpoint(offset);
197
}
198
199
@Override
200
public void stop() {
201
stopped = true;
202
// Clean up resources
203
closeConnections();
204
}
205
206
private long checkForNewRecords() {
207
// Implementation specific - check external source for new data
208
return queryExternalSourceForRecordCount();
209
}
210
211
private void persistCheckpoint(MyStreamOffset offset) {
212
// Persist offset for fault tolerance
213
writeCheckpointToStorage(offset);
214
}
215
}
216
```
217
218
### Streaming Scan Integration
219
220
```java
221
public class MyStreamingTable implements Table, SupportsRead {
222
@Override
223
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
224
return new MyStreamingScanBuilder(schema, options);
225
}
226
}
227
228
public class MyStreamingScanBuilder implements ScanBuilder {
229
private final StructType schema;
230
private final CaseInsensitiveStringMap options;
231
232
@Override
233
public Scan build() {
234
return new MyStreamingScan(schema, options);
235
}
236
}
237
238
public class MyStreamingScan implements Scan {
239
private final StructType schema;
240
private final CaseInsensitiveStringMap options;
241
242
@Override
243
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
244
String streamSource = options.get("source.path");
245
return new MyMicroBatchStream(streamSource, schema);
246
}
247
248
@Override
249
public StructType readSchema() {
250
return schema;
251
}
252
}
253
```
254
255
## Continuous Stream Implementation
256
257
### Continuous Partition Reader
258
259
```java
260
public class MyContinuousPartitionReader implements ContinuousPartitionReader<InternalRow> {
261
private final int partitionId;
262
private final StructType schema;
263
private volatile boolean stopped = false;
264
private volatile PartitionOffset currentOffset;
265
266
public MyContinuousPartitionReader(int partitionId, StructType schema,
267
PartitionOffset startOffset) {
268
this.partitionId = partitionId;
269
this.schema = schema;
270
this.currentOffset = startOffset;
271
}
272
273
@Override
274
public boolean next() throws IOException {
275
if (stopped) {
276
return false;
277
}
278
279
// Continuously poll for new records
280
return pollForNewRecord();
281
}
282
283
@Override
284
public InternalRow get() {
285
// Return current record and update offset
286
InternalRow row = getCurrentRecord();
287
updateOffset();
288
return row;
289
}
290
291
@Override
292
public PartitionOffset getOffset() {
293
return currentOffset;
294
}
295
296
@Override
297
public void close() throws IOException {
298
stopped = true;
299
// Clean up partition-specific resources
300
}
301
302
private boolean pollForNewRecord() {
303
// Implementation-specific polling logic
304
return hasNewDataAvailable();
305
}
306
}
307
```
308
309
### Continuous Stream Factory
310
311
```java
312
public class MyContinuousPartitionReaderFactory implements ContinuousPartitionReaderFactory {
313
private final StructType schema;
314
315
@Override
316
public ContinuousPartitionReader<InternalRow> createReader(
317
ContinuousInputPartition partition) {
318
MyContinuousInputPartition myPartition = (MyContinuousInputPartition) partition;
319
return new MyContinuousPartitionReader(
320
myPartition.getPartitionId(),
321
schema,
322
myPartition.getStartOffset()
323
);
324
}
325
}
326
```
327
328
## Streaming Write APIs
329
330
### StreamingWrite
331
332
Interface for streaming write operations:
333
334
```java { .api }
335
package org.apache.spark.sql.connector.write.streaming;
336
337
public interface StreamingWrite {
338
/**
339
* Create writer factory for streaming
340
*/
341
StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info);
342
343
/**
344
* Whether to use Spark's commit coordinator
345
*/
346
boolean useCommitCoordinator();
347
348
/**
349
* Commit an epoch of streaming writes
350
*/
351
void commit(long epochId, WriterCommitMessage[] messages);
352
353
/**
354
* Abort an epoch of streaming writes
355
*/
356
void abort(long epochId, WriterCommitMessage[] messages);
357
}
358
```
359
360
### StreamingDataWriterFactory
361
362
Factory for creating streaming data writers:
363
364
```java { .api }
365
public interface StreamingDataWriterFactory extends DataWriterFactory {
366
/**
367
* Create writer with epoch information
368
*/
369
DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId);
370
}
371
```
372
373
### Complete Streaming Write Implementation
374
375
```java
376
public class MyStreamingWrite implements StreamingWrite {
377
private final LogicalWriteInfo writeInfo;
378
private final Map<Long, Set<String>> epochFiles = new ConcurrentHashMap<>();
379
380
@Override
381
public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) {
382
return new MyStreamingDataWriterFactory(info.schema(), writeInfo.options());
383
}
384
385
@Override
386
public boolean useCommitCoordinator() {
387
return true; // Enable exactly-once semantics
388
}
389
390
@Override
391
public void commit(long epochId, WriterCommitMessage[] messages) {
392
Set<String> files = new HashSet<>();
393
394
try {
395
// Collect all files written in this epoch
396
for (WriterCommitMessage message : messages) {
397
MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;
398
files.addAll(myMessage.getWrittenFiles());
399
}
400
401
// Atomically commit all files for this epoch
402
atomicCommitEpoch(epochId, files);
403
epochFiles.put(epochId, files);
404
405
// Clean up old epochs
406
cleanupOldEpochs(epochId);
407
408
} catch (Exception e) {
409
// If commit fails, abort the epoch
410
abort(epochId, messages);
411
throw new RuntimeException("Failed to commit epoch " + epochId, e);
412
}
413
}
414
415
@Override
416
public void abort(long epochId, WriterCommitMessage[] messages) {
417
// Clean up any partially written files
418
for (WriterCommitMessage message : messages) {
419
MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;
420
for (String file : myMessage.getWrittenFiles()) {
421
deleteFileIfExists(file);
422
}
423
}
424
epochFiles.remove(epochId);
425
}
426
427
private void atomicCommitEpoch(long epochId, Set<String> files) {
428
// Implementation-specific atomic commit
429
// This might involve:
430
// 1. Writing a commit marker
431
// 2. Moving temp files to final locations
432
// 3. Updating metadata
433
}
434
435
private void cleanupOldEpochs(long currentEpoch) {
436
// Keep only recent epochs for fault tolerance
437
long cutoffEpoch = currentEpoch - 100;
438
epochFiles.entrySet().removeIf(entry -> entry.getKey() < cutoffEpoch);
439
}
440
}
441
442
public class MyStreamingDataWriterFactory implements StreamingDataWriterFactory {
443
private final StructType schema;
444
private final CaseInsensitiveStringMap options;
445
446
@Override
447
public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
448
return new MyStreamingDataWriter(schema, partitionId, taskId, epochId, options);
449
}
450
}
451
452
public class MyStreamingDataWriter implements DataWriter<InternalRow> {
453
private final StructType schema;
454
private final int partitionId;
455
private final long taskId;
456
private final long epochId;
457
private final List<InternalRow> buffer = new ArrayList<>();
458
private final Set<String> writtenFiles = new HashSet<>();
459
460
@Override
461
public void write(InternalRow record) throws IOException {
462
buffer.add(record.copy());
463
464
// Batch writes for efficiency
465
if (buffer.size() >= 1000) {
466
flushBuffer();
467
}
468
}
469
470
@Override
471
public WriterCommitMessage commit() throws IOException {
472
if (!buffer.isEmpty()) {
473
flushBuffer();
474
}
475
return new MyStreamingCommitMessage(partitionId, taskId, epochId, writtenFiles);
476
}
477
478
@Override
479
public void abort() throws IOException {
480
// Clean up any files written by this writer
481
for (String file : writtenFiles) {
482
deleteFileIfExists(file);
483
}
484
buffer.clear();
485
writtenFiles.clear();
486
}
487
488
private void flushBuffer() throws IOException {
489
String fileName = generateFileName(partitionId, taskId, epochId);
490
writeBufferToFile(fileName, buffer);
491
writtenFiles.add(fileName);
492
buffer.clear();
493
}
494
}
495
```
496
497
## Advanced Streaming Patterns
498
499
### Exactly-Once Processing with Idempotent Writes
500
501
```java
502
public class IdempotentStreamingWrite implements StreamingWrite {
503
private final Map<Long, String> epochCommitIds = new ConcurrentHashMap<>();
504
505
@Override
506
public void commit(long epochId, WriterCommitMessage[] messages) {
507
String commitId = generateCommitId(epochId, messages);
508
509
// Check if this epoch was already committed (for retries)
510
if (epochCommitIds.containsKey(epochId)) {
511
String existingCommitId = epochCommitIds.get(epochId);
512
if (existingCommitId.equals(commitId)) {
513
// Already committed with same data - this is a retry
514
return;
515
} else {
516
throw new IllegalStateException(
517
String.format("Epoch %d committed with different data", epochId));
518
}
519
}
520
521
// Perform idempotent commit
522
performIdempotentCommit(epochId, commitId, messages);
523
epochCommitIds.put(epochId, commitId);
524
}
525
526
private void performIdempotentCommit(long epochId, String commitId,
527
WriterCommitMessage[] messages) {
528
// Use commitId to ensure idempotency
529
// This might involve conditional writes to external systems
530
}
531
}
532
```
533
534
### State Management for Streaming Aggregations
535
536
```java
537
public class StatefulStreamingProcessor {
538
private final Map<String, Object> state = new ConcurrentHashMap<>();
539
540
public void processStreamingBatch(Iterator<InternalRow> batch, long epochId) {
541
Map<String, Object> batchState = new HashMap<>();
542
543
// Process batch and update state
544
while (batch.hasNext()) {
545
InternalRow row = batch.next();
546
String key = extractKey(row);
547
Object value = extractValue(row);
548
549
// Update batch state
550
batchState.merge(key, value, this::combineValues);
551
}
552
553
// Atomically update global state
554
synchronized (state) {
555
for (Map.Entry<String, Object> entry : batchState.entrySet()) {
556
state.merge(entry.getKey(), entry.getValue(), this::combineValues);
557
}
558
}
559
560
// Persist state for fault tolerance
561
persistState(epochId);
562
}
563
564
private Object combineValues(Object existing, Object newValue) {
565
// Implementation-specific value combination logic
566
if (existing instanceof Number && newValue instanceof Number) {
567
return ((Number) existing).doubleValue() + ((Number) newValue).doubleValue();
568
}
569
return newValue;
570
}
571
}
572
```
573
574
### Watermark-Based Late Data Handling
575
576
```java
577
public class WatermarkStreamingSource implements MicroBatchStream {
578
private volatile long currentWatermark = 0;
579
private final Duration allowedLateness;
580
581
public WatermarkStreamingSource(Duration allowedLateness) {
582
this.allowedLateness = allowedLateness;
583
}
584
585
public void updateWatermark(long eventTime) {
586
// Update watermark based on event time minus allowed lateness
587
long newWatermark = eventTime - allowedLateness.toMillis();
588
currentWatermark = Math.max(currentWatermark, newWatermark);
589
}
590
591
public boolean isLateData(long eventTime) {
592
return eventTime < currentWatermark;
593
}
594
595
@Override
596
public Offset latestOffset() {
597
// Include watermark information in offset
598
return new WatermarkOffset(getCurrentBatchId(), currentWatermark);
599
}
600
}
601
602
public class WatermarkOffset extends Offset {
603
private final long batchId;
604
private final long watermark;
605
606
public WatermarkOffset(long batchId, long watermark) {
607
this.batchId = batchId;
608
this.watermark = watermark;
609
}
610
611
@Override
612
public String json() {
613
return String.format("{\"batchId\":%d,\"watermark\":%d}", batchId, watermark);
614
}
615
616
// equals and hashCode implementations...
617
}
618
```
619
620
## Fault Tolerance and Recovery
621
622
### Checkpointing Implementation
623
624
```java
625
public class CheckpointableStreamingSource implements MicroBatchStream {
626
private final String checkpointLocation;
627
private volatile MyStreamOffset lastCheckpointedOffset;
628
629
@Override
630
public void commit(Offset end) {
631
MyStreamOffset offset = (MyStreamOffset) end;
632
633
try {
634
// Write checkpoint atomically
635
writeCheckpoint(offset);
636
lastCheckpointedOffset = offset;
637
} catch (IOException e) {
638
throw new RuntimeException("Failed to checkpoint offset: " + offset, e);
639
}
640
}
641
642
@Override
643
public Offset initialOffset() {
644
try {
645
// Try to recover from checkpoint
646
MyStreamOffset checkpointOffset = readCheckpoint();
647
if (checkpointOffset != null) {
648
return checkpointOffset;
649
}
650
} catch (IOException e) {
651
// Log warning and start from beginning
652
System.err.println("Failed to read checkpoint, starting from beginning: " + e);
653
}
654
655
return new MyStreamOffset(0, 0);
656
}
657
658
private void writeCheckpoint(MyStreamOffset offset) throws IOException {
659
String tempFile = checkpointLocation + ".tmp";
660
String finalFile = checkpointLocation;
661
662
// Atomic write: write to temp file, then rename
663
Files.write(Paths.get(tempFile), offset.json().getBytes());
664
Files.move(Paths.get(tempFile), Paths.get(finalFile));
665
}
666
667
private MyStreamOffset readCheckpoint() throws IOException {
668
Path checkpointPath = Paths.get(checkpointLocation);
669
if (!Files.exists(checkpointPath)) {
670
return null;
671
}
672
673
String json = new String(Files.readAllBytes(checkpointPath));
674
return MyStreamOffset.fromJson(json);
675
}
676
}
677
```
678
679
### Retry and Error Handling
680
681
```java
682
public class ResilientStreamingProcessor {
683
private final int maxRetries;
684
private final Duration retryDelay;
685
686
public void processWithRetry(Runnable operation, String operationName) {
687
int attempts = 0;
688
Exception lastException = null;
689
690
while (attempts < maxRetries) {
691
try {
692
operation.run();
693
return; // Success
694
} catch (Exception e) {
695
attempts++;
696
lastException = e;
697
698
if (isRetriableException(e) && attempts < maxRetries) {
699
System.err.printf("Operation %s failed (attempt %d/%d), retrying in %s: %s%n",
700
operationName, attempts, maxRetries, retryDelay, e.getMessage());
701
702
try {
703
Thread.sleep(retryDelay.toMillis());
704
} catch (InterruptedException ie) {
705
Thread.currentThread().interrupt();
706
throw new RuntimeException("Interrupted during retry", ie);
707
}
708
} else {
709
break;
710
}
711
}
712
}
713
714
throw new RuntimeException(
715
String.format("Operation %s failed after %d attempts", operationName, attempts),
716
lastException);
717
}
718
719
private boolean isRetriableException(Exception e) {
720
// Determine if exception is worth retrying
721
return e instanceof IOException ||
722
e instanceof ConnectException ||
723
(e instanceof RuntimeException && e.getCause() instanceof IOException);
724
}
725
}
726
```
727
728
## Performance Optimization
729
730
### Batching for Efficiency
731
732
```java
733
public class BatchingStreamingWriter implements DataWriter<InternalRow> {
734
private final List<InternalRow> buffer = new ArrayList<>();
735
private final int batchSize;
736
private final Duration flushInterval;
737
private long lastFlushTime = System.currentTimeMillis();
738
739
@Override
740
public void write(InternalRow record) throws IOException {
741
buffer.add(record.copy());
742
743
// Flush based on size or time
744
if (shouldFlush()) {
745
flushBuffer();
746
}
747
}
748
749
private boolean shouldFlush() {
750
return buffer.size() >= batchSize ||
751
(System.currentTimeMillis() - lastFlushTime) > flushInterval.toMillis();
752
}
753
754
private void flushBuffer() throws IOException {
755
if (!buffer.isEmpty()) {
756
writeBatch(buffer);
757
buffer.clear();
758
lastFlushTime = System.currentTimeMillis();
759
}
760
}
761
}
762
```
763
764
### Parallel Processing
765
766
```java
767
public class ParallelStreamingProcessor {
768
private final ExecutorService executor;
769
770
public void processParallel(List<InputPartition> partitions) {
771
List<CompletableFuture<Void>> futures = partitions.stream()
772
.map(partition -> CompletableFuture.runAsync(
773
() -> processPartition(partition), executor))
774
.collect(Collectors.toList());
775
776
// Wait for all partitions to complete
777
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
778
.join();
779
}
780
781
private void processPartition(InputPartition partition) {
782
// Partition-specific processing logic
783
}
784
}
785
```
786
787
The Streaming APIs provide a robust foundation for building real-time data processing systems with strong guarantees around fault tolerance, exactly-once processing, and efficient resource utilization.