or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdformat-factory.mdindex.mdprotobuf-integration.mdrowdata-writers.mdschema-utilities.mdvectorized-input.md

vectorized-input.mddocs/

0

# Vectorized Input

1

2

High-performance columnar input formats optimized for analytical workloads with vectorized processing, partition support, and efficient memory usage.

3

4

## Capabilities

5

6

### ParquetColumnarRowInputFormat

7

8

Columnar input format that provides RowData iterators using vectorized column batches for maximum performance in analytical queries.

9

10

```java { .api }

11

/**

12

* Parquet input format providing RowData iterator using columnar row data

13

* Extends ParquetVectorizedInputFormat with RowData-specific functionality

14

*/

15

public class ParquetColumnarRowInputFormat<SplitT extends FileSourceSplit>

16

extends ParquetVectorizedInputFormat<RowData, SplitT> {

17

18

/**

19

* Creates a basic columnar row input format without extra fields

20

* @param hadoopConfig Hadoop configuration for Parquet reading

21

* @param projectedType Row type defining the projected schema

22

* @param batchSize Number of rows per vectorized batch

23

* @param isUtcTimestamp Whether to use UTC timezone for timestamps

24

* @param isCaseSensitive Whether column name matching is case sensitive

25

*/

26

public ParquetColumnarRowInputFormat(

27

Configuration hadoopConfig,

28

RowType projectedType,

29

int batchSize,

30

boolean isUtcTimestamp,

31

boolean isCaseSensitive

32

);

33

34

/**

35

* Creates a columnar row input format with extra fields support

36

* @param hadoopConfig Hadoop configuration for Parquet reading

37

* @param projectedType Projected row type (excludes extra fields)

38

* @param producedType Produced row type (includes extra fields)

39

* @param batchFactory Factory for creating column batches with extra fields

40

* @param batchSize Number of rows per vectorized batch

41

* @param isUtcTimestamp Whether to use UTC timezone for timestamps

42

* @param isCaseSensitive Whether column name matching is case sensitive

43

*/

44

public ParquetColumnarRowInputFormat(

45

Configuration hadoopConfig,

46

RowType projectedType,

47

RowType producedType,

48

ColumnBatchFactory<SplitT> batchFactory,

49

int batchSize,

50

boolean isUtcTimestamp,

51

boolean isCaseSensitive

52

);

53

54

/**

55

* Creates a partitioned columnar row input format

56

* Automatically handles partition columns generated from file paths

57

* @param hadoopConfig Hadoop configuration for Parquet reading

58

* @param producedRowType Complete row type including partition columns

59

* @param partitionKeys List of partition column names

60

* @param extractor Extractor for deriving partition values from splits

61

* @param batchSize Number of rows per vectorized batch

62

* @param isUtcTimestamp Whether to use UTC timezone for timestamps

63

* @param isCaseSensitive Whether column name matching is case sensitive

64

* @return Configured ParquetColumnarRowInputFormat for partitioned data

65

*/

66

public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(

67

Configuration hadoopConfig,

68

RowType producedRowType,

69

List<String> partitionKeys,

70

PartitionFieldExtractor<SplitT> extractor,

71

int batchSize,

72

boolean isUtcTimestamp,

73

boolean isCaseSensitive

74

);

75

76

/**

77

* Returns the type information for the produced RowData

78

* @return TypeInformation for RowData output

79

*/

80

public TypeInformation<RowData> getProducedType();

81

}

82

```

83

84

### ParquetVectorizedInputFormat

85

86

Abstract base class for vectorized Parquet input formats providing the foundation for high-performance columnar processing.

87

88

```java { .api }

89

/**

90

* Abstract base class for vectorized Parquet input formats

91

* Provides vectorized column batch processing with configurable batch sizes

92

*/

93

public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit>

94

implements BulkFormat<T, SplitT> {

95

96

/**

97

* Creates a reader for the given file split

98

* @param config Flink configuration

99

* @param split File split to read from

100

* @return BulkFormat.Reader for processing the split

101

* @throws IOException if reader creation fails

102

*/

103

public Reader<T> createReader(Configuration config, SplitT split) throws IOException;

104

105

/**

106

* Restores a reader from a checkpointed position

107

* @param config Flink configuration

108

* @param split File split to read from

109

* @param restoredOffset Checkpointed position to restore from

110

* @return BulkFormat.Reader restored at the specified position

111

* @throws IOException if reader restoration fails

112

*/

113

public Reader<T> restoreReader(Configuration config, SplitT split, CheckpointedPosition restoredOffset)

114

throws IOException;

115

116

/**

117

* Checks if the format supports splitting

118

* @return true if the format can be split across multiple readers

119

*/

120

public boolean isSplittable();

121

}

122

```

123

124

### ColumnBatchFactory

125

126

Factory interface for creating vectorized column batches with support for extra fields like partition columns.

127

128

```java { .api }

129

/**

130

* Factory for creating vectorized column batches

131

* Supports adding extra fields beyond those present in Parquet files

132

*/

133

@FunctionalInterface

134

public interface ColumnBatchFactory<SplitT extends FileSourceSplit> {

135

136

/**

137

* Creates a vectorized column batch from Parquet column vectors

138

* @param split File split being processed (for extracting partition info)

139

* @param parquetVectors Column vectors read from Parquet file

140

* @return VectorizedColumnBatch with original and extra columns

141

* @throws IOException if batch creation fails

142

*/

143

VectorizedColumnBatch create(SplitT split, ColumnVector[] parquetVectors) throws IOException;

144

145

/**

146

* Creates a factory that doesn't add extra fields

147

* @return ColumnBatchFactory that passes through Parquet columns unchanged

148

*/

149

static <SplitT extends FileSourceSplit> ColumnBatchFactory<SplitT> withoutExtraFields() {

150

return (split, vectors) -> new VectorizedColumnBatch(vectors);

151

}

152

}

153

```

154

155

## Usage Examples

156

157

### Basic Vectorized Reading

158

159

```java

160

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;

161

import org.apache.flink.table.types.logical.RowType;

162

import org.apache.flink.table.types.logical.LogicalType;

163

import org.apache.hadoop.conf.Configuration;

164

165

// Define schema for reading

166

RowType rowType = RowType.of(

167

new LogicalType[]{

168

DataTypes.BIGINT().getLogicalType(),

169

DataTypes.STRING().getLogicalType(),

170

DataTypes.TIMESTAMP(3).getLogicalType()

171

},

172

new String[]{"id", "name", "created_at"}

173

);

174

175

// Create input format

176

Configuration hadoopConfig = new Configuration();

177

int batchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 2048

178

boolean utcTimezone = true;

179

boolean caseSensitive = false;

180

181

ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat =

182

new ParquetColumnarRowInputFormat<>(

183

hadoopConfig, rowType, batchSize, utcTimezone, caseSensitive

184

);

185

186

// Use with FileSource

187

FileSource<RowData> parquetSource = FileSource

188

.forBulkFileFormat(inputFormat, new Path("/input/parquet/files"))

189

.build();

190

191

DataStream<RowData> dataStream = env.fromSource(parquetSource,

192

WatermarkStrategy.noWatermarks(), "parquet-source");

193

```

194

195

### Partitioned Data Reading

196

197

```java

198

import org.apache.flink.table.filesystem.PartitionFieldExtractor;

199

200

// Schema including partition columns

201

RowType producedRowType = RowType.of(

202

new LogicalType[]{

203

DataTypes.BIGINT().getLogicalType(), // id

204

DataTypes.STRING().getLogicalType(), // name

205

DataTypes.DATE().getLogicalType(), // partition: date

206

DataTypes.STRING().getLogicalType() // partition: region

207

},

208

new String[]{"id", "name", "date", "region"}

209

);

210

211

// Define partition columns

212

List<String> partitionKeys = Arrays.asList("date", "region");

213

214

// Create partition field extractor

215

PartitionFieldExtractor<FileSourceSplit> extractor =

216

PartitionFieldExtractor.forFileSystem("__HIVE_DEFAULT_PARTITION__");

217

218

// Create partitioned input format

219

ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat =

220

ParquetColumnarRowInputFormat.createPartitionedFormat(

221

hadoopConfig,

222

producedRowType,

223

partitionKeys,

224

extractor,

225

4096, // Larger batch size for partitioned data

226

true, // UTC timezone

227

false // Case insensitive

228

);

229

230

// File structure: /data/date=2023-01-01/region=us-west/part-0000.parquet

231

FileSource<RowData> partitionedSource = FileSource

232

.forBulkFileFormat(partitionedFormat, new Path("/data"))

233

.setFileEnumerator(FileEnumerator.create())

234

.build();

235

```

236

237

### Performance Tuning

238

239

```java

240

// Custom configuration for high-throughput reading

241

Configuration optimizedConfig = new Configuration();

242

243

// Parquet-specific optimizations

244

optimizedConfig.setBoolean("parquet.enable.dictionary", true);

245

optimizedConfig.setInt("parquet.page.size", 1048576); // 1MB page size

246

optimizedConfig.setInt("parquet.block.size", 134217728); // 128MB block size

247

248

// Larger batch sizes for analytical workloads

249

int largeBatchSize = 8192; // 4x default size

250

251

ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =

252

new ParquetColumnarRowInputFormat<>(

253

optimizedConfig,

254

rowType,

255

largeBatchSize,

256

true, // UTC timestamps

257

false // Case insensitive

258

);

259

260

// Configure FileSource for optimal throughput

261

FileSource<RowData> optimizedSource = FileSource

262

.forBulkFileFormat(optimizedFormat, inputPath)

263

.monitorContinuously(Duration.ofMinutes(1))

264

.build();

265

266

DataStream<RowData> stream = env

267

.fromSource(optimizedSource, WatermarkStrategy.noWatermarks(), "optimized-parquet")

268

.setParallelism(Runtime.getRuntime().availableProcessors()); // Scale with CPU cores

269

```

270

271

### Column Projection

272

273

```java

274

// Original file schema: id, name, email, age, created_at, updated_at

275

// Project only needed columns for better performance

276

RowType projectedSchema = RowType.of(

277

new LogicalType[]{

278

DataTypes.BIGINT().getLogicalType(), // id

279

DataTypes.STRING().getLogicalType(), // name

280

DataTypes.INT().getLogicalType() // age

281

},

282

new String[]{"id", "name", "age"}

283

);

284

285

// Only projected columns are read from Parquet files

286

ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat =

287

new ParquetColumnarRowInputFormat<>(

288

hadoopConfig, projectedSchema, batchSize, true, false

289

);

290

291

// This will only read 3 columns instead of 6, significantly improving I/O performance

292

```

293

294

### Custom Column Batch Factory

295

296

```java

297

// Custom factory that adds computed columns

298

ColumnBatchFactory<FileSourceSplit> customFactory = (split, parquetVectors) -> {

299

// Original Parquet columns: id, amount

300

// Add computed column: tax (amount * 0.1)

301

302

ColumnVector[] allVectors = new ColumnVector[parquetVectors.length + 1];

303

System.arraycopy(parquetVectors, 0, allVectors, 0, parquetVectors.length);

304

305

// Create computed tax column

306

WritableColumnVector taxVector = new WritableDoubleVector(parquetVectors[0].getSize());

307

ColumnVector amountVector = parquetVectors[1]; // Assuming amount is second column

308

309

for (int i = 0; i < amountVector.getSize(); i++) {

310

if (!amountVector.isNullAt(i)) {

311

double amount = amountVector.getDouble(i);

312

taxVector.putDouble(i, amount * 0.1);

313

} else {

314

taxVector.putNull(i);

315

}

316

}

317

318

allVectors[parquetVectors.length] = taxVector;

319

return new VectorizedColumnBatch(allVectors);

320

};

321

322

// Schema including computed column

323

RowType schemaWithTax = RowType.of(

324

new LogicalType[]{

325

DataTypes.BIGINT().getLogicalType(), // id

326

DataTypes.DOUBLE().getLogicalType(), // amount

327

DataTypes.DOUBLE().getLogicalType() // tax (computed)

328

},

329

new String[]{"id", "amount", "tax"}

330

);

331

332

RowType parquetSchema = RowType.of(

333

new LogicalType[]{

334

DataTypes.BIGINT().getLogicalType(), // id

335

DataTypes.DOUBLE().getLogicalType() // amount

336

},

337

new String[]{"id", "amount"}

338

);

339

340

ParquetColumnarRowInputFormat<FileSourceSplit> customFormat =

341

new ParquetColumnarRowInputFormat<>(

342

hadoopConfig,

343

parquetSchema, // Schema in Parquet file

344

schemaWithTax, // Final produced schema

345

customFactory, // Custom batch factory

346

batchSize,

347

true,

348

false

349

);

350

```

351

352

## Performance Characteristics

353

354

### Memory Usage

355

356

- **Batch Size**: Larger batches (4096-8192) improve throughput but use more memory

357

- **Column Vectors**: Memory usage scales with batch size × number of columns × data type size

358

- **Dictionary Compression**: Reduces memory usage for repeated values

359

360

### I/O Optimization

361

362

- **Column Projection**: Only read required columns to minimize I/O

363

- **Predicate Pushdown**: Filter row groups at the Parquet level

364

- **Vectorized Processing**: Process multiple rows per operation

365

366

### Parallelization

367

368

- **Split-level Parallelism**: Each Parquet file can be processed by separate parallel instances

369

- **Row Group Parallelism**: Large files can be split at row group boundaries

370

- **CPU Vectorization**: Modern CPUs can process vectorized operations efficiently

371

372

## Error Handling

373

374

Common error scenarios and solutions:

375

376

```java

377

try {

378

ParquetColumnarRowInputFormat<FileSourceSplit> format =

379

new ParquetColumnarRowInputFormat<>(hadoopConfig, rowType, batchSize, true, false);

380

} catch (IllegalArgumentException e) {

381

// Invalid row type, negative batch size, etc.

382

logger.error("Invalid configuration for Parquet input format", e);

383

} catch (Exception e) {

384

// Other initialization errors

385

logger.error("Failed to create Parquet input format", e);

386

}

387

388

// Runtime reading errors

389

try {

390

BulkFormat.Reader<RowData> reader = format.createReader(config, split);

391

RecordIterator<RowData> iterator = reader.readBatch();

392

} catch (IOException e) {

393

// File not found, permission errors, corrupted files

394

logger.error("Failed to read Parquet file", e);

395

} catch (RuntimeException e) {

396

// Schema mismatches, unsupported data types

397

logger.error("Runtime error during Parquet reading", e);

398

}

399

```