or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdpredicate-pushdown.mdtable-api.mdvector-processing.md

vector-processing.mddocs/

0

# Vector Processing

1

2

The ORC format provides a comprehensive vector processing system for high-performance columnar data operations. The vector system handles the conversion between ORC's native column vectors and Flink's column vector format, enabling efficient vectorized processing.

3

4

## Abstract Base Vector

5

6

```java { .api }

7

public abstract class AbstractOrcColumnVector {

8

public static ColumnVector createFlinkVector(

9

org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector,

10

LogicalType type

11

);

12

13

public static ColumnVector createFlinkVectorFromConstant(

14

LogicalType type,

15

Object value,

16

int batchSize

17

);

18

}

19

```

20

21

## Column Vector Types

22

23

### Primitive Type Vectors

24

25

```java { .api }

26

// Long values (integers, dates, timestamps as long)

27

public class OrcLongColumnVector extends AbstractOrcColumnVector {

28

public OrcLongColumnVector(LongColumnVector vector);

29

}

30

31

// Double values (floats, doubles)

32

public class OrcDoubleColumnVector extends AbstractOrcColumnVector {

33

public OrcDoubleColumnVector(DoubleColumnVector vector);

34

}

35

36

// String and binary data

37

public class OrcBytesColumnVector extends AbstractOrcColumnVector {

38

public OrcBytesColumnVector(BytesColumnVector vector);

39

}

40

41

// Decimal values with precision/scale

42

public class OrcDecimalColumnVector extends AbstractOrcColumnVector {

43

public OrcDecimalColumnVector(DecimalColumnVector vector);

44

}

45

```

46

47

### Temporal Type Vectors

48

49

```java { .api }

50

// Timestamp values

51

public class OrcTimestampColumnVector extends AbstractOrcColumnVector {

52

public OrcTimestampColumnVector(TimestampColumnVector vector);

53

}

54

55

// Legacy timestamp support

56

public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector {

57

public OrcLegacyTimestampColumnVector(TimestampColumnVector vector);

58

}

59

```

60

61

### Complex Type Vectors

62

63

```java { .api }

64

// Array/List values

65

public class OrcArrayColumnVector extends AbstractOrcColumnVector {

66

public OrcArrayColumnVector(ListColumnVector vector, ColumnVector child);

67

}

68

69

// Map values

70

public class OrcMapColumnVector extends AbstractOrcColumnVector {

71

public OrcMapColumnVector(MapColumnVector vector, ColumnVector keyVector, ColumnVector valueVector);

72

}

73

74

// Struct/Row values

75

public class OrcRowColumnVector extends AbstractOrcColumnVector {

76

public OrcRowColumnVector(StructColumnVector vector, ColumnVector[] children);

77

}

78

```

79

80

## Vectorized Batch Processing

81

82

### Batch Wrapper

83

84

```java { .api }

85

public class OrcVectorizedBatchWrapper<BatchT> {

86

public BatchT getBatch();

87

public int size();

88

public void reset();

89

}

90

91

public class HiveOrcBatchWrapper extends OrcVectorizedBatchWrapper<VectorizedRowBatch> {

92

public HiveOrcBatchWrapper(VectorizedRowBatch batch);

93

public VectorizedRowBatch getBatch();

94

}

95

```

96

97

### Column Batch Factory

98

99

```java { .api }

100

@FunctionalInterface

101

public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {

102

VectorizedColumnBatch create(SplitT split, BatchT batch);

103

}

104

```

105

106

## Usage Examples

107

108

### Creating Flink Vectors from ORC Vectors

109

110

```java

111

import org.apache.flink.orc.vector.AbstractOrcColumnVector;

112

import org.apache.hadoop.hive.ql.exec.vector.*;

113

import org.apache.flink.table.types.logical.*;

114

115

// Convert ORC long vector to Flink vector

116

LongColumnVector orcLongVector = // ... from ORC batch

117

LogicalType intType = new IntType();

118

ColumnVector flinkVector = AbstractOrcColumnVector.createFlinkVector(orcLongVector, intType);

119

120

// Convert ORC string vector

121

BytesColumnVector orcStringVector = // ... from ORC batch

122

LogicalType varcharType = new VarCharType(255);

123

ColumnVector stringVector = AbstractOrcColumnVector.createFlinkVector(orcStringVector, varcharType);

124

125

// Convert ORC decimal vector

126

DecimalColumnVector orcDecimalVector = // ... from ORC batch

127

LogicalType decimalType = new DecimalType(10, 2);

128

ColumnVector decimalVector = AbstractOrcColumnVector.createFlinkVector(orcDecimalVector, decimalType);

129

```

130

131

### Creating Constant Vectors

132

133

```java

134

// Create constant integer vector

135

LogicalType intType = new IntType();

136

ColumnVector constantIntVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(

137

intType,

138

42, // constant value

139

1024 // batch size

140

);

141

142

// Create constant string vector

143

LogicalType stringType = new VarCharType(100);

144

ColumnVector constantStringVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(

145

stringType,

146

"default_value",

147

1024

148

);

149

150

// Create constant null vector

151

ColumnVector constantNullVector = AbstractOrcColumnVector.createFlinkVectorFromConstant(

152

intType,

153

null, // null value

154

1024

155

);

156

```

157

158

### Complex Type Vector Processing

159

160

```java

161

// Process array column

162

ListColumnVector orcListVector = // ... from ORC batch

163

ArrayType arrayType = new ArrayType(new VarCharType(100));

164

165

// Create child vector for array elements

166

ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(

167

orcListVector.child,

168

arrayType.getElementType()

169

);

170

171

// Create array vector

172

OrcArrayColumnVector arrayVector = new OrcArrayColumnVector(orcListVector, childVector);

173

174

// Process map column

175

MapColumnVector orcMapVector = // ... from ORC batch

176

MapType mapType = new MapType(new VarCharType(50), new IntType());

177

178

ColumnVector keyVector = AbstractOrcColumnVector.createFlinkVector(

179

orcMapVector.keys,

180

mapType.getKeyType()

181

);

182

ColumnVector valueVector = AbstractOrcColumnVector.createFlinkVector(

183

orcMapVector.values,

184

mapType.getValueType()

185

);

186

187

OrcMapColumnVector mapVector = new OrcMapColumnVector(orcMapVector, keyVector, valueVector);

188

```

189

190

### Custom Batch Factory

191

192

```java

193

// Custom batch factory for specialized processing

194

ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory =

195

(split, orcBatch) -> {

196

int numFields = orcBatch.numCols;

197

ColumnVector[] flinkVectors = new ColumnVector[numFields];

198

199

for (int i = 0; i < numFields; i++) {

200

org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = orcBatch.cols[i];

201

LogicalType fieldType = getFieldType(i); // your logic to get field type

202

203

// Handle different ORC vector types

204

if (orcVector instanceof LongColumnVector) {

205

flinkVectors[i] = new OrcLongColumnVector((LongColumnVector) orcVector);

206

} else if (orcVector instanceof DoubleColumnVector) {

207

flinkVectors[i] = new OrcDoubleColumnVector((DoubleColumnVector) orcVector);

208

} else if (orcVector instanceof BytesColumnVector) {

209

flinkVectors[i] = new OrcBytesColumnVector((BytesColumnVector) orcVector);

210

} else if (orcVector instanceof DecimalColumnVector) {

211

flinkVectors[i] = new OrcDecimalColumnVector((DecimalColumnVector) orcVector);

212

} else if (orcVector instanceof TimestampColumnVector) {

213

flinkVectors[i] = new OrcTimestampColumnVector((TimestampColumnVector) orcVector);

214

} else if (orcVector instanceof ListColumnVector) {

215

// Handle complex array type

216

ListColumnVector listVector = (ListColumnVector) orcVector;

217

ColumnVector childVector = AbstractOrcColumnVector.createFlinkVector(

218

listVector.child,

219

((ArrayType) fieldType).getElementType()

220

);

221

flinkVectors[i] = new OrcArrayColumnVector(listVector, childVector);

222

}

223

// ... handle other complex types

224

}

225

226

return new VectorizedColumnBatch(flinkVectors);

227

};

228

```

229

230

## Type System Integration

231

232

### Supported Type Mappings

233

234

| Flink LogicalType | ORC Vector Type | Flink Vector Class |

235

|------------------|-----------------|-------------------|

236

| `BooleanType` | `LongColumnVector` | `OrcLongColumnVector` |

237

| `TinyIntType` | `LongColumnVector` | `OrcLongColumnVector` |

238

| `SmallIntType` | `LongColumnVector` | `OrcLongColumnVector` |

239

| `IntType` | `LongColumnVector` | `OrcLongColumnVector` |

240

| `BigIntType` | `LongColumnVector` | `OrcLongColumnVector` |

241

| `FloatType` | `DoubleColumnVector` | `OrcDoubleColumnVector` |

242

| `DoubleType` | `DoubleColumnVector` | `OrcDoubleColumnVector` |

243

| `VarCharType` | `BytesColumnVector` | `OrcBytesColumnVector` |

244

| `CharType` | `BytesColumnVector` | `OrcBytesColumnVector` |

245

| `BinaryType` | `BytesColumnVector` | `OrcBytesColumnVector` |

246

| `VarBinaryType` | `BytesColumnVector` | `OrcBytesColumnVector` |

247

| `DecimalType` | `DecimalColumnVector` | `OrcDecimalColumnVector` |

248

| `DateType` | `LongColumnVector` | `OrcLongColumnVector` |

249

| `TimestampType` | `TimestampColumnVector` | `OrcTimestampColumnVector` |

250

| `ArrayType` | `ListColumnVector` | `OrcArrayColumnVector` |

251

| `MapType` | `MapColumnVector` | `OrcMapColumnVector` |

252

| `RowType` | `StructColumnVector` | `OrcRowColumnVector` |

253

254

### Vector Creation Logic

255

256

```java

257

public static ColumnVector createFlinkVector(

258

org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector,

259

LogicalType type) {

260

261

switch (type.getTypeRoot()) {

262

case BOOLEAN:

263

case TINYINT:

264

case SMALLINT:

265

case INTEGER:

266

case BIGINT:

267

case DATE:

268

return new OrcLongColumnVector((LongColumnVector) orcVector);

269

270

case FLOAT:

271

case DOUBLE:

272

return new OrcDoubleColumnVector((DoubleColumnVector) orcVector);

273

274

case CHAR:

275

case VARCHAR:

276

case BINARY:

277

case VARBINARY:

278

return new OrcBytesColumnVector((BytesColumnVector) orcVector);

279

280

case DECIMAL:

281

return new OrcDecimalColumnVector((DecimalColumnVector) orcVector);

282

283

case TIMESTAMP_WITHOUT_TIME_ZONE:

284

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:

285

return new OrcTimestampColumnVector((TimestampColumnVector) orcVector);

286

287

case ARRAY:

288

ArrayType arrayType = (ArrayType) type;

289

ListColumnVector listVector = (ListColumnVector) orcVector;

290

ColumnVector childVector = createFlinkVector(listVector.child, arrayType.getElementType());

291

return new OrcArrayColumnVector(listVector, childVector);

292

293

case MAP:

294

MapType mapType = (MapType) type;

295

MapColumnVector mapVector = (MapColumnVector) orcVector;

296

ColumnVector keyVector = createFlinkVector(mapVector.keys, mapType.getKeyType());

297

ColumnVector valueVector = createFlinkVector(mapVector.values, mapType.getValueType());

298

return new OrcMapColumnVector(mapVector, keyVector, valueVector);

299

300

case ROW:

301

RowType rowType = (RowType) type;

302

StructColumnVector structVector = (StructColumnVector) orcVector;

303

ColumnVector[] childVectors = new ColumnVector[structVector.fields.length];

304

for (int i = 0; i < childVectors.length; i++) {

305

childVectors[i] = createFlinkVector(structVector.fields[i], rowType.getTypeAt(i));

306

}

307

return new OrcRowColumnVector(structVector, childVectors);

308

309

default:

310

throw new UnsupportedOperationException("Unsupported type: " + type);

311

}

312

}

313

```

314

315

## Performance Considerations

316

317

### Memory Management

318

319

```java

320

// Vectors share underlying data arrays with ORC vectors - no copying

321

OrcLongColumnVector flinkVector = new OrcLongColumnVector(orcLongVector);

322

// flinkVector.vector points to same array as orcLongVector.vector

323

```

324

325

### Batch Size Optimization

326

327

```java

328

// Default ORC batch size

329

int defaultBatchSize = VectorizedRowBatch.DEFAULT_SIZE; // 1024

330

331

// Custom batch size for memory optimization

332

VectorizedRowBatch customBatch = schema.createRowBatch(2048);

333

```

334

335

### Null Handling

336

337

```java

338

// Check for null values in vector

339

if (orcVector.noNulls) {

340

// No null values in this vector - optimized processing

341

processNonNullVector(flinkVector);

342

} else {

343

// Check isNull array for each row

344

for (int i = 0; i < batchSize; i++) {

345

if (!orcVector.isNull[i]) {

346

processValue(flinkVector, i);

347

}

348

}

349

}

350

```

351

352

## Timestamp Utilities

353

354

### TimestampUtil

355

356

Specialized utility class for handling timestamp operations and vector creation.

357

358

```java { .api }

359

public class TimestampUtil {

360

public static boolean isHiveTimestampColumnVector(

361

org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector

362

);

363

364

public static ColumnVector createVectorFromConstant(

365

LogicalType type,

366

Object value,

367

int batchSize

368

);

369

}

370

```

371

372

**Usage Examples:**

373

374

```java

375

import org.apache.flink.orc.TimestampUtil;

376

import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;

377

378

// Check if ORC vector is timestamp type

379

org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector = // ... from ORC batch

380

boolean isTimestamp = TimestampUtil.isHiveTimestampColumnVector(orcVector);

381

382

// Create constant timestamp vector

383

LogicalType timestampType = new TimestampType(3);

384

ColumnVector constantTimestampVector = TimestampUtil.createVectorFromConstant(

385

timestampType,

386

Timestamp.valueOf("2023-01-01 12:00:00"),

387

1024

388

);

389

390

// Create timestamp vector with null value

391

ColumnVector nullTimestampVector = TimestampUtil.createVectorFromConstant(

392

timestampType,

393

null,

394

1024

395

);

396

```

397

398

## Integration with Reading Pipeline

399

400

The vector processing system integrates seamlessly with the ORC reading pipeline:

401

402

1. **ORC File Reading**: Native ORC vectors loaded from file

403

2. **Vector Conversion**: ORC vectors wrapped in Flink vector implementations

404

3. **Batch Creation**: `ColumnBatchFactory` creates `VectorizedColumnBatch`

405

4. **Row Iteration**: `ColumnarRowIterator` provides row-by-row access

406

5. **Type Safety**: Full type system integration ensures correctness