0
# Vectorized Processing APIs
1
2
The Vectorized Processing APIs in Apache Spark Catalyst enable high-performance columnar data processing. These APIs support efficient batch operations, reduced memory overhead, and optimized CPU utilization through vectorized execution and columnar storage formats like Apache Arrow.
3
4
## Core Vectorized Interfaces
5
6
### ColumnVector
7
8
Base class for columnar data representation:
9
10
```java { .api }
11
package org.apache.spark.sql.vectorized;
12
13
public abstract class ColumnVector implements AutoCloseable {
14
/**
15
* Data type of this column
16
*/
17
public abstract DataType dataType();
18
19
/**
20
* Number of null values in this column
21
*/
22
public abstract int numNulls();
23
24
/**
25
* Whether this column has any null values
26
*/
27
public abstract boolean hasNull();
28
29
/**
30
* Check if value at given row is null
31
*/
32
public abstract boolean isNullAt(int rowId);
33
34
/**
35
* Clean up resources
36
*/
37
@Override
38
public abstract void close();
39
40
// Type-specific getters
41
public boolean getBoolean(int rowId) { ... }
42
public byte getByte(int rowId) { ... }
43
public short getShort(int rowId) { ... }
44
public int getInt(int rowId) { ... }
45
public long getLong(int rowId) { ... }
46
public float getFloat(int rowId) { ... }
47
public double getDouble(int rowId) { ... }
48
public UTF8String getUTF8String(int rowId) { ... }
49
public byte[] getBinary(int rowId) { ... }
50
public Decimal getDecimal(int rowId, int precision, int scale) { ... }
51
52
// Complex type getters
53
public ColumnVector getChild(int ordinal) { ... }
54
public int getArrayLength(int rowId) { ... }
55
public int getArrayOffset(int rowId) { ... }
56
}
57
```
58
59
### ColumnarBatch
60
61
Collection of ColumnVector objects representing a batch of data:
62
63
```java { .api }
64
public final class ColumnarBatch implements AutoCloseable {
65
/**
66
* Number of columns in this batch
67
*/
68
public int numCols();
69
70
/**
71
* Number of rows in this batch
72
*/
73
public int numRows();
74
75
/**
76
* Set number of rows (for dynamically sized batches)
77
*/
78
public void setNumRows(int numRows);
79
80
/**
81
* Get column vector at given ordinal
82
*/
83
public ColumnVector column(int ordinal);
84
85
/**
86
* Get row at given index
87
*/
88
public ColumnarBatchRow getRow(int rowId);
89
90
/**
91
* Iterator over rows in this batch
92
*/
93
public Iterator<InternalRow> rowIterator();
94
95
@Override
96
public void close();
97
}
98
```
99
100
### ArrowColumnVector
101
102
ColumnVector implementation backed by Apache Arrow:
103
104
```java { .api }
105
public final class ArrowColumnVector extends ColumnVector {
106
/**
107
* Create ArrowColumnVector from Arrow ValueVector
108
*/
109
public ArrowColumnVector(ValueVector vector);
110
111
// Implements all ColumnVector methods with Arrow-optimized access
112
}
113
```
114
115
## Implementing Vectorized Data Sources
116
117
### Vectorized Partition Reader
118
119
```java
120
public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {
121
private final StructType schema;
122
private final String dataPath;
123
private final int batchSize;
124
private ColumnVector[] columns;
125
private int currentBatchRows;
126
private boolean hasNextBatch = true;
127
128
public MyVectorizedPartitionReader(StructType schema, String dataPath, int batchSize) {
129
this.schema = schema;
130
this.dataPath = dataPath;
131
this.batchSize = batchSize;
132
this.columns = createColumnVectors(schema, batchSize);
133
}
134
135
@Override
136
public boolean next() throws IOException {
137
if (!hasNextBatch) {
138
return false;
139
}
140
141
// Load next batch of data into column vectors
142
currentBatchRows = loadNextBatch();
143
hasNextBatch = currentBatchRows > 0;
144
145
return hasNextBatch;
146
}
147
148
@Override
149
public ColumnarBatch get() {
150
ColumnarBatch batch = new ColumnarBatch(columns, currentBatchRows);
151
return batch;
152
}
153
154
@Override
155
public void close() throws IOException {
156
if (columns != null) {
157
for (ColumnVector column : columns) {
158
column.close();
159
}
160
}
161
}
162
163
private ColumnVector[] createColumnVectors(StructType schema, int capacity) {
164
ColumnVector[] vectors = new ColumnVector[schema.length()];
165
for (int i = 0; i < schema.length(); i++) {
166
StructField field = schema.fields()[i];
167
vectors[i] = createColumnVector(field.dataType(), capacity);
168
}
169
return vectors;
170
}
171
172
private ColumnVector createColumnVector(DataType dataType, int capacity) {
173
if (dataType instanceof IntegerType) {
174
return new OnHeapColumnVector(capacity, IntegerType);
175
} else if (dataType instanceof LongType) {
176
return new OnHeapColumnVector(capacity, LongType);
177
} else if (dataType instanceof StringType) {
178
return new OnHeapColumnVector(capacity, StringType);
179
}
180
// Handle other data types...
181
throw new UnsupportedOperationException("Unsupported type: " + dataType);
182
}
183
184
private int loadNextBatch() throws IOException {
185
// Implementation-specific batch loading
186
// This would typically:
187
// 1. Read data from external source
188
// 2. Populate column vectors efficiently
189
// 3. Return number of rows loaded
190
return loadDataIntoVectors(columns);
191
}
192
}
193
```
194
195
### Custom ColumnVector Implementation
196
197
```java
198
public class MyOnHeapColumnVector extends ColumnVector {
199
private final DataType type;
200
private final int capacity;
201
private int numRows;
202
203
// Storage arrays for different types
204
private boolean[] booleanData;
205
private int[] intData;
206
private long[] longData;
207
private double[] doubleData;
208
private byte[][] binaryData;
209
210
// Null tracking
211
private boolean[] nulls;
212
private int numNulls;
213
214
public MyOnHeapColumnVector(int capacity, DataType type) {
215
this.capacity = capacity;
216
this.type = type;
217
this.nulls = new boolean[capacity];
218
219
// Allocate type-specific storage
220
if (type instanceof IntegerType) {
221
intData = new int[capacity];
222
} else if (type instanceof LongType) {
223
longData = new long[capacity];
224
} else if (type instanceof DoubleType) {
225
doubleData = new double[capacity];
226
} else if (type instanceof BooleanType) {
227
booleanData = new boolean[capacity];
228
} else if (type instanceof StringType || type instanceof BinaryType) {
229
binaryData = new byte[capacity][];
230
}
231
}
232
233
@Override
234
public DataType dataType() {
235
return type;
236
}
237
238
@Override
239
public int numNulls() {
240
return numNulls;
241
}
242
243
@Override
244
public boolean hasNull() {
245
return numNulls > 0;
246
}
247
248
@Override
249
public boolean isNullAt(int rowId) {
250
return nulls[rowId];
251
}
252
253
@Override
254
public int getInt(int rowId) {
255
if (nulls[rowId]) return 0;
256
return intData[rowId];
257
}
258
259
@Override
260
public long getLong(int rowId) {
261
if (nulls[rowId]) return 0L;
262
return longData[rowId];
263
}
264
265
@Override
266
public double getDouble(int rowId) {
267
if (nulls[rowId]) return 0.0;
268
return doubleData[rowId];
269
}
270
271
@Override
272
public boolean getBoolean(int rowId) {
273
if (nulls[rowId]) return false;
274
return booleanData[rowId];
275
}
276
277
@Override
278
public byte[] getBinary(int rowId) {
279
if (nulls[rowId]) return null;
280
return binaryData[rowId];
281
}
282
283
// Write methods for populating the vector
284
public void putInt(int rowId, int value) {
285
intData[rowId] = value;
286
nulls[rowId] = false;
287
}
288
289
public void putLong(int rowId, long value) {
290
longData[rowId] = value;
291
nulls[rowId] = false;
292
}
293
294
public void putDouble(int rowId, double value) {
295
doubleData[rowId] = value;
296
nulls[rowId] = false;
297
}
298
299
public void putNull(int rowId) {
300
nulls[rowId] = true;
301
numNulls++;
302
}
303
304
public void setNumRows(int numRows) {
305
this.numRows = numRows;
306
}
307
308
@Override
309
public void close() {
310
// Clean up arrays
311
booleanData = null;
312
intData = null;
313
longData = null;
314
doubleData = null;
315
binaryData = null;
316
nulls = null;
317
}
318
}
319
```
320
321
## Arrow Integration
322
323
### Arrow-Based Vectorized Reader
324
325
```java
326
public class ArrowVectorizedReader implements PartitionReader<ColumnarBatch> {
327
private final VectorSchemaRoot root;
328
private final ArrowFileReader arrowReader;
329
private final ColumnVector[] columns;
330
private boolean hasNext = true;
331
332
public ArrowVectorizedReader(String arrowFilePath, StructType schema)
333
throws IOException {
334
FileInputStream fis = new FileInputStream(arrowFilePath);
335
this.arrowReader = new ArrowFileReader(
336
new SeekableReadChannel(fis.getChannel()),
337
new RootAllocator()
338
);
339
340
this.root = arrowReader.getVectorSchemaRoot();
341
this.columns = createArrowColumnVectors();
342
}
343
344
@Override
345
public boolean next() throws IOException {
346
if (!hasNext) {
347
return false;
348
}
349
350
hasNext = arrowReader.loadNextBatch();
351
return hasNext;
352
}
353
354
@Override
355
public ColumnarBatch get() {
356
int numRows = root.getRowCount();
357
return new ColumnarBatch(columns, numRows);
358
}
359
360
@Override
361
public void close() throws IOException {
362
root.close();
363
arrowReader.close();
364
}
365
366
private ColumnVector[] createArrowColumnVectors() {
367
List<FieldVector> vectors = root.getFieldVectors();
368
ColumnVector[] columnVectors = new ColumnVector[vectors.size()];
369
370
for (int i = 0; i < vectors.size(); i++) {
371
columnVectors[i] = new ArrowColumnVector(vectors.get(i));
372
}
373
374
return columnVectors;
375
}
376
}
377
```
378
379
### Arrow Data Conversion
380
381
```java
382
public class ArrowDataConverter {
383
private final BufferAllocator allocator;
384
385
public ArrowDataConverter() {
386
this.allocator = new RootAllocator();
387
}
388
389
public VectorSchemaRoot convertToArrow(ColumnarBatch batch, StructType schema) {
390
List<Field> fields = new ArrayList<>();
391
List<FieldVector> vectors = new ArrayList<>();
392
393
for (int i = 0; i < schema.length(); i++) {
394
StructField field = schema.fields()[i];
395
ColumnVector sparkVector = batch.column(i);
396
397
Field arrowField = convertField(field);
398
FieldVector arrowVector = convertVector(sparkVector, arrowField);
399
400
fields.add(arrowField);
401
vectors.add(arrowVector);
402
}
403
404
Schema arrowSchema = new Schema(fields);
405
VectorSchemaRoot root = new VectorSchemaRoot(arrowSchema, vectors);
406
root.setRowCount(batch.numRows());
407
408
return root;
409
}
410
411
private Field convertField(StructField sparkField) {
412
ArrowType arrowType = convertDataType(sparkField.dataType());
413
return new Field(sparkField.name(),
414
new FieldType(sparkField.nullable(), arrowType, null),
415
null);
416
}
417
418
private ArrowType convertDataType(DataType sparkType) {
419
if (sparkType instanceof IntegerType) {
420
return new ArrowType.Int(32, true);
421
} else if (sparkType instanceof LongType) {
422
return new ArrowType.Int(64, true);
423
} else if (sparkType instanceof DoubleType) {
424
return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
425
} else if (sparkType instanceof StringType) {
426
return new ArrowType.Utf8();
427
}
428
throw new UnsupportedOperationException("Unsupported type: " + sparkType);
429
}
430
431
private FieldVector convertVector(ColumnVector sparkVector, Field field) {
432
FieldVector arrowVector = field.createVector(allocator);
433
arrowVector.allocateNew();
434
435
int numRows = sparkVector.numRows();
436
437
if (sparkVector.dataType() instanceof IntegerType) {
438
IntVector intVector = (IntVector) arrowVector;
439
for (int i = 0; i < numRows; i++) {
440
if (sparkVector.isNullAt(i)) {
441
intVector.setNull(i);
442
} else {
443
intVector.set(i, sparkVector.getInt(i));
444
}
445
}
446
}
447
// Handle other types similarly...
448
449
arrowVector.setValueCount(numRows);
450
return arrowVector;
451
}
452
}
453
```
454
455
## Vectorized Aggregations
456
457
### Vectorized Aggregate Functions
458
459
```java
460
public class VectorizedAggregateFunction {
461
462
public static long sum(ColumnVector vector) {
463
long result = 0;
464
int numRows = vector.numRows();
465
466
if (vector.dataType() instanceof IntegerType) {
467
for (int i = 0; i < numRows; i++) {
468
if (!vector.isNullAt(i)) {
469
result += vector.getInt(i);
470
}
471
}
472
} else if (vector.dataType() instanceof LongType) {
473
for (int i = 0; i < numRows; i++) {
474
if (!vector.isNullAt(i)) {
475
result += vector.getLong(i);
476
}
477
}
478
}
479
480
return result;
481
}
482
483
public static double average(ColumnVector vector) {
484
long sum = 0;
485
int count = 0;
486
int numRows = vector.numRows();
487
488
for (int i = 0; i < numRows; i++) {
489
if (!vector.isNullAt(i)) {
490
if (vector.dataType() instanceof IntegerType) {
491
sum += vector.getInt(i);
492
} else if (vector.dataType() instanceof LongType) {
493
sum += vector.getLong(i);
494
} else if (vector.dataType() instanceof DoubleType) {
495
sum += vector.getDouble(i);
496
}
497
count++;
498
}
499
}
500
501
return count > 0 ? (double) sum / count : 0.0;
502
}
503
504
public static Object min(ColumnVector vector) {
505
Object min = null;
506
int numRows = vector.numRows();
507
508
for (int i = 0; i < numRows; i++) {
509
if (!vector.isNullAt(i)) {
510
Object value = getValue(vector, i);
511
if (min == null || compareValues(value, min, vector.dataType()) < 0) {
512
min = value;
513
}
514
}
515
}
516
517
return min;
518
}
519
520
public static Object max(ColumnVector vector) {
521
Object max = null;
522
int numRows = vector.numRows();
523
524
for (int i = 0; i < numRows; i++) {
525
if (!vector.isNullAt(i)) {
526
Object value = getValue(vector, i);
527
if (max == null || compareValues(value, max, vector.dataType()) > 0) {
528
max = value;
529
}
530
}
531
}
532
533
return max;
534
}
535
536
private static Object getValue(ColumnVector vector, int rowId) {
537
DataType type = vector.dataType();
538
if (type instanceof IntegerType) {
539
return vector.getInt(rowId);
540
} else if (type instanceof LongType) {
541
return vector.getLong(rowId);
542
} else if (type instanceof DoubleType) {
543
return vector.getDouble(rowId);
544
} else if (type instanceof StringType) {
545
return vector.getUTF8String(rowId);
546
}
547
return null;
548
}
549
550
@SuppressWarnings("unchecked")
551
private static int compareValues(Object v1, Object v2, DataType dataType) {
552
if (v1 instanceof Comparable && v2 instanceof Comparable) {
553
return ((Comparable<Object>) v1).compareTo(v2);
554
}
555
return 0;
556
}
557
}
558
```
559
560
### Vectorized Filter Operations
561
562
```java
563
public class VectorizedFilters {
564
565
public static boolean[] equalTo(ColumnVector vector, Object value) {
566
int numRows = vector.numRows();
567
boolean[] result = new boolean[numRows];
568
569
for (int i = 0; i < numRows; i++) {
570
if (vector.isNullAt(i)) {
571
result[i] = false;
572
} else {
573
Object vectorValue = getValue(vector, i);
574
result[i] = Objects.equals(vectorValue, value);
575
}
576
}
577
578
return result;
579
}
580
581
public static boolean[] greaterThan(ColumnVector vector, Object value) {
582
int numRows = vector.numRows();
583
boolean[] result = new boolean[numRows];
584
585
for (int i = 0; i < numRows; i++) {
586
if (vector.isNullAt(i)) {
587
result[i] = false;
588
} else {
589
Object vectorValue = getValue(vector, i);
590
result[i] = compareValues(vectorValue, value, vector.dataType()) > 0;
591
}
592
}
593
594
return result;
595
}
596
597
public static boolean[] and(boolean[] left, boolean[] right) {
598
boolean[] result = new boolean[left.length];
599
for (int i = 0; i < left.length; i++) {
600
result[i] = left[i] && right[i];
601
}
602
return result;
603
}
604
605
public static boolean[] or(boolean[] left, boolean[] right) {
606
boolean[] result = new boolean[left.length];
607
for (int i = 0; i < left.length; i++) {
608
result[i] = left[i] || right[i];
609
}
610
return result;
611
}
612
613
public static ColumnarBatch filter(ColumnarBatch batch, boolean[] mask) {
614
int selectedRows = 0;
615
for (boolean selected : mask) {
616
if (selected) selectedRows++;
617
}
618
619
if (selectedRows == 0) {
620
return new ColumnarBatch(new ColumnVector[0], 0);
621
}
622
623
ColumnVector[] filteredColumns = new ColumnVector[batch.numCols()];
624
for (int colIdx = 0; colIdx < batch.numCols(); colIdx++) {
625
filteredColumns[colIdx] = filterColumn(batch.column(colIdx), mask, selectedRows);
626
}
627
628
return new ColumnarBatch(filteredColumns, selectedRows);
629
}
630
631
private static ColumnVector filterColumn(ColumnVector source, boolean[] mask, int resultRows) {
632
DataType dataType = source.dataType();
633
ColumnVector result = createColumnVector(dataType, resultRows);
634
635
int destIdx = 0;
636
for (int srcIdx = 0; srcIdx < mask.length; srcIdx++) {
637
if (mask[srcIdx]) {
638
copyValue(source, srcIdx, result, destIdx);
639
destIdx++;
640
}
641
}
642
643
return result;
644
}
645
646
private static void copyValue(ColumnVector source, int srcIdx,
647
ColumnVector dest, int destIdx) {
648
if (source.isNullAt(srcIdx)) {
649
((MyOnHeapColumnVector) dest).putNull(destIdx);
650
} else {
651
DataType type = source.dataType();
652
if (type instanceof IntegerType) {
653
((MyOnHeapColumnVector) dest).putInt(destIdx, source.getInt(srcIdx));
654
} else if (type instanceof LongType) {
655
((MyOnHeapColumnVector) dest).putLong(destIdx, source.getLong(srcIdx));
656
} else if (type instanceof DoubleType) {
657
((MyOnHeapColumnVector) dest).putDouble(destIdx, source.getDouble(srcIdx));
658
}
659
// Handle other types...
660
}
661
}
662
}
663
```
664
665
## Memory Management and Performance
666
667
### Off-Heap Column Vector
668
669
```java
670
public class OffHeapColumnVector extends ColumnVector {
671
private final DataType dataType;
672
private final long capacity;
673
private final long dataAddress;
674
private final long nullsAddress;
675
private int numRows;
676
677
public OffHeapColumnVector(DataType dataType, int capacity) {
678
this.dataType = dataType;
679
this.capacity = capacity;
680
681
// Allocate off-heap memory
682
int typeSize = getTypeSize(dataType);
683
this.dataAddress = PlatformDependent.allocateMemory(capacity * typeSize);
684
this.nullsAddress = PlatformDependent.allocateMemory(capacity); // 1 byte per null flag
685
686
// Initialize memory
687
PlatformDependent.setMemory(dataAddress, capacity * typeSize, (byte) 0);
688
PlatformDependent.setMemory(nullsAddress, capacity, (byte) 0);
689
}
690
691
@Override
692
public DataType dataType() {
693
return dataType;
694
}
695
696
@Override
697
public boolean isNullAt(int rowId) {
698
return PlatformDependent.getByte(nullsAddress + rowId) == 1;
699
}
700
701
@Override
702
public int getInt(int rowId) {
703
if (isNullAt(rowId)) return 0;
704
return PlatformDependent.getInt(dataAddress + rowId * 4);
705
}
706
707
@Override
708
public long getLong(int rowId) {
709
if (isNullAt(rowId)) return 0L;
710
return PlatformDependent.getLong(dataAddress + rowId * 8);
711
}
712
713
public void putInt(int rowId, int value) {
714
PlatformDependent.putInt(dataAddress + rowId * 4, value);
715
PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);
716
}
717
718
public void putLong(int rowId, long value) {
719
PlatformDependent.putLong(dataAddress + rowId * 8, value);
720
PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);
721
}
722
723
public void putNull(int rowId) {
724
PlatformDependent.putByte(nullsAddress + rowId, (byte) 1);
725
}
726
727
@Override
728
public void close() {
729
if (dataAddress != 0) {
730
PlatformDependent.freeMemory(dataAddress);
731
}
732
if (nullsAddress != 0) {
733
PlatformDependent.freeMemory(nullsAddress);
734
}
735
}
736
737
private int getTypeSize(DataType dataType) {
738
if (dataType instanceof IntegerType) return 4;
739
if (dataType instanceof LongType) return 8;
740
if (dataType instanceof DoubleType) return 8;
741
if (dataType instanceof FloatType) return 4;
742
return 8; // Default size
743
}
744
}
745
```
746
747
### Batch Size Optimization
748
749
```java
750
public class BatchSizeOptimizer {
751
private static final int MIN_BATCH_SIZE = 1024;
752
private static final int MAX_BATCH_SIZE = 8192;
753
private static final long TARGET_BATCH_MEMORY = 64 * 1024 * 1024; // 64MB
754
755
public static int calculateOptimalBatchSize(StructType schema) {
756
long rowSize = calculateRowSize(schema);
757
int calculatedBatchSize = (int) (TARGET_BATCH_MEMORY / rowSize);
758
759
// Clamp to reasonable bounds
760
return Math.max(MIN_BATCH_SIZE, Math.min(MAX_BATCH_SIZE, calculatedBatchSize));
761
}
762
763
private static long calculateRowSize(StructType schema) {
764
long totalSize = 0;
765
for (StructField field : schema.fields()) {
766
totalSize += getDataTypeSize(field.dataType());
767
}
768
return totalSize;
769
}
770
771
private static long getDataTypeSize(DataType dataType) {
772
if (dataType instanceof BooleanType) return 1;
773
if (dataType instanceof ByteType) return 1;
774
if (dataType instanceof ShortType) return 2;
775
if (dataType instanceof IntegerType) return 4;
776
if (dataType instanceof LongType) return 8;
777
if (dataType instanceof FloatType) return 4;
778
if (dataType instanceof DoubleType) return 8;
779
if (dataType instanceof StringType) return 20; // Average string size estimate
780
if (dataType instanceof BinaryType) return 16; // Average binary size estimate
781
return 8; // Default estimate
782
}
783
}
784
```
785
786
### Vectorized Expression Evaluation
787
788
```java
789
public class VectorizedExpressionEvaluator {
790
791
public static ColumnVector evaluateExpression(Expression expr, ColumnarBatch input) {
792
if (expr instanceof Literal) {
793
return evaluateLiteral((Literal) expr, input.numRows());
794
} else if (expr instanceof NamedReference) {
795
return evaluateColumnReference((NamedReference) expr, input);
796
} else if (expr instanceof BinaryExpression) {
797
return evaluateBinaryExpression((BinaryExpression) expr, input);
798
}
799
800
throw new UnsupportedOperationException("Unsupported expression: " + expr);
801
}
802
803
private static ColumnVector evaluateLiteral(Literal literal, int numRows) {
804
Object value = literal.value();
805
DataType dataType = inferDataType(value);
806
ColumnVector result = createColumnVector(dataType, numRows);
807
808
// Fill all rows with the literal value
809
for (int i = 0; i < numRows; i++) {
810
setColumnValue(result, i, value);
811
}
812
813
return result;
814
}
815
816
private static ColumnVector evaluateColumnReference(NamedReference ref, ColumnarBatch input) {
817
String[] fieldNames = ref.fieldNames();
818
String columnName = fieldNames[0]; // Simplified - assume single-level reference
819
820
// Find column index by name
821
// This would require schema information in a real implementation
822
int columnIndex = findColumnIndex(columnName, input);
823
return input.column(columnIndex);
824
}
825
826
private static ColumnVector evaluateBinaryExpression(BinaryExpression expr, ColumnarBatch input) {
827
ColumnVector left = evaluateExpression(expr.left(), input);
828
ColumnVector right = evaluateExpression(expr.right(), input);
829
830
if (expr instanceof Add) {
831
return vectorizedAdd(left, right);
832
} else if (expr instanceof Subtract) {
833
return vectorizedSubtract(left, right);
834
} else if (expr instanceof Multiply) {
835
return vectorizedMultiply(left, right);
836
}
837
838
throw new UnsupportedOperationException("Unsupported binary expression: " + expr);
839
}
840
841
private static ColumnVector vectorizedAdd(ColumnVector left, ColumnVector right) {
842
int numRows = Math.min(left.numRows(), right.numRows());
843
DataType resultType = promoteTypes(left.dataType(), right.dataType());
844
ColumnVector result = createColumnVector(resultType, numRows);
845
846
for (int i = 0; i < numRows; i++) {
847
if (left.isNullAt(i) || right.isNullAt(i)) {
848
((MyOnHeapColumnVector) result).putNull(i);
849
} else {
850
if (resultType instanceof IntegerType) {
851
int sum = left.getInt(i) + right.getInt(i);
852
((MyOnHeapColumnVector) result).putInt(i, sum);
853
} else if (resultType instanceof LongType) {
854
long sum = left.getLong(i) + right.getLong(i);
855
((MyOnHeapColumnVector) result).putLong(i, sum);
856
} else if (resultType instanceof DoubleType) {
857
double sum = left.getDouble(i) + right.getDouble(i);
858
((MyOnHeapColumnVector) result).putDouble(i, sum);
859
}
860
}
861
}
862
863
return result;
864
}
865
}
866
```
867
868
## Integration with Spark SQL
869
870
### Vectorized Data Source
871
872
```java
873
public class VectorizedDataSource implements Table, SupportsRead {
874
875
@Override
876
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
877
return new VectorizedScanBuilder(schema, options);
878
}
879
880
@Override
881
public Set<TableCapability> capabilities() {
882
return Set.of(
883
TableCapability.BATCH_READ,
884
TableCapability.ACCEPT_ANY_SCHEMA
885
);
886
}
887
}
888
889
public class VectorizedScan implements Scan, SupportsReportStatistics {
890
891
@Override
892
public Batch toBatch() {
893
return new VectorizedBatch(schema, paths);
894
}
895
896
@Override
897
public Statistics estimateStatistics() {
898
// Provide statistics for query optimization
899
return new Statistics() {
900
@Override
901
public OptionalLong sizeInBytes() {
902
return OptionalLong.of(calculateDataSize());
903
}
904
905
@Override
906
public OptionalLong numRows() {
907
return OptionalLong.of(estimateRowCount());
908
}
909
};
910
}
911
}
912
```
913
914
The Vectorized Processing APIs provide the foundation for high-performance analytical workloads in Spark, enabling efficient processing of large datasets through columnar operations and optimized memory layouts.