CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-orc

Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files

Pending
Overview
Eval results
Files

vector-processing.mddocs/

Vector Processing

The ORC format provides a comprehensive vector processing system for high-performance columnar data operations. The vector system handles the conversion between ORC's native column vectors and Flink's column vector format, enabling efficient vectorized processing.

Abstract Base Vector

public abstract class AbstractOrcColumnVector {
    public static ColumnVector createFlinkVector(
        org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector, 
        LogicalType type
    );
    
    public static ColumnVector createFlinkVectorFromConstant(
        LogicalType type, 
        Object value, 
        int batchSize
    );
}

Column Vector Types

Primitive Type Vectors

// Long values (integers, dates, timestamps as long)
public class OrcLongColumnVector extends AbstractOrcColumnVector {
    public OrcLongColumnVector(LongColumnVector vector);
}

// Double values (floats, doubles)
public class OrcDoubleColumnVector extends AbstractOrcColumnVector {
    public OrcDoubleColumnVector(DoubleColumnVector vector);
}

// String and binary data
public class OrcBytesColumnVector extends AbstractOrcColumnVector {
    public OrcBytesColumnVector(BytesColumnVector vector);
}

// Decimal values with precision/scale
public class OrcDecimalColumnVector extends AbstractOrcColumnVector {
    public OrcDecimalColumnVector(DecimalColumnVector vector);
}

Temporal Type Vectors

// Timestamp values
public class OrcTimestampColumnVector extends AbstractOrcColumnVector {
    public OrcTimestampColumnVector(TimestampColumnVector vector);
}

// Legacy timestamp support
public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector {
    public OrcLegacyTimestampColumnVector(TimestampColumnVector vector);
}

Complex Type Vectors

// Array/List values
public class OrcArrayColumnVector extends AbstractOrcColumnVector {
    public OrcArrayColumnVector(ListColumnVector vector, ColumnVector child);
}

// Map values
public class OrcMapColumnVector extends AbstractOrcColumnVector {
    public OrcMapColumnVector(MapColumnVector vector, ColumnVector keyVector, ColumnVector valueVector);
}

// Struct/Row values  
public class OrcRowColumnVector extends AbstractOrcColumnVector {
    public OrcRowColumnVector(StructColumnVector vector, ColumnVector[] children);
}

Vectorized Batch Processing

Batch Wrapper

public class OrcVectorizedBatchWrapper<BatchT> {
    public BatchT getBatch();
    public int size();
    public void reset();
}

public class HiveOrcBatchWrapper extends OrcVectorizedBatchWrapper<VectorizedRowBatch> {
    public HiveOrcBatchWrapper(VectorizedRowBatch batch);
    public VectorizedRowBatch getBatch();
}

Column Batch Factory

@FunctionalInterface
public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {
    VectorizedColumnBatch create(SplitT split, BatchT batch);
}

Usage Examples

Creating Flink Vectors from ORC Vectors

import org.apache.flink.orc.vector.AbstractOrcColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.flink.table.types.logical.*;

// Convert ORC long vector to Flink vector
LongColumnVector orcLongVector = // ... from ORC batch
LogicalType intType = new IntType();
ColumnVector flinkVector = AbstractOrcColumnVector.createFlinkVector(orcLongVector, intType);

// Convert ORC string vector
BytesColumnVector orcStringVector = // ... from ORC batch
LogicalType varcharType = new VarCharType(255);
ColumnVector stringVector = AbstractOrcColumnVector.createFlinkVector(orcStringVector, varcharType);

// Convert ORC decimal vector
DecimalColumnVector orcDecimalVector = // ... from ORC batch
LogicalType decimalType = new DecimalType(10, 2);
ColumnVector decimalVector = AbstractOrcColumnVector.createFlinkVector(orcDecimalVector, decimalType);

Creating Constant Vectors

// Create constant integer vector
LogicalType intType = new IntType();
ColumnVector constantIntVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
    intType, 
    42,      // constant value
    1024     // batch size
);

// Create constant string vector
LogicalType stringType = new VarCharType(100);
ColumnVector constantStringVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
    stringType, 
    "default_value", 
    1024
);

// Create constant null vector
ColumnVector constantNullVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
    intType, 
    null,    // null value
    1024
);

Complex Type Vector Processing

// Process array column
ListColumnVector orcListVector = // ... from ORC batch
ArrayType arrayType = new ArrayType(new VarCharType(100));

// Create child vector for array elements
ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(
    orcListVector.child, 
    arrayType.getElementType()
);

// Create array vector
OrcArrayColumnVector arrayVector = new OrcArrayColumnVector(orcListVector, childVector);

// Process map column
MapColumnVector orcMapVector = // ... from ORC batch  
MapType mapType = new MapType(new VarCharType(50), new IntType());

ColumnVector keyVector = AbstractOrcColumnVector.createFlinkVector(
    orcMapVector.keys, 
    mapType.getKeyType()
);
ColumnVector valueVector = AbstractOrcColumnVector.createFlinkVector(
    orcMapVector.values, 
    mapType.getValueType()
);

OrcMapColumnVector mapVector = new OrcMapColumnVector(orcMapVector, keyVector, valueVector);

Custom Batch Factory

// Custom batch factory for specialized processing
ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory = 
    (split, orcBatch) -> {
        int numFields = orcBatch.numCols;
        ColumnVector[] flinkVectors = new ColumnVector[numFields];
        
        for (int i = 0; i < numFields; i++) {
            org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = orcBatch.cols[i];
            LogicalType fieldType = getFieldType(i); // your logic to get field type
            
            // Handle different ORC vector types
            if (orcVector instanceof LongColumnVector) {
                flinkVectors[i] = new OrcLongColumnVector((LongColumnVector) orcVector);
            } else if (orcVector instanceof DoubleColumnVector) {
                flinkVectors[i] = new OrcDoubleColumnVector((DoubleColumnVector) orcVector);
            } else if (orcVector instanceof BytesColumnVector) {
                flinkVectors[i] = new OrcBytesColumnVector((BytesColumnVector) orcVector);
            } else if (orcVector instanceof DecimalColumnVector) {
                flinkVectors[i] = new OrcDecimalColumnVector((DecimalColumnVector) orcVector);
            } else if (orcVector instanceof TimestampColumnVector) {
                flinkVectors[i] = new OrcTimestampColumnVector((TimestampColumnVector) orcVector);
            } else if (orcVector instanceof ListColumnVector) {
                // Handle complex array type
                ListColumnVector listVector = (ListColumnVector) orcVector;
                ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(
                    listVector.child, 
                    ((ArrayType) fieldType).getElementType()
                );
                flinkVectors[i] = new OrcArrayColumnVector(listVector, childVector);
            }
            // ... handle other complex types
        }
        
        return new VectorizedColumnBatch(flinkVectors);
    };

Type System Integration

Supported Type Mappings

Flink LogicalTypeORC Vector TypeFlink Vector Class
BooleanTypeLongColumnVectorOrcLongColumnVector
TinyIntTypeLongColumnVectorOrcLongColumnVector
SmallIntTypeLongColumnVectorOrcLongColumnVector
IntTypeLongColumnVectorOrcLongColumnVector
BigIntTypeLongColumnVectorOrcLongColumnVector
FloatTypeDoubleColumnVectorOrcDoubleColumnVector
DoubleTypeDoubleColumnVectorOrcDoubleColumnVector
VarCharTypeBytesColumnVectorOrcBytesColumnVector
CharTypeBytesColumnVectorOrcBytesColumnVector
BinaryTypeBytesColumnVectorOrcBytesColumnVector
VarBinaryTypeBytesColumnVectorOrcBytesColumnVector
DecimalTypeDecimalColumnVectorOrcDecimalColumnVector
DateTypeLongColumnVectorOrcLongColumnVector
TimestampTypeTimestampColumnVectorOrcTimestampColumnVector
ArrayTypeListColumnVectorOrcArrayColumnVector
MapTypeMapColumnVectorOrcMapColumnVector
RowTypeStructColumnVectorOrcRowColumnVector

Vector Creation Logic

public static ColumnVector createFlinkVector(
    org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector, 
    LogicalType type) {
    
    switch (type.getTypeRoot()) {
        case BOOLEAN:
        case TINYINT:
        case SMALLINT:
        case INTEGER:
        case BIGINT:
        case DATE:
            return new OrcLongColumnVector((LongColumnVector) orcVector);
            
        case FLOAT:
        case DOUBLE:
            return new OrcDoubleColumnVector((DoubleColumnVector) orcVector);
            
        case CHAR:
        case VARCHAR:
        case BINARY:
        case VARBINARY:
            return new OrcBytesColumnVector((BytesColumnVector) orcVector);
            
        case DECIMAL:
            return new OrcDecimalColumnVector((DecimalColumnVector) orcVector);
            
        case TIMESTAMP_WITHOUT_TIME_ZONE:
        case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
            return new OrcTimestampColumnVector((TimestampColumnVector) orcVector);
            
        case ARRAY:
            ArrayType arrayType = (ArrayType) type;
            ListColumnVector listVector = (ListColumnVector) orcVector;
            ColumnVector childVector = createFlinkVector(listVector.child, arrayType.getElementType());
            return new OrcArrayColumnVector(listVector, childVector);
            
        case MAP:
            MapType mapType = (MapType) type;
            MapColumnVector mapVector = (MapColumnVector) orcVector;
            ColumnVector keyVector = createFlinkVector(mapVector.keys, mapType.getKeyType());
            ColumnVector valueVector = createFlinkVector(mapVector.values, mapType.getValueType());
            return new OrcMapColumnVector(mapVector, keyVector, valueVector);
            
        case ROW:
            RowType rowType = (RowType) type;
            StructColumnVector structVector = (StructColumnVector) orcVector;
            ColumnVector[] childVectors = new ColumnVector[structVector.fields.length];
            for (int i = 0; i < childVectors.length; i++) {
                childVectors[i] = createFlinkVector(structVector.fields[i], rowType.getTypeAt(i));
            }
            return new OrcRowColumnVector(structVector, childVectors);
            
        default:
            throw new UnsupportedOperationException("Unsupported type: " + type);
    }
}

Performance Considerations

Memory Management

// Vectors share underlying data arrays with ORC vectors - no copying
OrcLongColumnVector flinkVector = new OrcLongColumnVector(orcLongVector);
// flinkVector.vector points to same array as orcLongVector.vector

Batch Size Optimization

// Default ORC batch size
int defaultBatchSize = VectorizedRowBatch.DEFAULT_SIZE; // 1024

// Custom batch size for memory optimization
VectorizedRowBatch customBatch = schema.createRowBatch(2048);

Null Handling

// Check for null values in vector
if (orcVector.noNulls) {
    // No null values in this vector - optimized processing
    processNonNullVector(flinkVector);
} else {
    // Check isNull array for each row
    for (int i = 0; i < batchSize; i++) {
        if (!orcVector.isNull[i]) {
            processValue(flinkVector, i);
        }
    }
}

Timestamp Utilities

TimestampUtil

Specialized utility class for handling timestamp operations and vector creation.

public class TimestampUtil {
    public static boolean isHiveTimestampColumnVector(
        org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector
    );
    
    public static ColumnVector createVectorFromConstant(
        LogicalType type,
        Object value,
        int batchSize
    );
}

Usage Examples:

import org.apache.flink.orc.TimestampUtil;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;

// Check if ORC vector is timestamp type
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = // ... from ORC batch
boolean isTimestamp = TimestampUtil.isHiveTimestampColumnVector(orcVector);

// Create constant timestamp vector
LogicalType timestampType = new TimestampType(3);
ColumnVector constantTimestampVector = TimestampUtil.createVectorFromConstant(
    timestampType,
    Timestamp.valueOf("2023-01-01 12:00:00"),
    1024
);

// Create timestamp vector with null value
ColumnVector nullTimestampVector = TimestampUtil.createVectorFromConstant(
    timestampType,
    null,
    1024
);

Integration with Reading Pipeline

The vector processing system integrates seamlessly with the ORC reading pipeline:

  1. ORC File Reading: Native ORC vectors loaded from file
  2. Vector Conversion: ORC vectors wrapped in Flink vector implementations
  3. Batch Creation: ColumnBatchFactory creates VectorizedColumnBatch
  4. Row Iteration: ColumnarRowIterator provides row-by-row access
  5. Type Safety: Full type system integration ensures correctness

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-orc

docs

bulk-writing.md

columnar-reading.md

index.md

predicate-pushdown.md

table-api.md

vector-processing.md

tile.json