The Vectorized Processing APIs in Apache Spark Catalyst enable high-performance columnar data processing. These APIs support efficient batch operations, reduced memory overhead, and optimized CPU utilization through vectorized execution and columnar storage formats like Apache Arrow.
Base class for columnar data representation:
package org.apache.spark.sql.vectorized;
public abstract class ColumnVector implements AutoCloseable {
/**
* Data type of this column
*/
public abstract DataType dataType();
/**
* Number of null values in this column
*/
public abstract int numNulls();
/**
* Whether this column has any null values
*/
public abstract boolean hasNull();
/**
* Check if value at given row is null
*/
public abstract boolean isNullAt(int rowId);
/**
* Clean up resources
*/
@Override
public abstract void close();
// Type-specific getters
public boolean getBoolean(int rowId) { ... }
public byte getByte(int rowId) { ... }
public short getShort(int rowId) { ... }
public int getInt(int rowId) { ... }
public long getLong(int rowId) { ... }
public float getFloat(int rowId) { ... }
public double getDouble(int rowId) { ... }
public UTF8String getUTF8String(int rowId) { ... }
public byte[] getBinary(int rowId) { ... }
public Decimal getDecimal(int rowId, int precision, int scale) { ... }
// Complex type getters
public ColumnVector getChild(int ordinal) { ... }
public int getArrayLength(int rowId) { ... }
public int getArrayOffset(int rowId) { ... }
}Collection of ColumnVector objects representing a batch of data:
public final class ColumnarBatch implements AutoCloseable {
/**
* Number of columns in this batch
*/
public int numCols();
/**
* Number of rows in this batch
*/
public int numRows();
/**
* Set number of rows (for dynamically sized batches)
*/
public void setNumRows(int numRows);
/**
* Get column vector at given ordinal
*/
public ColumnVector column(int ordinal);
/**
* Get row at given index
*/
public ColumnarBatchRow getRow(int rowId);
/**
* Iterator over rows in this batch
*/
public Iterator<InternalRow> rowIterator();
@Override
public void close();
}ColumnVector implementation backed by Apache Arrow:
public final class ArrowColumnVector extends ColumnVector {
/**
* Create ArrowColumnVector from Arrow ValueVector
*/
public ArrowColumnVector(ValueVector vector);
// Implements all ColumnVector methods with Arrow-optimized access
}public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {
private final StructType schema;
private final String dataPath;
private final int batchSize;
private ColumnVector[] columns;
private int currentBatchRows;
private boolean hasNextBatch = true;
public MyVectorizedPartitionReader(StructType schema, String dataPath, int batchSize) {
this.schema = schema;
this.dataPath = dataPath;
this.batchSize = batchSize;
this.columns = createColumnVectors(schema, batchSize);
}
@Override
public boolean next() throws IOException {
if (!hasNextBatch) {
return false;
}
// Load next batch of data into column vectors
currentBatchRows = loadNextBatch();
hasNextBatch = currentBatchRows > 0;
return hasNextBatch;
}
@Override
public ColumnarBatch get() {
ColumnarBatch batch = new ColumnarBatch(columns, currentBatchRows);
return batch;
}
@Override
public void close() throws IOException {
if (columns != null) {
for (ColumnVector column : columns) {
column.close();
}
}
}
private ColumnVector[] createColumnVectors(StructType schema, int capacity) {
ColumnVector[] vectors = new ColumnVector[schema.length()];
for (int i = 0; i < schema.length(); i++) {
StructField field = schema.fields()[i];
vectors[i] = createColumnVector(field.dataType(), capacity);
}
return vectors;
}
private ColumnVector createColumnVector(DataType dataType, int capacity) {
if (dataType instanceof IntegerType) {
return new OnHeapColumnVector(capacity, IntegerType);
} else if (dataType instanceof LongType) {
return new OnHeapColumnVector(capacity, LongType);
} else if (dataType instanceof StringType) {
return new OnHeapColumnVector(capacity, StringType);
}
// Handle other data types...
throw new UnsupportedOperationException("Unsupported type: " + dataType);
}
private int loadNextBatch() throws IOException {
// Implementation-specific batch loading
// This would typically:
// 1. Read data from external source
// 2. Populate column vectors efficiently
// 3. Return number of rows loaded
return loadDataIntoVectors(columns);
}
}public class MyOnHeapColumnVector extends ColumnVector {
private final DataType type;
private final int capacity;
private int numRows;
// Storage arrays for different types
private boolean[] booleanData;
private int[] intData;
private long[] longData;
private double[] doubleData;
private byte[][] binaryData;
// Null tracking
private boolean[] nulls;
private int numNulls;
public MyOnHeapColumnVector(int capacity, DataType type) {
this.capacity = capacity;
this.type = type;
this.nulls = new boolean[capacity];
// Allocate type-specific storage
if (type instanceof IntegerType) {
intData = new int[capacity];
} else if (type instanceof LongType) {
longData = new long[capacity];
} else if (type instanceof DoubleType) {
doubleData = new double[capacity];
} else if (type instanceof BooleanType) {
booleanData = new boolean[capacity];
} else if (type instanceof StringType || type instanceof BinaryType) {
binaryData = new byte[capacity][];
}
}
@Override
public DataType dataType() {
return type;
}
@Override
public int numNulls() {
return numNulls;
}
@Override
public boolean hasNull() {
return numNulls > 0;
}
@Override
public boolean isNullAt(int rowId) {
return nulls[rowId];
}
@Override
public int getInt(int rowId) {
if (nulls[rowId]) return 0;
return intData[rowId];
}
@Override
public long getLong(int rowId) {
if (nulls[rowId]) return 0L;
return longData[rowId];
}
@Override
public double getDouble(int rowId) {
if (nulls[rowId]) return 0.0;
return doubleData[rowId];
}
@Override
public boolean getBoolean(int rowId) {
if (nulls[rowId]) return false;
return booleanData[rowId];
}
@Override
public byte[] getBinary(int rowId) {
if (nulls[rowId]) return null;
return binaryData[rowId];
}
// Write methods for populating the vector
public void putInt(int rowId, int value) {
intData[rowId] = value;
nulls[rowId] = false;
}
public void putLong(int rowId, long value) {
longData[rowId] = value;
nulls[rowId] = false;
}
public void putDouble(int rowId, double value) {
doubleData[rowId] = value;
nulls[rowId] = false;
}
public void putNull(int rowId) {
nulls[rowId] = true;
numNulls++;
}
public void setNumRows(int numRows) {
this.numRows = numRows;
}
@Override
public void close() {
// Clean up arrays
booleanData = null;
intData = null;
longData = null;
doubleData = null;
binaryData = null;
nulls = null;
}
}public class ArrowVectorizedReader implements PartitionReader<ColumnarBatch> {
private final VectorSchemaRoot root;
private final ArrowFileReader arrowReader;
private final ColumnVector[] columns;
private boolean hasNext = true;
public ArrowVectorizedReader(String arrowFilePath, StructType schema)
throws IOException {
FileInputStream fis = new FileInputStream(arrowFilePath);
this.arrowReader = new ArrowFileReader(
new SeekableReadChannel(fis.getChannel()),
new RootAllocator()
);
this.root = arrowReader.getVectorSchemaRoot();
this.columns = createArrowColumnVectors();
}
@Override
public boolean next() throws IOException {
if (!hasNext) {
return false;
}
hasNext = arrowReader.loadNextBatch();
return hasNext;
}
@Override
public ColumnarBatch get() {
int numRows = root.getRowCount();
return new ColumnarBatch(columns, numRows);
}
@Override
public void close() throws IOException {
root.close();
arrowReader.close();
}
private ColumnVector[] createArrowColumnVectors() {
List<FieldVector> vectors = root.getFieldVectors();
ColumnVector[] columnVectors = new ColumnVector[vectors.size()];
for (int i = 0; i < vectors.size(); i++) {
columnVectors[i] = new ArrowColumnVector(vectors.get(i));
}
return columnVectors;
}
}public class ArrowDataConverter {
private final BufferAllocator allocator;
public ArrowDataConverter() {
this.allocator = new RootAllocator();
}
public VectorSchemaRoot convertToArrow(ColumnarBatch batch, StructType schema) {
List<Field> fields = new ArrayList<>();
List<FieldVector> vectors = new ArrayList<>();
for (int i = 0; i < schema.length(); i++) {
StructField field = schema.fields()[i];
ColumnVector sparkVector = batch.column(i);
Field arrowField = convertField(field);
FieldVector arrowVector = convertVector(sparkVector, arrowField);
fields.add(arrowField);
vectors.add(arrowVector);
}
Schema arrowSchema = new Schema(fields);
VectorSchemaRoot root = new VectorSchemaRoot(arrowSchema, vectors);
root.setRowCount(batch.numRows());
return root;
}
private Field convertField(StructField sparkField) {
ArrowType arrowType = convertDataType(sparkField.dataType());
return new Field(sparkField.name(),
new FieldType(sparkField.nullable(), arrowType, null),
null);
}
private ArrowType convertDataType(DataType sparkType) {
if (sparkType instanceof IntegerType) {
return new ArrowType.Int(32, true);
} else if (sparkType instanceof LongType) {
return new ArrowType.Int(64, true);
} else if (sparkType instanceof DoubleType) {
return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
} else if (sparkType instanceof StringType) {
return new ArrowType.Utf8();
}
throw new UnsupportedOperationException("Unsupported type: " + sparkType);
}
private FieldVector convertVector(ColumnVector sparkVector, Field field) {
FieldVector arrowVector = field.createVector(allocator);
arrowVector.allocateNew();
int numRows = sparkVector.numRows();
if (sparkVector.dataType() instanceof IntegerType) {
IntVector intVector = (IntVector) arrowVector;
for (int i = 0; i < numRows; i++) {
if (sparkVector.isNullAt(i)) {
intVector.setNull(i);
} else {
intVector.set(i, sparkVector.getInt(i));
}
}
}
// Handle other types similarly...
arrowVector.setValueCount(numRows);
return arrowVector;
}
}public class VectorizedAggregateFunction {
public static long sum(ColumnVector vector) {
long result = 0;
int numRows = vector.numRows();
if (vector.dataType() instanceof IntegerType) {
for (int i = 0; i < numRows; i++) {
if (!vector.isNullAt(i)) {
result += vector.getInt(i);
}
}
} else if (vector.dataType() instanceof LongType) {
for (int i = 0; i < numRows; i++) {
if (!vector.isNullAt(i)) {
result += vector.getLong(i);
}
}
}
return result;
}
public static double average(ColumnVector vector) {
long sum = 0;
int count = 0;
int numRows = vector.numRows();
for (int i = 0; i < numRows; i++) {
if (!vector.isNullAt(i)) {
if (vector.dataType() instanceof IntegerType) {
sum += vector.getInt(i);
} else if (vector.dataType() instanceof LongType) {
sum += vector.getLong(i);
} else if (vector.dataType() instanceof DoubleType) {
sum += vector.getDouble(i);
}
count++;
}
}
return count > 0 ? (double) sum / count : 0.0;
}
public static Object min(ColumnVector vector) {
Object min = null;
int numRows = vector.numRows();
for (int i = 0; i < numRows; i++) {
if (!vector.isNullAt(i)) {
Object value = getValue(vector, i);
if (min == null || compareValues(value, min, vector.dataType()) < 0) {
min = value;
}
}
}
return min;
}
public static Object max(ColumnVector vector) {
Object max = null;
int numRows = vector.numRows();
for (int i = 0; i < numRows; i++) {
if (!vector.isNullAt(i)) {
Object value = getValue(vector, i);
if (max == null || compareValues(value, max, vector.dataType()) > 0) {
max = value;
}
}
}
return max;
}
private static Object getValue(ColumnVector vector, int rowId) {
DataType type = vector.dataType();
if (type instanceof IntegerType) {
return vector.getInt(rowId);
} else if (type instanceof LongType) {
return vector.getLong(rowId);
} else if (type instanceof DoubleType) {
return vector.getDouble(rowId);
} else if (type instanceof StringType) {
return vector.getUTF8String(rowId);
}
return null;
}
@SuppressWarnings("unchecked")
private static int compareValues(Object v1, Object v2, DataType dataType) {
if (v1 instanceof Comparable && v2 instanceof Comparable) {
return ((Comparable<Object>) v1).compareTo(v2);
}
return 0;
}
}public class VectorizedFilters {
public static boolean[] equalTo(ColumnVector vector, Object value) {
int numRows = vector.numRows();
boolean[] result = new boolean[numRows];
for (int i = 0; i < numRows; i++) {
if (vector.isNullAt(i)) {
result[i] = false;
} else {
Object vectorValue = getValue(vector, i);
result[i] = Objects.equals(vectorValue, value);
}
}
return result;
}
public static boolean[] greaterThan(ColumnVector vector, Object value) {
int numRows = vector.numRows();
boolean[] result = new boolean[numRows];
for (int i = 0; i < numRows; i++) {
if (vector.isNullAt(i)) {
result[i] = false;
} else {
Object vectorValue = getValue(vector, i);
result[i] = compareValues(vectorValue, value, vector.dataType()) > 0;
}
}
return result;
}
public static boolean[] and(boolean[] left, boolean[] right) {
boolean[] result = new boolean[left.length];
for (int i = 0; i < left.length; i++) {
result[i] = left[i] && right[i];
}
return result;
}
public static boolean[] or(boolean[] left, boolean[] right) {
boolean[] result = new boolean[left.length];
for (int i = 0; i < left.length; i++) {
result[i] = left[i] || right[i];
}
return result;
}
public static ColumnarBatch filter(ColumnarBatch batch, boolean[] mask) {
int selectedRows = 0;
for (boolean selected : mask) {
if (selected) selectedRows++;
}
if (selectedRows == 0) {
return new ColumnarBatch(new ColumnVector[0], 0);
}
ColumnVector[] filteredColumns = new ColumnVector[batch.numCols()];
for (int colIdx = 0; colIdx < batch.numCols(); colIdx++) {
filteredColumns[colIdx] = filterColumn(batch.column(colIdx), mask, selectedRows);
}
return new ColumnarBatch(filteredColumns, selectedRows);
}
private static ColumnVector filterColumn(ColumnVector source, boolean[] mask, int resultRows) {
DataType dataType = source.dataType();
ColumnVector result = createColumnVector(dataType, resultRows);
int destIdx = 0;
for (int srcIdx = 0; srcIdx < mask.length; srcIdx++) {
if (mask[srcIdx]) {
copyValue(source, srcIdx, result, destIdx);
destIdx++;
}
}
return result;
}
private static void copyValue(ColumnVector source, int srcIdx,
ColumnVector dest, int destIdx) {
if (source.isNullAt(srcIdx)) {
((MyOnHeapColumnVector) dest).putNull(destIdx);
} else {
DataType type = source.dataType();
if (type instanceof IntegerType) {
((MyOnHeapColumnVector) dest).putInt(destIdx, source.getInt(srcIdx));
} else if (type instanceof LongType) {
((MyOnHeapColumnVector) dest).putLong(destIdx, source.getLong(srcIdx));
} else if (type instanceof DoubleType) {
((MyOnHeapColumnVector) dest).putDouble(destIdx, source.getDouble(srcIdx));
}
// Handle other types...
}
}
}public class OffHeapColumnVector extends ColumnVector {
private final DataType dataType;
private final long capacity;
private final long dataAddress;
private final long nullsAddress;
private int numRows;
public OffHeapColumnVector(DataType dataType, int capacity) {
this.dataType = dataType;
this.capacity = capacity;
// Allocate off-heap memory
int typeSize = getTypeSize(dataType);
this.dataAddress = PlatformDependent.allocateMemory(capacity * typeSize);
this.nullsAddress = PlatformDependent.allocateMemory(capacity); // 1 byte per null flag
// Initialize memory
PlatformDependent.setMemory(dataAddress, capacity * typeSize, (byte) 0);
PlatformDependent.setMemory(nullsAddress, capacity, (byte) 0);
}
@Override
public DataType dataType() {
return dataType;
}
@Override
public boolean isNullAt(int rowId) {
return PlatformDependent.getByte(nullsAddress + rowId) == 1;
}
@Override
public int getInt(int rowId) {
if (isNullAt(rowId)) return 0;
return PlatformDependent.getInt(dataAddress + rowId * 4);
}
@Override
public long getLong(int rowId) {
if (isNullAt(rowId)) return 0L;
return PlatformDependent.getLong(dataAddress + rowId * 8);
}
public void putInt(int rowId, int value) {
PlatformDependent.putInt(dataAddress + rowId * 4, value);
PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);
}
public void putLong(int rowId, long value) {
PlatformDependent.putLong(dataAddress + rowId * 8, value);
PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);
}
public void putNull(int rowId) {
PlatformDependent.putByte(nullsAddress + rowId, (byte) 1);
}
@Override
public void close() {
if (dataAddress != 0) {
PlatformDependent.freeMemory(dataAddress);
}
if (nullsAddress != 0) {
PlatformDependent.freeMemory(nullsAddress);
}
}
private int getTypeSize(DataType dataType) {
if (dataType instanceof IntegerType) return 4;
if (dataType instanceof LongType) return 8;
if (dataType instanceof DoubleType) return 8;
if (dataType instanceof FloatType) return 4;
return 8; // Default size
}
}public class BatchSizeOptimizer {
private static final int MIN_BATCH_SIZE = 1024;
private static final int MAX_BATCH_SIZE = 8192;
private static final long TARGET_BATCH_MEMORY = 64 * 1024 * 1024; // 64MB
public static int calculateOptimalBatchSize(StructType schema) {
long rowSize = calculateRowSize(schema);
int calculatedBatchSize = (int) (TARGET_BATCH_MEMORY / rowSize);
// Clamp to reasonable bounds
return Math.max(MIN_BATCH_SIZE, Math.min(MAX_BATCH_SIZE, calculatedBatchSize));
}
private static long calculateRowSize(StructType schema) {
long totalSize = 0;
for (StructField field : schema.fields()) {
totalSize += getDataTypeSize(field.dataType());
}
return totalSize;
}
private static long getDataTypeSize(DataType dataType) {
if (dataType instanceof BooleanType) return 1;
if (dataType instanceof ByteType) return 1;
if (dataType instanceof ShortType) return 2;
if (dataType instanceof IntegerType) return 4;
if (dataType instanceof LongType) return 8;
if (dataType instanceof FloatType) return 4;
if (dataType instanceof DoubleType) return 8;
if (dataType instanceof StringType) return 20; // Average string size estimate
if (dataType instanceof BinaryType) return 16; // Average binary size estimate
return 8; // Default estimate
}
}public class VectorizedExpressionEvaluator {
public static ColumnVector evaluateExpression(Expression expr, ColumnarBatch input) {
if (expr instanceof Literal) {
return evaluateLiteral((Literal) expr, input.numRows());
} else if (expr instanceof NamedReference) {
return evaluateColumnReference((NamedReference) expr, input);
} else if (expr instanceof BinaryExpression) {
return evaluateBinaryExpression((BinaryExpression) expr, input);
}
throw new UnsupportedOperationException("Unsupported expression: " + expr);
}
private static ColumnVector evaluateLiteral(Literal literal, int numRows) {
Object value = literal.value();
DataType dataType = inferDataType(value);
ColumnVector result = createColumnVector(dataType, numRows);
// Fill all rows with the literal value
for (int i = 0; i < numRows; i++) {
setColumnValue(result, i, value);
}
return result;
}
private static ColumnVector evaluateColumnReference(NamedReference ref, ColumnarBatch input) {
String[] fieldNames = ref.fieldNames();
String columnName = fieldNames[0]; // Simplified - assume single-level reference
// Find column index by name
// This would require schema information in a real implementation
int columnIndex = findColumnIndex(columnName, input);
return input.column(columnIndex);
}
private static ColumnVector evaluateBinaryExpression(BinaryExpression expr, ColumnarBatch input) {
ColumnVector left = evaluateExpression(expr.left(), input);
ColumnVector right = evaluateExpression(expr.right(), input);
if (expr instanceof Add) {
return vectorizedAdd(left, right);
} else if (expr instanceof Subtract) {
return vectorizedSubtract(left, right);
} else if (expr instanceof Multiply) {
return vectorizedMultiply(left, right);
}
throw new UnsupportedOperationException("Unsupported binary expression: " + expr);
}
private static ColumnVector vectorizedAdd(ColumnVector left, ColumnVector right) {
int numRows = Math.min(left.numRows(), right.numRows());
DataType resultType = promoteTypes(left.dataType(), right.dataType());
ColumnVector result = createColumnVector(resultType, numRows);
for (int i = 0; i < numRows; i++) {
if (left.isNullAt(i) || right.isNullAt(i)) {
((MyOnHeapColumnVector) result).putNull(i);
} else {
if (resultType instanceof IntegerType) {
int sum = left.getInt(i) + right.getInt(i);
((MyOnHeapColumnVector) result).putInt(i, sum);
} else if (resultType instanceof LongType) {
long sum = left.getLong(i) + right.getLong(i);
((MyOnHeapColumnVector) result).putLong(i, sum);
} else if (resultType instanceof DoubleType) {
double sum = left.getDouble(i) + right.getDouble(i);
((MyOnHeapColumnVector) result).putDouble(i, sum);
}
}
}
return result;
}
}public class VectorizedDataSource implements Table, SupportsRead {
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new VectorizedScanBuilder(schema, options);
}
@Override
public Set<TableCapability> capabilities() {
return Set.of(
TableCapability.BATCH_READ,
TableCapability.ACCEPT_ANY_SCHEMA
);
}
}
public class VectorizedScan implements Scan, SupportsReportStatistics {
@Override
public Batch toBatch() {
return new VectorizedBatch(schema, paths);
}
@Override
public Statistics estimateStatistics() {
// Provide statistics for query optimization
return new Statistics() {
@Override
public OptionalLong sizeInBytes() {
return OptionalLong.of(calculateDataSize());
}
@Override
public OptionalLong numRows() {
return OptionalLong.of(estimateRowCount());
}
};
}
}The Vectorized Processing APIs provide the foundation for high-performance analytical workloads in Spark, enabling efficient processing of large datasets through columnar operations and optimized memory layouts.