0
# Vector Processing
1
2
The ORC format provides a comprehensive vector processing system for high-performance columnar data operations. The vector system handles the conversion between ORC's native column vectors and Flink's column vector format, enabling efficient vectorized processing.
3
4
## Abstract Base Vector
5
6
```java { .api }
7
public abstract class AbstractOrcColumnVector {
8
public static ColumnVector createFlinkVector(
9
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector,
10
LogicalType type
11
);
12
13
public static ColumnVector createFlinkVectorFromConstant(
14
LogicalType type,
15
Object value,
16
int batchSize
17
);
18
}
19
```
20
21
## Column Vector Types
22
23
### Primitive Type Vectors
24
25
```java { .api }
26
// Long values (integers, dates, timestamps as long)
27
public class OrcLongColumnVector extends AbstractOrcColumnVector {
28
public OrcLongColumnVector(LongColumnVector vector);
29
}
30
31
// Double values (floats, doubles)
32
public class OrcDoubleColumnVector extends AbstractOrcColumnVector {
33
public OrcDoubleColumnVector(DoubleColumnVector vector);
34
}
35
36
// String and binary data
37
public class OrcBytesColumnVector extends AbstractOrcColumnVector {
38
public OrcBytesColumnVector(BytesColumnVector vector);
39
}
40
41
// Decimal values with precision/scale
42
public class OrcDecimalColumnVector extends AbstractOrcColumnVector {
43
public OrcDecimalColumnVector(DecimalColumnVector vector);
44
}
45
```
46
47
### Temporal Type Vectors
48
49
```java { .api }
50
// Timestamp values
51
public class OrcTimestampColumnVector extends AbstractOrcColumnVector {
52
public OrcTimestampColumnVector(TimestampColumnVector vector);
53
}
54
55
// Legacy timestamp support
56
public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector {
57
public OrcLegacyTimestampColumnVector(TimestampColumnVector vector);
58
}
59
```
60
61
### Complex Type Vectors
62
63
```java { .api }
64
// Array/List values
65
public class OrcArrayColumnVector extends AbstractOrcColumnVector {
66
public OrcArrayColumnVector(ListColumnVector vector, ColumnVector child);
67
}
68
69
// Map values
70
public class OrcMapColumnVector extends AbstractOrcColumnVector {
71
public OrcMapColumnVector(MapColumnVector vector, ColumnVector keyVector, ColumnVector valueVector);
72
}
73
74
// Struct/Row values
75
public class OrcRowColumnVector extends AbstractOrcColumnVector {
76
public OrcRowColumnVector(StructColumnVector vector, ColumnVector[] children);
77
}
78
```
79
80
## Vectorized Batch Processing
81
82
### Batch Wrapper
83
84
```java { .api }
85
public class OrcVectorizedBatchWrapper<BatchT> {
86
public BatchT getBatch();
87
public int size();
88
public void reset();
89
}
90
91
public class HiveOrcBatchWrapper extends OrcVectorizedBatchWrapper<VectorizedRowBatch> {
92
public HiveOrcBatchWrapper(VectorizedRowBatch batch);
93
public VectorizedRowBatch getBatch();
94
}
95
```
96
97
### Column Batch Factory
98
99
```java { .api }
100
@FunctionalInterface
101
public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {
102
VectorizedColumnBatch create(SplitT split, BatchT batch);
103
}
104
```
105
106
## Usage Examples
107
108
### Creating Flink Vectors from ORC Vectors
109
110
```java
111
import org.apache.flink.orc.vector.AbstractOrcColumnVector;
112
import org.apache.hadoop.hive.ql.exec.vector.*;
113
import org.apache.flink.table.types.logical.*;
114
115
// Convert ORC long vector to Flink vector
116
LongColumnVector orcLongVector = // ... from ORC batch
117
LogicalType intType = new IntType();
118
ColumnVector flinkVector = AbstractOrcColumnVector.createFlinkVector(orcLongVector, intType);
119
120
// Convert ORC string vector
121
BytesColumnVector orcStringVector = // ... from ORC batch
122
LogicalType varcharType = new VarCharType(255);
123
ColumnVector stringVector = AbstractOrcColumnVector.createFlinkVector(orcStringVector, varcharType);
124
125
// Convert ORC decimal vector
126
DecimalColumnVector orcDecimalVector = // ... from ORC batch
127
LogicalType decimalType = new DecimalType(10, 2);
128
ColumnVector decimalVector = AbstractOrcColumnVector.createFlinkVector(orcDecimalVector, decimalType);
129
```
130
131
### Creating Constant Vectors
132
133
```java
134
// Create constant integer vector
135
LogicalType intType = new IntType();
136
ColumnVector constantIntVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
137
intType,
138
42, // constant value
139
1024 // batch size
140
);
141
142
// Create constant string vector
143
LogicalType stringType = new VarCharType(100);
144
ColumnVector constantStringVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
145
stringType,
146
"default_value",
147
1024
148
);
149
150
// Create constant null vector
151
ColumnVector constantNullVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(
152
intType,
153
null, // null value
154
1024
155
);
156
```
157
158
### Complex Type Vector Processing
159
160
```java
161
// Process array column
162
ListColumnVector orcListVector = // ... from ORC batch
163
ArrayType arrayType = new ArrayType(new VarCharType(100));
164
165
// Create child vector for array elements
166
ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(
167
orcListVector.child,
168
arrayType.getElementType()
169
);
170
171
// Create array vector
172
OrcArrayColumnVector arrayVector = new OrcArrayColumnVector(orcListVector, childVector);
173
174
// Process map column
175
MapColumnVector orcMapVector = // ... from ORC batch
176
MapType mapType = new MapType(new VarCharType(50), new IntType());
177
178
ColumnVector keyVector = AbstractOrcColumnVector.createFlinkVector(
179
orcMapVector.keys,
180
mapType.getKeyType()
181
);
182
ColumnVector valueVector = AbstractOrcColumnVector.createFlinkVector(
183
orcMapVector.values,
184
mapType.getValueType()
185
);
186
187
OrcMapColumnVector mapVector = new OrcMapColumnVector(orcMapVector, keyVector, valueVector);
188
```
189
190
### Custom Batch Factory
191
192
```java
193
// Custom batch factory for specialized processing
194
ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory =
195
(split, orcBatch) -> {
196
int numFields = orcBatch.numCols;
197
ColumnVector[] flinkVectors = new ColumnVector[numFields];
198
199
for (int i = 0; i < numFields; i++) {
200
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = orcBatch.cols[i];
201
LogicalType fieldType = getFieldType(i); // your logic to get field type
202
203
// Handle different ORC vector types
204
if (orcVector instanceof LongColumnVector) {
205
flinkVectors[i] = new OrcLongColumnVector((LongColumnVector) orcVector);
206
} else if (orcVector instanceof DoubleColumnVector) {
207
flinkVectors[i] = new OrcDoubleColumnVector((DoubleColumnVector) orcVector);
208
} else if (orcVector instanceof BytesColumnVector) {
209
flinkVectors[i] = new OrcBytesColumnVector((BytesColumnVector) orcVector);
210
} else if (orcVector instanceof DecimalColumnVector) {
211
flinkVectors[i] = new OrcDecimalColumnVector((DecimalColumnVector) orcVector);
212
} else if (orcVector instanceof TimestampColumnVector) {
213
flinkVectors[i] = new OrcTimestampColumnVector((TimestampColumnVector) orcVector);
214
} else if (orcVector instanceof ListColumnVector) {
215
// Handle complex array type
216
ListColumnVector listVector = (ListColumnVector) orcVector;
217
ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(
218
listVector.child,
219
((ArrayType) fieldType).getElementType()
220
);
221
flinkVectors[i] = new OrcArrayColumnVector(listVector, childVector);
222
}
223
// ... handle other complex types
224
}
225
226
return new VectorizedColumnBatch(flinkVectors);
227
};
228
```
229
230
## Type System Integration
231
232
### Supported Type Mappings
233
234
| Flink LogicalType | ORC Vector Type | Flink Vector Class |
235
|------------------|-----------------|-------------------|
236
| `BooleanType` | `LongColumnVector` | `OrcLongColumnVector` |
237
| `TinyIntType` | `LongColumnVector` | `OrcLongColumnVector` |
238
| `SmallIntType` | `LongColumnVector` | `OrcLongColumnVector` |
239
| `IntType` | `LongColumnVector` | `OrcLongColumnVector` |
240
| `BigIntType` | `LongColumnVector` | `OrcLongColumnVector` |
241
| `FloatType` | `DoubleColumnVector` | `OrcDoubleColumnVector` |
242
| `DoubleType` | `DoubleColumnVector` | `OrcDoubleColumnVector` |
243
| `VarCharType` | `BytesColumnVector` | `OrcBytesColumnVector` |
244
| `CharType` | `BytesColumnVector` | `OrcBytesColumnVector` |
245
| `BinaryType` | `BytesColumnVector` | `OrcBytesColumnVector` |
246
| `VarBinaryType` | `BytesColumnVector` | `OrcBytesColumnVector` |
247
| `DecimalType` | `DecimalColumnVector` | `OrcDecimalColumnVector` |
248
| `DateType` | `LongColumnVector` | `OrcLongColumnVector` |
249
| `TimestampType` | `TimestampColumnVector` | `OrcTimestampColumnVector` |
250
| `ArrayType` | `ListColumnVector` | `OrcArrayColumnVector` |
251
| `MapType` | `MapColumnVector` | `OrcMapColumnVector` |
252
| `RowType` | `StructColumnVector` | `OrcRowColumnVector` |
253
254
### Vector Creation Logic
255
256
```java
257
public static ColumnVector createFlinkVector(
258
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector,
259
LogicalType type) {
260
261
switch (type.getTypeRoot()) {
262
case BOOLEAN:
263
case TINYINT:
264
case SMALLINT:
265
case INTEGER:
266
case BIGINT:
267
case DATE:
268
return new OrcLongColumnVector((LongColumnVector) orcVector);
269
270
case FLOAT:
271
case DOUBLE:
272
return new OrcDoubleColumnVector((DoubleColumnVector) orcVector);
273
274
case CHAR:
275
case VARCHAR:
276
case BINARY:
277
case VARBINARY:
278
return new OrcBytesColumnVector((BytesColumnVector) orcVector);
279
280
case DECIMAL:
281
return new OrcDecimalColumnVector((DecimalColumnVector) orcVector);
282
283
case TIMESTAMP_WITHOUT_TIME_ZONE:
284
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
285
return new OrcTimestampColumnVector((TimestampColumnVector) orcVector);
286
287
case ARRAY:
288
ArrayType arrayType = (ArrayType) type;
289
ListColumnVector listVector = (ListColumnVector) orcVector;
290
ColumnVector childVector = createFlinkVector(listVector.child, arrayType.getElementType());
291
return new OrcArrayColumnVector(listVector, childVector);
292
293
case MAP:
294
MapType mapType = (MapType) type;
295
MapColumnVector mapVector = (MapColumnVector) orcVector;
296
ColumnVector keyVector = createFlinkVector(mapVector.keys, mapType.getKeyType());
297
ColumnVector valueVector = createFlinkVector(mapVector.values, mapType.getValueType());
298
return new OrcMapColumnVector(mapVector, keyVector, valueVector);
299
300
case ROW:
301
RowType rowType = (RowType) type;
302
StructColumnVector structVector = (StructColumnVector) orcVector;
303
ColumnVector[] childVectors = new ColumnVector[structVector.fields.length];
304
for (int i = 0; i < childVectors.length; i++) {
305
childVectors[i] = createFlinkVector(structVector.fields[i], rowType.getTypeAt(i));
306
}
307
return new OrcRowColumnVector(structVector, childVectors);
308
309
default:
310
throw new UnsupportedOperationException("Unsupported type: " + type);
311
}
312
}
313
```
314
315
## Performance Considerations
316
317
### Memory Management
318
319
```java
320
// Vectors share underlying data arrays with ORC vectors - no copying
321
OrcLongColumnVector flinkVector = new OrcLongColumnVector(orcLongVector);
322
// flinkVector.vector points to same array as orcLongVector.vector
323
```
324
325
### Batch Size Optimization
326
327
```java
328
// Default ORC batch size
329
int defaultBatchSize = VectorizedRowBatch.DEFAULT_SIZE; // 1024
330
331
// Custom batch size for memory optimization
332
VectorizedRowBatch customBatch = schema.createRowBatch(2048);
333
```
334
335
### Null Handling
336
337
```java
338
// Check for null values in vector
339
if (orcVector.noNulls) {
340
// No null values in this vector - optimized processing
341
processNonNullVector(flinkVector);
342
} else {
343
// Check isNull array for each row
344
for (int i = 0; i < batchSize; i++) {
345
if (!orcVector.isNull[i]) {
346
processValue(flinkVector, i);
347
}
348
}
349
}
350
```
351
352
## Timestamp Utilities
353
354
### TimestampUtil
355
356
Specialized utility class for handling timestamp operations and vector creation.
357
358
```java { .api }
359
public class TimestampUtil {
360
public static boolean isHiveTimestampColumnVector(
361
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector
362
);
363
364
public static ColumnVector createVectorFromConstant(
365
LogicalType type,
366
Object value,
367
int batchSize
368
);
369
}
370
```
371
372
**Usage Examples:**
373
374
```java
375
import org.apache.flink.orc.TimestampUtil;
376
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
377
378
// Check if ORC vector is timestamp type
379
org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = // ... from ORC batch
380
boolean isTimestamp = TimestampUtil.isHiveTimestampColumnVector(orcVector);
381
382
// Create constant timestamp vector
383
LogicalType timestampType = new TimestampType(3);
384
ColumnVector constantTimestampVector = TimestampUtil.createVectorFromConstant(
385
timestampType,
386
Timestamp.valueOf("2023-01-01 12:00:00"),
387
1024
388
);
389
390
// Create timestamp vector with null value
391
ColumnVector nullTimestampVector = TimestampUtil.createVectorFromConstant(
392
timestampType,
393
null,
394
1024
395
);
396
```
397
398
## Integration with Reading Pipeline
399
400
The vector processing system integrates seamlessly with the ORC reading pipeline:
401
402
1. **ORC File Reading**: Native ORC vectors loaded from file
403
2. **Vector Conversion**: ORC vectors wrapped in Flink vector implementations
404
3. **Batch Creation**: `ColumnBatchFactory` creates `VectorizedColumnBatch`
405
4. **Row Iteration**: `ColumnarRowIterator` provides row-by-row access
406
5. **Type Safety**: Full type system integration ensures correctness