or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

streaming-apis.mddocs/

Streaming APIs

The Streaming APIs in Apache Spark Catalyst provide comprehensive support for real-time data processing through both micro-batch and continuous processing modes. These APIs enable building custom streaming data sources and sinks with advanced features like exactly-once semantics, fault tolerance, and state management.

Core Streaming Read APIs

MicroBatchStream

Interface for streaming data sources in micro-batch mode:

package org.apache.spark.sql.connector.read.streaming;

public interface MicroBatchStream {
    /**
     * Get the latest available offset
     */
    Offset latestOffset();
    
    /**
     * Get the initial offset for starting the stream
     */
    Offset initialOffset();
    
    /**
     * Deserialize offset from JSON string
     */
    Offset deserializeOffset(String json);
    
    /**
     * Commit processing up to the given offset
     */
    void commit(Offset end);
    
    /**
     * Stop the stream
     */
    void stop();
}

ContinuousStream

Interface for streaming data sources in continuous mode:

public interface ContinuousStream {
    /**
     * Create continuous reader factory
     */
    ContinuousPartitionReaderFactory createContinuousReaderFactory();
    
    /**
     * Get initial offset for the stream
     */
    Offset initialOffset();
    
    /**
     * Merge partition offsets into a single offset
     */
    Offset mergeOffsets(PartitionOffset[] offsets);
    
    /**
     * Deserialize offset from JSON
     */
    Offset deserializeOffset(String json);
    
    /**
     * Commit processing up to the given offset
     */
    void commit(Offset end);
    
    /**
     * Stop the stream
     */
    void stop();
}

Offset

Abstract base class for representing positions in streaming data:

public abstract class Offset {
    /**
     * JSON representation of this offset
     */
    public abstract String json();
    
    @Override
    public abstract boolean equals(Object obj);
    
    @Override 
    public abstract int hashCode();
}

Implementing a Micro-Batch Streaming Source

Custom Offset Implementation

public class MyStreamOffset extends Offset {
    private final long batchId;
    private final long recordCount;
    
    public MyStreamOffset(long batchId, long recordCount) {
        this.batchId = batchId;
        this.recordCount = recordCount;
    }
    
    @Override
    public String json() {
        return String.format("{\"batchId\":%d,\"recordCount\":%d}", batchId, recordCount);
    }
    
    @Override
    public boolean equals(Object obj) {
        if (obj instanceof MyStreamOffset) {
            MyStreamOffset other = (MyStreamOffset) obj;
            return this.batchId == other.batchId && this.recordCount == other.recordCount;
        }
        return false;
    }
    
    @Override
    public int hashCode() {
        return Objects.hash(batchId, recordCount);
    }
    
    public long getBatchId() { return batchId; }
    public long getRecordCount() { return recordCount; }
    
    public static MyStreamOffset fromJson(String json) {
        // Parse JSON and return offset
        ObjectMapper mapper = new ObjectMapper();
        try {
            JsonNode node = mapper.readTree(json);
            return new MyStreamOffset(
                node.get("batchId").asLong(),
                node.get("recordCount").asLong()
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to parse offset: " + json, e);
        }
    }
}

Complete Micro-Batch Stream Implementation

public class MyMicroBatchStream implements MicroBatchStream {
    private final String streamSource;
    private final StructType schema;
    private volatile MyStreamOffset currentOffset;
    private volatile boolean stopped = false;
    
    public MyMicroBatchStream(String streamSource, StructType schema) {
        this.streamSource = streamSource;
        this.schema = schema;
        this.currentOffset = new MyStreamOffset(0, 0);
    }
    
    @Override
    public Offset latestOffset() {
        if (stopped) {
            return currentOffset;
        }
        
        // Check for new data and update offset
        long newBatchId = currentOffset.getBatchId() + 1;
        long newRecordCount = checkForNewRecords();
        
        if (newRecordCount > currentOffset.getRecordCount()) {
            currentOffset = new MyStreamOffset(newBatchId, newRecordCount);
        }
        
        return currentOffset;
    }
    
    @Override
    public Offset initialOffset() {
        return new MyStreamOffset(0, 0);
    }
    
    @Override
    public Offset deserializeOffset(String json) {
        return MyStreamOffset.fromJson(json);
    }
    
    @Override
    public void commit(Offset end) {
        MyStreamOffset offset = (MyStreamOffset) end;
        // Persist checkpoint information
        persistCheckpoint(offset);
    }
    
    @Override
    public void stop() {
        stopped = true;
        // Clean up resources
        closeConnections();
    }
    
    private long checkForNewRecords() {
        // Implementation specific - check external source for new data
        return queryExternalSourceForRecordCount();
    }
    
    private void persistCheckpoint(MyStreamOffset offset) {
        // Persist offset for fault tolerance
        writeCheckpointToStorage(offset);
    }
}

Streaming Scan Integration

public class MyStreamingTable implements Table, SupportsRead {
    @Override
    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
        return new MyStreamingScanBuilder(schema, options);
    }
}

public class MyStreamingScanBuilder implements ScanBuilder {
    private final StructType schema;
    private final CaseInsensitiveStringMap options;
    
    @Override
    public Scan build() {
        return new MyStreamingScan(schema, options);
    }
}

public class MyStreamingScan implements Scan {
    private final StructType schema;
    private final CaseInsensitiveStringMap options;
    
    @Override
    public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
        String streamSource = options.get("source.path");
        return new MyMicroBatchStream(streamSource, schema);
    }
    
    @Override
    public StructType readSchema() {
        return schema;
    }
}

Continuous Stream Implementation

Continuous Partition Reader

public class MyContinuousPartitionReader implements ContinuousPartitionReader<InternalRow> {
    private final int partitionId;
    private final StructType schema;
    private volatile boolean stopped = false;
    private volatile PartitionOffset currentOffset;
    
    public MyContinuousPartitionReader(int partitionId, StructType schema, 
                                      PartitionOffset startOffset) {
        this.partitionId = partitionId;
        this.schema = schema;
        this.currentOffset = startOffset;
    }
    
    @Override
    public boolean next() throws IOException {
        if (stopped) {
            return false;
        }
        
        // Continuously poll for new records
        return pollForNewRecord();
    }
    
    @Override
    public InternalRow get() {
        // Return current record and update offset
        InternalRow row = getCurrentRecord();
        updateOffset();
        return row;
    }
    
    @Override
    public PartitionOffset getOffset() {
        return currentOffset;
    }
    
    @Override
    public void close() throws IOException {
        stopped = true;
        // Clean up partition-specific resources
    }
    
    private boolean pollForNewRecord() {
        // Implementation-specific polling logic
        return hasNewDataAvailable();
    }
}

Continuous Stream Factory

public class MyContinuousPartitionReaderFactory implements ContinuousPartitionReaderFactory {
    private final StructType schema;
    
    @Override
    public ContinuousPartitionReader<InternalRow> createReader(
            ContinuousInputPartition partition) {
        MyContinuousInputPartition myPartition = (MyContinuousInputPartition) partition;
        return new MyContinuousPartitionReader(
            myPartition.getPartitionId(),
            schema,
            myPartition.getStartOffset()
        );
    }
}

Streaming Write APIs

StreamingWrite

Interface for streaming write operations:

package org.apache.spark.sql.connector.write.streaming;

public interface StreamingWrite {
    /**
     * Create writer factory for streaming
     */
    StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info);
    
    /**
     * Whether to use Spark's commit coordinator
     */
    boolean useCommitCoordinator();
    
    /**
     * Commit an epoch of streaming writes
     */
    void commit(long epochId, WriterCommitMessage[] messages);
    
    /**
     * Abort an epoch of streaming writes
     */
    void abort(long epochId, WriterCommitMessage[] messages);
}

StreamingDataWriterFactory

Factory for creating streaming data writers:

public interface StreamingDataWriterFactory extends DataWriterFactory {
    /**
     * Create writer with epoch information
     */
    DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId);
}

Complete Streaming Write Implementation

public class MyStreamingWrite implements StreamingWrite {
    private final LogicalWriteInfo writeInfo;
    private final Map<Long, Set<String>> epochFiles = new ConcurrentHashMap<>();
    
    @Override
    public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) {
        return new MyStreamingDataWriterFactory(info.schema(), writeInfo.options());
    }
    
    @Override
    public boolean useCommitCoordinator() {
        return true; // Enable exactly-once semantics
    }
    
    @Override
    public void commit(long epochId, WriterCommitMessage[] messages) {
        Set<String> files = new HashSet<>();
        
        try {
            // Collect all files written in this epoch
            for (WriterCommitMessage message : messages) {
                MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;
                files.addAll(myMessage.getWrittenFiles());
            }
            
            // Atomically commit all files for this epoch
            atomicCommitEpoch(epochId, files);
            epochFiles.put(epochId, files);
            
            // Clean up old epochs
            cleanupOldEpochs(epochId);
            
        } catch (Exception e) {
            // If commit fails, abort the epoch
            abort(epochId, messages);
            throw new RuntimeException("Failed to commit epoch " + epochId, e);
        }
    }
    
    @Override
    public void abort(long epochId, WriterCommitMessage[] messages) {
        // Clean up any partially written files
        for (WriterCommitMessage message : messages) {
            MyStreamingCommitMessage myMessage = (MyStreamingCommitMessage) message;
            for (String file : myMessage.getWrittenFiles()) {
                deleteFileIfExists(file);
            }
        }
        epochFiles.remove(epochId);
    }
    
    private void atomicCommitEpoch(long epochId, Set<String> files) {
        // Implementation-specific atomic commit
        // This might involve:
        // 1. Writing a commit marker
        // 2. Moving temp files to final locations
        // 3. Updating metadata
    }
    
    private void cleanupOldEpochs(long currentEpoch) {
        // Keep only recent epochs for fault tolerance
        long cutoffEpoch = currentEpoch - 100;
        epochFiles.entrySet().removeIf(entry -> entry.getKey() < cutoffEpoch);
    }
}

public class MyStreamingDataWriterFactory implements StreamingDataWriterFactory {
    private final StructType schema;
    private final CaseInsensitiveStringMap options;
    
    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
        return new MyStreamingDataWriter(schema, partitionId, taskId, epochId, options);
    }
}

public class MyStreamingDataWriter implements DataWriter<InternalRow> {
    private final StructType schema;
    private final int partitionId;
    private final long taskId;
    private final long epochId;
    private final List<InternalRow> buffer = new ArrayList<>();
    private final Set<String> writtenFiles = new HashSet<>();
    
    @Override
    public void write(InternalRow record) throws IOException {
        buffer.add(record.copy());
        
        // Batch writes for efficiency
        if (buffer.size() >= 1000) {
            flushBuffer();
        }
    }
    
    @Override
    public WriterCommitMessage commit() throws IOException {
        if (!buffer.isEmpty()) {
            flushBuffer();
        }
        return new MyStreamingCommitMessage(partitionId, taskId, epochId, writtenFiles);
    }
    
    @Override
    public void abort() throws IOException {
        // Clean up any files written by this writer
        for (String file : writtenFiles) {
            deleteFileIfExists(file);
        }
        buffer.clear();
        writtenFiles.clear();
    }
    
    private void flushBuffer() throws IOException {
        String fileName = generateFileName(partitionId, taskId, epochId);
        writeBufferToFile(fileName, buffer);
        writtenFiles.add(fileName);
        buffer.clear();
    }
}

Advanced Streaming Patterns

Exactly-Once Processing with Idempotent Writes

public class IdempotentStreamingWrite implements StreamingWrite {
    private final Map<Long, String> epochCommitIds = new ConcurrentHashMap<>();
    
    @Override
    public void commit(long epochId, WriterCommitMessage[] messages) {
        String commitId = generateCommitId(epochId, messages);
        
        // Check if this epoch was already committed (for retries)
        if (epochCommitIds.containsKey(epochId)) {
            String existingCommitId = epochCommitIds.get(epochId);
            if (existingCommitId.equals(commitId)) {
                // Already committed with same data - this is a retry
                return;
            } else {
                throw new IllegalStateException(
                    String.format("Epoch %d committed with different data", epochId));
            }
        }
        
        // Perform idempotent commit
        performIdempotentCommit(epochId, commitId, messages);
        epochCommitIds.put(epochId, commitId);
    }
    
    private void performIdempotentCommit(long epochId, String commitId, 
                                       WriterCommitMessage[] messages) {
        // Use commitId to ensure idempotency
        // This might involve conditional writes to external systems
    }
}

State Management for Streaming Aggregations

public class StatefulStreamingProcessor {
    private final Map<String, Object> state = new ConcurrentHashMap<>();
    
    public void processStreamingBatch(Iterator<InternalRow> batch, long epochId) {
        Map<String, Object> batchState = new HashMap<>();
        
        // Process batch and update state
        while (batch.hasNext()) {
            InternalRow row = batch.next();
            String key = extractKey(row);
            Object value = extractValue(row);
            
            // Update batch state
            batchState.merge(key, value, this::combineValues);
        }
        
        // Atomically update global state
        synchronized (state) {
            for (Map.Entry<String, Object> entry : batchState.entrySet()) {
                state.merge(entry.getKey(), entry.getValue(), this::combineValues);
            }
        }
        
        // Persist state for fault tolerance
        persistState(epochId);
    }
    
    private Object combineValues(Object existing, Object newValue) {
        // Implementation-specific value combination logic
        if (existing instanceof Number && newValue instanceof Number) {
            return ((Number) existing).doubleValue() + ((Number) newValue).doubleValue();
        }
        return newValue;
    }
}

Watermark-Based Late Data Handling

public class WatermarkStreamingSource implements MicroBatchStream {
    private volatile long currentWatermark = 0;
    private final Duration allowedLateness;
    
    public WatermarkStreamingSource(Duration allowedLateness) {
        this.allowedLateness = allowedLateness;
    }
    
    public void updateWatermark(long eventTime) {
        // Update watermark based on event time minus allowed lateness
        long newWatermark = eventTime - allowedLateness.toMillis();
        currentWatermark = Math.max(currentWatermark, newWatermark);
    }
    
    public boolean isLateData(long eventTime) {
        return eventTime < currentWatermark;
    }
    
    @Override
    public Offset latestOffset() {
        // Include watermark information in offset
        return new WatermarkOffset(getCurrentBatchId(), currentWatermark);
    }
}

public class WatermarkOffset extends Offset {
    private final long batchId;
    private final long watermark;
    
    public WatermarkOffset(long batchId, long watermark) {
        this.batchId = batchId;
        this.watermark = watermark;
    }
    
    @Override
    public String json() {
        return String.format("{\"batchId\":%d,\"watermark\":%d}", batchId, watermark);
    }
    
    // equals and hashCode implementations...
}

Fault Tolerance and Recovery

Checkpointing Implementation

public class CheckpointableStreamingSource implements MicroBatchStream {
    private final String checkpointLocation;
    private volatile MyStreamOffset lastCheckpointedOffset;
    
    @Override
    public void commit(Offset end) {
        MyStreamOffset offset = (MyStreamOffset) end;
        
        try {
            // Write checkpoint atomically
            writeCheckpoint(offset);
            lastCheckpointedOffset = offset;
        } catch (IOException e) {
            throw new RuntimeException("Failed to checkpoint offset: " + offset, e);
        }
    }
    
    @Override
    public Offset initialOffset() {
        try {
            // Try to recover from checkpoint
            MyStreamOffset checkpointOffset = readCheckpoint();
            if (checkpointOffset != null) {
                return checkpointOffset;
            }
        } catch (IOException e) {
            // Log warning and start from beginning
            System.err.println("Failed to read checkpoint, starting from beginning: " + e);
        }
        
        return new MyStreamOffset(0, 0);
    }
    
    private void writeCheckpoint(MyStreamOffset offset) throws IOException {
        String tempFile = checkpointLocation + ".tmp";
        String finalFile = checkpointLocation;
        
        // Atomic write: write to temp file, then rename
        Files.write(Paths.get(tempFile), offset.json().getBytes());
        Files.move(Paths.get(tempFile), Paths.get(finalFile));
    }
    
    private MyStreamOffset readCheckpoint() throws IOException {
        Path checkpointPath = Paths.get(checkpointLocation);
        if (!Files.exists(checkpointPath)) {
            return null;
        }
        
        String json = new String(Files.readAllBytes(checkpointPath));
        return MyStreamOffset.fromJson(json);
    }
}

Retry and Error Handling

public class ResilientStreamingProcessor {
    private final int maxRetries;
    private final Duration retryDelay;
    
    public void processWithRetry(Runnable operation, String operationName) {
        int attempts = 0;
        Exception lastException = null;
        
        while (attempts < maxRetries) {
            try {
                operation.run();
                return; // Success
            } catch (Exception e) {
                attempts++;
                lastException = e;
                
                if (isRetriableException(e) && attempts < maxRetries) {
                    System.err.printf("Operation %s failed (attempt %d/%d), retrying in %s: %s%n",
                        operationName, attempts, maxRetries, retryDelay, e.getMessage());
                    
                    try {
                        Thread.sleep(retryDelay.toMillis());
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException("Interrupted during retry", ie);
                    }
                } else {
                    break;
                }
            }
        }
        
        throw new RuntimeException(
            String.format("Operation %s failed after %d attempts", operationName, attempts),
            lastException);
    }
    
    private boolean isRetriableException(Exception e) {
        // Determine if exception is worth retrying
        return e instanceof IOException || 
               e instanceof ConnectException ||
               (e instanceof RuntimeException && e.getCause() instanceof IOException);
    }
}

Performance Optimization

Batching for Efficiency

public class BatchingStreamingWriter implements DataWriter<InternalRow> {
    private final List<InternalRow> buffer = new ArrayList<>();
    private final int batchSize;
    private final Duration flushInterval;
    private long lastFlushTime = System.currentTimeMillis();
    
    @Override
    public void write(InternalRow record) throws IOException {
        buffer.add(record.copy());
        
        // Flush based on size or time
        if (shouldFlush()) {
            flushBuffer();
        }
    }
    
    private boolean shouldFlush() {
        return buffer.size() >= batchSize ||
               (System.currentTimeMillis() - lastFlushTime) > flushInterval.toMillis();
    }
    
    private void flushBuffer() throws IOException {
        if (!buffer.isEmpty()) {
            writeBatch(buffer);
            buffer.clear();
            lastFlushTime = System.currentTimeMillis();
        }
    }
}

Parallel Processing

public class ParallelStreamingProcessor {
    private final ExecutorService executor;
    
    public void processParallel(List<InputPartition> partitions) {
        List<CompletableFuture<Void>> futures = partitions.stream()
            .map(partition -> CompletableFuture.runAsync(
                () -> processPartition(partition), executor))
            .collect(Collectors.toList());
        
        // Wait for all partitions to complete
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();
    }
    
    private void processPartition(InputPartition partition) {
        // Partition-specific processing logic
    }
}

The Streaming APIs provide a robust foundation for building real-time data processing systems with strong guarantees around fault tolerance, exactly-once processing, and efficient resource utilization.