0
# Data Source V2 APIs
1
2
The Data Source V2 APIs provide a modern, comprehensive framework for implementing custom data sources in Apache Spark. These APIs support advanced optimizations like predicate pushdown, column pruning, and vectorized processing while maintaining clean separation of concerns.
3
4
## Core Read APIs
5
6
### ScanBuilder
7
8
The entry point for building scans with various optimizations:
9
10
```java { .api }
11
package org.apache.spark.sql.connector.read;
12
13
public interface ScanBuilder {
14
/**
15
* Build the final Scan object
16
*/
17
Scan build();
18
}
19
```
20
21
### Scan
22
23
Logical representation of a data scan:
24
25
```java { .api }
26
public interface Scan {
27
/**
28
* Returns the actual schema of this data source scan
29
*/
30
StructType readSchema();
31
32
/**
33
* Returns a human-readable description of this scan
34
*/
35
default String description();
36
37
/**
38
* Returns a Batch for batch queries (must implement if table supports BATCH_READ)
39
*/
40
default Batch toBatch();
41
42
/**
43
* Returns a MicroBatchStream for streaming queries (must implement if table supports MICRO_BATCH_READ)
44
*/
45
default MicroBatchStream toMicroBatchStream(String checkpointLocation);
46
47
/**
48
* Returns a ContinuousStream for continuous streaming queries (must implement if table supports CONTINUOUS_READ)
49
*/
50
default ContinuousStream toContinuousStream(String checkpointLocation);
51
52
/**
53
* Returns custom metrics that this scan supports
54
*/
55
default CustomMetric[] supportedCustomMetrics();
56
57
/**
58
* Returns custom task metrics reported from driver side
59
*/
60
default CustomTaskMetric[] reportDriverMetrics();
61
62
/**
63
* Returns the columnar support mode for vectorized processing
64
*/
65
default ColumnarSupportMode columnarSupportMode();
66
}
67
```
68
69
### Batch
70
71
Physical representation for batch execution:
72
73
```java { .api }
74
public interface Batch {
75
/**
76
* Plan input partitions for parallel processing
77
*/
78
InputPartition[] planInputPartitions();
79
80
/**
81
* Create reader factory for processing partitions
82
*/
83
PartitionReaderFactory createReaderFactory();
84
}
85
```
86
87
### Basic Data Source Implementation
88
89
```java
90
public class MyDataSource implements Table, SupportsRead {
91
private final String name;
92
private final StructType schema;
93
private final String[] paths;
94
95
public MyDataSource(String name, StructType schema, String[] paths) {
96
this.name = name;
97
this.schema = schema;
98
this.paths = paths;
99
}
100
101
@Override
102
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
103
return new MyScanBuilder(schema, paths, options);
104
}
105
}
106
107
public class MyScanBuilder implements ScanBuilder {
108
private final StructType schema;
109
private final String[] paths;
110
private final CaseInsensitiveStringMap options;
111
112
public MyScanBuilder(StructType schema, String[] paths,
113
CaseInsensitiveStringMap options) {
114
this.schema = schema;
115
this.paths = paths;
116
this.options = options;
117
}
118
119
@Override
120
public Scan build() {
121
return new MyScan(schema, paths);
122
}
123
}
124
125
public class MyScan implements Scan {
126
private final StructType schema;
127
private final String[] paths;
128
129
@Override
130
public StructType readSchema() {
131
return schema;
132
}
133
134
@Override
135
public String description() {
136
return String.format("MyScan[paths=%s]", Arrays.toString(paths));
137
}
138
139
@Override
140
public Batch toBatch() {
141
return new MyBatch(schema, paths);
142
}
143
}
144
```
145
146
## Partition Processing
147
148
### InputPartition
149
150
Represents a partition of input data:
151
152
```java { .api }
153
public interface InputPartition extends Serializable {
154
// Marker interface - implementations can add partition-specific data
155
}
156
```
157
158
### PartitionReader
159
160
Reads data from a single partition:
161
162
```java { .api }
163
public interface PartitionReader<T> extends Closeable {
164
/**
165
* Advance to next record
166
*/
167
boolean next() throws IOException;
168
169
/**
170
* Get current record
171
*/
172
T get();
173
}
174
```
175
176
### PartitionReaderFactory
177
178
Factory for creating partition readers:
179
180
```java { .api }
181
public interface PartitionReaderFactory extends Serializable {
182
/**
183
* Create reader for given partition
184
*/
185
PartitionReader<InternalRow> createReader(InputPartition partition);
186
}
187
```
188
189
**Complete Partition Processing Implementation:**
190
191
```java
192
public class MyBatch implements Batch {
193
private final StructType schema;
194
private final String[] paths;
195
196
@Override
197
public InputPartition[] planInputPartitions() {
198
// Create one partition per file/path
199
return Arrays.stream(paths)
200
.map(MyInputPartition::new)
201
.toArray(InputPartition[]::new);
202
}
203
204
@Override
205
public PartitionReaderFactory createReaderFactory() {
206
return new MyPartitionReaderFactory(schema);
207
}
208
}
209
210
public class MyInputPartition implements InputPartition {
211
private final String path;
212
213
public MyInputPartition(String path) {
214
this.path = path;
215
}
216
217
public String getPath() {
218
return path;
219
}
220
}
221
222
public class MyPartitionReaderFactory implements PartitionReaderFactory {
223
private final StructType schema;
224
225
public MyPartitionReaderFactory(StructType schema) {
226
this.schema = schema;
227
}
228
229
@Override
230
public PartitionReader<InternalRow> createReader(InputPartition partition) {
231
MyInputPartition myPartition = (MyInputPartition) partition;
232
return new MyPartitionReader(schema, myPartition.getPath());
233
}
234
}
235
236
public class MyPartitionReader implements PartitionReader<InternalRow> {
237
private final StructType schema;
238
private final String path;
239
private Iterator<InternalRow> iterator;
240
private InternalRow currentRow;
241
242
public MyPartitionReader(StructType schema, String path) {
243
this.schema = schema;
244
this.path = path;
245
this.iterator = loadDataFromPath(path);
246
}
247
248
@Override
249
public boolean next() {
250
if (iterator.hasNext()) {
251
currentRow = iterator.next();
252
return true;
253
}
254
return false;
255
}
256
257
@Override
258
public InternalRow get() {
259
return currentRow;
260
}
261
262
@Override
263
public void close() throws IOException {
264
// Clean up resources
265
}
266
}
267
```
268
269
## Pushdown Optimizations
270
271
### Filter Pushdown
272
273
#### Legacy Filter Pushdown (V1)
274
```java { .api }
275
public interface SupportsPushDownFilters {
276
/**
277
* Push filters down to data source
278
* @return filters that could not be pushed down
279
*/
280
Filter[] pushFilters(Filter[] filters);
281
282
/**
283
* Get filters that were successfully pushed down
284
*/
285
Filter[] pushedFilters();
286
}
287
```
288
289
#### Modern Filter Pushdown (V2)
290
```java { .api }
291
public interface SupportsPushDownV2Filters {
292
/**
293
* Push V2 predicates down to data source
294
* @return predicates that could not be pushed down
295
*/
296
Predicate[] pushPredicates(Predicate[] predicates);
297
298
/**
299
* Get predicates that were successfully pushed down
300
*/
301
Predicate[] pushedPredicates();
302
}
303
```
304
305
**Filter Pushdown Implementation:**
306
307
```java
308
public class MyScanBuilder implements ScanBuilder, SupportsPushDownV2Filters {
309
private final StructType schema;
310
private final String[] paths;
311
private Predicate[] pushedPredicates = new Predicate[0];
312
313
@Override
314
public Predicate[] pushPredicates(Predicate[] predicates) {
315
List<Predicate> supported = new ArrayList<>();
316
List<Predicate> unsupported = new ArrayList<>();
317
318
for (Predicate predicate : predicates) {
319
if (canPushDown(predicate)) {
320
supported.add(predicate);
321
} else {
322
unsupported.add(predicate);
323
}
324
}
325
326
this.pushedPredicates = supported.toArray(new Predicate[0]);
327
return unsupported.toArray(new Predicate[0]);
328
}
329
330
@Override
331
public Predicate[] pushedPredicates() {
332
return pushedPredicates.clone();
333
}
334
335
private boolean canPushDown(Predicate predicate) {
336
// Check if predicate can be evaluated by data source
337
if (predicate instanceof EqualTo) {
338
return true;
339
}
340
if (predicate instanceof GreaterThan || predicate instanceof LessThan) {
341
return true;
342
}
343
if (predicate instanceof And || predicate instanceof Or) {
344
return true;
345
}
346
return false;
347
}
348
349
@Override
350
public Scan build() {
351
return new MyScan(schema, paths, pushedPredicates);
352
}
353
}
354
```
355
356
### Column Pruning
357
358
```java { .api }
359
public interface SupportsPushDownRequiredColumns {
360
/**
361
* Prune columns to only those required
362
*/
363
void pruneColumns(StructType requiredSchema);
364
}
365
```
366
367
**Implementation:**
368
369
```java
370
public class MyScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns {
371
private StructType schema;
372
private StructType prunedSchema;
373
374
@Override
375
public void pruneColumns(StructType requiredSchema) {
376
// Only read required columns
377
this.prunedSchema = requiredSchema;
378
}
379
380
@Override
381
public Scan build() {
382
StructType finalSchema = prunedSchema != null ? prunedSchema : schema;
383
return new MyScan(finalSchema, paths, pushedPredicates);
384
}
385
}
386
```
387
388
### Aggregate Pushdown
389
390
```java { .api }
391
public interface SupportsPushDownAggregates {
392
/**
393
* Push aggregation down to data source
394
* @return true if aggregation can be completely pushed down
395
*/
396
boolean pushAggregation(Aggregation aggregation);
397
398
/**
399
* Whether data source can completely handle the aggregation
400
*/
401
boolean supportCompletePushDown(Aggregation aggregation);
402
}
403
```
404
405
**Implementation:**
406
407
```java
408
public class MyScanBuilder implements ScanBuilder, SupportsPushDownAggregates {
409
private Aggregation pushedAggregation;
410
private boolean completeAggregation;
411
412
@Override
413
public boolean pushAggregation(Aggregation aggregation) {
414
// Check if we can handle this aggregation
415
AggregateFunc[] aggregates = aggregation.aggregateExpressions();
416
Expression[] groupBy = aggregation.groupByExpressions();
417
418
// Simple aggregations we can handle
419
for (AggregateFunc func : aggregates) {
420
if (!(func instanceof Count || func instanceof Sum)) {
421
return false; // Can't handle complex aggregations
422
}
423
}
424
425
this.pushedAggregation = aggregation;
426
this.completeAggregation = true;
427
return true;
428
}
429
430
@Override
431
public boolean supportCompletePushDown(Aggregation aggregation) {
432
return completeAggregation;
433
}
434
}
435
```
436
437
### Limit and Offset Pushdown
438
439
```java { .api }
440
public interface SupportsPushDownLimit {
441
boolean pushLimit(int limit);
442
int pushedLimit();
443
}
444
445
public interface SupportsPushDownOffset {
446
boolean pushOffset(long offset);
447
long pushedOffset();
448
}
449
```
450
451
### TopN Pushdown
452
453
```java { .api }
454
public interface SupportsPushDownTopN {
455
boolean pushTopN(SortOrder[] orders, int limit);
456
SortOrder[] pushedTopNOrders();
457
int pushedTopNLimit();
458
}
459
```
460
461
**TopN Implementation:**
462
463
```java
464
public class MyScanBuilder implements ScanBuilder, SupportsPushDownTopN {
465
private SortOrder[] pushedOrders;
466
private int pushedLimit = -1;
467
468
@Override
469
public boolean pushTopN(SortOrder[] orders, int limit) {
470
// Check if we can handle the sort orders
471
for (SortOrder order : orders) {
472
if (!canSortBy(order)) {
473
return false;
474
}
475
}
476
477
this.pushedOrders = orders;
478
this.pushedLimit = limit;
479
return true;
480
}
481
482
private boolean canSortBy(SortOrder order) {
483
// Check if column is sortable in our data source
484
return order.expression() instanceof NamedReference;
485
}
486
}
487
```
488
489
## Write APIs
490
491
### WriteBuilder
492
493
Entry point for building write operations:
494
495
```java { .api }
496
package org.apache.spark.sql.connector.write;
497
498
public interface WriteBuilder {
499
/**
500
* Build the final Write object
501
*/
502
Write build();
503
504
/**
505
* Set the save mode for this write
506
*/
507
WriteBuilder mode(SaveMode mode);
508
}
509
```
510
511
### Write
512
513
Logical representation of a write operation:
514
515
```java { .api }
516
public interface Write {
517
/**
518
* Returns the description associated with this write
519
*/
520
default String description();
521
522
/**
523
* Returns a BatchWrite to write data to batch source (must implement if table supports BATCH_WRITE)
524
*/
525
default BatchWrite toBatch();
526
527
/**
528
* Returns a StreamingWrite for streaming writes (must implement if table supports STREAMING_WRITE)
529
*/
530
default StreamingWrite toStreaming();
531
532
/**
533
* Returns custom metrics that this write supports
534
*/
535
default CustomMetric[] supportedCustomMetrics();
536
}
537
```
538
539
### BatchWrite
540
541
Physical batch write implementation:
542
543
```java { .api }
544
public interface BatchWrite {
545
/**
546
* Create writer factory for partitions
547
*/
548
DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info);
549
550
/**
551
* Whether to use Spark's commit coordinator
552
*/
553
boolean useCommitCoordinator();
554
555
/**
556
* Called when a data writer commits
557
*/
558
void onDataWriterCommit(WriterCommitMessage message);
559
560
/**
561
* Commit the entire write operation
562
*/
563
void commit(WriterCommitMessage[] messages);
564
565
/**
566
* Abort the write operation
567
*/
568
void abort(WriterCommitMessage[] messages);
569
}
570
```
571
572
### DataWriter
573
574
Writes data for a single partition:
575
576
```java { .api }
577
public interface DataWriter<T> extends Closeable {
578
/**
579
* Write a single record
580
*/
581
void write(T record) throws IOException;
582
583
/**
584
* Commit this writer's work
585
*/
586
WriterCommitMessage commit() throws IOException;
587
588
/**
589
* Abort this writer's work
590
*/
591
void abort() throws IOException;
592
}
593
```
594
595
**Complete Write Implementation:**
596
597
```java
598
public class MyDataSource implements Table, SupportsWrite {
599
@Override
600
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
601
return new MyWriteBuilder(info);
602
}
603
}
604
605
public class MyWriteBuilder implements WriteBuilder {
606
private final LogicalWriteInfo info;
607
private SaveMode mode = SaveMode.ErrorIfExists;
608
609
@Override
610
public WriteBuilder mode(SaveMode mode) {
611
this.mode = mode;
612
return this;
613
}
614
615
@Override
616
public Write build() {
617
return new MyWrite(info, mode);
618
}
619
}
620
621
public class MyWrite implements Write {
622
private final LogicalWriteInfo info;
623
private final SaveMode mode;
624
625
@Override
626
public BatchWrite toBatch() {
627
return new MyBatchWrite(info, mode);
628
}
629
}
630
631
public class MyBatchWrite implements BatchWrite {
632
private final LogicalWriteInfo info;
633
private final SaveMode mode;
634
635
@Override
636
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
637
return new MyDataWriterFactory(info.schema());
638
}
639
640
@Override
641
public boolean useCommitCoordinator() {
642
return true; // Use Spark's coordinator for ACID guarantees
643
}
644
645
@Override
646
public void commit(WriterCommitMessage[] messages) {
647
// Commit all partition writes atomically
648
for (WriterCommitMessage message : messages) {
649
MyCommitMessage myMessage = (MyCommitMessage) message;
650
finalizePartition(myMessage);
651
}
652
}
653
654
@Override
655
public void abort(WriterCommitMessage[] messages) {
656
// Clean up any partially written data
657
for (WriterCommitMessage message : messages) {
658
MyCommitMessage myMessage = (MyCommitMessage) message;
659
cleanupPartition(myMessage);
660
}
661
}
662
}
663
664
public class MyDataWriterFactory implements DataWriterFactory {
665
private final StructType schema;
666
667
@Override
668
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
669
return new MyDataWriter(schema, partitionId, taskId);
670
}
671
}
672
673
public class MyDataWriter implements DataWriter<InternalRow> {
674
private final StructType schema;
675
private final int partitionId;
676
private final long taskId;
677
private final List<InternalRow> buffer = new ArrayList<>();
678
679
@Override
680
public void write(InternalRow record) throws IOException {
681
buffer.add(record.copy()); // Make defensive copy
682
}
683
684
@Override
685
public WriterCommitMessage commit() throws IOException {
686
// Write buffered data to storage
687
String outputPath = writeDataToStorage(buffer);
688
return new MyCommitMessage(partitionId, taskId, outputPath, buffer.size());
689
}
690
691
@Override
692
public void abort() throws IOException {
693
buffer.clear();
694
// Clean up any temporary files
695
}
696
}
697
```
698
699
## Write Support Interfaces
700
701
### SupportsOverwrite
702
703
```java { .api }
704
public interface SupportsOverwrite {
705
WriteBuilder overwrite(Filter[] filters);
706
}
707
```
708
709
### SupportsOverwriteV2
710
711
```java { .api }
712
public interface SupportsOverwriteV2 {
713
WriteBuilder overwrite(Predicate[] predicates);
714
}
715
```
716
717
### SupportsDynamicOverwrite
718
719
```java { .api }
720
public interface SupportsDynamicOverwrite {
721
WriteBuilder overwriteDynamicPartitions();
722
}
723
```
724
725
### SupportsTruncate
726
727
```java { .api }
728
public interface SupportsTruncate {
729
WriteBuilder truncate();
730
}
731
```
732
733
**Complete Write Builder with All Support:**
734
735
```java
736
public class MyWriteBuilder implements WriteBuilder, SupportsOverwriteV2,
737
SupportsDynamicOverwrite, SupportsTruncate {
738
private final LogicalWriteInfo info;
739
private SaveMode mode = SaveMode.ErrorIfExists;
740
private Predicate[] overwritePredicates;
741
private boolean dynamicOverwrite = false;
742
private boolean truncate = false;
743
744
@Override
745
public WriteBuilder overwrite(Predicate[] predicates) {
746
this.overwritePredicates = predicates;
747
this.mode = SaveMode.Overwrite;
748
return this;
749
}
750
751
@Override
752
public WriteBuilder overwriteDynamicPartitions() {
753
this.dynamicOverwrite = true;
754
this.mode = SaveMode.Overwrite;
755
return this;
756
}
757
758
@Override
759
public WriteBuilder truncate() {
760
this.truncate = true;
761
return this;
762
}
763
764
@Override
765
public Write build() {
766
return new MyWrite(info, mode, overwritePredicates, dynamicOverwrite, truncate);
767
}
768
}
769
```
770
771
## Distribution APIs
772
773
### Distribution
774
775
Represents how data should be distributed across partitions:
776
777
```java { .api }
778
package org.apache.spark.sql.connector.distributions;
779
780
public interface Distribution {
781
// Marker interface for different distribution strategies
782
}
783
```
784
785
### Distributions Factory
786
787
```java { .api }
788
public class Distributions {
789
/**
790
* No specific distribution requirement
791
*/
792
public static Distribution unspecified() { ... }
793
794
/**
795
* Data clustered by expressions (hash partitioning)
796
*/
797
public static Distribution clustered(Expression[] expressions) { ... }
798
799
/**
800
* Data ordered by sort expressions
801
*/
802
public static Distribution ordered(SortOrder[] ordering) { ... }
803
}
804
```
805
806
**Usage Example:**
807
808
```java
809
public class MyBatchWrite implements BatchWrite, SupportsReportPartitioning {
810
@Override
811
public Distribution requiredDistribution() {
812
// Require data to be hash partitioned by user_id
813
return Distributions.clustered(new Expression[] {
814
Expressions.column("user_id")
815
});
816
}
817
818
@Override
819
public int numPartitions() {
820
return 10; // Write to 10 partitions
821
}
822
}
823
```
824
825
## Advanced Patterns
826
827
### Vectorized Reading
828
829
For high-performance reading, implement columnar batch processing:
830
831
```java
832
public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {
833
private final ColumnVector[] columns;
834
private int batchSize = 1000;
835
836
@Override
837
public boolean next() throws IOException {
838
// Load next batch of data into column vectors
839
return loadNextBatch();
840
}
841
842
@Override
843
public ColumnarBatch get() {
844
return new ColumnarBatch(columns, batchSize);
845
}
846
847
private boolean loadNextBatch() {
848
// Efficient columnar data loading
849
for (int i = 0; i < columns.length; i++) {
850
loadColumnData(columns[i], i);
851
}
852
return true;
853
}
854
}
855
```
856
857
### Transactional Writes
858
859
Implement ACID transactions using commit coordination:
860
861
```java
862
public class TransactionalBatchWrite implements BatchWrite {
863
private final String transactionId;
864
865
@Override
866
public boolean useCommitCoordinator() {
867
return true; // Essential for transactions
868
}
869
870
@Override
871
public void commit(WriterCommitMessage[] messages) {
872
try {
873
// Start transaction
874
beginTransaction(transactionId);
875
876
// Commit all partitions
877
for (WriterCommitMessage message : messages) {
878
commitPartition(message);
879
}
880
881
// Commit transaction
882
commitTransaction(transactionId);
883
} catch (Exception e) {
884
abortTransaction(transactionId);
885
throw new RuntimeException("Transaction failed", e);
886
}
887
}
888
}
889
```
890
891
### Partition-Aware Writing
892
893
Optimize writes for partitioned tables:
894
895
```java
896
public class PartitionAwareDataWriter implements DataWriter<InternalRow> {
897
private final Map<String, List<InternalRow>> partitionBuffers = new HashMap<>();
898
private final String[] partitionColumns;
899
900
@Override
901
public void write(InternalRow record) throws IOException {
902
String partitionKey = extractPartitionKey(record);
903
partitionBuffers.computeIfAbsent(partitionKey, k -> new ArrayList<>())
904
.add(record.copy());
905
}
906
907
@Override
908
public WriterCommitMessage commit() throws IOException {
909
Map<String, String> partitionPaths = new HashMap<>();
910
911
// Write each partition separately
912
for (Map.Entry<String, List<InternalRow>> entry : partitionBuffers.entrySet()) {
913
String partitionKey = entry.getKey();
914
List<InternalRow> rows = entry.getValue();
915
String path = writePartition(partitionKey, rows);
916
partitionPaths.put(partitionKey, path);
917
}
918
919
return new PartitionedCommitMessage(partitionPaths);
920
}
921
}
922
```
923
924
The Data Source V2 APIs provide a powerful, flexible framework for implementing high-performance, feature-rich data sources with comprehensive optimization support and clean architectural patterns.