or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md

vector-processing.mddocs/

0

# Vector Processing

1

2

High-performance vector implementations that adapt ORC column vectors to Flink's vector API for efficient columnar data processing. Provides type-safe access to vectorized data with support for all standard data types.

3

4

## Capabilities

5

6

### Abstract Vector Base

7

8

Base class for all ORC to Flink vector adapters, providing common null handling and factory methods.

9

10

```java { .api }

11

/**

12

* Base class for adapting ORC column vectors to Flink column vectors

13

* Provides common functionality for null handling and vector creation

14

*/

15

public abstract class AbstractOrcNoHiveVector implements ColumnVector {

16

17

/**

18

* Check if value at given index is null

19

* @param i Row index to check

20

* @return true if value is null, false otherwise

21

*/

22

public boolean isNullAt(int i);

23

24

/**

25

* Create appropriate Flink vector from ORC column vector

26

* Automatically detects vector type and creates corresponding adapter

27

* @param vector ORC column vector to adapt

28

* @return Flink column vector implementation

29

* @throws UnsupportedOperationException for unsupported vector types

30

*/

31

public static ColumnVector createFlinkVector(ColumnVector vector);

32

33

/**

34

* Create Flink vector from constant value for partition columns

35

* @param type Logical type of the constant value

36

* @param value Constant value to fill vector with

37

* @param batchSize Number of rows in the vector

38

* @return Flink column vector filled with constant value

39

* @throws UnsupportedOperationException for unsupported types

40

*/

41

public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);

42

}

43

```

44

45

**Usage Examples:**

46

47

```java

48

import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;

49

import org.apache.orc.storage.ql.exec.vector.LongColumnVector;

50

51

// Create Flink vector from ORC vector

52

LongColumnVector orcVector = new LongColumnVector(1024);

53

ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);

54

55

// Create constant vector for partition column

56

LogicalType stringType = new VarCharType(50);

57

ColumnVector constantVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(

58

stringType,

59

"US", // partition value

60

1024 // batch size

61

);

62

63

// Check for nulls

64

for (int i = 0; i < batchSize; i++) {

65

if (!flinkVector.isNullAt(i)) {

66

// Process non-null value

67

long value = ((LongColumnVector) flinkVector).getLong(i);

68

}

69

}

70

```

71

72

### Long Vector Adapter

73

74

Adapter for ORC LongColumnVector supporting multiple Flink integer and boolean types.

75

76

```java { .api }

77

/**

78

* Adapter for ORC LongColumnVector to Flink's numeric column vectors

79

* Supports boolean, byte, short, int, and long data types

80

*/

81

public class OrcNoHiveLongVector extends AbstractOrcNoHiveVector

82

implements LongColumnVector, BooleanColumnVector, ByteColumnVector,

83

ShortColumnVector, IntColumnVector {

84

85

/**

86

* Create long vector adapter

87

* @param vector ORC LongColumnVector to adapt

88

*/

89

public OrcNoHiveLongVector(LongColumnVector vector);

90

91

/**

92

* Get long value at specified index

93

* @param i Row index

94

* @return Long value at index

95

*/

96

public long getLong(int i);

97

98

/**

99

* Get boolean value at specified index (1 = true, 0 = false)

100

* @param i Row index

101

* @return Boolean value at index

102

*/

103

public boolean getBoolean(int i);

104

105

/**

106

* Get byte value at specified index

107

* @param i Row index

108

* @return Byte value at index

109

*/

110

public byte getByte(int i);

111

112

/**

113

* Get short value at specified index

114

* @param i Row index

115

* @return Short value at index

116

*/

117

public short getShort(int i);

118

119

/**

120

* Get int value at specified index

121

* @param i Row index

122

* @return Int value at index

123

*/

124

public int getInt(int i);

125

}

126

```

127

128

### Double Vector Adapter

129

130

Adapter for ORC DoubleColumnVector supporting float and double types.

131

132

```java { .api }

133

/**

134

* Adapter for ORC DoubleColumnVector to Flink's floating-point column vectors

135

* Supports both float and double data types

136

*/

137

public class OrcNoHiveDoubleVector extends AbstractOrcNoHiveVector

138

implements DoubleColumnVector, FloatColumnVector {

139

140

/**

141

* Create double vector adapter

142

* @param vector ORC DoubleColumnVector to adapt

143

*/

144

public OrcNoHiveDoubleVector(DoubleColumnVector vector);

145

146

/**

147

* Get double value at specified index

148

* @param i Row index

149

* @return Double value at index

150

*/

151

public double getDouble(int i);

152

153

/**

154

* Get float value at specified index (cast from double)

155

* @param i Row index

156

* @return Float value at index

157

*/

158

public float getFloat(int i);

159

}

160

```

161

162

### Bytes Vector Adapter

163

164

Adapter for ORC BytesColumnVector supporting string and binary types.

165

166

```java { .api }

167

/**

168

* Adapter for ORC BytesColumnVector to Flink's bytes column vector

169

* Supports string, char, varchar, binary, and varbinary types

170

*/

171

public class OrcNoHiveBytesVector extends AbstractOrcNoHiveVector

172

implements BytesColumnVector {

173

174

/**

175

* Create bytes vector adapter

176

* @param vector ORC BytesColumnVector to adapt

177

*/

178

public OrcNoHiveBytesVector(BytesColumnVector vector);

179

180

/**

181

* Get Bytes value at specified index

182

* @param i Row index

183

* @return Bytes object containing byte data, start offset, and length

184

*/

185

public Bytes getBytes(int i);

186

}

187

```

188

189

### Decimal Vector Adapter

190

191

Adapter for ORC DecimalColumnVector supporting high-precision decimal types.

192

193

```java { .api }

194

/**

195

* Adapter for ORC DecimalColumnVector to Flink's decimal column vector

196

* Supports decimal types with configurable precision and scale

197

*/

198

public class OrcNoHiveDecimalVector extends AbstractOrcNoHiveVector

199

implements DecimalColumnVector {

200

201

/**

202

* Create decimal vector adapter

203

* @param vector ORC DecimalColumnVector to adapt

204

*/

205

public OrcNoHiveDecimalVector(DecimalColumnVector vector);

206

207

/**

208

* Get decimal value at specified index

209

* @param i Row index

210

* @param precision Decimal precision (total digits)

211

* @param scale Decimal scale (digits after decimal point)

212

* @return DecimalData value at index

213

*/

214

public DecimalData getDecimal(int i, int precision, int scale);

215

}

216

```

217

218

### Timestamp Vector Adapter

219

220

Adapter for ORC TimestampColumnVector supporting timestamp types.

221

222

```java { .api }

223

/**

224

* Adapter for ORC TimestampColumnVector to Flink's timestamp column vector

225

* Supports timestamp with and without timezone

226

*/

227

public class OrcNoHiveTimestampVector extends AbstractOrcNoHiveVector

228

implements TimestampColumnVector {

229

230

/**

231

* Create timestamp vector adapter

232

* @param vector ORC TimestampColumnVector to adapt

233

*/

234

public OrcNoHiveTimestampVector(TimestampColumnVector vector);

235

236

/**

237

* Get timestamp value at specified index

238

* @param i Row index

239

* @param precision Timestamp precision (digits in fractional seconds)

240

* @return TimestampData value at index

241

*/

242

public TimestampData getTimestamp(int i, int precision);

243

}

244

```

245

246

### Batch Wrapper

247

248

Wrapper for ORC VectorizedRowBatch providing size information and batch access.

249

250

```java { .api }

251

/**

252

* Wrapper for ORC VectorizedRowBatch

253

* Provides access to the underlying batch and size information

254

*/

255

public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {

256

257

/**

258

* Create batch wrapper

259

* @param batch ORC VectorizedRowBatch to wrap

260

*/

261

public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);

262

263

/**

264

* Get the wrapped ORC batch

265

* @return Underlying VectorizedRowBatch

266

*/

267

public VectorizedRowBatch getBatch();

268

269

/**

270

* Get number of rows in the batch

271

* @return Number of rows currently in batch

272

*/

273

public int size();

274

}

275

```

276

277

## Vector Creation Examples

278

279

### Automatic Vector Creation

280

281

```java

282

import org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector;

283

import org.apache.orc.storage.ql.exec.vector.*;

284

285

// Create ORC vectors

286

LongColumnVector longVector = new LongColumnVector(1024);

287

DoubleColumnVector doubleVector = new DoubleColumnVector(1024);

288

BytesColumnVector bytesVector = new BytesColumnVector(1024);

289

DecimalColumnVector decimalVector = new DecimalColumnVector(1024, 10, 2);

290

TimestampColumnVector timestampVector = new TimestampColumnVector(1024);

291

292

// Automatically create appropriate Flink vectors

293

ColumnVector[] flinkVectors = new ColumnVector[] {

294

AbstractOrcNoHiveVector.createFlinkVector(longVector), // OrcNoHiveLongVector

295

AbstractOrcNoHiveVector.createFlinkVector(doubleVector), // OrcNoHiveDoubleVector

296

AbstractOrcNoHiveVector.createFlinkVector(bytesVector), // OrcNoHiveBytesVector

297

AbstractOrcNoHiveVector.createFlinkVector(decimalVector), // OrcNoHiveDecimalVector

298

AbstractOrcNoHiveVector.createFlinkVector(timestampVector) // OrcNoHiveTimestampVector

299

};

300

```

301

302

### Constant Vector Creation for Partitions

303

304

```java

305

import org.apache.flink.table.types.logical.*;

306

307

int batchSize = 1024;

308

309

// Create constant vectors for partition values

310

ColumnVector countryVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(

311

new VarCharType(50), "US", batchSize

312

);

313

314

ColumnVector yearVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(

315

new IntType(), 2023, batchSize

316

);

317

318

ColumnVector isActiveVector = AbstractOrcNoHiveVector.createFlinkVectorFromConstant(

319

new BooleanType(), true, batchSize

320

);

321

322

// All rows in batch will have the same partition values

323

for (int i = 0; i < batchSize; i++) {

324

assert countryVector.getString(i).toString().equals("US");

325

assert yearVector.getInt(i) == 2023;

326

assert isActiveVector.getBoolean(i) == true;

327

}

328

```

329

330

## Type Mapping

331

332

| Flink Logical Type | ORC Vector Type | Flink Vector Interface | Notes |

333

|-------------------|----------------|----------------------|--------|

334

| BOOLEAN | LongColumnVector | BooleanColumnVector | 1=true, 0=false |

335

| TINYINT | LongColumnVector | ByteColumnVector | Cast from long |

336

| SMALLINT | LongColumnVector | ShortColumnVector | Cast from long |

337

| INTEGER | LongColumnVector | IntColumnVector | Cast from long |

338

| BIGINT | LongColumnVector | LongColumnVector | Direct mapping |

339

| FLOAT | DoubleColumnVector | FloatColumnVector | Cast from double |

340

| DOUBLE | DoubleColumnVector | DoubleColumnVector | Direct mapping |

341

| CHAR, VARCHAR | BytesColumnVector | BytesColumnVector | UTF-8 bytes |

342

| BINARY, VARBINARY | BytesColumnVector | BytesColumnVector | Raw bytes |

343

| DECIMAL | DecimalColumnVector | DecimalColumnVector | HiveDecimal format |

344

| DATE | LongColumnVector | IntColumnVector | Days since epoch |

345

| TIMESTAMP_* | TimestampColumnVector | TimestampColumnVector | Microsecond precision |

346

347

## Vectorized Processing Patterns

348

349

### Batch Processing with Type Safety

350

351

```java

352

import org.apache.flink.table.data.vector.VectorizedColumnBatch;

353

354

// Process vectorized batch with mixed types

355

public void processBatch(VectorizedColumnBatch batch) {

356

int numRows = batch.getNumRows();

357

358

// Get typed column vectors

359

LongColumnVector idVector = (LongColumnVector) batch.getColumn(0);

360

BytesColumnVector nameVector = (BytesColumnVector) batch.getColumn(1);

361

IntColumnVector ageVector = (IntColumnVector) batch.getColumn(2);

362

DecimalColumnVector salaryVector = (DecimalColumnVector) batch.getColumn(3);

363

364

// Process rows in batch

365

for (int i = 0; i < numRows; i++) {

366

if (!idVector.isNullAt(i)) {

367

long id = idVector.getLong(i);

368

String name = nameVector.isNullAt(i) ? null :

369

new String(nameVector.getBytes(i).getData(), StandardCharsets.UTF_8);

370

int age = ageVector.isNullAt(i) ? 0 : ageVector.getInt(i);

371

DecimalData salary = salaryVector.isNullAt(i) ? null :

372

salaryVector.getDecimal(i, 10, 2);

373

374

// Process row data

375

processRow(id, name, age, salary);

376

}

377

}

378

}

379

```

380

381

### Null-Safe Vector Access

382

383

```java

384

// Safe access pattern for nullable columns

385

public <T> T safeGet(ColumnVector vector, int index, Function<Integer, T> getter, T defaultValue) {

386

return vector.isNullAt(index) ? defaultValue : getter.apply(index);

387

}

388

389

// Usage examples

390

LongColumnVector longVector = (LongColumnVector) batch.getColumn(0);

391

BytesColumnVector stringVector = (BytesColumnVector) batch.getColumn(1);

392

393

for (int i = 0; i < batch.getNumRows(); i++) {

394

Long id = safeGet(longVector, i, longVector::getLong, null);

395

String name = safeGet(stringVector, i,

396

idx -> new String(stringVector.getBytes(idx).getData(), StandardCharsets.UTF_8),

397

"UNKNOWN");

398

399

if (id != null) {

400

processRecord(id, name);

401

}

402

}

403

```

404

405

## Performance Considerations

406

407

### Memory Management

408

409

- **Vector Reuse**: Vectors are reused across batches to minimize allocations

410

- **Lazy Conversion**: Values are converted from ORC format only when accessed

411

- **Batch Size**: Larger batches improve throughput but use more memory

412

- **Null Handling**: Optimized null checking avoids unnecessary object creation

413

414

### Access Patterns

415

416

```java

417

// Efficient: Sequential access within batch

418

for (int i = 0; i < batch.getNumRows(); i++) {

419

processValue(vector.getLong(i));

420

}

421

422

// Less efficient: Random access pattern

423

for (int i : randomIndices) {

424

processValue(vector.getLong(i));

425

}

426

427

// Efficient: Bulk null checking

428

if (vector.hasNulls()) {

429

// Handle nulls explicitly

430

for (int i = 0; i < batch.getNumRows(); i++) {

431

if (!vector.isNullAt(i)) {

432

processValue(vector.getLong(i));

433

}

434

}

435

} else {

436

// No nulls, skip null checks

437

for (int i = 0; i < batch.getNumRows(); i++) {

438

processValue(vector.getLong(i));

439

}

440

}

441

```

442

443

## Error Handling

444

445

```java

446

try {

447

ColumnVector flinkVector = AbstractOrcNoHiveVector.createFlinkVector(orcVector);

448

449

for (int i = 0; i < batchSize; i++) {

450

if (!flinkVector.isNullAt(i)) {

451

// Type-safe access

452

if (flinkVector instanceof LongColumnVector) {

453

long value = ((LongColumnVector) flinkVector).getLong(i);

454

}

455

}

456

}

457

} catch (UnsupportedOperationException e) {

458

// Handle unsupported vector types

459

logger.error("Unsupported ORC vector type: " + orcVector.getClass(), e);

460

} catch (ClassCastException e) {

461

// Handle type mismatches

462

logger.error("Vector type mismatch during access", e);

463

}

464

```