or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

data-source-v2-apis.mddocs/

Data Source V2 APIs

The Data Source V2 APIs provide a modern, comprehensive framework for implementing custom data sources in Apache Spark. These APIs support advanced optimizations like predicate pushdown, column pruning, and vectorized processing while maintaining clean separation of concerns.

Core Read APIs

ScanBuilder

The entry point for building scans with various optimizations:

package org.apache.spark.sql.connector.read;

public interface ScanBuilder {
    /**
     * Build the final Scan object
     */
    Scan build();
}

Scan

Logical representation of a data scan:

public interface Scan {
    /**
     * Returns the actual schema of this data source scan
     */
    StructType readSchema();
    
    /**
     * Returns a human-readable description of this scan
     */
    default String description();
    
    /**
     * Returns a Batch for batch queries (must implement if table supports BATCH_READ)
     */
    default Batch toBatch();
    
    /**
     * Returns a MicroBatchStream for streaming queries (must implement if table supports MICRO_BATCH_READ)
     */
    default MicroBatchStream toMicroBatchStream(String checkpointLocation);
    
    /**
     * Returns a ContinuousStream for continuous streaming queries (must implement if table supports CONTINUOUS_READ)
     */
    default ContinuousStream toContinuousStream(String checkpointLocation);
    
    /**
     * Returns custom metrics that this scan supports
     */
    default CustomMetric[] supportedCustomMetrics();
    
    /**
     * Returns custom task metrics reported from driver side
     */
    default CustomTaskMetric[] reportDriverMetrics();
    
    /**
     * Returns the columnar support mode for vectorized processing
     */
    default ColumnarSupportMode columnarSupportMode();
}

Batch

Physical representation for batch execution:

public interface Batch {
    /**
     * Plan input partitions for parallel processing
     */
    InputPartition[] planInputPartitions();
    
    /**
     * Create reader factory for processing partitions
     */
    PartitionReaderFactory createReaderFactory();
}

Basic Data Source Implementation

public class MyDataSource implements Table, SupportsRead {
    private final String name;
    private final StructType schema;
    private final String[] paths;
    
    public MyDataSource(String name, StructType schema, String[] paths) {
        this.name = name;
        this.schema = schema;
        this.paths = paths;
    }
    
    @Override
    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
        return new MyScanBuilder(schema, paths, options);
    }
}

public class MyScanBuilder implements ScanBuilder {
    private final StructType schema;
    private final String[] paths;
    private final CaseInsensitiveStringMap options;
    
    public MyScanBuilder(StructType schema, String[] paths, 
                        CaseInsensitiveStringMap options) {
        this.schema = schema;
        this.paths = paths;
        this.options = options;
    }
    
    @Override
    public Scan build() {
        return new MyScan(schema, paths);
    }
}

public class MyScan implements Scan {
    private final StructType schema;
    private final String[] paths;
    
    @Override
    public StructType readSchema() {
        return schema;
    }
    
    @Override
    public String description() {
        return String.format("MyScan[paths=%s]", Arrays.toString(paths));
    }
    
    @Override
    public Batch toBatch() {
        return new MyBatch(schema, paths);
    }
}

Partition Processing

InputPartition

Represents a partition of input data:

public interface InputPartition extends Serializable {
    // Marker interface - implementations can add partition-specific data
}

PartitionReader

Reads data from a single partition:

public interface PartitionReader<T> extends Closeable {
    /**
     * Advance to next record
     */
    boolean next() throws IOException;
    
    /**
     * Get current record
     */
    T get();
}

PartitionReaderFactory

Factory for creating partition readers:

public interface PartitionReaderFactory extends Serializable {
    /**
     * Create reader for given partition
     */
    PartitionReader<InternalRow> createReader(InputPartition partition);
}

Complete Partition Processing Implementation:

public class MyBatch implements Batch {
    private final StructType schema;
    private final String[] paths;
    
    @Override
    public InputPartition[] planInputPartitions() {
        // Create one partition per file/path
        return Arrays.stream(paths)
            .map(MyInputPartition::new)
            .toArray(InputPartition[]::new);
    }
    
    @Override
    public PartitionReaderFactory createReaderFactory() {
        return new MyPartitionReaderFactory(schema);
    }
}

public class MyInputPartition implements InputPartition {
    private final String path;
    
    public MyInputPartition(String path) {
        this.path = path;
    }
    
    public String getPath() {
        return path;
    }
}

public class MyPartitionReaderFactory implements PartitionReaderFactory {
    private final StructType schema;
    
    public MyPartitionReaderFactory(StructType schema) {
        this.schema = schema;
    }
    
    @Override
    public PartitionReader<InternalRow> createReader(InputPartition partition) {
        MyInputPartition myPartition = (MyInputPartition) partition;
        return new MyPartitionReader(schema, myPartition.getPath());
    }
}

public class MyPartitionReader implements PartitionReader<InternalRow> {
    private final StructType schema;
    private final String path;
    private Iterator<InternalRow> iterator;
    private InternalRow currentRow;
    
    public MyPartitionReader(StructType schema, String path) {
        this.schema = schema;
        this.path = path;
        this.iterator = loadDataFromPath(path);
    }
    
    @Override
    public boolean next() {
        if (iterator.hasNext()) {
            currentRow = iterator.next();
            return true;
        }
        return false;
    }
    
    @Override
    public InternalRow get() {
        return currentRow;
    }
    
    @Override
    public void close() throws IOException {
        // Clean up resources
    }
}

Pushdown Optimizations

Filter Pushdown

Legacy Filter Pushdown (V1)

public interface SupportsPushDownFilters {
    /**
     * Push filters down to data source
     * @return filters that could not be pushed down
     */
    Filter[] pushFilters(Filter[] filters);
    
    /**
     * Get filters that were successfully pushed down
     */
    Filter[] pushedFilters();
}

Modern Filter Pushdown (V2)

public interface SupportsPushDownV2Filters {
    /**
     * Push V2 predicates down to data source
     * @return predicates that could not be pushed down  
     */
    Predicate[] pushPredicates(Predicate[] predicates);
    
    /**
     * Get predicates that were successfully pushed down
     */
    Predicate[] pushedPredicates();
}

Filter Pushdown Implementation:

public class MyScanBuilder implements ScanBuilder, SupportsPushDownV2Filters {
    private final StructType schema;
    private final String[] paths;
    private Predicate[] pushedPredicates = new Predicate[0];
    
    @Override
    public Predicate[] pushPredicates(Predicate[] predicates) {
        List<Predicate> supported = new ArrayList<>();
        List<Predicate> unsupported = new ArrayList<>();
        
        for (Predicate predicate : predicates) {
            if (canPushDown(predicate)) {
                supported.add(predicate);
            } else {
                unsupported.add(predicate);
            }
        }
        
        this.pushedPredicates = supported.toArray(new Predicate[0]);
        return unsupported.toArray(new Predicate[0]);
    }
    
    @Override
    public Predicate[] pushedPredicates() {
        return pushedPredicates.clone();
    }
    
    private boolean canPushDown(Predicate predicate) {
        // Check if predicate can be evaluated by data source
        if (predicate instanceof EqualTo) {
            return true;
        }
        if (predicate instanceof GreaterThan || predicate instanceof LessThan) {
            return true;
        }
        if (predicate instanceof And || predicate instanceof Or) {
            return true;
        }
        return false;
    }
    
    @Override
    public Scan build() {
        return new MyScan(schema, paths, pushedPredicates);
    }
}

Column Pruning

public interface SupportsPushDownRequiredColumns {
    /**
     * Prune columns to only those required
     */
    void pruneColumns(StructType requiredSchema);
}

Implementation:

public class MyScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns {
    private StructType schema;
    private StructType prunedSchema;
    
    @Override
    public void pruneColumns(StructType requiredSchema) {
        // Only read required columns
        this.prunedSchema = requiredSchema;
    }
    
    @Override
    public Scan build() {
        StructType finalSchema = prunedSchema != null ? prunedSchema : schema;
        return new MyScan(finalSchema, paths, pushedPredicates);
    }
}

Aggregate Pushdown

public interface SupportsPushDownAggregates {
    /**
     * Push aggregation down to data source
     * @return true if aggregation can be completely pushed down
     */
    boolean pushAggregation(Aggregation aggregation);
    
    /**
     * Whether data source can completely handle the aggregation
     */
    boolean supportCompletePushDown(Aggregation aggregation);
}

Implementation:

public class MyScanBuilder implements ScanBuilder, SupportsPushDownAggregates {
    private Aggregation pushedAggregation;
    private boolean completeAggregation;
    
    @Override
    public boolean pushAggregation(Aggregation aggregation) {
        // Check if we can handle this aggregation
        AggregateFunc[] aggregates = aggregation.aggregateExpressions();
        Expression[] groupBy = aggregation.groupByExpressions();
        
        // Simple aggregations we can handle
        for (AggregateFunc func : aggregates) {
            if (!(func instanceof Count || func instanceof Sum)) {
                return false; // Can't handle complex aggregations
            }
        }
        
        this.pushedAggregation = aggregation;
        this.completeAggregation = true;
        return true;
    }
    
    @Override
    public boolean supportCompletePushDown(Aggregation aggregation) {
        return completeAggregation;
    }
}

Limit and Offset Pushdown

public interface SupportsPushDownLimit {
    boolean pushLimit(int limit);
    int pushedLimit();
}

public interface SupportsPushDownOffset {
    boolean pushOffset(long offset);
    long pushedOffset();
}

TopN Pushdown

public interface SupportsPushDownTopN {
    boolean pushTopN(SortOrder[] orders, int limit);
    SortOrder[] pushedTopNOrders();
    int pushedTopNLimit();
}

TopN Implementation:

public class MyScanBuilder implements ScanBuilder, SupportsPushDownTopN {
    private SortOrder[] pushedOrders;
    private int pushedLimit = -1;
    
    @Override
    public boolean pushTopN(SortOrder[] orders, int limit) {
        // Check if we can handle the sort orders
        for (SortOrder order : orders) {
            if (!canSortBy(order)) {
                return false;
            }
        }
        
        this.pushedOrders = orders;
        this.pushedLimit = limit;
        return true;
    }
    
    private boolean canSortBy(SortOrder order) {
        // Check if column is sortable in our data source
        return order.expression() instanceof NamedReference;
    }
}

Write APIs

WriteBuilder

Entry point for building write operations:

package org.apache.spark.sql.connector.write;

public interface WriteBuilder {
    /**
     * Build the final Write object
     */
    Write build();
    
    /**
     * Set the save mode for this write
     */
    WriteBuilder mode(SaveMode mode);
}

Write

Logical representation of a write operation:

public interface Write {
    /**
     * Returns the description associated with this write
     */
    default String description();
    
    /**
     * Returns a BatchWrite to write data to batch source (must implement if table supports BATCH_WRITE)
     */
    default BatchWrite toBatch();
    
    /**
     * Returns a StreamingWrite for streaming writes (must implement if table supports STREAMING_WRITE)
     */
    default StreamingWrite toStreaming();
    
    /**
     * Returns custom metrics that this write supports
     */
    default CustomMetric[] supportedCustomMetrics();
}

BatchWrite

Physical batch write implementation:

public interface BatchWrite {
    /**
     * Create writer factory for partitions
     */
    DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info);
    
    /**
     * Whether to use Spark's commit coordinator
     */
    boolean useCommitCoordinator();
    
    /**
     * Called when a data writer commits
     */
    void onDataWriterCommit(WriterCommitMessage message);
    
    /**
     * Commit the entire write operation
     */
    void commit(WriterCommitMessage[] messages);
    
    /**
     * Abort the write operation
     */
    void abort(WriterCommitMessage[] messages);
}

DataWriter

Writes data for a single partition:

public interface DataWriter<T> extends Closeable {
    /**
     * Write a single record
     */
    void write(T record) throws IOException;
    
    /**
     * Commit this writer's work
     */
    WriterCommitMessage commit() throws IOException;
    
    /**
     * Abort this writer's work
     */
    void abort() throws IOException;
}

Complete Write Implementation:

public class MyDataSource implements Table, SupportsWrite {
    @Override
    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
        return new MyWriteBuilder(info);
    }
}

public class MyWriteBuilder implements WriteBuilder {
    private final LogicalWriteInfo info;
    private SaveMode mode = SaveMode.ErrorIfExists;
    
    @Override
    public WriteBuilder mode(SaveMode mode) {
        this.mode = mode;
        return this;
    }
    
    @Override
    public Write build() {
        return new MyWrite(info, mode);
    }
}

public class MyWrite implements Write {
    private final LogicalWriteInfo info;
    private final SaveMode mode;
    
    @Override
    public BatchWrite toBatch() {
        return new MyBatchWrite(info, mode);
    }
}

public class MyBatchWrite implements BatchWrite {
    private final LogicalWriteInfo info;
    private final SaveMode mode;
    
    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
        return new MyDataWriterFactory(info.schema());
    }
    
    @Override
    public boolean useCommitCoordinator() {
        return true; // Use Spark's coordinator for ACID guarantees
    }
    
    @Override
    public void commit(WriterCommitMessage[] messages) {
        // Commit all partition writes atomically
        for (WriterCommitMessage message : messages) {
            MyCommitMessage myMessage = (MyCommitMessage) message;
            finalizePartition(myMessage);
        }
    }
    
    @Override
    public void abort(WriterCommitMessage[] messages) {
        // Clean up any partially written data
        for (WriterCommitMessage message : messages) {
            MyCommitMessage myMessage = (MyCommitMessage) message;
            cleanupPartition(myMessage);
        }
    }
}

public class MyDataWriterFactory implements DataWriterFactory {
    private final StructType schema;
    
    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
        return new MyDataWriter(schema, partitionId, taskId);
    }
}

public class MyDataWriter implements DataWriter<InternalRow> {
    private final StructType schema;
    private final int partitionId;
    private final long taskId;
    private final List<InternalRow> buffer = new ArrayList<>();
    
    @Override
    public void write(InternalRow record) throws IOException {
        buffer.add(record.copy()); // Make defensive copy
    }
    
    @Override
    public WriterCommitMessage commit() throws IOException {
        // Write buffered data to storage
        String outputPath = writeDataToStorage(buffer);
        return new MyCommitMessage(partitionId, taskId, outputPath, buffer.size());
    }
    
    @Override
    public void abort() throws IOException {
        buffer.clear();
        // Clean up any temporary files
    }
}

Write Support Interfaces

SupportsOverwrite

public interface SupportsOverwrite {
    WriteBuilder overwrite(Filter[] filters);
}

SupportsOverwriteV2

public interface SupportsOverwriteV2 {
    WriteBuilder overwrite(Predicate[] predicates);
}

SupportsDynamicOverwrite

public interface SupportsDynamicOverwrite {
    WriteBuilder overwriteDynamicPartitions();
}

SupportsTruncate

public interface SupportsTruncate {
    WriteBuilder truncate();
}

Complete Write Builder with All Support:

public class MyWriteBuilder implements WriteBuilder, SupportsOverwriteV2, 
                                     SupportsDynamicOverwrite, SupportsTruncate {
    private final LogicalWriteInfo info;
    private SaveMode mode = SaveMode.ErrorIfExists;
    private Predicate[] overwritePredicates;
    private boolean dynamicOverwrite = false;
    private boolean truncate = false;
    
    @Override
    public WriteBuilder overwrite(Predicate[] predicates) {
        this.overwritePredicates = predicates;
        this.mode = SaveMode.Overwrite;
        return this;
    }
    
    @Override
    public WriteBuilder overwriteDynamicPartitions() {
        this.dynamicOverwrite = true;
        this.mode = SaveMode.Overwrite;
        return this;
    }
    
    @Override
    public WriteBuilder truncate() {
        this.truncate = true;
        return this;
    }
    
    @Override
    public Write build() {
        return new MyWrite(info, mode, overwritePredicates, dynamicOverwrite, truncate);
    }
}

Distribution APIs

Distribution

Represents how data should be distributed across partitions:

package org.apache.spark.sql.connector.distributions;

public interface Distribution {
    // Marker interface for different distribution strategies
}

Distributions Factory

public class Distributions {
    /**
     * No specific distribution requirement
     */
    public static Distribution unspecified() { ... }
    
    /**
     * Data clustered by expressions (hash partitioning)
     */
    public static Distribution clustered(Expression[] expressions) { ... }
    
    /**
     * Data ordered by sort expressions
     */  
    public static Distribution ordered(SortOrder[] ordering) { ... }
}

Usage Example:

public class MyBatchWrite implements BatchWrite, SupportsReportPartitioning {
    @Override
    public Distribution requiredDistribution() {
        // Require data to be hash partitioned by user_id
        return Distributions.clustered(new Expression[] {
            Expressions.column("user_id")
        });
    }
    
    @Override
    public int numPartitions() {
        return 10; // Write to 10 partitions
    }
}

Advanced Patterns

Vectorized Reading

For high-performance reading, implement columnar batch processing:

public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {
    private final ColumnVector[] columns;
    private int batchSize = 1000;
    
    @Override
    public boolean next() throws IOException {
        // Load next batch of data into column vectors
        return loadNextBatch();
    }
    
    @Override
    public ColumnarBatch get() {
        return new ColumnarBatch(columns, batchSize);
    }
    
    private boolean loadNextBatch() {
        // Efficient columnar data loading
        for (int i = 0; i < columns.length; i++) {
            loadColumnData(columns[i], i);
        }
        return true;
    }
}

Transactional Writes

Implement ACID transactions using commit coordination:

public class TransactionalBatchWrite implements BatchWrite {
    private final String transactionId;
    
    @Override
    public boolean useCommitCoordinator() {
        return true; // Essential for transactions
    }
    
    @Override
    public void commit(WriterCommitMessage[] messages) {
        try {
            // Start transaction
            beginTransaction(transactionId);
            
            // Commit all partitions
            for (WriterCommitMessage message : messages) {
                commitPartition(message);
            }
            
            // Commit transaction
            commitTransaction(transactionId);
        } catch (Exception e) {
            abortTransaction(transactionId);
            throw new RuntimeException("Transaction failed", e);
        }
    }
}

Partition-Aware Writing

Optimize writes for partitioned tables:

public class PartitionAwareDataWriter implements DataWriter<InternalRow> {
    private final Map<String, List<InternalRow>> partitionBuffers = new HashMap<>();
    private final String[] partitionColumns;
    
    @Override
    public void write(InternalRow record) throws IOException {
        String partitionKey = extractPartitionKey(record);
        partitionBuffers.computeIfAbsent(partitionKey, k -> new ArrayList<>())
                       .add(record.copy());
    }
    
    @Override
    public WriterCommitMessage commit() throws IOException {
        Map<String, String> partitionPaths = new HashMap<>();
        
        // Write each partition separately
        for (Map.Entry<String, List<InternalRow>> entry : partitionBuffers.entrySet()) {
            String partitionKey = entry.getKey();
            List<InternalRow> rows = entry.getValue();
            String path = writePartition(partitionKey, rows);
            partitionPaths.put(partitionKey, path);
        }
        
        return new PartitionedCommitMessage(partitionPaths);
    }
}

The Data Source V2 APIs provide a powerful, flexible framework for implementing high-performance, feature-rich data sources with comprehensive optimization support and clean architectural patterns.