The Data Source V2 APIs provide a modern, comprehensive framework for implementing custom data sources in Apache Spark. These APIs support advanced optimizations like predicate pushdown, column pruning, and vectorized processing while maintaining clean separation of concerns.
The entry point for building scans with various optimizations:
package org.apache.spark.sql.connector.read;
public interface ScanBuilder {
/**
* Build the final Scan object
*/
Scan build();
}Logical representation of a data scan:
public interface Scan {
/**
* Returns the actual schema of this data source scan
*/
StructType readSchema();
/**
* Returns a human-readable description of this scan
*/
default String description();
/**
* Returns a Batch for batch queries (must implement if table supports BATCH_READ)
*/
default Batch toBatch();
/**
* Returns a MicroBatchStream for streaming queries (must implement if table supports MICRO_BATCH_READ)
*/
default MicroBatchStream toMicroBatchStream(String checkpointLocation);
/**
* Returns a ContinuousStream for continuous streaming queries (must implement if table supports CONTINUOUS_READ)
*/
default ContinuousStream toContinuousStream(String checkpointLocation);
/**
* Returns custom metrics that this scan supports
*/
default CustomMetric[] supportedCustomMetrics();
/**
* Returns custom task metrics reported from driver side
*/
default CustomTaskMetric[] reportDriverMetrics();
/**
* Returns the columnar support mode for vectorized processing
*/
default ColumnarSupportMode columnarSupportMode();
}Physical representation for batch execution:
public interface Batch {
/**
* Plan input partitions for parallel processing
*/
InputPartition[] planInputPartitions();
/**
* Create reader factory for processing partitions
*/
PartitionReaderFactory createReaderFactory();
}public class MyDataSource implements Table, SupportsRead {
private final String name;
private final StructType schema;
private final String[] paths;
public MyDataSource(String name, StructType schema, String[] paths) {
this.name = name;
this.schema = schema;
this.paths = paths;
}
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder(schema, paths, options);
}
}
public class MyScanBuilder implements ScanBuilder {
private final StructType schema;
private final String[] paths;
private final CaseInsensitiveStringMap options;
public MyScanBuilder(StructType schema, String[] paths,
CaseInsensitiveStringMap options) {
this.schema = schema;
this.paths = paths;
this.options = options;
}
@Override
public Scan build() {
return new MyScan(schema, paths);
}
}
public class MyScan implements Scan {
private final StructType schema;
private final String[] paths;
@Override
public StructType readSchema() {
return schema;
}
@Override
public String description() {
return String.format("MyScan[paths=%s]", Arrays.toString(paths));
}
@Override
public Batch toBatch() {
return new MyBatch(schema, paths);
}
}Represents a partition of input data:
public interface InputPartition extends Serializable {
// Marker interface - implementations can add partition-specific data
}Reads data from a single partition:
public interface PartitionReader<T> extends Closeable {
/**
* Advance to next record
*/
boolean next() throws IOException;
/**
* Get current record
*/
T get();
}Factory for creating partition readers:
public interface PartitionReaderFactory extends Serializable {
/**
* Create reader for given partition
*/
PartitionReader<InternalRow> createReader(InputPartition partition);
}Complete Partition Processing Implementation:
public class MyBatch implements Batch {
private final StructType schema;
private final String[] paths;
@Override
public InputPartition[] planInputPartitions() {
// Create one partition per file/path
return Arrays.stream(paths)
.map(MyInputPartition::new)
.toArray(InputPartition[]::new);
}
@Override
public PartitionReaderFactory createReaderFactory() {
return new MyPartitionReaderFactory(schema);
}
}
public class MyInputPartition implements InputPartition {
private final String path;
public MyInputPartition(String path) {
this.path = path;
}
public String getPath() {
return path;
}
}
public class MyPartitionReaderFactory implements PartitionReaderFactory {
private final StructType schema;
public MyPartitionReaderFactory(StructType schema) {
this.schema = schema;
}
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
MyInputPartition myPartition = (MyInputPartition) partition;
return new MyPartitionReader(schema, myPartition.getPath());
}
}
public class MyPartitionReader implements PartitionReader<InternalRow> {
private final StructType schema;
private final String path;
private Iterator<InternalRow> iterator;
private InternalRow currentRow;
public MyPartitionReader(StructType schema, String path) {
this.schema = schema;
this.path = path;
this.iterator = loadDataFromPath(path);
}
@Override
public boolean next() {
if (iterator.hasNext()) {
currentRow = iterator.next();
return true;
}
return false;
}
@Override
public InternalRow get() {
return currentRow;
}
@Override
public void close() throws IOException {
// Clean up resources
}
}public interface SupportsPushDownFilters {
/**
* Push filters down to data source
* @return filters that could not be pushed down
*/
Filter[] pushFilters(Filter[] filters);
/**
* Get filters that were successfully pushed down
*/
Filter[] pushedFilters();
}public interface SupportsPushDownV2Filters {
/**
* Push V2 predicates down to data source
* @return predicates that could not be pushed down
*/
Predicate[] pushPredicates(Predicate[] predicates);
/**
* Get predicates that were successfully pushed down
*/
Predicate[] pushedPredicates();
}Filter Pushdown Implementation:
public class MyScanBuilder implements ScanBuilder, SupportsPushDownV2Filters {
private final StructType schema;
private final String[] paths;
private Predicate[] pushedPredicates = new Predicate[0];
@Override
public Predicate[] pushPredicates(Predicate[] predicates) {
List<Predicate> supported = new ArrayList<>();
List<Predicate> unsupported = new ArrayList<>();
for (Predicate predicate : predicates) {
if (canPushDown(predicate)) {
supported.add(predicate);
} else {
unsupported.add(predicate);
}
}
this.pushedPredicates = supported.toArray(new Predicate[0]);
return unsupported.toArray(new Predicate[0]);
}
@Override
public Predicate[] pushedPredicates() {
return pushedPredicates.clone();
}
private boolean canPushDown(Predicate predicate) {
// Check if predicate can be evaluated by data source
if (predicate instanceof EqualTo) {
return true;
}
if (predicate instanceof GreaterThan || predicate instanceof LessThan) {
return true;
}
if (predicate instanceof And || predicate instanceof Or) {
return true;
}
return false;
}
@Override
public Scan build() {
return new MyScan(schema, paths, pushedPredicates);
}
}public interface SupportsPushDownRequiredColumns {
/**
* Prune columns to only those required
*/
void pruneColumns(StructType requiredSchema);
}Implementation:
public class MyScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns {
private StructType schema;
private StructType prunedSchema;
@Override
public void pruneColumns(StructType requiredSchema) {
// Only read required columns
this.prunedSchema = requiredSchema;
}
@Override
public Scan build() {
StructType finalSchema = prunedSchema != null ? prunedSchema : schema;
return new MyScan(finalSchema, paths, pushedPredicates);
}
}public interface SupportsPushDownAggregates {
/**
* Push aggregation down to data source
* @return true if aggregation can be completely pushed down
*/
boolean pushAggregation(Aggregation aggregation);
/**
* Whether data source can completely handle the aggregation
*/
boolean supportCompletePushDown(Aggregation aggregation);
}Implementation:
public class MyScanBuilder implements ScanBuilder, SupportsPushDownAggregates {
private Aggregation pushedAggregation;
private boolean completeAggregation;
@Override
public boolean pushAggregation(Aggregation aggregation) {
// Check if we can handle this aggregation
AggregateFunc[] aggregates = aggregation.aggregateExpressions();
Expression[] groupBy = aggregation.groupByExpressions();
// Simple aggregations we can handle
for (AggregateFunc func : aggregates) {
if (!(func instanceof Count || func instanceof Sum)) {
return false; // Can't handle complex aggregations
}
}
this.pushedAggregation = aggregation;
this.completeAggregation = true;
return true;
}
@Override
public boolean supportCompletePushDown(Aggregation aggregation) {
return completeAggregation;
}
}public interface SupportsPushDownLimit {
boolean pushLimit(int limit);
int pushedLimit();
}
public interface SupportsPushDownOffset {
boolean pushOffset(long offset);
long pushedOffset();
}public interface SupportsPushDownTopN {
boolean pushTopN(SortOrder[] orders, int limit);
SortOrder[] pushedTopNOrders();
int pushedTopNLimit();
}TopN Implementation:
public class MyScanBuilder implements ScanBuilder, SupportsPushDownTopN {
private SortOrder[] pushedOrders;
private int pushedLimit = -1;
@Override
public boolean pushTopN(SortOrder[] orders, int limit) {
// Check if we can handle the sort orders
for (SortOrder order : orders) {
if (!canSortBy(order)) {
return false;
}
}
this.pushedOrders = orders;
this.pushedLimit = limit;
return true;
}
private boolean canSortBy(SortOrder order) {
// Check if column is sortable in our data source
return order.expression() instanceof NamedReference;
}
}Entry point for building write operations:
package org.apache.spark.sql.connector.write;
public interface WriteBuilder {
/**
* Build the final Write object
*/
Write build();
/**
* Set the save mode for this write
*/
WriteBuilder mode(SaveMode mode);
}Logical representation of a write operation:
public interface Write {
/**
* Returns the description associated with this write
*/
default String description();
/**
* Returns a BatchWrite to write data to batch source (must implement if table supports BATCH_WRITE)
*/
default BatchWrite toBatch();
/**
* Returns a StreamingWrite for streaming writes (must implement if table supports STREAMING_WRITE)
*/
default StreamingWrite toStreaming();
/**
* Returns custom metrics that this write supports
*/
default CustomMetric[] supportedCustomMetrics();
}Physical batch write implementation:
public interface BatchWrite {
/**
* Create writer factory for partitions
*/
DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info);
/**
* Whether to use Spark's commit coordinator
*/
boolean useCommitCoordinator();
/**
* Called when a data writer commits
*/
void onDataWriterCommit(WriterCommitMessage message);
/**
* Commit the entire write operation
*/
void commit(WriterCommitMessage[] messages);
/**
* Abort the write operation
*/
void abort(WriterCommitMessage[] messages);
}Writes data for a single partition:
public interface DataWriter<T> extends Closeable {
/**
* Write a single record
*/
void write(T record) throws IOException;
/**
* Commit this writer's work
*/
WriterCommitMessage commit() throws IOException;
/**
* Abort this writer's work
*/
void abort() throws IOException;
}Complete Write Implementation:
public class MyDataSource implements Table, SupportsWrite {
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new MyWriteBuilder(info);
}
}
public class MyWriteBuilder implements WriteBuilder {
private final LogicalWriteInfo info;
private SaveMode mode = SaveMode.ErrorIfExists;
@Override
public WriteBuilder mode(SaveMode mode) {
this.mode = mode;
return this;
}
@Override
public Write build() {
return new MyWrite(info, mode);
}
}
public class MyWrite implements Write {
private final LogicalWriteInfo info;
private final SaveMode mode;
@Override
public BatchWrite toBatch() {
return new MyBatchWrite(info, mode);
}
}
public class MyBatchWrite implements BatchWrite {
private final LogicalWriteInfo info;
private final SaveMode mode;
@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
return new MyDataWriterFactory(info.schema());
}
@Override
public boolean useCommitCoordinator() {
return true; // Use Spark's coordinator for ACID guarantees
}
@Override
public void commit(WriterCommitMessage[] messages) {
// Commit all partition writes atomically
for (WriterCommitMessage message : messages) {
MyCommitMessage myMessage = (MyCommitMessage) message;
finalizePartition(myMessage);
}
}
@Override
public void abort(WriterCommitMessage[] messages) {
// Clean up any partially written data
for (WriterCommitMessage message : messages) {
MyCommitMessage myMessage = (MyCommitMessage) message;
cleanupPartition(myMessage);
}
}
}
public class MyDataWriterFactory implements DataWriterFactory {
private final StructType schema;
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
return new MyDataWriter(schema, partitionId, taskId);
}
}
public class MyDataWriter implements DataWriter<InternalRow> {
private final StructType schema;
private final int partitionId;
private final long taskId;
private final List<InternalRow> buffer = new ArrayList<>();
@Override
public void write(InternalRow record) throws IOException {
buffer.add(record.copy()); // Make defensive copy
}
@Override
public WriterCommitMessage commit() throws IOException {
// Write buffered data to storage
String outputPath = writeDataToStorage(buffer);
return new MyCommitMessage(partitionId, taskId, outputPath, buffer.size());
}
@Override
public void abort() throws IOException {
buffer.clear();
// Clean up any temporary files
}
}public interface SupportsOverwrite {
WriteBuilder overwrite(Filter[] filters);
}public interface SupportsOverwriteV2 {
WriteBuilder overwrite(Predicate[] predicates);
}public interface SupportsDynamicOverwrite {
WriteBuilder overwriteDynamicPartitions();
}public interface SupportsTruncate {
WriteBuilder truncate();
}Complete Write Builder with All Support:
public class MyWriteBuilder implements WriteBuilder, SupportsOverwriteV2,
SupportsDynamicOverwrite, SupportsTruncate {
private final LogicalWriteInfo info;
private SaveMode mode = SaveMode.ErrorIfExists;
private Predicate[] overwritePredicates;
private boolean dynamicOverwrite = false;
private boolean truncate = false;
@Override
public WriteBuilder overwrite(Predicate[] predicates) {
this.overwritePredicates = predicates;
this.mode = SaveMode.Overwrite;
return this;
}
@Override
public WriteBuilder overwriteDynamicPartitions() {
this.dynamicOverwrite = true;
this.mode = SaveMode.Overwrite;
return this;
}
@Override
public WriteBuilder truncate() {
this.truncate = true;
return this;
}
@Override
public Write build() {
return new MyWrite(info, mode, overwritePredicates, dynamicOverwrite, truncate);
}
}Represents how data should be distributed across partitions:
package org.apache.spark.sql.connector.distributions;
public interface Distribution {
// Marker interface for different distribution strategies
}public class Distributions {
/**
* No specific distribution requirement
*/
public static Distribution unspecified() { ... }
/**
* Data clustered by expressions (hash partitioning)
*/
public static Distribution clustered(Expression[] expressions) { ... }
/**
* Data ordered by sort expressions
*/
public static Distribution ordered(SortOrder[] ordering) { ... }
}Usage Example:
public class MyBatchWrite implements BatchWrite, SupportsReportPartitioning {
@Override
public Distribution requiredDistribution() {
// Require data to be hash partitioned by user_id
return Distributions.clustered(new Expression[] {
Expressions.column("user_id")
});
}
@Override
public int numPartitions() {
return 10; // Write to 10 partitions
}
}For high-performance reading, implement columnar batch processing:
public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {
private final ColumnVector[] columns;
private int batchSize = 1000;
@Override
public boolean next() throws IOException {
// Load next batch of data into column vectors
return loadNextBatch();
}
@Override
public ColumnarBatch get() {
return new ColumnarBatch(columns, batchSize);
}
private boolean loadNextBatch() {
// Efficient columnar data loading
for (int i = 0; i < columns.length; i++) {
loadColumnData(columns[i], i);
}
return true;
}
}Implement ACID transactions using commit coordination:
public class TransactionalBatchWrite implements BatchWrite {
private final String transactionId;
@Override
public boolean useCommitCoordinator() {
return true; // Essential for transactions
}
@Override
public void commit(WriterCommitMessage[] messages) {
try {
// Start transaction
beginTransaction(transactionId);
// Commit all partitions
for (WriterCommitMessage message : messages) {
commitPartition(message);
}
// Commit transaction
commitTransaction(transactionId);
} catch (Exception e) {
abortTransaction(transactionId);
throw new RuntimeException("Transaction failed", e);
}
}
}Optimize writes for partitioned tables:
public class PartitionAwareDataWriter implements DataWriter<InternalRow> {
private final Map<String, List<InternalRow>> partitionBuffers = new HashMap<>();
private final String[] partitionColumns;
@Override
public void write(InternalRow record) throws IOException {
String partitionKey = extractPartitionKey(record);
partitionBuffers.computeIfAbsent(partitionKey, k -> new ArrayList<>())
.add(record.copy());
}
@Override
public WriterCommitMessage commit() throws IOException {
Map<String, String> partitionPaths = new HashMap<>();
// Write each partition separately
for (Map.Entry<String, List<InternalRow>> entry : partitionBuffers.entrySet()) {
String partitionKey = entry.getKey();
List<InternalRow> rows = entry.getValue();
String path = writePartition(partitionKey, rows);
partitionPaths.put(partitionKey, path);
}
return new PartitionedCommitMessage(partitionPaths);
}
}The Data Source V2 APIs provide a powerful, flexible framework for implementing high-performance, feature-rich data sources with comprehensive optimization support and clean architectural patterns.