0
# Vector Processing
1
2
High-performance vector implementations that adapt ORC column vectors to Flink's vector API for efficient columnar data processing. Provides type-safe access to vectorized data with support for all standard data types.
3
4
## Capabilities
5
6
### Abstract Vector Base
7
8
Base class for all ORC to Flink vector adapters, providing common null handling and factory methods.
9
10
```java { .api }
11
/**
12
* Base class for adapting ORC column vectors to Flink column vectors
13
* Provides common functionality for null handling and vector creation
14
*/
15
public abstract class AbstractOrcNoHiveVector implements ColumnVector {
16
17
/**
18
* Check if value at given index is null
19
* @param i Row index to check
20
* @return true if value is null, false otherwise
21
*/
22
public boolean isNullAt(int i);
23
24
/**
25
* Create appropriate Flink vector from ORC column vector
26
* Automatically detects vector type and creates corresponding adapter
27
* @param vector ORC column vector to adapt
28
* @return Flink column vector implementation
29
* @throws UnsupportedOperationException for unsupported vector types
30
*/
31
public static ColumnVector createFlinkVector(ColumnVector vector);
32
33
/**
34
* Create Flink vector from constant value for partition columns
35
* @param type Logical type of the constant value
36
* @param value Constant value to fill vector with
37
* @param batchSize Number of rows in the vector
38
* @return Flink column vector filled with constant value
39
* @throws UnsupportedOperationException for unsupported types
40
*/
41
public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);
42
}
43
```
44
45
**Usage Examples:**
46
47
```java
48
import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;
49
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
50
51
// Create Flink vector from ORC vector
52
LongColumnVector orcVector = new LongColumnVector(1024);
53
ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);
54
55
// Create constant vector for partition column
56
LogicalType stringType = new VarCharType(50);
57
ColumnVector constantVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
58
stringType,
59
"US", // partition value
60
1024 // batch size
61
);
62
63
// Check for nulls
64
for (int i = 0; i < batchSize; i++) {
65
if (!flinkVector.isNullAt(i)) {
66
// Process non-null value
67
long value = ((LongColumnVector) flinkVector).getLong(i);
68
}
69
}
70
```
71
72
### Long Vector Adapter
73
74
Adapter for ORC LongColumnVector supporting multiple Flink integer and boolean types.
75
76
```java { .api }
77
/**
78
* Adapter for ORC LongColumnVector to Flink's numeric column vectors
79
* Supports boolean, byte, short, int, and long data types
80
*/
81
public class OrcNoHiveLongVector extends AbstractOrcNoHiveVector
82
implements LongColumnVector, BooleanColumnVector, ByteColumnVector,
83
ShortColumnVector, IntColumnVector {
84
85
/**
86
* Create long vector adapter
87
* @param vector ORC LongColumnVector to adapt
88
*/
89
public OrcNoHiveLongVector(LongColumnVector vector);
90
91
/**
92
* Get long value at specified index
93
* @param i Row index
94
* @return Long value at index
95
*/
96
public long getLong(int i);
97
98
/**
99
* Get boolean value at specified index (1 = true, 0 = false)
100
* @param i Row index
101
* @return Boolean value at index
102
*/
103
public boolean getBoolean(int i);
104
105
/**
106
* Get byte value at specified index
107
* @param i Row index
108
* @return Byte value at index
109
*/
110
public byte getByte(int i);
111
112
/**
113
* Get short value at specified index
114
* @param i Row index
115
* @return Short value at index
116
*/
117
public short getShort(int i);
118
119
/**
120
* Get int value at specified index
121
* @param i Row index
122
* @return Int value at index
123
*/
124
public int getInt(int i);
125
}
126
```
127
128
### Double Vector Adapter
129
130
Adapter for ORC DoubleColumnVector supporting float and double types.
131
132
```java { .api }
133
/**
134
* Adapter for ORC DoubleColumnVector to Flink's floating-point column vectors
135
* Supports both float and double data types
136
*/
137
public class OrcNoHiveDoubleVector extends AbstractOrcNoHiveVector
138
implements DoubleColumnVector, FloatColumnVector {
139
140
/**
141
* Create double vector adapter
142
* @param vector ORC DoubleColumnVector to adapt
143
*/
144
public OrcNoHiveDoubleVector(DoubleColumnVector vector);
145
146
/**
147
* Get double value at specified index
148
* @param i Row index
149
* @return Double value at index
150
*/
151
public double getDouble(int i);
152
153
/**
154
* Get float value at specified index (cast from double)
155
* @param i Row index
156
* @return Float value at index
157
*/
158
public float getFloat(int i);
159
}
160
```
161
162
### Bytes Vector Adapter
163
164
Adapter for ORC BytesColumnVector supporting string and binary types.
165
166
```java { .api }
167
/**
168
* Adapter for ORC BytesColumnVector to Flink's bytes column vector
169
* Supports string, char, varchar, binary, and varbinary types
170
*/
171
public class OrcNoHiveBytesVector extends AbstractOrcNoHiveVector
172
implements BytesColumnVector {
173
174
/**
175
* Create bytes vector adapter
176
* @param vector ORC BytesColumnVector to adapt
177
*/
178
public OrcNoHiveBytesVector(BytesColumnVector vector);
179
180
/**
181
* Get Bytes value at specified index
182
* @param i Row index
183
* @return Bytes object containing byte data, start offset, and length
184
*/
185
public Bytes getBytes(int i);
186
}
187
```
188
189
### Decimal Vector Adapter
190
191
Adapter for ORC DecimalColumnVector supporting high-precision decimal types.
192
193
```java { .api }
194
/**
195
* Adapter for ORC DecimalColumnVector to Flink's decimal column vector
196
* Supports decimal types with configurable precision and scale
197
*/
198
public class OrcNoHiveDecimalVector extends AbstractOrcNoHiveVector
199
implements DecimalColumnVector {
200
201
/**
202
* Create decimal vector adapter
203
* @param vector ORC DecimalColumnVector to adapt
204
*/
205
public OrcNoHiveDecimalVector(DecimalColumnVector vector);
206
207
/**
208
* Get decimal value at specified index
209
* @param i Row index
210
* @param precision Decimal precision (total digits)
211
* @param scale Decimal scale (digits after decimal point)
212
* @return DecimalData value at index
213
*/
214
public DecimalData getDecimal(int i, int precision, int scale);
215
}
216
```
217
218
### Timestamp Vector Adapter
219
220
Adapter for ORC TimestampColumnVector supporting timestamp types.
221
222
```java { .api }
223
/**
224
* Adapter for ORC TimestampColumnVector to Flink's timestamp column vector
225
* Supports timestamp with and without timezone
226
*/
227
public class OrcNoHiveTimestampVector extends AbstractOrcNoHiveVector
228
implements TimestampColumnVector {
229
230
/**
231
* Create timestamp vector adapter
232
* @param vector ORC TimestampColumnVector to adapt
233
*/
234
public OrcNoHiveTimestampVector(TimestampColumnVector vector);
235
236
/**
237
* Get timestamp value at specified index
238
* @param i Row index
239
* @param precision Timestamp precision (digits in fractional seconds)
240
* @return TimestampData value at index
241
*/
242
public TimestampData getTimestamp(int i, int precision);
243
}
244
```
245
246
### Batch Wrapper
247
248
Wrapper for ORC VectorizedRowBatch providing size information and batch access.
249
250
```java { .api }
251
/**
252
* Wrapper for ORC VectorizedRowBatch
253
* Provides access to the underlying batch and size information
254
*/
255
public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {
256
257
/**
258
* Create batch wrapper
259
* @param batch ORC VectorizedRowBatch to wrap
260
*/
261
public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);
262
263
/**
264
* Get the wrapped ORC batch
265
* @return Underlying VectorizedRowBatch
266
*/
267
public VectorizedRowBatch getBatch();
268
269
/**
270
* Get number of rows in the batch
271
* @return Number of rows currently in batch
272
*/
273
public int size();
274
}
275
```
276
277
## Vector Creation Examples
278
279
### Automatic Vector Creation
280
281
```java
282
import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;
283
import org.apache.orc.storage.ql.exec.vector.*;
284
285
// Create ORC vectors
286
LongColumnVector longVector = new LongColumnVector(1024);
287
DoubleColumnVector doubleVector = new DoubleColumnVector(1024);
288
BytesColumnVector bytesVector = new BytesColumnVector(1024);
289
DecimalColumnVector decimalVector = new DecimalColumnVector(1024, 10, 2);
290
TimestampColumnVector timestampVector = new TimestampColumnVector(1024);
291
292
// Automatically create appropriate Flink vectors
293
ColumnVector[] flinkVectors = new ColumnVector[] {
294
AbstractOrcNoHiveVector.createFlinkVector(longVector), // OrcNoHiveLongVector
295
AbstractOrcNoHiveVector.createFlinkVector(doubleVector), // OrcNoHiveDoubleVector
296
AbstractOrcNoHiveVector.createFlinkVector(bytesVector), // OrcNoHiveBytesVector
297
AbstractOrcNoHiveVector.createFlinkVector(decimalVector), // OrcNoHiveDecimalVector
298
AbstractOrcNoHiveVector.createFlinkVector(timestampVector) // OrcNoHiveTimestampVector
299
};
300
```
301
302
### Constant Vector Creation for Partitions
303
304
```java
305
import org.apache.flink.table.types.logical.*;
306
307
int batchSize = 1024;
308
309
// Create constant vectors for partition values
310
ColumnVector countryVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
311
new VarCharType(50), "US", batchSize
312
);
313
314
ColumnVector yearVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
315
new IntType(), 2023, batchSize
316
);
317
318
ColumnVector isActiveVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(
319
new BooleanType(), true, batchSize
320
);
321
322
// All rows in batch will have the same partition values
323
for (int i = 0; i < batchSize; i++) {
324
assert countryVector.getString(i).toString().equals("US");
325
assert yearVector.getInt(i) == 2023;
326
assert isActiveVector.getBoolean(i) == true;
327
}
328
```
329
330
## Type Mapping
331
332
| Flink Logical Type | ORC Vector Type | Flink Vector Interface | Notes |
333
|-------------------|----------------|----------------------|--------|
334
| BOOLEAN | LongColumnVector | BooleanColumnVector | 1=true, 0=false |
335
| TINYINT | LongColumnVector | ByteColumnVector | Cast from long |
336
| SMALLINT | LongColumnVector | ShortColumnVector | Cast from long |
337
| INTEGER | LongColumnVector | IntColumnVector | Cast from long |
338
| BIGINT | LongColumnVector | LongColumnVector | Direct mapping |
339
| FLOAT | DoubleColumnVector | FloatColumnVector | Cast from double |
340
| DOUBLE | DoubleColumnVector | DoubleColumnVector | Direct mapping |
341
| CHAR, VARCHAR | BytesColumnVector | BytesColumnVector | UTF-8 bytes |
342
| BINARY, VARBINARY | BytesColumnVector | BytesColumnVector | Raw bytes |
343
| DECIMAL | DecimalColumnVector | DecimalColumnVector | HiveDecimal format |
344
| DATE | LongColumnVector | IntColumnVector | Days since epoch |
345
| TIMESTAMP_* | TimestampColumnVector | TimestampColumnVector | Microsecond precision |
346
347
## Vectorized Processing Patterns
348
349
### Batch Processing with Type Safety
350
351
```java
352
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
353
354
// Process vectorized batch with mixed types
355
public void processBatch(VectorizedColumnBatch batch) {
356
int numRows = batch.getNumRows();
357
358
// Get typed column vectors
359
LongColumnVector idVector = (LongColumnVector) batch.getColumn(0);
360
BytesColumnVector nameVector = (BytesColumnVector) batch.getColumn(1);
361
IntColumnVector ageVector = (IntColumnVector) batch.getColumn(2);
362
DecimalColumnVector salaryVector = (DecimalColumnVector) batch.getColumn(3);
363
364
// Process rows in batch
365
for (int i = 0; i < numRows; i++) {
366
if (!idVector.isNullAt(i)) {
367
long id = idVector.getLong(i);
368
String name = nameVector.isNullAt(i) ? null :
369
new String(nameVector.getBytes(i).getData(), StandardCharsets.UTF_8);
370
int age = ageVector.isNullAt(i) ? 0 : ageVector.getInt(i);
371
DecimalData salary = salaryVector.isNullAt(i) ? null :
372
salaryVector.getDecimal(i, 10, 2);
373
374
// Process row data
375
processRow(id, name, age, salary);
376
}
377
}
378
}
379
```
380
381
### Null-Safe Vector Access
382
383
```java
384
// Safe access pattern for nullable columns
385
public <T> T safeGet(ColumnVector vector, int index, Function<Integer, T> getter, T defaultValue) {
386
return vector.isNullAt(index) ? defaultValue : getter.apply(index);
387
}
388
389
// Usage examples
390
LongColumnVector longVector = (LongColumnVector) batch.getColumn(0);
391
BytesColumnVector stringVector = (BytesColumnVector) batch.getColumn(1);
392
393
for (int i = 0; i < batch.getNumRows(); i++) {
394
Long id = safeGet(longVector, i, longVector::getLong, null);
395
String name = safeGet(stringVector, i,
396
idx -> new String(stringVector.getBytes(idx).getData(), StandardCharsets.UTF_8),
397
"UNKNOWN");
398
399
if (id != null) {
400
processRecord(id, name);
401
}
402
}
403
```
404
405
## Performance Considerations
406
407
### Memory Management
408
409
- **Vector Reuse**: Vectors are reused across batches to minimize allocations
410
- **Lazy Conversion**: Values are converted from ORC format only when accessed
411
- **Batch Size**: Larger batches improve throughput but use more memory
412
- **Null Handling**: Optimized null checking avoids unnecessary object creation
413
414
### Access Patterns
415
416
```java
417
// Efficient: Sequential access within batch
418
for (int i = 0; i < batch.getNumRows(); i++) {
419
processValue(vector.getLong(i));
420
}
421
422
// Less efficient: Random access pattern
423
for (int i : randomIndices) {
424
processValue(vector.getLong(i));
425
}
426
427
// Efficient: Bulk null checking
428
if (vector.hasNulls()) {
429
// Handle nulls explicitly
430
for (int i = 0; i < batch.getNumRows(); i++) {
431
if (!vector.isNullAt(i)) {
432
processValue(vector.getLong(i));
433
}
434
}
435
} else {
436
// No nulls, skip null checks
437
for (int i = 0; i < batch.getNumRows(); i++) {
438
processValue(vector.getLong(i));
439
}
440
}
441
```
442
443
## Error Handling
444
445
```java
446
try {
447
ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);
448
449
for (int i = 0; i < batchSize; i++) {
450
if (!flinkVector.isNullAt(i)) {
451
// Type-safe access
452
if (flinkVector instanceof LongColumnVector) {
453
long value = ((LongColumnVector) flinkVector).getLong(i);
454
}
455
}
456
}
457
} catch (UnsupportedOperationException e) {
458
// Handle unsupported vector types
459
logger.error("Unsupported ORC vector type: " + orcVector.getClass(), e);
460
} catch (ClassCastException e) {
461
// Handle type mismatches
462
logger.error("Vector type mismatch during access", e);
463
}
464
```