or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdpredicate-pushdown.mdtable-api.mdvector-processing.md

columnar-reading.mddocs/

0

# Columnar Reading

1

2

The ORC format provides high-performance columnar reading through the `OrcColumnarRowInputFormat`, enabling vectorized processing with partition support, column projection, and statistics reporting.

3

4

## Input Format

5

6

```java { .api }

7

public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>

8

extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT>

9

implements FileBasedStatisticsReportableInputFormat {

10

11

public OrcColumnarRowInputFormat(

12

OrcShim<BatchT> shim,

13

Configuration hadoopConfig,

14

TypeDescription schema,

15

int[] selectedFields,

16

List<OrcFilters.Predicate> conjunctPredicates,

17

int batchSize,

18

ColumnBatchFactory<BatchT, SplitT> batchFactory,

19

TypeInformation<RowData> producedTypeInfo

20

);

21

22

public OrcReaderBatch<RowData, BatchT> createReaderBatch(

23

SplitT split,

24

OrcVectorizedBatchWrapper<BatchT> orcBatch,

25

Pool.Recycler<OrcReaderBatch<RowData, BatchT>> recycler,

26

int batchSize

27

);

28

29

public TypeInformation<RowData> getProducedType();

30

public TableStats reportStatistics(List<Path> files, DataType producedDataType);

31

}

32

```

33

34

## Factory Methods

35

36

```java { .api }

37

public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT>

38

createPartitionedFormat(

39

OrcShim<VectorizedRowBatch> shim,

40

Configuration hadoopConfig,

41

RowType tableType,

42

List<String> partitionKeys,

43

PartitionFieldExtractor<SplitT> extractor,

44

int[] selectedFields,

45

List<OrcFilters.Predicate> conjunctPredicates,

46

int batchSize,

47

Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory

48

);

49

```

50

51

## Usage Examples

52

53

### Basic Columnar Reading

54

55

```java

56

import org.apache.flink.orc.OrcColumnarRowInputFormat;

57

import org.apache.flink.orc.shim.OrcShim;

58

import org.apache.flink.table.types.logical.*;

59

import org.apache.hadoop.conf.Configuration;

60

61

// Define table schema

62

RowType tableType = RowType.of(

63

new LogicalType[] {

64

new BigIntType(), // id

65

new VarCharType(255), // name

66

new IntType(), // age

67

new BooleanType() // active

68

},

69

new String[] {"id", "name", "age", "active"}

70

);

71

72

// Configure reading

73

Configuration hadoopConfig = new Configuration();

74

int[] selectedFields = {0, 1, 2, 3}; // All fields

75

List<OrcFilters.Predicate> predicates = new ArrayList<>();

76

int batchSize = 1024;

77

78

// Create input format

79

OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =

80

OrcColumnarRowInputFormat.createPartitionedFormat(

81

OrcShim.defaultShim(),

82

hadoopConfig,

83

tableType,

84

Collections.emptyList(), // No partitions

85

null, // No partition extractor

86

selectedFields,

87

predicates,

88

batchSize,

89

TypeConversions::fromLogicalToDataType

90

);

91

```

92

93

### Reading with Column Projection

94

95

```java

96

// Read only specific columns (id, name)

97

int[] projectedFields = {0, 1}; // Only id and name columns

98

99

RowType projectedType = RowType.of(

100

new LogicalType[] {

101

new BigIntType(), // id

102

new VarCharType(255) // name

103

},

104

new String[] {"id", "name"}

105

);

106

107

OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> projectedFormat =

108

OrcColumnarRowInputFormat.createPartitionedFormat(

109

OrcShim.defaultShim(),

110

hadoopConfig,

111

tableType, // Full table type

112

Collections.emptyList(),

113

null,

114

projectedFields, // Only selected fields

115

predicates,

116

batchSize,

117

TypeConversions::fromLogicalToDataType

118

);

119

```

120

121

### Reading Partitioned Data

122

123

```java

124

import org.apache.flink.connector.file.table.PartitionFieldExtractor;

125

126

// Define partitioned table

127

List<String> partitionKeys = Arrays.asList("year", "month");

128

129

RowType partitionedTableType = RowType.of(

130

new LogicalType[] {

131

new BigIntType(), // id

132

new VarCharType(255), // name

133

new IntType(), // age

134

new BooleanType(), // active

135

new IntType(), // year (partition)

136

new IntType() // month (partition)

137

},

138

new String[] {"id", "name", "age", "active", "year", "month"}

139

);

140

141

// Partition field extractor

142

PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {

143

// Extract partition values from file path

144

String path = split.path().toString();

145

if ("year".equals(fieldName)) {

146

// Extract year from path like /data/year=2023/month=01/file.orc

147

return extractYearFromPath(path);

148

} else if ("month".equals(fieldName)) {

149

return extractMonthFromPath(path);

150

}

151

return null;

152

};

153

154

// Create partitioned format

155

OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> partitionedFormat =

156

OrcColumnarRowInputFormat.createPartitionedFormat(

157

OrcShim.defaultShim(),

158

hadoopConfig,

159

partitionedTableType,

160

partitionKeys,

161

extractor,

162

selectedFields,

163

predicates,

164

batchSize,

165

TypeConversions::fromLogicalToDataType

166

);

167

```

168

169

### Reading with Predicates

170

171

```java

172

import org.apache.flink.orc.OrcFilters;

173

import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;

174

175

// Create filter predicates

176

List<OrcFilters.Predicate> predicates = Arrays.asList(

177

// age > 25

178

new OrcFilters.Not(new OrcFilters.LessThanEquals("age", PredicateLeaf.Type.LONG, 25)),

179

// active = true

180

new OrcFilters.Equals("active", PredicateLeaf.Type.BOOLEAN, true),

181

// name IS NOT NULL

182

new OrcFilters.Not(new OrcFilters.IsNull("name", PredicateLeaf.Type.STRING))

183

);

184

185

// Use predicates in format

186

OrcColumnarRowInputFormat<VectorizedRowBatch, FileSourceSplit> filteredFormat =

187

OrcColumnarRowInputFormat.createPartitionedFormat(

188

OrcShim.defaultShim(),

189

hadoopConfig,

190

tableType,

191

Collections.emptyList(),

192

null,

193

selectedFields,

194

predicates, // Apply filters

195

batchSize,

196

TypeConversions::fromLogicalToDataType

197

);

198

```

199

200

## Column Batch Factory

201

202

```java { .api }

203

@FunctionalInterface

204

public interface ColumnBatchFactory<BatchT, SplitT extends FileSourceSplit> {

205

VectorizedColumnBatch create(SplitT split, BatchT batch);

206

}

207

```

208

209

Custom batch factory for specialized processing:

210

211

```java

212

ColumnBatchFactory<VectorizedRowBatch, FileSourceSplit> customFactory =

213

(split, orcBatch) -> {

214

// Create Flink column vectors from ORC vectors

215

ColumnVector[] vectors = new ColumnVector[selectedFields.length];

216

217

for (int i = 0; i < vectors.length; i++) {

218

int fieldIndex = selectedFields[i];

219

LogicalType fieldType = tableType.getTypeAt(fieldIndex);

220

221

vectors[i] = AbstractOrcColumnVector.createFlinkVector(

222

orcBatch.cols[i],

223

fieldType

224

);

225

}

226

227

return new VectorizedColumnBatch(vectors);

228

};

229

```

230

231

## Statistics Reporting

232

233

```java { .api }

234

public TableStats reportStatistics(List<Path> files, DataType producedDataType);

235

```

236

237

Extract statistics from ORC files:

238

239

```java

240

import org.apache.flink.core.fs.Path;

241

import org.apache.flink.table.plan.stats.TableStats;

242

243

List<Path> orcFiles = Arrays.asList(

244

new Path("/data/file1.orc"),

245

new Path("/data/file2.orc")

246

);

247

248

// Get statistics from ORC metadata

249

TableStats stats = inputFormat.reportStatistics(orcFiles, dataType);

250

251

System.out.println("Row count: " + stats.getRowCount());

252

System.out.println("Column stats: " + stats.getColumnStats());

253

```

254

255

### OrcFormatStatisticsReportUtil

256

257

Utility class for extracting comprehensive table statistics from ORC files.

258

259

```java { .api }

260

public class OrcFormatStatisticsReportUtil {

261

public static TableStats getTableStatistics(

262

List<Path> files,

263

DataType producedDataType

264

);

265

266

public static TableStats getTableStatistics(

267

List<Path> files,

268

DataType producedDataType,

269

Configuration hadoopConfig

270

);

271

}

272

```

273

274

**Usage Examples:**

275

276

```java

277

import org.apache.flink.orc.util.OrcFormatStatisticsReportUtil;

278

import org.apache.hadoop.conf.Configuration;

279

280

// Get statistics with default configuration

281

List<Path> orcFiles = Arrays.asList(

282

new Path("/warehouse/users/part1.orc"),

283

new Path("/warehouse/users/part2.orc")

284

);

285

286

TableStats stats = OrcFormatStatisticsReportUtil.getTableStatistics(

287

orcFiles,

288

producedDataType

289

);

290

291

// Get statistics with custom Hadoop configuration

292

Configuration customConfig = new Configuration();

293

customConfig.set("orc.read.buffer.size", "262144");

294

customConfig.setBoolean("orc.read.include.file.footer", true);

295

296

TableStats detailedStats = OrcFormatStatisticsReportUtil.getTableStatistics(

297

orcFiles,

298

producedDataType,

299

customConfig

300

);

301

302

// Access statistics information

303

System.out.println("Total row count: " + stats.getRowCount());

304

Map<String, ColumnStats> columnStats = stats.getColumnStats();

305

for (Map.Entry<String, ColumnStats> entry : columnStats.entrySet()) {

306

String columnName = entry.getKey();

307

ColumnStats colStats = entry.getValue();

308

System.out.println(columnName + " - Null count: " + colStats.getNullCount());

309

System.out.println(columnName + " - Min: " + colStats.getMin());

310

System.out.println(columnName + " - Max: " + colStats.getMax());

311

}

312

```

313

314

## Reader Batch Processing

315

316

```java { .api }

317

public abstract class OrcReaderBatch<RowDataT, BatchT> implements Pool.Recyclable {

318

public abstract RecordIterator<RowDataT> convertAndGetIterator(

319

OrcVectorizedBatchWrapper<BatchT> orcBatch,

320

long startingOffset

321

);

322

}

323

```

324

325

The reader processes data in vectorized batches for optimal performance:

326

327

1. **Batch Creation**: `createReaderBatch()` creates vectorized column batch

328

2. **Vector Conversion**: ORC vectors converted to Flink column vectors

329

3. **Iterator Generation**: `convertAndGetIterator()` provides row-by-row access

330

4. **Memory Management**: Batches are recyclable for memory efficiency

331

332

## Performance Configuration

333

334

### Batch Size Tuning

335

336

```java

337

// Smaller batches for memory-constrained environments

338

int smallBatchSize = 512;

339

340

// Larger batches for high-throughput scenarios

341

int largeBatchSize = 4096;

342

343

// Default ORC batch size

344

int defaultBatchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 1024

345

```

346

347

### Hadoop Configuration

348

349

```java

350

Configuration hadoopConfig = new Configuration();

351

352

// ORC reader settings

353

hadoopConfig.setBoolean("orc.use.zerocopy", true);

354

hadoopConfig.setInt("orc.read.buffer.size", 262144); // 256KB

355

hadoopConfig.setBoolean("orc.read.include.file.footer", true);

356

357

// Compression buffer settings

358

hadoopConfig.setInt("io.compression.codec.lzo.buffersize", 65536);

359

hadoopConfig.setInt("io.compression.codec.snappy.buffersize", 65536);

360

```

361

362

## Integration with DataStream API

363

364

```java

365

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

366

import org.apache.flink.connector.file.src.FileSource;

367

368

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

369

370

// Create file source with ORC format

371

FileSource<RowData> orcSource = FileSource

372

.forBulkFileFormat(inputFormat, new Path("/path/to/orc/files"))

373

.build();

374

375

DataStream<RowData> orcStream = env.fromSource(

376

orcSource,

377

WatermarkStrategy.noWatermarks(),

378

"ORC Source"

379

);

380

381

// Process the stream

382

orcStream

383

.filter(row -> row.getInt(2) > 25) // age > 25

384

.map(row -> row.getString(1)) // extract name

385

.print();

386

```

387

388

## Split Reader Utilities

389

390

### OrcSplitReaderUtil

391

392

Core utility class for ORC split reading operations and type conversions.

393

394

```java { .api }

395

public class OrcSplitReaderUtil {

396

public static <SplitT extends FileSourceSplit> Function<SplitT, RecordIterator<RowData>>

397

genPartColumnarRowReader(

398

Configuration hadoopConfig,

399

String[] fullFieldNames,

400

DataType[] fullFieldTypes,

401

TypeInformation<RowData> typeInfo,

402

int[] selectedFields,

403

List<OrcFilters.Predicate> conjunctPredicates,

404

int batchSize,

405

OrcShim<VectorizedRowBatch> shim,

406

List<String> partitionKeys,

407

PartitionFieldExtractor<SplitT> extractor

408

);

409

410

public static int[] getSelectedOrcFields(

411

RowType tableType,

412

int[] selectedFields,

413

List<String> partitionKeys

414

);

415

416

public static String[] getNonPartNames(

417

String[] fullNames,

418

List<String> partitionKeys

419

);

420

421

public static String[] getNonPartNames(

422

RowType rowType,

423

List<String> partitionKeys

424

);

425

426

public static TypeDescription convertToOrcTypeWithPart(

427

RowType rowType,

428

List<String> partitionKeys

429

);

430

431

public static TypeDescription convertToOrcTypeWithPart(

432

String[] fieldNames,

433

DataType[] fieldTypes,

434

List<String> partitionKeys

435

);

436

437

public static TypeDescription logicalTypeToOrcType(LogicalType logicalType);

438

}

439

```

440

441

**Usage Examples:**

442

443

```java

444

import org.apache.flink.orc.OrcSplitReaderUtil;

445

446

// Get selected ORC field indices

447

int[] selectedOrcFields = OrcSplitReaderUtil.getSelectedOrcFields(

448

tableType,

449

selectedFields,

450

partitionKeys

451

);

452

453

// Convert Flink types to ORC schema

454

TypeDescription orcSchema = OrcSplitReaderUtil.convertToOrcTypeWithPart(

455

tableType,

456

partitionKeys

457

);

458

459

// Convert individual logical type

460

LogicalType stringType = new VarCharType(255);

461

TypeDescription orcStringType = OrcSplitReaderUtil.logicalTypeToOrcType(stringType);

462

```