High-performance vector implementations that adapt ORC column vectors to Flink's vector API for efficient columnar data processing. Provides type-safe access to vectorized data with support for all standard data types.
Base class for all ORC to Flink vector adapters, providing common null handling and factory methods.
/**
* Base class for adapting ORC column vectors to Flink column vectors
* Provides common functionality for null handling and vector creation
*/
public abstract class AbstractOrcNoHiveVector implements ColumnVector {
/**
* Check if value at given index is null
* @param i Row index to check
* @return true if value is null, false otherwise
*/
public boolean isNullAt(int i);
/**
* Create appropriate Flink vector from ORC column vector
* Automatically detects vector type and creates corresponding adapter
* @param vector ORC column vector to adapt
* @return Flink column vector implementation
* @throws UnsupportedOperationException for unsupported vector types
*/
public static ColumnVector createFlinkVector(ColumnVector vector);
/**
* Create Flink vector from constant value for partition columns
* @param type Logical type of the constant value
* @param value Constant value to fill vector with
* @param batchSize Number of rows in the vector
* @return Flink column vector filled with constant value
* @throws UnsupportedOperationException for unsupported types
*/
public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);
}Usage Examples:
import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
// Create Flink vector from ORC vector
LongColumnVector orcVector = new LongColumnVector(1024);
ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);
// Create constant vector for partition column
LogicalType stringType = new VarCharType(50);
ColumnVector constantVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
stringType,
"US", // partition value
1024 // batch size
);
// Check for nulls
for (int i = 0; i < batchSize; i++) {
if (!flinkVector.isNullAt(i)) {
// Process non-null value
long value = ((LongColumnVector) flinkVector).getLong(i);
}
}Adapter for ORC LongColumnVector supporting multiple Flink integer and boolean types.
/**
* Adapter for ORC LongColumnVector to Flink's numeric column vectors
* Supports boolean, byte, short, int, and long data types
*/
public class OrcNoHiveLongVector extends AbstractOrcNoHiveVector
implements LongColumnVector, BooleanColumnVector, ByteColumnVector,
ShortColumnVector, IntColumnVector {
/**
* Create long vector adapter
* @param vector ORC LongColumnVector to adapt
*/
public OrcNoHiveLongVector(LongColumnVector vector);
/**
* Get long value at specified index
* @param i Row index
* @return Long value at index
*/
public long getLong(int i);
/**
* Get boolean value at specified index (1 = true, 0 = false)
* @param i Row index
* @return Boolean value at index
*/
public boolean getBoolean(int i);
/**
* Get byte value at specified index
* @param i Row index
* @return Byte value at index
*/
public byte getByte(int i);
/**
* Get short value at specified index
* @param i Row index
* @return Short value at index
*/
public short getShort(int i);
/**
* Get int value at specified index
* @param i Row index
* @return Int value at index
*/
public int getInt(int i);
}Adapter for ORC DoubleColumnVector supporting float and double types.
/**
* Adapter for ORC DoubleColumnVector to Flink's floating-point column vectors
* Supports both float and double data types
*/
public class OrcNoHiveDoubleVector extends AbstractOrcNoHiveVector
implements DoubleColumnVector, FloatColumnVector {
/**
* Create double vector adapter
* @param vector ORC DoubleColumnVector to adapt
*/
public OrcNoHiveDoubleVector(DoubleColumnVector vector);
/**
* Get double value at specified index
* @param i Row index
* @return Double value at index
*/
public double getDouble(int i);
/**
* Get float value at specified index (cast from double)
* @param i Row index
* @return Float value at index
*/
public float getFloat(int i);
}Adapter for ORC BytesColumnVector supporting string and binary types.
/**
* Adapter for ORC BytesColumnVector to Flink's bytes column vector
* Supports string, char, varchar, binary, and varbinary types
*/
public class OrcNoHiveBytesVector extends AbstractOrcNoHiveVector
implements BytesColumnVector {
/**
* Create bytes vector adapter
* @param vector ORC BytesColumnVector to adapt
*/
public OrcNoHiveBytesVector(BytesColumnVector vector);
/**
* Get Bytes value at specified index
* @param i Row index
* @return Bytes object containing byte data, start offset, and length
*/
public Bytes getBytes(int i);
}Adapter for ORC DecimalColumnVector supporting high-precision decimal types.
/**
* Adapter for ORC DecimalColumnVector to Flink's decimal column vector
* Supports decimal types with configurable precision and scale
*/
public class OrcNoHiveDecimalVector extends AbstractOrcNoHiveVector
implements DecimalColumnVector {
/**
* Create decimal vector adapter
* @param vector ORC DecimalColumnVector to adapt
*/
public OrcNoHiveDecimalVector(DecimalColumnVector vector);
/**
* Get decimal value at specified index
* @param i Row index
* @param precision Decimal precision (total digits)
* @param scale Decimal scale (digits after decimal point)
* @return DecimalData value at index
*/
public DecimalData getDecimal(int i, int precision, int scale);
}Adapter for ORC TimestampColumnVector supporting timestamp types.
/**
* Adapter for ORC TimestampColumnVector to Flink's timestamp column vector
* Supports timestamp with and without timezone
*/
public class OrcNoHiveTimestampVector extends AbstractOrcNoHiveVector
implements TimestampColumnVector {
/**
* Create timestamp vector adapter
* @param vector ORC TimestampColumnVector to adapt
*/
public OrcNoHiveTimestampVector(TimestampColumnVector vector);
/**
* Get timestamp value at specified index
* @param i Row index
* @param precision Timestamp precision (digits in fractional seconds)
* @return TimestampData value at index
*/
public TimestampData getTimestamp(int i, int precision);
}Wrapper for ORC VectorizedRowBatch providing size information and batch access.
/**
* Wrapper for ORC VectorizedRowBatch
* Provides access to the underlying batch and size information
*/
public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
/**
* Create batch wrapper
* @param batch ORC VectorizedRowBatch to wrap
*/
public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);
/**
* Get the wrapped ORC batch
* @return Underlying VectorizedRowBatch
*/
public VectorizedRowBatch getBatch();
/**
* Get number of rows in the batch
* @return Number of rows currently in batch
*/
public int size();
}import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;
import org.apache.orc.storage.ql.exec.vector.*;
// Create ORC vectors
LongColumnVector longVector = new LongColumnVector(1024);
DoubleColumnVector doubleVector = new DoubleColumnVector(1024);
BytesColumnVector bytesVector = new BytesColumnVector(1024);
DecimalColumnVector decimalVector = new DecimalColumnVector(1024, 10, 2);
TimestampColumnVector timestampVector = new TimestampColumnVector(1024);
// Automatically create appropriate Flink vectors
ColumnVector[] flinkVectors = new ColumnVector[] {
AbstractOrcNoHiveVector.createFlinkVector(longVector), // OrcNoHiveLongVector
AbstractOrcNoHiveVector.createFlinkVector(doubleVector), // OrcNoHiveDoubleVector
AbstractOrcNoHiveVector.createFlinkVector(bytesVector), // OrcNoHiveBytesVector
AbstractOrcNoHiveVector.createFlinkVector(decimalVector), // OrcNoHiveDecimalVector
AbstractOrcNoHiveVector.createFlinkVector(timestampVector) // OrcNoHiveTimestampVector
};import org.apache.flink.table.types.logical.*;
int batchSize = 1024;
// Create constant vectors for partition values
ColumnVector countryVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
new VarCharType(50), "US", batchSize
);
ColumnVector yearVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
new IntType(), 2023, batchSize
);
ColumnVector isActiveVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
new BooleanType(), true, batchSize
);
// All rows in batch will have the same partition values
for (int i = 0; i < batchSize; i++) {
assert countryVector.getString(i).toString().equals("US");
assert yearVector.getInt(i) == 2023;
assert isActiveVector.getBoolean(i) == true;
}| Flink Logical Type | ORC Vector Type | Flink Vector Interface | Notes |
|---|---|---|---|
| BOOLEAN | LongColumnVector | BooleanColumnVector | 1=true, 0=false |
| TINYINT | LongColumnVector | ByteColumnVector | Cast from long |
| SMALLINT | LongColumnVector | ShortColumnVector | Cast from long |
| INTEGER | LongColumnVector | IntColumnVector | Cast from long |
| BIGINT | LongColumnVector | LongColumnVector | Direct mapping |
| FLOAT | DoubleColumnVector | FloatColumnVector | Cast from double |
| DOUBLE | DoubleColumnVector | DoubleColumnVector | Direct mapping |
| CHAR, VARCHAR | BytesColumnVector | BytesColumnVector | UTF-8 bytes |
| BINARY, VARBINARY | BytesColumnVector | BytesColumnVector | Raw bytes |
| DECIMAL | DecimalColumnVector | DecimalColumnVector | HiveDecimal format |
| DATE | LongColumnVector | IntColumnVector | Days since epoch |
| TIMESTAMP_* | TimestampColumnVector | TimestampColumnVector | Microsecond precision |
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
// Process vectorized batch with mixed types
public void processBatch(VectorizedColumnBatch batch) {
int numRows = batch.getNumRows();
// Get typed column vectors
LongColumnVector idVector = (LongColumnVector) batch.getColumn(0);
BytesColumnVector nameVector = (BytesColumnVector) batch.getColumn(1);
IntColumnVector ageVector = (IntColumnVector) batch.getColumn(2);
DecimalColumnVector salaryVector = (DecimalColumnVector) batch.getColumn(3);
// Process rows in batch
for (int i = 0; i < numRows; i++) {
if (!idVector.isNullAt(i)) {
long id = idVector.getLong(i);
String name = nameVector.isNullAt(i) ? null :
new String(nameVector.getBytes(i).getData(), StandardCharsets.UTF_8);
int age = ageVector.isNullAt(i) ? 0 : ageVector.getInt(i);
DecimalData salary = salaryVector.isNullAt(i) ? null :
salaryVector.getDecimal(i, 10, 2);
// Process row data
processRow(id, name, age, salary);
}
}
}// Safe access pattern for nullable columns
public <T> T safeGet(ColumnVector vector, int index, Function<Integer, T> getter, T defaultValue) {
return vector.isNullAt(index) ? defaultValue : getter.apply(index);
}
// Usage examples
LongColumnVector longVector = (LongColumnVector) batch.getColumn(0);
BytesColumnVector stringVector = (BytesColumnVector) batch.getColumn(1);
for (int i = 0; i < batch.getNumRows(); i++) {
Long id = safeGet(longVector, i, longVector::getLong, null);
String name = safeGet(stringVector, i,
idx -> new String(stringVector.getBytes(idx).getData(), StandardCharsets.UTF_8),
"UNKNOWN");
if (id != null) {
processRecord(id, name);
}
}// Efficient: Sequential access within batch
for (int i = 0; i < batch.getNumRows(); i++) {
processValue(vector.getLong(i));
}
// Less efficient: Random access pattern
for (int i : randomIndices) {
processValue(vector.getLong(i));
}
// Efficient: Bulk null checking
if (vector.hasNulls()) {
// Handle nulls explicitly
for (int i = 0; i < batch.getNumRows(); i++) {
if (!vector.isNullAt(i)) {
processValue(vector.getLong(i));
}
}
} else {
// No nulls, skip null checks
for (int i = 0; i < batch.getNumRows(); i++) {
processValue(vector.getLong(i));
}
}try {
ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);
for (int i = 0; i < batchSize; i++) {
if (!flinkVector.isNullAt(i)) {
// Type-safe access
if (flinkVector instanceof LongColumnVector) {
long value = ((LongColumnVector) flinkVector).getLong(i);
}
}
}
} catch (UnsupportedOperationException e) {
// Handle unsupported vector types
logger.error("Unsupported ORC vector type: " + orcVector.getClass(), e);
} catch (ClassCastException e) {
// Handle type mismatches
logger.error("Vector type mismatch during access", e);
}