Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
—
The ORC format provides a comprehensive vector processing system for high-performance columnar data operations. The vector system handles the conversion between ORC's native column vectors and Flink's column vector format, enabling efficient vectorized processing.
public abstract class AbstractOrcColumnVector {
public static ColumnVector createFlinkVector(
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector,
LogicalType type
);
public static ColumnVector createFlinkVectorFromConstant(
LogicalType type,
Object value,
int batchSize
);
}// Long values (integers, dates, timestamps as long)
public class OrcLongColumnVector extends AbstractOrcColumnVector {
public OrcLongColumnVector(LongColumnVector vector);
}
// Double values (floats, doubles)
public class OrcDoubleColumnVector extends AbstractOrcColumnVector {
public OrcDoubleColumnVector(DoubleColumnVector vector);
}
// String and binary data
public class OrcBytesColumnVector extends AbstractOrcColumnVector {
public OrcBytesColumnVector(BytesColumnVector vector);
}
// Decimal values with precision/scale
public class OrcDecimalColumnVector extends AbstractOrcColumnVector {
public OrcDecimalColumnVector(DecimalColumnVector vector);
}// Timestamp values
public class OrcTimestampColumnVector extends AbstractOrcColumnVector {
public OrcTimestampColumnVector(TimestampColumnVector vector);
}
// Legacy timestamp support
public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector {
public OrcLegacyTimestampColumnVector(TimestampColumnVector vector);
}// Array/List values
public class OrcArrayColumnVector extends AbstractOrcColumnVector {
public OrcArrayColumnVector(ListColumnVector vector, ColumnVector child);
}
// Map values
public class OrcMapColumnVector extends AbstractOrcColumnVector {
public OrcMapColumnVector(MapColumnVector vector, ColumnVector keyVector, ColumnVector valueVector);
}
// Struct/Row values
public class OrcRowColumnVector extends AbstractOrcColumnVector {
public OrcRowColumnVector(StructColumnVector vector, ColumnVector[] children);
}public class OrcVectorizedBatchWrapper<BatchT> {
public BatchT getBatch();
public int size();
public void reset();
}
public class HiveOrcBatchWrapper extends OrcVectorizedBatchWrapper<VectorizedRowBatch> {
public HiveOrcBatchWrapper(VectorizedRowBatch batch);
public VectorizedRowBatch getBatch();
}@FunctionalInterface
public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {
VectorizedColumnBatch create(SplitT split, BatchT batch);
}import org.apache.flink.orc.vector.AbstractOrcColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.flink.table.types.logical.*;
// Convert ORC long vector to Flink vector
LongColumnVector orcLongVector = // ... from ORC batch
LogicalType intType = new IntType();
ColumnVector flinkVector = AbstractOrcColumnVector.createFlinkVector(orcLongVector, intType);
// Convert ORC string vector
BytesColumnVector orcStringVector = // ... from ORC batch
LogicalType varcharType = new VarCharType(255);
ColumnVector stringVector = AbstractOrcColumnVector.createFlinkVector(orcStringVector, varcharType);
// Convert ORC decimal vector
DecimalColumnVector orcDecimalVector = // ... from ORC batch
LogicalType decimalType = new DecimalType(10, 2);
ColumnVector decimalVector = AbstractOrcColumnVector.createFlinkVector(orcDecimalVector, decimalType);// Create constant integer vector
LogicalType intType = new IntType();
ColumnVector constantIntVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
intType,
42, // constant value
1024 // batch size
);
// Create constant string vector
LogicalType stringType = new VarCharType(100);
ColumnVector constantStringVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
stringType,
"default_value",
1024
);
// Create constant null vector
ColumnVector constantNullVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
intType,
null, // null value
1024
);// Process array column
ListColumnVector orcListVector = // ... from ORC batch
ArrayType arrayType = new ArrayType(new VarCharType(100));
// Create child vector for array elements
ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(
orcListVector.child,
arrayType.getElementType()
);
// Create array vector
OrcArrayColumnVector arrayVector = new OrcArrayColumnVector(orcListVector, childVector);
// Process map column
MapColumnVector orcMapVector = // ... from ORC batch
MapType mapType = new MapType(new VarCharType(50), new IntType());
ColumnVector keyVector = AbstractOrcColumnVector.createFlinkVector(
orcMapVector.keys,
mapType.getKeyType()
);
ColumnVector valueVector = AbstractOrcColumnVector.createFlinkVector(
orcMapVector.values,
mapType.getValueType()
);
OrcMapColumnVector mapVector = new OrcMapColumnVector(orcMapVector, keyVector, valueVector);// Custom batch factory for specialized processing
ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory =
(split, orcBatch) -> {
int numFields = orcBatch.numCols;
ColumnVector[] flinkVectors = new ColumnVector[numFields];
for (int i = 0; i < numFields; i++) {
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = orcBatch.cols[i];
LogicalType fieldType = getFieldType(i); // your logic to get field type
// Handle different ORC vector types
if (orcVector instanceof LongColumnVector) {
flinkVectors[i] = new OrcLongColumnVector((LongColumnVector) orcVector);
} else if (orcVector instanceof DoubleColumnVector) {
flinkVectors[i] = new OrcDoubleColumnVector((DoubleColumnVector) orcVector);
} else if (orcVector instanceof BytesColumnVector) {
flinkVectors[i] = new OrcBytesColumnVector((BytesColumnVector) orcVector);
} else if (orcVector instanceof DecimalColumnVector) {
flinkVectors[i] = new OrcDecimalColumnVector((DecimalColumnVector) orcVector);
} else if (orcVector instanceof TimestampColumnVector) {
flinkVectors[i] = new OrcTimestampColumnVector((TimestampColumnVector) orcVector);
} else if (orcVector instanceof ListColumnVector) {
// Handle complex array type
ListColumnVector listVector = (ListColumnVector) orcVector;
ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(
listVector.child,
((ArrayType) fieldType).getElementType()
);
flinkVectors[i] = new OrcArrayColumnVector(listVector, childVector);
}
// ... handle other complex types
}
return new VectorizedColumnBatch(flinkVectors);
};| Flink LogicalType | ORC Vector Type | Flink Vector Class |
|---|---|---|
BooleanType | LongColumnVector | OrcLongColumnVector |
TinyIntType | LongColumnVector | OrcLongColumnVector |
SmallIntType | LongColumnVector | OrcLongColumnVector |
IntType | LongColumnVector | OrcLongColumnVector |
BigIntType | LongColumnVector | OrcLongColumnVector |
FloatType | DoubleColumnVector | OrcDoubleColumnVector |
DoubleType | DoubleColumnVector | OrcDoubleColumnVector |
VarCharType | BytesColumnVector | OrcBytesColumnVector |
CharType | BytesColumnVector | OrcBytesColumnVector |
BinaryType | BytesColumnVector | OrcBytesColumnVector |
VarBinaryType | BytesColumnVector | OrcBytesColumnVector |
DecimalType | DecimalColumnVector | OrcDecimalColumnVector |
DateType | LongColumnVector | OrcLongColumnVector |
TimestampType | TimestampColumnVector | OrcTimestampColumnVector |
ArrayType | ListColumnVector | OrcArrayColumnVector |
MapType | MapColumnVector | OrcMapColumnVector |
RowType | StructColumnVector | OrcRowColumnVector |
public static ColumnVector createFlinkVector(
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector,
LogicalType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case DATE:
return new OrcLongColumnVector((LongColumnVector) orcVector);
case FLOAT:
case DOUBLE:
return new OrcDoubleColumnVector((DoubleColumnVector) orcVector);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
return new OrcBytesColumnVector((BytesColumnVector) orcVector);
case DECIMAL:
return new OrcDecimalColumnVector((DecimalColumnVector) orcVector);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return new OrcTimestampColumnVector((TimestampColumnVector) orcVector);
case ARRAY:
ArrayType arrayType = (ArrayType) type;
ListColumnVector listVector = (ListColumnVector) orcVector;
ColumnVector childVector = createFlinkVector(listVector.child, arrayType.getElementType());
return new OrcArrayColumnVector(listVector, childVector);
case MAP:
MapType mapType = (MapType) type;
MapColumnVector mapVector = (MapColumnVector) orcVector;
ColumnVector keyVector = createFlinkVector(mapVector.keys, mapType.getKeyType());
ColumnVector valueVector = createFlinkVector(mapVector.values, mapType.getValueType());
return new OrcMapColumnVector(mapVector, keyVector, valueVector);
case ROW:
RowType rowType = (RowType) type;
StructColumnVector structVector = (StructColumnVector) orcVector;
ColumnVector[] childVectors = new ColumnVector[structVector.fields.length];
for (int i = 0; i < childVectors.length; i++) {
childVectors[i] = createFlinkVector(structVector.fields[i], rowType.getTypeAt(i));
}
return new OrcRowColumnVector(structVector, childVectors);
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}// Vectors share underlying data arrays with ORC vectors - no copying
OrcLongColumnVector flinkVector = new OrcLongColumnVector(orcLongVector);
// flinkVector.vector points to same array as orcLongVector.vector// Default ORC batch size
int defaultBatchSize = VectorizedRowBatch.DEFAULT_SIZE; // 1024
// Custom batch size for memory optimization
VectorizedRowBatch customBatch = schema.createRowBatch(2048);// Check for null values in vector
if (orcVector.noNulls) {
// No null values in this vector - optimized processing
processNonNullVector(flinkVector);
} else {
// Check isNull array for each row
for (int i = 0; i < batchSize; i++) {
if (!orcVector.isNull[i]) {
processValue(flinkVector, i);
}
}
}Specialized utility class for handling timestamp operations and vector creation.
public class TimestampUtil {
public static boolean isHiveTimestampColumnVector(
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector
);
public static ColumnVector createVectorFromConstant(
LogicalType type,
Object value,
int batchSize
);
}Usage Examples:
import org.apache.flink.orc.TimestampUtil;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
// Check if ORC vector is timestamp type
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = // ... from ORC batch
boolean isTimestamp = TimestampUtil.isHiveTimestampColumnVector(orcVector);
// Create constant timestamp vector
LogicalType timestampType = new TimestampType(3);
ColumnVector constantTimestampVector = TimestampUtil.createVectorFromConstant(
timestampType,
Timestamp.valueOf("2023-01-01 12:00:00"),
1024
);
// Create timestamp vector with null value
ColumnVector nullTimestampVector = TimestampUtil.createVectorFromConstant(
timestampType,
null,
1024
);The vector processing system integrates seamlessly with the ORC reading pipeline:
ColumnBatchFactory creates VectorizedColumnBatchColumnarRowIterator provides row-by-row accessInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-orc