The Streaming APIs in Apache Spark Catalyst provide comprehensive support for real-time data processing through both micro-batch and continuous processing modes. These APIs enable building custom streaming data sources and sinks with advanced features like exactly-once semantics, fault tolerance, and state management.
Interface for streaming data sources in micro-batch mode:
package org.apache.spark.sql.connector.read.streaming;
public interface MicroBatchStream {
/**
* Get the latest available offset
*/
Offset latestOffset();
/**
* Get the initial offset for starting the stream
*/
Offset initialOffset();
/**
* Deserialize offset from JSON string
*/
Offset deserializeOffset(String json);
/**
* Commit processing up to the given offset
*/
void commit(Offset end);
/**
* Stop the stream
*/
void stop();
}Interface for streaming data sources in continuous mode:
public interface ContinuousStream {
/**
* Create continuous reader factory
*/
ContinuousPartitionReaderFactory createContinuousReaderFactory();
/**
* Get initial offset for the stream
*/
Offset initialOffset();
/**
* Merge partition offsets into a single offset
*/
Offset mergeOffsets(PartitionOffset[] offsets);
/**
* Deserialize offset from JSON
*/
Offset deserializeOffset(String json);
/**
* Commit processing up to the given offset
*/
void commit(Offset end);
/**
* Stop the stream
*/
void stop();
}Abstract base class for representing positions in streaming data:
public abstract class Offset {
/**
* JSON representation of this offset
*/
public abstract String json();
@Override
public abstract boolean equals(Object obj);
@Override
public abstract int hashCode();
}public class MyStreamOffset extends Offset {
private final long batchId;
private final long recordCount;
public MyStreamOffset(long batchId, long recordCount) {
this.batchId = batchId;
this.recordCount = recordCount;
}
@Override
public String json() {
return String.format("{\"batchId\":%d,\"recordCount\":%d}", batchId, recordCount);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof MyStreamOffset) {
MyStreamOffset other = (MyStreamOffset) obj;
return this.batchId == other.batchId && this.recordCount == other.recordCount;
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(batchId, recordCount);
}
public long getBatchId() { return batchId; }
public long getRecordCount() { return recordCount; }
public static MyStreamOffset fromJson(String json) {
// Parse JSON and return offset
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode node = mapper.readTree(json);
return new MyStreamOffset(
node.get("batchId").asLong(),
node.get("recordCount").asLong()
);
} catch (Exception e) {
throw new RuntimeException("Failed to parse offset: " + json, e);
}
}
}public class MyMicroBatchStream implements MicroBatchStream {
private final String streamSource;
private final StructType schema;
private volatile MyStreamOffset currentOffset;
private volatile boolean stopped = false;
public MyMicroBatchStream(String streamSource, StructType schema) {
this.streamSource = streamSource;
this.schema = schema;
this.currentOffset = new MyStreamOffset(0, 0);
}
@Override
public Offset latestOffset() {
if (stopped) {
return currentOffset;
}
// Check for new data and update offset
long newBatchId = currentOffset.getBatchId() + 1;
long newRecordCount = checkForNewRecords();
if (newRecordCount > currentOffset.getRecordCount()) {
currentOffset = new MyStreamOffset(newBatchId, newRecordCount);
}
return currentOffset;
}
@Override
public Offset initialOffset() {
return new MyStreamOffset(0, 0);
}
@Override
public Offset deserializeOffset(String json) {
return MyStreamOffset.fromJson(json);
}
@Override
public void commit(Offset end) {
MyStreamOffset offset = (MyStreamOffset) end;
// Persist checkpoint information
persistCheckpoint(offset);
}
@Override
public void stop() {
stopped = true;
// Clean up resources
closeConnections();
}
private long checkForNewRecords() {
// Implementation specific - check external source for new data
return queryExternalSourceForRecordCount();
}
private void persistCheckpoint(MyStreamOffset offset) {
// Persist offset for fault tolerance
writeCheckpointToStorage(offset);
}
}public class MyStreamingTable implements Table, SupportsRead {
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyStreamingScanBuilder(schema, options);
}
}
public class MyStreamingScanBuilder implements ScanBuilder {
private final StructType schema;
private final CaseInsensitiveStringMap options;
@Override
public Scan build() {
return new MyStreamingScan(schema, options);
}
}
public class MyStreamingScan implements Scan {
private final StructType schema;
private final CaseInsensitiveStringMap options;
@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
String streamSource = options.get("source.path");
return new MyMicroBatchStream(streamSource, schema);
}
@Override
public StructType readSchema() {
return schema;
}
}public class MyContinuousPartitionReader implements ContinuousPartitionReader<InternalRow> {
private final int partitionId;
private final StructType schema;
private volatile boolean stopped = false;
private volatile PartitionOffset currentOffset;
public MyContinuousPartitionReader(int partitionId, StructType schema,
PartitionOffset startOffset) {
this.partitionId = partitionId;
this.schema = schema;
this.currentOffset = startOffset;
}
@Override
public boolean next() throws IOException {
if (stopped) {
return false;
}
// Continuously poll for new records
return pollForNewRecord();
}
@Override
public InternalRow get() {
// Return current record and update offset
InternalRow row = getCurrentRecord();
updateOffset();
return row;
}
@Override
public PartitionOffset getOffset() {
return currentOffset;
}
@Override
public void close() throws IOException {
stopped = true;
// Clean up partition-specific resources
}
private boolean pollForNewRecord() {
// Implementation-specific polling logic
return hasNewDataAvailable();
}
}public class MyContinuousPartitionReaderFactory implements ContinuousPartitionReaderFactory {
private final StructType schema;
@Override
public ContinuousPartitionReader<InternalRow> createReader(
ContinuousInputPartition partition) {
MyContinuousInputPartition myPartition = (MyContinuousInputPartition) partition;
return new MyContinuousPartitionReader(
myPartition.getPartitionId(),
schema,
myPartition.getStartOffset()
);
}
}Interface for streaming write operations:
package org.apache.spark.sql.connector.write.streaming;
public interface StreamingWrite {
/**
* Create writer factory for streaming
*/
StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info);
/**
* Whether to use Spark's commit coordinator
*/
boolean useCommitCoordinator();
/**
* Commit an epoch of streaming writes
*/
void commit(long epochId, WriterCommitMessage[] messages);
/**
* Abort an epoch of streaming writes
*/
void abort(long epochId, WriterCommitMessage[] messages);
}Factory for creating streaming data writers:
public interface StreamingDataWriterFactory extends DataWriterFactory {
/**
* Create writer with epoch information
*/
DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId);
}public class MyStreamingWrite implements StreamingWrite {
private final LogicalWriteInfo writeInfo;
private final Map<Long, Set<String>> epochFiles = new ConcurrentHashMap<>();
@Override
public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) {
return new MyStreamingDataWriterFactory(info.schema(), writeInfo.options());
}
@Override
public boolean useCommitCoordinator() {
return true; // Enable exactly-once semantics
}
@Override
public void commit(long epochId, WriterCommitMessage[] messages) {
Set<String> files = new HashSet<>();
try {
// Collect all files written in this epoch
for (WriterCommitMessage message : messages) {
MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;
files.addAll(myMessage.getWrittenFiles());
}
// Atomically commit all files for this epoch
atomicCommitEpoch(epochId, files);
epochFiles.put(epochId, files);
// Clean up old epochs
cleanupOldEpochs(epochId);
} catch (Exception e) {
// If commit fails, abort the epoch
abort(epochId, messages);
throw new RuntimeException("Failed to commit epoch " + epochId, e);
}
}
@Override
public void abort(long epochId, WriterCommitMessage[] messages) {
// Clean up any partially written files
for (WriterCommitMessage message : messages) {
MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;
for (String file : myMessage.getWrittenFiles()) {
deleteFileIfExists(file);
}
}
epochFiles.remove(epochId);
}
private void atomicCommitEpoch(long epochId, Set<String> files) {
// Implementation-specific atomic commit
// This might involve:
// 1. Writing a commit marker
// 2. Moving temp files to final locations
// 3. Updating metadata
}
private void cleanupOldEpochs(long currentEpoch) {
// Keep only recent epochs for fault tolerance
long cutoffEpoch = currentEpoch - 100;
epochFiles.entrySet().removeIf(entry -> entry.getKey() < cutoffEpoch);
}
}
public class MyStreamingDataWriterFactory implements StreamingDataWriterFactory {
private final StructType schema;
private final CaseInsensitiveStringMap options;
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
return new MyStreamingDataWriter(schema, partitionId, taskId, epochId, options);
}
}
public class MyStreamingDataWriter implements DataWriter<InternalRow> {
private final StructType schema;
private final int partitionId;
private final long taskId;
private final long epochId;
private final List<InternalRow> buffer = new ArrayList<>();
private final Set<String> writtenFiles = new HashSet<>();
@Override
public void write(InternalRow record) throws IOException {
buffer.add(record.copy());
// Batch writes for efficiency
if (buffer.size() >= 1000) {
flushBuffer();
}
}
@Override
public WriterCommitMessage commit() throws IOException {
if (!buffer.isEmpty()) {
flushBuffer();
}
return new MyStreamingCommitMessage(partitionId, taskId, epochId, writtenFiles);
}
@Override
public void abort() throws IOException {
// Clean up any files written by this writer
for (String file : writtenFiles) {
deleteFileIfExists(file);
}
buffer.clear();
writtenFiles.clear();
}
private void flushBuffer() throws IOException {
String fileName = generateFileName(partitionId, taskId, epochId);
writeBufferToFile(fileName, buffer);
writtenFiles.add(fileName);
buffer.clear();
}
}public class IdempotentStreamingWrite implements StreamingWrite {
private final Map<Long, String> epochCommitIds = new ConcurrentHashMap<>();
@Override
public void commit(long epochId, WriterCommitMessage[] messages) {
String commitId = generateCommitId(epochId, messages);
// Check if this epoch was already committed (for retries)
if (epochCommitIds.containsKey(epochId)) {
String existingCommitId = epochCommitIds.get(epochId);
if (existingCommitId.equals(commitId)) {
// Already committed with same data - this is a retry
return;
} else {
throw new IllegalStateException(
String.format("Epoch %d committed with different data", epochId));
}
}
// Perform idempotent commit
performIdempotentCommit(epochId, commitId, messages);
epochCommitIds.put(epochId, commitId);
}
private void performIdempotentCommit(long epochId, String commitId,
WriterCommitMessage[] messages) {
// Use commitId to ensure idempotency
// This might involve conditional writes to external systems
}
}public class StatefulStreamingProcessor {
private final Map<String, Object> state = new ConcurrentHashMap<>();
public void processStreamingBatch(Iterator<InternalRow> batch, long epochId) {
Map<String, Object> batchState = new HashMap<>();
// Process batch and update state
while (batch.hasNext()) {
InternalRow row = batch.next();
String key = extractKey(row);
Object value = extractValue(row);
// Update batch state
batchState.merge(key, value, this::combineValues);
}
// Atomically update global state
synchronized (state) {
for (Map.Entry<String, Object> entry : batchState.entrySet()) {
state.merge(entry.getKey(), entry.getValue(), this::combineValues);
}
}
// Persist state for fault tolerance
persistState(epochId);
}
private Object combineValues(Object existing, Object newValue) {
// Implementation-specific value combination logic
if (existing instanceof Number && newValue instanceof Number) {
return ((Number) existing).doubleValue() + ((Number) newValue).doubleValue();
}
return newValue;
}
}public class WatermarkStreamingSource implements MicroBatchStream {
private volatile long currentWatermark = 0;
private final Duration allowedLateness;
public WatermarkStreamingSource(Duration allowedLateness) {
this.allowedLateness = allowedLateness;
}
public void updateWatermark(long eventTime) {
// Update watermark based on event time minus allowed lateness
long newWatermark = eventTime - allowedLateness.toMillis();
currentWatermark = Math.max(currentWatermark, newWatermark);
}
public boolean isLateData(long eventTime) {
return eventTime < currentWatermark;
}
@Override
public Offset latestOffset() {
// Include watermark information in offset
return new WatermarkOffset(getCurrentBatchId(), currentWatermark);
}
}
public class WatermarkOffset extends Offset {
private final long batchId;
private final long watermark;
public WatermarkOffset(long batchId, long watermark) {
this.batchId = batchId;
this.watermark = watermark;
}
@Override
public String json() {
return String.format("{\"batchId\":%d,\"watermark\":%d}", batchId, watermark);
}
// equals and hashCode implementations...
}public class CheckpointableStreamingSource implements MicroBatchStream {
private final String checkpointLocation;
private volatile MyStreamOffset lastCheckpointedOffset;
@Override
public void commit(Offset end) {
MyStreamOffset offset = (MyStreamOffset) end;
try {
// Write checkpoint atomically
writeCheckpoint(offset);
lastCheckpointedOffset = offset;
} catch (IOException e) {
throw new RuntimeException("Failed to checkpoint offset: " + offset, e);
}
}
@Override
public Offset initialOffset() {
try {
// Try to recover from checkpoint
MyStreamOffset checkpointOffset = readCheckpoint();
if (checkpointOffset != null) {
return checkpointOffset;
}
} catch (IOException e) {
// Log warning and start from beginning
System.err.println("Failed to read checkpoint, starting from beginning: " + e);
}
return new MyStreamOffset(0, 0);
}
private void writeCheckpoint(MyStreamOffset offset) throws IOException {
String tempFile = checkpointLocation + ".tmp";
String finalFile = checkpointLocation;
// Atomic write: write to temp file, then rename
Files.write(Paths.get(tempFile), offset.json().getBytes());
Files.move(Paths.get(tempFile), Paths.get(finalFile));
}
private MyStreamOffset readCheckpoint() throws IOException {
Path checkpointPath = Paths.get(checkpointLocation);
if (!Files.exists(checkpointPath)) {
return null;
}
String json = new String(Files.readAllBytes(checkpointPath));
return MyStreamOffset.fromJson(json);
}
}public class ResilientStreamingProcessor {
private final int maxRetries;
private final Duration retryDelay;
public void processWithRetry(Runnable operation, String operationName) {
int attempts = 0;
Exception lastException = null;
while (attempts < maxRetries) {
try {
operation.run();
return; // Success
} catch (Exception e) {
attempts++;
lastException = e;
if (isRetriableException(e) && attempts < maxRetries) {
System.err.printf("Operation %s failed (attempt %d/%d), retrying in %s: %s%n",
operationName, attempts, maxRetries, retryDelay, e.getMessage());
try {
Thread.sleep(retryDelay.toMillis());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
} else {
break;
}
}
}
throw new RuntimeException(
String.format("Operation %s failed after %d attempts", operationName, attempts),
lastException);
}
private boolean isRetriableException(Exception e) {
// Determine if exception is worth retrying
return e instanceof IOException ||
e instanceof ConnectException ||
(e instanceof RuntimeException && e.getCause() instanceof IOException);
}
}public class BatchingStreamingWriter implements DataWriter<InternalRow> {
private final List<InternalRow> buffer = new ArrayList<>();
private final int batchSize;
private final Duration flushInterval;
private long lastFlushTime = System.currentTimeMillis();
@Override
public void write(InternalRow record) throws IOException {
buffer.add(record.copy());
// Flush based on size or time
if (shouldFlush()) {
flushBuffer();
}
}
private boolean shouldFlush() {
return buffer.size() >= batchSize ||
(System.currentTimeMillis() - lastFlushTime) > flushInterval.toMillis();
}
private void flushBuffer() throws IOException {
if (!buffer.isEmpty()) {
writeBatch(buffer);
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
}
}public class ParallelStreamingProcessor {
private final ExecutorService executor;
public void processParallel(List<InputPartition> partitions) {
List<CompletableFuture<Void>> futures = partitions.stream()
.map(partition -> CompletableFuture.runAsync(
() -> processPartition(partition), executor))
.collect(Collectors.toList());
// Wait for all partitions to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
}
private void processPartition(InputPartition partition) {
// Partition-specific processing logic
}
}The Streaming APIs provide a robust foundation for building real-time data processing systems with strong guarantees around fault tolerance, exactly-once processing, and efficient resource utilization.