or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

vectorized-processing.mddocs/

Vectorized Processing APIs

The Vectorized Processing APIs in Apache Spark Catalyst enable high-performance columnar data processing. These APIs support efficient batch operations, reduced memory overhead, and optimized CPU utilization through vectorized execution and columnar storage formats like Apache Arrow.

Core Vectorized Interfaces

ColumnVector

Base class for columnar data representation:

package org.apache.spark.sql.vectorized;

public abstract class ColumnVector implements AutoCloseable {
    /**
     * Data type of this column
     */
    public abstract DataType dataType();
    
    /**
     * Number of null values in this column
     */
    public abstract int numNulls();
    
    /**
     * Whether this column has any null values
     */
    public abstract boolean hasNull();
    
    /**
     * Check if value at given row is null
     */
    public abstract boolean isNullAt(int rowId);
    
    /**
     * Clean up resources
     */
    @Override
    public abstract void close();
    
    // Type-specific getters
    public boolean getBoolean(int rowId) { ... }
    public byte getByte(int rowId) { ... }
    public short getShort(int rowId) { ... }
    public int getInt(int rowId) { ... }
    public long getLong(int rowId) { ... }
    public float getFloat(int rowId) { ... }
    public double getDouble(int rowId) { ... }
    public UTF8String getUTF8String(int rowId) { ... }
    public byte[] getBinary(int rowId) { ... }
    public Decimal getDecimal(int rowId, int precision, int scale) { ... }
    
    // Complex type getters
    public ColumnVector getChild(int ordinal) { ... }
    public int getArrayLength(int rowId) { ... }
    public int getArrayOffset(int rowId) { ... }
}

ColumnarBatch

Collection of ColumnVector objects representing a batch of data:

public final class ColumnarBatch implements AutoCloseable {
    /**
     * Number of columns in this batch
     */
    public int numCols();
    
    /**
     * Number of rows in this batch
     */
    public int numRows();
    
    /**
     * Set number of rows (for dynamically sized batches)
     */
    public void setNumRows(int numRows);
    
    /**
     * Get column vector at given ordinal
     */
    public ColumnVector column(int ordinal);
    
    /**
     * Get row at given index
     */
    public ColumnarBatchRow getRow(int rowId);
    
    /**
     * Iterator over rows in this batch
     */
    public Iterator<InternalRow> rowIterator();
    
    @Override
    public void close();
}

ArrowColumnVector

ColumnVector implementation backed by Apache Arrow:

public final class ArrowColumnVector extends ColumnVector {
    /**
     * Create ArrowColumnVector from Arrow ValueVector
     */
    public ArrowColumnVector(ValueVector vector);
    
    // Implements all ColumnVector methods with Arrow-optimized access
}

Implementing Vectorized Data Sources

Vectorized Partition Reader

public class MyVectorizedPartitionReader implements PartitionReader<ColumnarBatch> {
    private final StructType schema;
    private final String dataPath;
    private final int batchSize;
    private ColumnVector[] columns;
    private int currentBatchRows;
    private boolean hasNextBatch = true;
    
    public MyVectorizedPartitionReader(StructType schema, String dataPath, int batchSize) {
        this.schema = schema;
        this.dataPath = dataPath;
        this.batchSize = batchSize;
        this.columns = createColumnVectors(schema, batchSize);
    }
    
    @Override
    public boolean next() throws IOException {
        if (!hasNextBatch) {
            return false;
        }
        
        // Load next batch of data into column vectors
        currentBatchRows = loadNextBatch();
        hasNextBatch = currentBatchRows > 0;
        
        return hasNextBatch;
    }
    
    @Override
    public ColumnarBatch get() {
        ColumnarBatch batch = new ColumnarBatch(columns, currentBatchRows);
        return batch;
    }
    
    @Override
    public void close() throws IOException {
        if (columns != null) {
            for (ColumnVector column : columns) {
                column.close();
            }
        }
    }
    
    private ColumnVector[] createColumnVectors(StructType schema, int capacity) {
        ColumnVector[] vectors = new ColumnVector[schema.length()];
        for (int i = 0; i < schema.length(); i++) {
            StructField field = schema.fields()[i];
            vectors[i] = createColumnVector(field.dataType(), capacity);
        }
        return vectors;
    }
    
    private ColumnVector createColumnVector(DataType dataType, int capacity) {
        if (dataType instanceof IntegerType) {
            return new OnHeapColumnVector(capacity, IntegerType);
        } else if (dataType instanceof LongType) {
            return new OnHeapColumnVector(capacity, LongType);
        } else if (dataType instanceof StringType) {
            return new OnHeapColumnVector(capacity, StringType);
        }
        // Handle other data types...
        throw new UnsupportedOperationException("Unsupported type: " + dataType);
    }
    
    private int loadNextBatch() throws IOException {
        // Implementation-specific batch loading
        // This would typically:
        // 1. Read data from external source
        // 2. Populate column vectors efficiently
        // 3. Return number of rows loaded
        return loadDataIntoVectors(columns);
    }
}

Custom ColumnVector Implementation

public class MyOnHeapColumnVector extends ColumnVector {
    private final DataType type;
    private final int capacity;
    private int numRows;
    
    // Storage arrays for different types
    private boolean[] booleanData;
    private int[] intData;
    private long[] longData;
    private double[] doubleData;
    private byte[][] binaryData;
    
    // Null tracking
    private boolean[] nulls;
    private int numNulls;
    
    public MyOnHeapColumnVector(int capacity, DataType type) {
        this.capacity = capacity;
        this.type = type;
        this.nulls = new boolean[capacity];
        
        // Allocate type-specific storage
        if (type instanceof IntegerType) {
            intData = new int[capacity];
        } else if (type instanceof LongType) {
            longData = new long[capacity];
        } else if (type instanceof DoubleType) {
            doubleData = new double[capacity];
        } else if (type instanceof BooleanType) {
            booleanData = new boolean[capacity];
        } else if (type instanceof StringType || type instanceof BinaryType) {
            binaryData = new byte[capacity][];
        }
    }
    
    @Override
    public DataType dataType() {
        return type;
    }
    
    @Override
    public int numNulls() {
        return numNulls;
    }
    
    @Override
    public boolean hasNull() {
        return numNulls > 0;
    }
    
    @Override
    public boolean isNullAt(int rowId) {
        return nulls[rowId];
    }
    
    @Override
    public int getInt(int rowId) {
        if (nulls[rowId]) return 0;
        return intData[rowId];
    }
    
    @Override
    public long getLong(int rowId) {
        if (nulls[rowId]) return 0L;
        return longData[rowId];
    }
    
    @Override
    public double getDouble(int rowId) {
        if (nulls[rowId]) return 0.0;
        return doubleData[rowId];
    }
    
    @Override
    public boolean getBoolean(int rowId) {
        if (nulls[rowId]) return false;
        return booleanData[rowId];
    }
    
    @Override
    public byte[] getBinary(int rowId) {
        if (nulls[rowId]) return null;
        return binaryData[rowId];
    }
    
    // Write methods for populating the vector
    public void putInt(int rowId, int value) {
        intData[rowId] = value;
        nulls[rowId] = false;
    }
    
    public void putLong(int rowId, long value) {
        longData[rowId] = value;
        nulls[rowId] = false;
    }
    
    public void putDouble(int rowId, double value) {
        doubleData[rowId] = value;
        nulls[rowId] = false;
    }
    
    public void putNull(int rowId) {
        nulls[rowId] = true;
        numNulls++;
    }
    
    public void setNumRows(int numRows) {
        this.numRows = numRows;
    }
    
    @Override
    public void close() {
        // Clean up arrays
        booleanData = null;
        intData = null;
        longData = null;
        doubleData = null;
        binaryData = null;
        nulls = null;
    }
}

Arrow Integration

Arrow-Based Vectorized Reader

public class ArrowVectorizedReader implements PartitionReader<ColumnarBatch> {
    private final VectorSchemaRoot root;
    private final ArrowFileReader arrowReader;
    private final ColumnVector[] columns;
    private boolean hasNext = true;
    
    public ArrowVectorizedReader(String arrowFilePath, StructType schema) 
            throws IOException {
        FileInputStream fis = new FileInputStream(arrowFilePath);
        this.arrowReader = new ArrowFileReader(
            new SeekableReadChannel(fis.getChannel()), 
            new RootAllocator()
        );
        
        this.root = arrowReader.getVectorSchemaRoot();
        this.columns = createArrowColumnVectors();
    }
    
    @Override
    public boolean next() throws IOException {
        if (!hasNext) {
            return false;
        }
        
        hasNext = arrowReader.loadNextBatch();
        return hasNext;
    }
    
    @Override
    public ColumnarBatch get() {
        int numRows = root.getRowCount();
        return new ColumnarBatch(columns, numRows);
    }
    
    @Override
    public void close() throws IOException {
        root.close();
        arrowReader.close();
    }
    
    private ColumnVector[] createArrowColumnVectors() {
        List<FieldVector> vectors = root.getFieldVectors();
        ColumnVector[] columnVectors = new ColumnVector[vectors.size()];
        
        for (int i = 0; i < vectors.size(); i++) {
            columnVectors[i] = new ArrowColumnVector(vectors.get(i));
        }
        
        return columnVectors;
    }
}

Arrow Data Conversion

public class ArrowDataConverter {
    private final BufferAllocator allocator;
    
    public ArrowDataConverter() {
        this.allocator = new RootAllocator();
    }
    
    public VectorSchemaRoot convertToArrow(ColumnarBatch batch, StructType schema) {
        List<Field> fields = new ArrayList<>();
        List<FieldVector> vectors = new ArrayList<>();
        
        for (int i = 0; i < schema.length(); i++) {
            StructField field = schema.fields()[i];
            ColumnVector sparkVector = batch.column(i);
            
            Field arrowField = convertField(field);
            FieldVector arrowVector = convertVector(sparkVector, arrowField);
            
            fields.add(arrowField);
            vectors.add(arrowVector);
        }
        
        Schema arrowSchema = new Schema(fields);
        VectorSchemaRoot root = new VectorSchemaRoot(arrowSchema, vectors);
        root.setRowCount(batch.numRows());
        
        return root;
    }
    
    private Field convertField(StructField sparkField) {
        ArrowType arrowType = convertDataType(sparkField.dataType());
        return new Field(sparkField.name(), 
                        new FieldType(sparkField.nullable(), arrowType, null), 
                        null);
    }
    
    private ArrowType convertDataType(DataType sparkType) {
        if (sparkType instanceof IntegerType) {
            return new ArrowType.Int(32, true);
        } else if (sparkType instanceof LongType) {
            return new ArrowType.Int(64, true);
        } else if (sparkType instanceof DoubleType) {
            return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
        } else if (sparkType instanceof StringType) {
            return new ArrowType.Utf8();
        }
        throw new UnsupportedOperationException("Unsupported type: " + sparkType);
    }
    
    private FieldVector convertVector(ColumnVector sparkVector, Field field) {
        FieldVector arrowVector = field.createVector(allocator);
        arrowVector.allocateNew();
        
        int numRows = sparkVector.numRows();
        
        if (sparkVector.dataType() instanceof IntegerType) {
            IntVector intVector = (IntVector) arrowVector;
            for (int i = 0; i < numRows; i++) {
                if (sparkVector.isNullAt(i)) {
                    intVector.setNull(i);
                } else {
                    intVector.set(i, sparkVector.getInt(i));
                }
            }
        }
        // Handle other types similarly...
        
        arrowVector.setValueCount(numRows);
        return arrowVector;
    }
}

Vectorized Aggregations

Vectorized Aggregate Functions

public class VectorizedAggregateFunction {
    
    public static long sum(ColumnVector vector) {
        long result = 0;
        int numRows = vector.numRows();
        
        if (vector.dataType() instanceof IntegerType) {
            for (int i = 0; i < numRows; i++) {
                if (!vector.isNullAt(i)) {
                    result += vector.getInt(i);
                }
            }
        } else if (vector.dataType() instanceof LongType) {
            for (int i = 0; i < numRows; i++) {
                if (!vector.isNullAt(i)) {
                    result += vector.getLong(i);
                }
            }
        }
        
        return result;
    }
    
    public static double average(ColumnVector vector) {
        long sum = 0;
        int count = 0;
        int numRows = vector.numRows();
        
        for (int i = 0; i < numRows; i++) {
            if (!vector.isNullAt(i)) {
                if (vector.dataType() instanceof IntegerType) {
                    sum += vector.getInt(i);
                } else if (vector.dataType() instanceof LongType) {
                    sum += vector.getLong(i);
                } else if (vector.dataType() instanceof DoubleType) {
                    sum += vector.getDouble(i);
                }
                count++;
            }
        }
        
        return count > 0 ? (double) sum / count : 0.0;
    }
    
    public static Object min(ColumnVector vector) {
        Object min = null;
        int numRows = vector.numRows();
        
        for (int i = 0; i < numRows; i++) {
            if (!vector.isNullAt(i)) {
                Object value = getValue(vector, i);
                if (min == null || compareValues(value, min, vector.dataType()) < 0) {
                    min = value;
                }
            }
        }
        
        return min;
    }
    
    public static Object max(ColumnVector vector) {
        Object max = null;
        int numRows = vector.numRows();
        
        for (int i = 0; i < numRows; i++) {
            if (!vector.isNullAt(i)) {
                Object value = getValue(vector, i);
                if (max == null || compareValues(value, max, vector.dataType()) > 0) {
                    max = value;
                }
            }
        }
        
        return max;
    }
    
    private static Object getValue(ColumnVector vector, int rowId) {
        DataType type = vector.dataType();
        if (type instanceof IntegerType) {
            return vector.getInt(rowId);
        } else if (type instanceof LongType) {
            return vector.getLong(rowId);
        } else if (type instanceof DoubleType) {
            return vector.getDouble(rowId);
        } else if (type instanceof StringType) {
            return vector.getUTF8String(rowId);
        }
        return null;
    }
    
    @SuppressWarnings("unchecked")
    private static int compareValues(Object v1, Object v2, DataType dataType) {
        if (v1 instanceof Comparable && v2 instanceof Comparable) {
            return ((Comparable<Object>) v1).compareTo(v2);
        }
        return 0;
    }
}

Vectorized Filter Operations

public class VectorizedFilters {
    
    public static boolean[] equalTo(ColumnVector vector, Object value) {
        int numRows = vector.numRows();
        boolean[] result = new boolean[numRows];
        
        for (int i = 0; i < numRows; i++) {
            if (vector.isNullAt(i)) {
                result[i] = false;
            } else {
                Object vectorValue = getValue(vector, i);
                result[i] = Objects.equals(vectorValue, value);
            }
        }
        
        return result;
    }
    
    public static boolean[] greaterThan(ColumnVector vector, Object value) {
        int numRows = vector.numRows();
        boolean[] result = new boolean[numRows];
        
        for (int i = 0; i < numRows; i++) {
            if (vector.isNullAt(i)) {
                result[i] = false;
            } else {
                Object vectorValue = getValue(vector, i);
                result[i] = compareValues(vectorValue, value, vector.dataType()) > 0;
            }
        }
        
        return result;
    }
    
    public static boolean[] and(boolean[] left, boolean[] right) {
        boolean[] result = new boolean[left.length];
        for (int i = 0; i < left.length; i++) {
            result[i] = left[i] && right[i];
        }
        return result;
    }
    
    public static boolean[] or(boolean[] left, boolean[] right) {
        boolean[] result = new boolean[left.length];
        for (int i = 0; i < left.length; i++) {
            result[i] = left[i] || right[i];
        }
        return result;
    }
    
    public static ColumnarBatch filter(ColumnarBatch batch, boolean[] mask) {
        int selectedRows = 0;
        for (boolean selected : mask) {
            if (selected) selectedRows++;
        }
        
        if (selectedRows == 0) {
            return new ColumnarBatch(new ColumnVector[0], 0);
        }
        
        ColumnVector[] filteredColumns = new ColumnVector[batch.numCols()];
        for (int colIdx = 0; colIdx < batch.numCols(); colIdx++) {
            filteredColumns[colIdx] = filterColumn(batch.column(colIdx), mask, selectedRows);
        }
        
        return new ColumnarBatch(filteredColumns, selectedRows);
    }
    
    private static ColumnVector filterColumn(ColumnVector source, boolean[] mask, int resultRows) {
        DataType dataType = source.dataType();
        ColumnVector result = createColumnVector(dataType, resultRows);
        
        int destIdx = 0;
        for (int srcIdx = 0; srcIdx < mask.length; srcIdx++) {
            if (mask[srcIdx]) {
                copyValue(source, srcIdx, result, destIdx);
                destIdx++;
            }
        }
        
        return result;
    }
    
    private static void copyValue(ColumnVector source, int srcIdx, 
                                ColumnVector dest, int destIdx) {
        if (source.isNullAt(srcIdx)) {
            ((MyOnHeapColumnVector) dest).putNull(destIdx);
        } else {
            DataType type = source.dataType();
            if (type instanceof IntegerType) {
                ((MyOnHeapColumnVector) dest).putInt(destIdx, source.getInt(srcIdx));
            } else if (type instanceof LongType) {
                ((MyOnHeapColumnVector) dest).putLong(destIdx, source.getLong(srcIdx));
            } else if (type instanceof DoubleType) {
                ((MyOnHeapColumnVector) dest).putDouble(destIdx, source.getDouble(srcIdx));
            }
            // Handle other types...
        }
    }
}

Memory Management and Performance

Off-Heap Column Vector

public class OffHeapColumnVector extends ColumnVector {
    private final DataType dataType;
    private final long capacity;
    private final long dataAddress;
    private final long nullsAddress;
    private int numRows;
    
    public OffHeapColumnVector(DataType dataType, int capacity) {
        this.dataType = dataType;
        this.capacity = capacity;
        
        // Allocate off-heap memory
        int typeSize = getTypeSize(dataType);
        this.dataAddress = PlatformDependent.allocateMemory(capacity * typeSize);
        this.nullsAddress = PlatformDependent.allocateMemory(capacity); // 1 byte per null flag
        
        // Initialize memory
        PlatformDependent.setMemory(dataAddress, capacity * typeSize, (byte) 0);
        PlatformDependent.setMemory(nullsAddress, capacity, (byte) 0);
    }
    
    @Override
    public DataType dataType() {
        return dataType;
    }
    
    @Override
    public boolean isNullAt(int rowId) {
        return PlatformDependent.getByte(nullsAddress + rowId) == 1;
    }
    
    @Override
    public int getInt(int rowId) {
        if (isNullAt(rowId)) return 0;
        return PlatformDependent.getInt(dataAddress + rowId * 4);
    }
    
    @Override
    public long getLong(int rowId) {
        if (isNullAt(rowId)) return 0L;
        return PlatformDependent.getLong(dataAddress + rowId * 8);
    }
    
    public void putInt(int rowId, int value) {
        PlatformDependent.putInt(dataAddress + rowId * 4, value);
        PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);
    }
    
    public void putLong(int rowId, long value) {
        PlatformDependent.putLong(dataAddress + rowId * 8, value);
        PlatformDependent.putByte(nullsAddress + rowId, (byte) 0);
    }
    
    public void putNull(int rowId) {
        PlatformDependent.putByte(nullsAddress + rowId, (byte) 1);
    }
    
    @Override
    public void close() {
        if (dataAddress != 0) {
            PlatformDependent.freeMemory(dataAddress);
        }
        if (nullsAddress != 0) {
            PlatformDependent.freeMemory(nullsAddress);
        }
    }
    
    private int getTypeSize(DataType dataType) {
        if (dataType instanceof IntegerType) return 4;
        if (dataType instanceof LongType) return 8;
        if (dataType instanceof DoubleType) return 8;
        if (dataType instanceof FloatType) return 4;
        return 8; // Default size
    }
}

Batch Size Optimization

public class BatchSizeOptimizer {
    private static final int MIN_BATCH_SIZE = 1024;
    private static final int MAX_BATCH_SIZE = 8192;
    private static final long TARGET_BATCH_MEMORY = 64 * 1024 * 1024; // 64MB
    
    public static int calculateOptimalBatchSize(StructType schema) {
        long rowSize = calculateRowSize(schema);
        int calculatedBatchSize = (int) (TARGET_BATCH_MEMORY / rowSize);
        
        // Clamp to reasonable bounds
        return Math.max(MIN_BATCH_SIZE, Math.min(MAX_BATCH_SIZE, calculatedBatchSize));
    }
    
    private static long calculateRowSize(StructType schema) {
        long totalSize = 0;
        for (StructField field : schema.fields()) {
            totalSize += getDataTypeSize(field.dataType());
        }
        return totalSize;
    }
    
    private static long getDataTypeSize(DataType dataType) {
        if (dataType instanceof BooleanType) return 1;
        if (dataType instanceof ByteType) return 1;
        if (dataType instanceof ShortType) return 2;
        if (dataType instanceof IntegerType) return 4;
        if (dataType instanceof LongType) return 8;
        if (dataType instanceof FloatType) return 4;
        if (dataType instanceof DoubleType) return 8;
        if (dataType instanceof StringType) return 20; // Average string size estimate
        if (dataType instanceof BinaryType) return 16; // Average binary size estimate
        return 8; // Default estimate
    }
}

Vectorized Expression Evaluation

public class VectorizedExpressionEvaluator {
    
    public static ColumnVector evaluateExpression(Expression expr, ColumnarBatch input) {
        if (expr instanceof Literal) {
            return evaluateLiteral((Literal) expr, input.numRows());
        } else if (expr instanceof NamedReference) {
            return evaluateColumnReference((NamedReference) expr, input);
        } else if (expr instanceof BinaryExpression) {
            return evaluateBinaryExpression((BinaryExpression) expr, input);
        }
        
        throw new UnsupportedOperationException("Unsupported expression: " + expr);
    }
    
    private static ColumnVector evaluateLiteral(Literal literal, int numRows) {
        Object value = literal.value();
        DataType dataType = inferDataType(value);
        ColumnVector result = createColumnVector(dataType, numRows);
        
        // Fill all rows with the literal value
        for (int i = 0; i < numRows; i++) {
            setColumnValue(result, i, value);
        }
        
        return result;
    }
    
    private static ColumnVector evaluateColumnReference(NamedReference ref, ColumnarBatch input) {
        String[] fieldNames = ref.fieldNames();
        String columnName = fieldNames[0]; // Simplified - assume single-level reference
        
        // Find column index by name
        // This would require schema information in a real implementation
        int columnIndex = findColumnIndex(columnName, input);
        return input.column(columnIndex);
    }
    
    private static ColumnVector evaluateBinaryExpression(BinaryExpression expr, ColumnarBatch input) {
        ColumnVector left = evaluateExpression(expr.left(), input);
        ColumnVector right = evaluateExpression(expr.right(), input);
        
        if (expr instanceof Add) {
            return vectorizedAdd(left, right);
        } else if (expr instanceof Subtract) {
            return vectorizedSubtract(left, right);
        } else if (expr instanceof Multiply) {
            return vectorizedMultiply(left, right);
        }
        
        throw new UnsupportedOperationException("Unsupported binary expression: " + expr);
    }
    
    private static ColumnVector vectorizedAdd(ColumnVector left, ColumnVector right) {
        int numRows = Math.min(left.numRows(), right.numRows());
        DataType resultType = promoteTypes(left.dataType(), right.dataType());
        ColumnVector result = createColumnVector(resultType, numRows);
        
        for (int i = 0; i < numRows; i++) {
            if (left.isNullAt(i) || right.isNullAt(i)) {
                ((MyOnHeapColumnVector) result).putNull(i);
            } else {
                if (resultType instanceof IntegerType) {
                    int sum = left.getInt(i) + right.getInt(i);
                    ((MyOnHeapColumnVector) result).putInt(i, sum);
                } else if (resultType instanceof LongType) {
                    long sum = left.getLong(i) + right.getLong(i);
                    ((MyOnHeapColumnVector) result).putLong(i, sum);
                } else if (resultType instanceof DoubleType) {
                    double sum = left.getDouble(i) + right.getDouble(i);
                    ((MyOnHeapColumnVector) result).putDouble(i, sum);
                }
            }
        }
        
        return result;
    }
}

Integration with Spark SQL

Vectorized Data Source

public class VectorizedDataSource implements Table, SupportsRead {
    
    @Override
    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
        return new VectorizedScanBuilder(schema, options);
    }
    
    @Override
    public Set<TableCapability> capabilities() {
        return Set.of(
            TableCapability.BATCH_READ,
            TableCapability.ACCEPT_ANY_SCHEMA
        );
    }
}

public class VectorizedScan implements Scan, SupportsReportStatistics {
    
    @Override
    public Batch toBatch() {
        return new VectorizedBatch(schema, paths);
    }
    
    @Override
    public Statistics estimateStatistics() {
        // Provide statistics for query optimization
        return new Statistics() {
            @Override
            public OptionalLong sizeInBytes() {
                return OptionalLong.of(calculateDataSize());
            }
            
            @Override
            public OptionalLong numRows() {
                return OptionalLong.of(estimateRowCount());
            }
        };
    }
}

The Vectorized Processing APIs provide the foundation for high-performance analytical workloads in Spark, enabling efficient processing of large datasets through columnar operations and optimized memory layouts.