or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

vectorized-reading.mddocs/

0

# Vectorized Reading

1

2

High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.

3

4

## Capabilities

5

6

### ParquetVectorizedInputFormat

7

8

Abstract base class for vectorized Parquet file reading with pluggable batch creation strategies.

9

10

```java { .api }

11

/**

12

* Base class for vectorized Parquet file reading

13

* @param <T> Type of records produced

14

* @param <SplitT> Type of file split

15

*/

16

public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {

17

18

/**

19

* Creates a reader for the given configuration and split

20

* @param config Hadoop configuration for Parquet settings

21

* @param split File split to read

22

* @return RecordReaderIterator for reading records

23

* @throws IOException if reader creation fails

24

*/

25

public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;

26

27

/**

28

* Restores a reader from checkpoint state

29

* @param config Hadoop configuration

30

* @param split File split to read

31

* @return RecordReaderIterator for reading records

32

* @throws IOException if reader restoration fails

33

*/

34

public RecordReaderIterator<T> restoreReader(Configuration config, SplitT split) throws IOException;

35

36

/**

37

* Indicates whether this format supports file splitting

38

* @return true - vectorized reading supports splitting

39

*/

40

public boolean isSplittable();

41

42

/**

43

* Creates reader batch implementation for vectorized processing (abstract method)

44

* @param writableVectors Array of writable column vectors

45

* @param columnarBatch Vectorized column batch for processing

46

* @param recycler Pool recycler for batch reuse

47

* @return ParquetReaderBatch implementation for the specific type

48

*/

49

protected abstract ParquetReaderBatch<T> createReaderBatch(

50

WritableColumnVector[] writableVectors,

51

VectorizedColumnBatch columnarBatch,

52

Pool.Recycler<ParquetReaderBatch<T>> recycler

53

);

54

}

55

```

56

57

### ColumnBatchFactory

58

59

Functional interface for creating vectorized column batches from file splits and column vectors.

60

61

```java { .api }

62

/**

63

* Factory for creating vectorized column batches

64

* @param <SplitT> Type of file split

65

*/

66

@FunctionalInterface

67

public interface ColumnBatchFactory<SplitT> {

68

69

/**

70

* Creates a VectorizedColumnBatch from split and column vectors

71

* @param split File split containing metadata

72

* @param vectors Array of column vectors containing data

73

* @return VectorizedColumnBatch for processing

74

*/

75

VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);

76

77

/**

78

* Creates a default factory that doesn't add extra fields

79

* @return ColumnBatchFactory without partition field injection

80

*/

81

static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();

82

}

83

```

84

85

### ParquetColumnarRowSplitReader

86

87

Specialized split reader for columnar RowData reading with vectorized processing and partition support.

88

89

```java { .api }

90

/**

91

* Split reader for columnar RowData reading with vectorization

92

*/

93

public class ParquetColumnarRowSplitReader implements RecordReader<RowData> {

94

95

/**

96

* Creates a new ParquetColumnarRowSplitReader

97

* @param utcTimestamp Whether to use UTC timezone for timestamps

98

* @param caseSensitive Whether field names are case sensitive

99

* @param conf Hadoop configuration

100

* @param fieldTypes Array of logical types for output fields

101

* @param fieldNames Array of field names for output schema

102

* @param selectedFields Array of selected field names (null for all)

103

* @param batchSize Batch size for vectorized reading

104

* @param rowDataWrapper Function to transform raw RowData

105

* @param splitStart Start offset within the split

106

* @param splitLength Length of data to read from split

107

* @param fileLength Total length of the file

108

* @param footer Parquet file metadata

109

* @param blocks Array of column chunks to read

110

*/

111

public ParquetColumnarRowSplitReader(

112

boolean utcTimestamp,

113

boolean caseSensitive,

114

Configuration conf,

115

RowType rowType,

116

String[] fieldNames,

117

String[] selectedFields,

118

int batchSize,

119

Function<RowData, RowData> rowDataWrapper,

120

long splitStart,

121

long splitLength,

122

long fileLength,

123

ParquetMetadata footer,

124

ColumnChunk[] blocks

125

);

126

127

/**

128

* Reads next batch of records

129

* @return RecordIterator for the batch, null if no more data

130

* @throws IOException if reading fails

131

*/

132

public RecordIterator<RowData> readBatch() throws IOException;

133

134

/**

135

* Closes the reader and releases resources

136

* @throws IOException if close fails

137

*/

138

public void close() throws IOException;

139

}

140

```

141

142

### ParquetSplitReaderUtil

143

144

Utility class providing helper methods for vectorized Parquet reading operations.

145

146

```java { .api }

147

/**

148

* Utilities for vectorized Parquet file reading

149

*/

150

public class ParquetSplitReaderUtil {

151

152

/**

153

* Creates column readers for the specified schema and configuration

154

* @param utcTimestamp Whether to use UTC timezone

155

* @param caseSensitive Whether names are case sensitive

156

* @param conf Hadoop configuration

157

* @param fieldTypes Array of field logical types

158

* @param fieldNames Array of field names

159

* @param footer Parquet metadata

160

* @param blocks Column chunks to read

161

* @param batchSize Batch size for reading

162

* @return Array of ColumnReader instances

163

*/

164

public static ColumnReader[] createColumnReaders(

165

boolean utcTimestamp,

166

boolean caseSensitive,

167

Configuration conf,

168

LogicalType[] fieldTypes,

169

String[] fieldNames,

170

ParquetMetadata footer,

171

ColumnChunk[] blocks,

172

int batchSize

173

);

174

175

/**

176

* Additional utility methods for split reading operations

177

*/

178

// ... other static utility methods

179

}

180

```

181

182

### Column Vector Types

183

184

Specialized column vectors for different data types in vectorized processing.

185

186

```java { .api }

187

/**

188

* Specialized decimal vector for high-precision numeric data

189

*/

190

public class ParquetDecimalVector extends ColumnVector {

191

192

/**

193

* Creates a new ParquetDecimalVector

194

* @param capacity Maximum number of values to store

195

*/

196

public ParquetDecimalVector(int capacity);

197

198

/**

199

* Sets decimal value at specified position

200

* @param index Position to set

201

* @param value Decimal value to set

202

*/

203

public void setDecimal(int index, DecimalData value);

204

205

/**

206

* Gets decimal value at specified position

207

* @param index Position to get

208

* @return DecimalData value at position

209

*/

210

public DecimalData getDecimal(int index);

211

}

212

213

/**

214

* Dictionary support for vectorized reading

215

*/

216

public class ParquetDictionary {

217

218

/**

219

* Creates dictionary from Parquet dictionary page

220

* @param dictionaryPage Dictionary page from Parquet file

221

* @param descriptor Column descriptor for type information

222

* @return ParquetDictionary for decoding values

223

*/

224

public static ParquetDictionary create(

225

DictionaryPage dictionaryPage,

226

ColumnDescriptor descriptor

227

);

228

229

/**

230

* Decodes dictionary ID to actual value

231

* @param id Dictionary ID to decode

232

* @return Decoded value

233

*/

234

public Object decode(int id);

235

}

236

```

237

238

## Column Reader Architecture

239

240

### Base Column Reader Interface

241

242

```java { .api }

243

/**

244

* Base interface for vectorized column readers

245

*/

246

public interface ColumnReader {

247

248

/**

249

* Reads a batch of values into the provided column vector

250

* @param num Number of values to read

251

* @param vector Column vector to populate

252

*/

253

void readBatch(int num, WritableColumnVector vector);

254

255

/**

256

* Returns the current repetition level

257

* @return Repetition level for nested data

258

*/

259

int getCurrentRepetitionLevel();

260

261

/**

262

* Returns the current definition level

263

* @return Definition level for null handling

264

*/

265

int getCurrentDefinitionLevel();

266

}

267

```

268

269

### Specialized Column Readers

270

271

```java { .api }

272

/**

273

* Column reader implementations for different primitive types

274

*/

275

276

// Boolean column reader

277

public class BooleanColumnReader extends AbstractColumnReader {

278

public void readBatch(int num, WritableColumnVector vector);

279

}

280

281

// Integer type readers

282

public class ByteColumnReader extends AbstractColumnReader {

283

public void readBatch(int num, WritableColumnVector vector);

284

}

285

286

public class ShortColumnReader extends AbstractColumnReader {

287

public void readBatch(int num, WritableColumnVector vector);

288

}

289

290

public class IntColumnReader extends AbstractColumnReader {

291

public void readBatch(int num, WritableColumnVector vector);

292

}

293

294

public class LongColumnReader extends AbstractColumnReader {

295

public void readBatch(int num, WritableColumnVector vector);

296

}

297

298

// Floating point readers

299

public class FloatColumnReader extends AbstractColumnReader {

300

public void readBatch(int num, WritableColumnVector vector);

301

}

302

303

public class DoubleColumnReader extends AbstractColumnReader {

304

public void readBatch(int num, WritableColumnVector vector);

305

}

306

307

// String and binary readers

308

public class BytesColumnReader extends AbstractColumnReader {

309

public void readBatch(int num, WritableColumnVector vector);

310

}

311

312

public class FixedLenBytesColumnReader extends AbstractColumnReader {

313

public void readBatch(int num, WritableColumnVector vector);

314

}

315

316

// Temporal type readers

317

public class TimestampColumnReader extends AbstractColumnReader {

318

public void readBatch(int num, WritableColumnVector vector);

319

}

320

321

// Nested type readers

322

public class NestedColumnReader extends AbstractColumnReader {

323

public void readBatch(int num, WritableColumnVector vector);

324

}

325

326

public class NestedPrimitiveColumnReader extends AbstractColumnReader {

327

public void readBatch(int num, WritableColumnVector vector);

328

}

329

```

330

331

## Usage Examples

332

333

### Basic Vectorized Reading

334

335

```java

336

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;

337

import org.apache.flink.connector.file.src.FileSource;

338

339

// Create vectorized input format with optimal batch size

340

ParquetColumnarRowInputFormat<FileSourceSplit> vectorizedFormat =

341

new ParquetColumnarRowInputFormat<>(

342

new Configuration(),

343

rowType,

344

TypeInformation.of(RowData.class),

345

null, // Read all fields

346

null, // No field ID mapping

347

4096, // Large batch size for performance

348

true, // UTC timestamps

349

true // Case sensitive

350

);

351

352

// Use with FileSource for high-throughput reading

353

FileSource<RowData> source = FileSource

354

.forBulkFormat(vectorizedFormat, new Path("/large-dataset"))

355

.build();

356

357

DataStream<RowData> highThroughputStream = env.fromSource(

358

source,

359

WatermarkStrategy.noWatermarks(),

360

"vectorized-parquet-source"

361

);

362

```

363

364

### Custom Column Batch Factory

365

366

```java

367

import org.apache.flink.formats.parquet.vector.ColumnBatchFactory;

368

369

// Custom factory that adds partition information

370

ColumnBatchFactory<FileSourceSplit> customFactory = (split, vectors) -> {

371

// Extract partition information from split

372

String[] partitionValues = extractPartitionValues(split);

373

374

// Create batch with partition fields

375

VectorizedColumnBatch batch = new VectorizedColumnBatch(vectors.length + partitionValues.length);

376

377

// Add data columns

378

for (int i = 0; i < vectors.length; i++) {

379

batch.cols[i] = vectors[i];

380

}

381

382

// Add partition columns

383

for (int i = 0; i < partitionValues.length; i++) {

384

batch.cols[vectors.length + i] = createPartitionVector(partitionValues[i], batch.size);

385

}

386

387

return batch;

388

};

389

```

390

391

### Performance Tuning

392

393

```java

394

// Optimize batch size based on available memory and data characteristics

395

int optimalBatchSize = calculateBatchSize(

396

Runtime.getRuntime().maxMemory(), // Available memory

397

numberOfColumns, // Schema width

398

averageRowSize, // Data density

399

parquetBlockSize // File block size

400

);

401

402

// Configure for high-throughput scenarios

403

Configuration perfConfig = new Configuration();

404

perfConfig.setInt("parquet.read.batch.size", optimalBatchSize);

405

perfConfig.setBoolean("parquet.read.vectorized.enable", true);

406

perfConfig.setInt("parquet.read.allocation.size", 8 * 1024 * 1024); // 8MB

407

408

ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =

409

new ParquetColumnarRowInputFormat<>(

410

perfConfig,

411

rowType,

412

typeInfo,

413

selectedFields,

414

null,

415

optimalBatchSize,

416

utcTimestamp,

417

caseSensitive

418

);

419

```

420

421

### Nested Data Reading

422

423

```java

424

// Schema with nested structures

425

RowType nestedSchema = RowType.of(

426

new LogicalType[] {

427

DataTypes.BIGINT().getLogicalType(), // id

428

RowType.of( // address (nested)

429

new LogicalType[] {

430

DataTypes.STRING().getLogicalType(), // street

431

DataTypes.STRING().getLogicalType(), // city

432

DataTypes.STRING().getLogicalType() // country

433

},

434

new String[] {"street", "city", "country"}

435

),

436

ArrayType.newBuilder() // phone_numbers (array)

437

.elementType(DataTypes.STRING().getLogicalType())

438

.build()

439

},

440

new String[] {"id", "address", "phone_numbers"}

441

);

442

443

// Vectorized reading handles nested structures efficiently

444

ParquetColumnarRowInputFormat<FileSourceSplit> nestedFormat =

445

new ParquetColumnarRowInputFormat<>(

446

conf, nestedSchema, typeInfo, null, null,

447

2048, // Smaller batches for complex data

448

utcTimestamp, caseSensitive

449

);

450

```

451

452

### Memory-Efficient Reading

453

454

```java

455

// Configure for memory-constrained environments

456

Configuration memoryConfig = new Configuration();

457

memoryConfig.setInt("parquet.read.batch.size", 1024); // Smaller batches

458

memoryConfig.setLong("parquet.memory.pool.ratio", 0.7); // Conservative memory usage

459

memoryConfig.setBoolean("parquet.strings.signed-min-max", false); // Reduce string overhead

460

461

// Use column projection to reduce memory footprint

462

List<String> essentialFields = Arrays.asList("id", "timestamp", "value");

463

464

ParquetColumnarRowInputFormat<FileSourceSplit> memoryEfficientFormat =

465

new ParquetColumnarRowInputFormat<>(

466

memoryConfig,

467

projectedRowType, // Only essential fields

468

typeInfo,

469

essentialFields, // Column projection

470

null,

471

1024, // Conservative batch size

472

utcTimestamp,

473

caseSensitive

474

);

475

```

476

477

### Parallel Reading with Multiple Splits

478

479

```java

480

// Configure for parallel reading across multiple splits

481

FileSource<RowData> parallelSource = FileSource

482

.forBulkFormat(vectorizedFormat, inputPath)

483

.monitorContinuously(Duration.ofMinutes(1)) // Monitor for new files

484

.setSplitEnumerator( // Custom split strategy

485

ContinuousFileSplitEnumerator.builder()

486

.setSplitSize(64 * 1024 * 1024) // 64MB splits

487

.build()

488

)

489

.build();

490

491

// Process with appropriate parallelism

492

DataStream<RowData> parallelStream = env

493

.fromSource(parallelSource, WatermarkStrategy.noWatermarks(), "parallel-source")

494

.setParallelism(numberOfCores * 2); // CPU-bound processing

495

```

496

497

### ParquetSplitReaderUtil

498

499

Utility class providing helper methods for creating column readers and vectors in vectorized Parquet reading.

500

501

```java { .api }

502

/**

503

* Utility methods for Parquet vectorized reading components

504

*/

505

public class ParquetSplitReaderUtil {

506

507

/**

508

* Builds a list of ParquetField representations from RowType fields

509

* @param fields List of RowType fields to convert

510

* @param fieldNames List of field names for mapping

511

* @param columnIO MessageColumnIO for schema information

512

* @return List of ParquetField objects for vectorized reading

513

*/

514

public static List<ParquetField> buildFieldsList(

515

List<RowType.RowField> fields,

516

List<String> fieldNames,

517

MessageColumnIO columnIO

518

);

519

520

/**

521

* Creates a column reader for the specified field and configuration

522

* @param utcTimestamp Whether to use UTC timezone for timestamps

523

* @param logicalType Flink logical type for the column

524

* @param physicalType Parquet physical type representation

525

* @param columnDescriptors List of column descriptors for the field

526

* @param pageReadStore Page read store for accessing data

527

* @param field ParquetField definition

528

* @param depth Nesting depth of the field

529

* @return ColumnReader instance for reading the field data

530

*/

531

public static ColumnReader createColumnReader(

532

boolean utcTimestamp,

533

LogicalType logicalType,

534

Type physicalType,

535

List<ColumnDescriptor> columnDescriptors,

536

PageReadStore pageReadStore,

537

ParquetField field,

538

int depth

539

);

540

541

/**

542

* Creates a writable column vector for the specified type and configuration

543

* @param batchSize Batch size for the vector

544

* @param logicalType Flink logical type

545

* @param physicalType Parquet physical type

546

* @param columnDescriptors Column descriptors for metadata

547

* @param depth Nesting depth

548

* @return WritableColumnVector for storing read data

549

*/

550

public static WritableColumnVector createWritableColumnVector(

551

int batchSize,

552

LogicalType logicalType,

553

Type physicalType,

554

List<ColumnDescriptor> columnDescriptors,

555

int depth

556

);

557

558

/**

559

* Creates a constant-value column vector

560

* @param type Logical type of the constant

561

* @param value Constant value to fill the vector

562

* @param batchSize Size of the vector

563

* @return ColumnVector filled with the constant value

564

*/

565

public static ColumnVector createVectorFromConstant(

566

LogicalType type,

567

Object value,

568

int batchSize

569

);

570

}

571

```

572

573

## Performance Characteristics

574

575

### Throughput Optimization

576

577

- **Batch Processing**: Processes multiple rows simultaneously using SIMD operations where possible

578

- **Column Pruning**: Only reads required columns from storage, reducing I/O

579

- **Dictionary Compression**: Efficient handling of dictionary-encoded columns

580

- **Lazy Evaluation**: Defers expensive operations until data is actually needed

581

582

### Memory Management

583

584

- **Vectorized Memory Layout**: Contiguous memory access patterns for better CPU cache utilization

585

- **Controlled Memory Usage**: Configurable batch sizes prevent memory overflow

586

- **Off-heap Storage**: Column vectors can use off-heap memory to reduce GC pressure

587

588

### I/O Efficiency

589

590

- **Block-level Reading**: Aligns with Parquet row group boundaries for optimal disk access

591

- **Parallel I/O**: Multiple threads can read different column chunks simultaneously

592

- **Compression Handling**: Native support for all Parquet compression codecs

593

594

The vectorized reading infrastructure provides significant performance improvements over row-based processing, especially for analytical workloads with wide schemas and large datasets.