or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md

columnar-reading.mddocs/

0

# Columnar Reading

1

2

Helper utilities for creating columnar input formats and split readers with partition support for efficient ORC file reading. Provides high-performance vectorized reading with predicate pushdown and column projection capabilities.

3

4

## Capabilities

5

6

### Columnar Row Input Format

7

8

Helper class for creating partitioned ORC columnar input formats without Hive dependencies.

9

10

```java { .api }

11

/**

12

* Helper class to create OrcColumnarRowFileInputFormat for no-hive usage

13

* Provides static factory methods for creating partitioned input formats

14

*/

15

public class OrcNoHiveColumnarRowInputFormat {

16

17

/**

18

* Create a partitioned OrcColumnarRowFileInputFormat where partition columns

19

* can be generated by split metadata

20

* @param hadoopConfig Hadoop configuration for ORC reading

21

* @param tableType Row type describing the complete table schema

22

* @param partitionKeys List of partition column names

23

* @param extractor Extracts partition values from file splits

24

* @param selectedFields Array of field indices to read from files

25

* @param conjunctPredicates List of filter predicates for pushdown

26

* @param batchSize Number of rows per vectorized batch

27

* @return Configured columnar input format for partitioned reading

28

*/

29

public static <SplitT extends FileSourceSplit>

30

OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat(

31

Configuration hadoopConfig,

32

RowType tableType,

33

List<String> partitionKeys,

34

PartitionFieldExtractor<SplitT> extractor,

35

int[] selectedFields,

36

List<OrcFilters.Predicate> conjunctPredicates,

37

int batchSize

38

);

39

}

40

```

41

42

**Usage Examples:**

43

44

```java

45

import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;

46

import org.apache.flink.table.types.logical.*;

47

import org.apache.flink.connector.file.src.FileSourceSplit;

48

49

// Define table schema with partitioned columns

50

RowType tableType = RowType.of(

51

new LogicalType[] {

52

new BigIntType(), // user_id

53

new VarCharType(255), // name

54

new VarCharType(100), // email

55

new IntType(), // age

56

new VarCharType(50), // country (partition)

57

new VarCharType(10) // year (partition)

58

},

59

new String[] {"user_id", "name", "email", "age", "country", "year"}

60

);

61

62

// Define partition keys

63

List<String> partitionKeys = Arrays.asList("country", "year");

64

65

// Select only specific fields to read (column projection)

66

int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year

67

68

// Create partition extractor

69

PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {

70

// Extract partition values from file path like /data/country=US/year=2023/file.orc

71

String path = split.path().toString();

72

if (fieldName.equals("country")) {

73

return extractFromPath(path, "country=");

74

} else if (fieldName.equals("year")) {

75

return extractFromPath(path, "year=");

76

}

77

return null;

78

};

79

80

// Create filter predicates

81

List<OrcFilters.Predicate> predicates = Arrays.asList(

82

OrcFilters.equals("age", 25),

83

OrcFilters.lessThan("user_id", 10000L)

84

);

85

86

// Create columnar input format

87

OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> inputFormat =

88

OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(

89

new Configuration(),

90

tableType,

91

partitionKeys,

92

extractor,

93

selectedFields,

94

predicates,

95

1024 // batch size

96

);

97

```

98

99

### Split Reader Utility

100

101

Utility for generating ORC split readers with partition support and predicate pushdown.

102

103

```java { .api }

104

/**

105

* Utility for generating OrcColumnarRowSplitReader instances

106

* Provides factory methods for creating split readers with partition support

107

*/

108

public class OrcNoHiveSplitReaderUtil {

109

110

/**

111

* Generate partitioned columnar row reader for ORC files

112

* @param conf Hadoop configuration

113

* @param fullFieldNames Complete array of field names in table schema

114

* @param fullFieldTypes Complete array of field types in table schema

115

* @param partitionSpec Map of partition column names to values

116

* @param selectedFields Array of field indices to read from files

117

* @param conjunctPredicates List of filter predicates for pushdown

118

* @param batchSize Number of rows per vectorized batch

119

* @param path Path to the ORC file to read

120

* @param splitStart Byte offset where split starts in file

121

* @param splitLength Number of bytes to read in this split

122

* @return Configured columnar row split reader

123

* @throws IOException if reader creation fails

124

*/

125

public static OrcColumnarRowSplitReader<VectorizedRowBatch> genPartColumnarRowReader(

126

Configuration conf,

127

String[] fullFieldNames,

128

DataType[] fullFieldTypes,

129

Map<String, Object> partitionSpec,

130

int[] selectedFields,

131

List<OrcFilters.Predicate> conjunctPredicates,

132

int batchSize,

133

org.apache.flink.core.fs.Path path,

134

long splitStart,

135

long splitLength

136

) throws IOException;

137

}

138

```

139

140

**Usage Examples:**

141

142

```java

143

import org.apache.flink.orc.nohive.OrcNoHiveSplitReaderUtil;

144

import org.apache.flink.core.fs.Path;

145

146

// Define complete table schema

147

String[] fieldNames = {"user_id", "name", "email", "age", "country", "year"};

148

DataType[] fieldTypes = {

149

DataTypes.BIGINT(),

150

DataTypes.VARCHAR(255),

151

DataTypes.VARCHAR(100),

152

DataTypes.INT(),

153

DataTypes.VARCHAR(50), // partition column

154

DataTypes.VARCHAR(10) // partition column

155

};

156

157

// Define partition values for this split

158

Map<String, Object> partitionSpec = new HashMap<>();

159

partitionSpec.put("country", "US");

160

partitionSpec.put("year", "2023");

161

162

// Select fields to read (excluding age for performance)

163

int[] selectedFields = {0, 1, 2, 4, 5}; // user_id, name, email, country, year

164

165

// Create filter predicates

166

List<OrcFilters.Predicate> predicates = Arrays.asList(

167

OrcFilters.lessThan("user_id", 50000L)

168

);

169

170

// Create split reader

171

org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/country=US/year=2023/part-00001.orc");

172

OrcColumnarRowSplitReader<VectorizedRowBatch> reader =

173

OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(

174

new Configuration(),

175

fieldNames,

176

fieldTypes,

177

partitionSpec,

178

selectedFields,

179

predicates,

180

2048, // batch size

181

filePath,

182

0, // split start

183

1024 * 1024 // split length (1MB)

184

);

185

186

// Read data in batches

187

VectorizedColumnBatch batch;

188

while ((batch = reader.nextBatch()) != null) {

189

// Process vectorized batch

190

for (int i = 0; i < batch.getNumRows(); i++) {

191

// Access column data through vectors

192

long userId = batch.getColumn(0).getLong(i);

193

String name = batch.getColumn(1).getString(i);

194

// ... process row

195

}

196

}

197

reader.close();

198

```

199

200

### Column Batch Factory

201

202

The input format uses a ColumnBatchFactory to create Flink VectorizedColumnBatch instances from ORC VectorizedRowBatch:

203

204

```java { .api }

205

/**

206

* Factory interface for creating column batches from ORC row batches

207

* Used internally by input formats to convert ORC vectors to Flink vectors

208

*/

209

interface ColumnBatchFactory<T, SplitT extends FileSourceSplit> {

210

/**

211

* Create VectorizedColumnBatch from ORC VectorizedRowBatch

212

* @param split File split containing partition metadata

213

* @param rowBatch ORC vectorized row batch with column data

214

* @return Flink VectorizedColumnBatch for processing

215

*/

216

VectorizedColumnBatch createBatch(SplitT split, T rowBatch);

217

}

218

```

219

220

## Predicate Pushdown

221

222

ORC filters enable predicate pushdown for improved performance:

223

224

```java

225

import org.apache.flink.orc.OrcFilters;

226

227

// Comparison predicates

228

List<OrcFilters.Predicate> predicates = Arrays.asList(

229

OrcFilters.equals("status", "active"),

230

OrcFilters.lessThan("age", 65),

231

OrcFilters.greaterThan("salary", 50000.0),

232

OrcFilters.lessThanEquals("score", 100),

233

OrcFilters.greaterThanEquals("rating", 4.0),

234

OrcFilters.isNull("deleted_at"),

235

OrcFilters.isNotNull("email"),

236

OrcFilters.between("created_date", startDate, endDate)

237

);

238

239

// String predicates

240

List<OrcFilters.Predicate> stringPredicates = Arrays.asList(

241

OrcFilters.startsWith("name", "John"),

242

OrcFilters.in("country", Arrays.asList("US", "CA", "UK"))

243

);

244

245

// Logical combinations

246

OrcFilters.Predicate combined = OrcFilters.and(

247

OrcFilters.equals("status", "active"),

248

OrcFilters.or(

249

OrcFilters.greaterThan("age", 18),

250

OrcFilters.isNull("age")

251

)

252

);

253

```

254

255

## Column Projection

256

257

Optimize performance by reading only required columns:

258

259

```java

260

// Table has 10 columns but only need 3

261

String[] allFields = {"id", "name", "email", "age", "salary", "dept", "manager", "created", "updated", "status"};

262

263

// Project only required columns (indices 0, 1, 2, 9)

264

int[] selectedFields = {0, 1, 2, 9}; // id, name, email, status

265

266

// This reduces I/O and memory usage significantly

267

OrcColumnarRowFileInputFormat<VectorizedRowBatch, FileSourceSplit> format =

268

OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(

269

hadoopConfig,

270

fullTableType,

271

partitionKeys,

272

extractor,

273

selectedFields, // Only read these columns

274

predicates,

275

batchSize

276

);

277

```

278

279

## Partition Handling

280

281

Handle partitioned ORC datasets efficiently:

282

283

```java

284

// Partition extractor implementation

285

PartitionFieldExtractor<FileSourceSplit> extractor = (split, fieldName, fieldType) -> {

286

String path = split.path().toString();

287

288

// Parse Hive-style partition paths: /table/year=2023/month=12/file.orc

289

Pattern pattern = Pattern.compile(fieldName + "=([^/]+)");

290

Matcher matcher = pattern.matcher(path);

291

292

if (matcher.find()) {

293

String value = matcher.group(1);

294

295

// Convert string value to appropriate type

296

switch (fieldType.getTypeRoot()) {

297

case INTEGER:

298

return Integer.parseInt(value);

299

case BIGINT:

300

return Long.parseLong(value);

301

case VARCHAR:

302

return value;

303

case DATE:

304

return Date.valueOf(value);

305

default:

306

return value;

307

}

308

}

309

return null;

310

};

311

```

312

313

## Performance Optimization

314

315

Key strategies for optimal columnar reading performance:

316

317

1. **Column Projection**: Only read required columns using `selectedFields`

318

2. **Predicate Pushdown**: Use `conjunctPredicates` to filter at the ORC level

319

3. **Batch Size Tuning**: Adjust `batchSize` based on memory and processing requirements

320

4. **Partition Pruning**: Let Flink's partition pruning eliminate unnecessary splits

321

5. **Compression**: Configure ORC compression for better I/O performance

322

323

```java

324

// Optimized configuration

325

Configuration optimizedConfig = new Configuration();

326

optimizedConfig.setBoolean("orc.use.zerocopy", true);

327

optimizedConfig.setInt("orc.row.batch.size", 2048); // Larger batches for better throughput

328

optimizedConfig.set("orc.compress", "ZSTD"); // Fast compression

329

```

330

331

## Error Handling

332

333

Handle common reading errors:

334

335

```java

336

try {

337

OrcColumnarRowSplitReader<VectorizedRowBatch> reader =

338

OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(/* parameters */);

339

340

VectorizedColumnBatch batch;

341

while ((batch = reader.nextBatch()) != null) {

342

// Process batch

343

}

344

} catch (IOException e) {

345

// Handle file system errors, corrupt files, or read failures

346

logger.error("Failed to read ORC file: " + path, e);

347

} catch (IllegalArgumentException e) {

348

// Handle schema mismatches or invalid column selections

349

logger.error("Invalid schema or column selection", e);

350

} finally {

351

if (reader != null) {

352

reader.close();

353

}

354

}

355

```