or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md
tile.json

vector-processing.mddocs/

Vector Processing

High-performance vector implementations that adapt ORC column vectors to Flink's vector API for efficient columnar data processing. Provides type-safe access to vectorized data with support for all standard data types.

Capabilities

Abstract Vector Base

Base class for all ORC to Flink vector adapters, providing common null handling and factory methods.

/**
 * Base class for adapting ORC column vectors to Flink column vectors
 * Provides common functionality for null handling and vector creation
 */
public abstract class AbstractOrcNoHiveVector implements ColumnVector {
    
    /**
     * Check if value at given index is null
     * @param i Row index to check
     * @return true if value is null, false otherwise
     */
    public boolean isNullAt(int i);
    
    /**
     * Create appropriate Flink vector from ORC column vector
     * Automatically detects vector type and creates corresponding adapter
     * @param vector ORC column vector to adapt
     * @return Flink column vector implementation
     * @throws UnsupportedOperationException for unsupported vector types
     */
    public static ColumnVector createFlinkVector(ColumnVector vector);
    
    /**
     * Create Flink vector from constant value for partition columns
     * @param type Logical type of the constant value
     * @param value Constant value to fill vector with
     * @param batchSize Number of rows in the vector
     * @return Flink column vector filled with constant value
     * @throws UnsupportedOperationException for unsupported types
     */
    public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);
}

Usage Examples:

import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;

// Create Flink vector from ORC vector
LongColumnVector orcVector = new LongColumnVector(1024);
ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);

// Create constant vector for partition column
LogicalType stringType = new VarCharType(50);
ColumnVector constantVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
    stringType, 
    "US",     // partition value
    1024      // batch size
);

// Check for nulls
for (int i = 0; i < batchSize; i++) {
    if (!flinkVector.isNullAt(i)) {
        // Process non-null value
        long value = ((LongColumnVector) flinkVector).getLong(i);
    }
}

Long Vector Adapter

Adapter for ORC LongColumnVector supporting multiple Flink integer and boolean types.

/**
 * Adapter for ORC LongColumnVector to Flink's numeric column vectors
 * Supports boolean, byte, short, int, and long data types
 */
public class OrcNoHiveLongVector extends AbstractOrcNoHiveVector 
    implements LongColumnVector, BooleanColumnVector, ByteColumnVector, 
               ShortColumnVector, IntColumnVector {
    
    /**
     * Create long vector adapter
     * @param vector ORC LongColumnVector to adapt
     */
    public OrcNoHiveLongVector(LongColumnVector vector);
    
    /**
     * Get long value at specified index
     * @param i Row index
     * @return Long value at index
     */
    public long getLong(int i);
    
    /**
     * Get boolean value at specified index (1 = true, 0 = false)
     * @param i Row index
     * @return Boolean value at index
     */
    public boolean getBoolean(int i);
    
    /**
     * Get byte value at specified index
     * @param i Row index
     * @return Byte value at index
     */
    public byte getByte(int i);
    
    /**
     * Get short value at specified index
     * @param i Row index
     * @return Short value at index
     */
    public short getShort(int i);
    
    /**
     * Get int value at specified index
     * @param i Row index
     * @return Int value at index
     */
    public int getInt(int i);
}

Double Vector Adapter

Adapter for ORC DoubleColumnVector supporting float and double types.

/**
 * Adapter for ORC DoubleColumnVector to Flink's floating-point column vectors
 * Supports both float and double data types
 */
public class OrcNoHiveDoubleVector extends AbstractOrcNoHiveVector
    implements DoubleColumnVector, FloatColumnVector {
    
    /**
     * Create double vector adapter
     * @param vector ORC DoubleColumnVector to adapt
     */
    public OrcNoHiveDoubleVector(DoubleColumnVector vector);
    
    /**
     * Get double value at specified index
     * @param i Row index
     * @return Double value at index
     */
    public double getDouble(int i);
    
    /**
     * Get float value at specified index (cast from double)
     * @param i Row index
     * @return Float value at index
     */
    public float getFloat(int i);
}

Bytes Vector Adapter

Adapter for ORC BytesColumnVector supporting string and binary types.

/**
 * Adapter for ORC BytesColumnVector to Flink's bytes column vector
 * Supports string, char, varchar, binary, and varbinary types
 */
public class OrcNoHiveBytesVector extends AbstractOrcNoHiveVector
    implements BytesColumnVector {
    
    /**
     * Create bytes vector adapter
     * @param vector ORC BytesColumnVector to adapt
     */
    public OrcNoHiveBytesVector(BytesColumnVector vector);
    
    /**
     * Get Bytes value at specified index
     * @param i Row index
     * @return Bytes object containing byte data, start offset, and length
     */
    public Bytes getBytes(int i);
}

Decimal Vector Adapter

Adapter for ORC DecimalColumnVector supporting high-precision decimal types.

/**
 * Adapter for ORC DecimalColumnVector to Flink's decimal column vector
 * Supports decimal types with configurable precision and scale
 */
public class OrcNoHiveDecimalVector extends AbstractOrcNoHiveVector
    implements DecimalColumnVector {
    
    /**
     * Create decimal vector adapter
     * @param vector ORC DecimalColumnVector to adapt
     */
    public OrcNoHiveDecimalVector(DecimalColumnVector vector);
    
    /**
     * Get decimal value at specified index
     * @param i Row index
     * @param precision Decimal precision (total digits)
     * @param scale Decimal scale (digits after decimal point)
     * @return DecimalData value at index
     */
    public DecimalData getDecimal(int i, int precision, int scale);
}

Timestamp Vector Adapter

Adapter for ORC TimestampColumnVector supporting timestamp types.

/**
 * Adapter for ORC TimestampColumnVector to Flink's timestamp column vector
 * Supports timestamp with and without timezone
 */
public class OrcNoHiveTimestampVector extends AbstractOrcNoHiveVector
    implements TimestampColumnVector {
    
    /**
     * Create timestamp vector adapter
     * @param vector ORC TimestampColumnVector to adapt
     */
    public OrcNoHiveTimestampVector(TimestampColumnVector vector);
    
    /**
     * Get timestamp value at specified index
     * @param i Row index
     * @param precision Timestamp precision (digits in fractional seconds)
     * @return TimestampData value at index
     */
    public TimestampData getTimestamp(int i, int precision);
}

Batch Wrapper

Wrapper for ORC VectorizedRowBatch providing size information and batch access.

/**
 * Wrapper for ORC VectorizedRowBatch
 * Provides access to the underlying batch and size information
 */
public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
    
    /**
     * Create batch wrapper
     * @param batch ORC VectorizedRowBatch to wrap
     */
    public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);
    
    /**
     * Get the wrapped ORC batch
     * @return Underlying VectorizedRowBatch
     */
    public VectorizedRowBatch getBatch();
    
    /**
     * Get number of rows in the batch
     * @return Number of rows currently in batch
     */
    public int size();
}

Vector Creation Examples

Automatic Vector Creation

import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;
import org.apache.orc.storage.ql.exec.vector.*;

// Create ORC vectors
LongColumnVector longVector = new LongColumnVector(1024);
DoubleColumnVector doubleVector = new DoubleColumnVector(1024);
BytesColumnVector bytesVector = new BytesColumnVector(1024);
DecimalColumnVector decimalVector = new DecimalColumnVector(1024, 10, 2);
TimestampColumnVector timestampVector = new TimestampColumnVector(1024);

// Automatically create appropriate Flink vectors
ColumnVector[] flinkVectors = new ColumnVector[] {
    AbstractOrcNoHiveVector.createFlinkVector(longVector),      // OrcNoHiveLongVector
    AbstractOrcNoHiveVector.createFlinkVector(doubleVector),    // OrcNoHiveDoubleVector  
    AbstractOrcNoHiveVector.createFlinkVector(bytesVector),     // OrcNoHiveBytesVector
    AbstractOrcNoHiveVector.createFlinkVector(decimalVector),   // OrcNoHiveDecimalVector
    AbstractOrcNoHiveVector.createFlinkVector(timestampVector)  // OrcNoHiveTimestampVector
};

Constant Vector Creation for Partitions

import org.apache.flink.table.types.logical.*;

int batchSize = 1024;

// Create constant vectors for partition values
ColumnVector countryVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
    new VarCharType(50), "US", batchSize
);

ColumnVector yearVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
    new IntType(), 2023, batchSize
);

ColumnVector isActiveVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
    new BooleanType(), true, batchSize
);

// All rows in batch will have the same partition values
for (int i = 0; i < batchSize; i++) {
    assert countryVector.getString(i).toString().equals("US");
    assert yearVector.getInt(i) == 2023;
    assert isActiveVector.getBoolean(i) == true;
}

Type Mapping

Flink Logical TypeORC Vector TypeFlink Vector InterfaceNotes
BOOLEANLongColumnVectorBooleanColumnVector1=true, 0=false
TINYINTLongColumnVectorByteColumnVectorCast from long
SMALLINTLongColumnVectorShortColumnVectorCast from long
INTEGERLongColumnVectorIntColumnVectorCast from long
BIGINTLongColumnVectorLongColumnVectorDirect mapping
FLOATDoubleColumnVectorFloatColumnVectorCast from double
DOUBLEDoubleColumnVectorDoubleColumnVectorDirect mapping
CHAR, VARCHARBytesColumnVectorBytesColumnVectorUTF-8 bytes
BINARY, VARBINARYBytesColumnVectorBytesColumnVectorRaw bytes
DECIMALDecimalColumnVectorDecimalColumnVectorHiveDecimal format
DATELongColumnVectorIntColumnVectorDays since epoch
TIMESTAMP_*TimestampColumnVectorTimestampColumnVectorMicrosecond precision

Vectorized Processing Patterns

Batch Processing with Type Safety

import org.apache.flink.table.data.vector.VectorizedColumnBatch;

// Process vectorized batch with mixed types
public void processBatch(VectorizedColumnBatch batch) {
    int numRows = batch.getNumRows();
    
    // Get typed column vectors
    LongColumnVector idVector = (LongColumnVector) batch.getColumn(0);
    BytesColumnVector nameVector = (BytesColumnVector) batch.getColumn(1);
    IntColumnVector ageVector = (IntColumnVector) batch.getColumn(2);
    DecimalColumnVector salaryVector = (DecimalColumnVector) batch.getColumn(3);
    
    // Process rows in batch
    for (int i = 0; i < numRows; i++) {
        if (!idVector.isNullAt(i)) {
            long id = idVector.getLong(i);
            String name = nameVector.isNullAt(i) ? null : 
                new String(nameVector.getBytes(i).getData(), StandardCharsets.UTF_8);
            int age = ageVector.isNullAt(i) ? 0 : ageVector.getInt(i);
            DecimalData salary = salaryVector.isNullAt(i) ? null : 
                salaryVector.getDecimal(i, 10, 2);
            
            // Process row data
            processRow(id, name, age, salary);
        }
    }
}

Null-Safe Vector Access

// Safe access pattern for nullable columns
public <T> T safeGet(ColumnVector vector, int index, Function<Integer, T> getter, T defaultValue) {
    return vector.isNullAt(index) ? defaultValue : getter.apply(index);
}

// Usage examples
LongColumnVector longVector = (LongColumnVector) batch.getColumn(0);
BytesColumnVector stringVector = (BytesColumnVector) batch.getColumn(1);

for (int i = 0; i < batch.getNumRows(); i++) {
    Long id = safeGet(longVector, i, longVector::getLong, null);
    String name = safeGet(stringVector, i, 
        idx -> new String(stringVector.getBytes(idx).getData(), StandardCharsets.UTF_8), 
        "UNKNOWN");
    
    if (id != null) {
        processRecord(id, name);
    }
}

Performance Considerations

Memory Management

  • Vector Reuse: Vectors are reused across batches to minimize allocations
  • Lazy Conversion: Values are converted from ORC format only when accessed
  • Batch Size: Larger batches improve throughput but use more memory
  • Null Handling: Optimized null checking avoids unnecessary object creation

Access Patterns

// Efficient: Sequential access within batch
for (int i = 0; i < batch.getNumRows(); i++) {
    processValue(vector.getLong(i));
}

// Less efficient: Random access pattern
for (int i : randomIndices) {
    processValue(vector.getLong(i));
}

// Efficient: Bulk null checking
if (vector.hasNulls()) {
    // Handle nulls explicitly
    for (int i = 0; i < batch.getNumRows(); i++) {
        if (!vector.isNullAt(i)) {
            processValue(vector.getLong(i));
        }
    }
} else {
    // No nulls, skip null checks
    for (int i = 0; i < batch.getNumRows(); i++) {
        processValue(vector.getLong(i));
    }
}

Error Handling

try {
    ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);
    
    for (int i = 0; i < batchSize; i++) {
        if (!flinkVector.isNullAt(i)) {
            // Type-safe access
            if (flinkVector instanceof LongColumnVector) {
                long value = ((LongColumnVector) flinkVector).getLong(i);
            }
        }
    }
} catch (UnsupportedOperationException e) {
    // Handle unsupported vector types
    logger.error("Unsupported ORC vector type: " + orcVector.getClass(), e);
} catch (ClassCastException e) {
    // Handle type mismatches
    logger.error("Vector type mismatch during access", e);
}