or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md

orc-integration.mddocs/

0

# ORC Integration

1

2

Low-level ORC integration providing record readers and batch wrappers for direct ORC file access without Hive dependencies. Handles ORC file structure, metadata, and provides the foundation for higher-level reading and writing operations.

3

4

## Capabilities

5

6

### ORC No-Hive Shim

7

8

Shim implementation for ORC operations without Hive dependencies, providing record readers and batch management.

9

10

```java { .api }

11

/**

12

* Shim for ORC reader without Hive dependencies

13

* Implements OrcShim interface for ORC file operations using standalone ORC library

14

*/

15

public class OrcNoHiveShim implements OrcShim<VectorizedRowBatch> {

16

17

/**

18

* Create ORC record reader for specified file and split

19

* @param conf Hadoop configuration for ORC settings

20

* @param schema ORC type description for the file schema

21

* @param selectedFields Array of field indices to read (column projection)

22

* @param conjunctPredicates List of filter predicates for pushdown

23

* @param path Path to the ORC file to read

24

* @param splitStart Byte offset where split starts in file

25

* @param splitLength Number of bytes to read in this split

26

* @return ORC RecordReader configured for the specified parameters

27

* @throws IOException if reader creation fails

28

*/

29

public RecordReader createRecordReader(

30

Configuration conf,

31

TypeDescription schema,

32

int[] selectedFields,

33

List<OrcFilters.Predicate> conjunctPredicates,

34

org.apache.flink.core.fs.Path path,

35

long splitStart,

36

long splitLength

37

) throws IOException;

38

39

/**

40

* Create batch wrapper for ORC vectorized row batches

41

* @param schema ORC type description for creating appropriately sized batch

42

* @param batchSize Maximum number of rows per batch

43

* @return Batch wrapper containing initialized VectorizedRowBatch

44

*/

45

public OrcNoHiveBatchWrapper createBatchWrapper(TypeDescription schema, int batchSize);

46

47

/**

48

* Read next batch of data from ORC record reader

49

* @param reader ORC record reader to read from

50

* @param rowBatch Vectorized row batch to populate with data

51

* @return true if batch was populated with data, false if end of data

52

* @throws IOException if read operation fails

53

*/

54

public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException;

55

}

56

```

57

58

### Batch Wrapper

59

60

Wrapper class for ORC VectorizedRowBatch that provides size information and batch access.

61

62

```java { .api }

63

/**

64

* Wrapper for ORC VectorizedRowBatch providing additional functionality

65

* Implements OrcVectorizedBatchWrapper interface for batch management

66

*/

67

public class OrcNoHiveBatchWrapper implements OrcVectorizedBatchWrapper<VectorizedRowBatch> {

68

69

/**

70

* Create batch wrapper for the given VectorizedRowBatch

71

* @param batch ORC vectorized row batch to wrap

72

*/

73

public OrcNoHiveBatchWrapper(VectorizedRowBatch batch);

74

75

/**

76

* Get the wrapped ORC vectorized row batch

77

* @return VectorizedRowBatch instance

78

*/

79

public VectorizedRowBatch getBatch();

80

81

/**

82

* Get the number of rows currently in the batch

83

* @return Current row count in the batch

84

*/

85

public int size();

86

}

87

```

88

89

**Usage Examples:**

90

91

```java

92

import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;

93

import org.apache.orc.TypeDescription;

94

import org.apache.orc.RecordReader;

95

96

// Create ORC schema

97

TypeDescription schema = TypeDescription.fromString(

98

"struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>"

99

);

100

101

// Configure Hadoop settings

102

Configuration conf = new Configuration();

103

conf.set("orc.compress", "ZLIB");

104

conf.setBoolean("orc.use.zerocopy", true);

105

106

// Create shim instance

107

OrcNoHiveShim shim = new OrcNoHiveShim();

108

109

// Create record reader for entire file

110

org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path("hdfs://cluster/data/users.orc");

111

int[] selectedFields = {0, 1, 2, 3, 4}; // Read all fields

112

List<OrcFilters.Predicate> predicates = Arrays.asList(

113

OrcFilters.greaterThan("age", 18),

114

OrcFilters.isNotNull("email")

115

);

116

117

RecordReader reader = shim.createRecordReader(

118

conf,

119

schema,

120

selectedFields,

121

predicates,

122

filePath,

123

0, // Start at beginning

124

Long.MAX_VALUE // Read entire file

125

);

126

127

// Create batch wrapper

128

OrcNoHiveBatchWrapper batchWrapper = shim.createBatchWrapper(schema, 2048);

129

VectorizedRowBatch batch = batchWrapper.getBatch();

130

131

// Read data in batches

132

while (shim.nextBatch(reader, batch)) {

133

System.out.println("Read batch with " + batch.size + " rows");

134

135

// Process batch data

136

for (int i = 0; i < batch.size; i++) {

137

if (!batch.cols[0].isNull[i]) {

138

long id = ((LongColumnVector) batch.cols[0]).vector[i];

139

// Process row...

140

}

141

}

142

143

// Reset batch for next read

144

batch.reset();

145

}

146

147

reader.close();

148

```

149

150

### Record Reader Operations

151

152

The shim creates ORC RecordReader instances with advanced configuration:

153

154

```java

155

// Create reader with split-specific configuration

156

public RecordReader createAdvancedReader(

157

Configuration conf,

158

TypeDescription schema,

159

Path filePath,

160

long splitStart,

161

long splitLength) throws IOException {

162

163

OrcNoHiveShim shim = new OrcNoHiveShim();

164

165

// Configure column projection (read only columns 0, 2, 4)

166

int[] selectedFields = {0, 2, 4};

167

168

// Configure predicate pushdown

169

List<OrcFilters.Predicate> predicates = Arrays.asList(

170

OrcFilters.between("timestamp_col", startTime, endTime),

171

OrcFilters.in("status", Arrays.asList("ACTIVE", "PENDING"))

172

);

173

174

return shim.createRecordReader(

175

conf, schema, selectedFields, predicates,

176

filePath, splitStart, splitLength

177

);

178

}

179

```

180

181

### Batch Processing with Shim

182

183

```java

184

import org.apache.flink.orc.nohive.vector.OrcNoHiveBatchWrapper;

185

186

// Process ORC file with custom batch size and error handling

187

public long processOrcFile(Path filePath, TypeDescription schema) throws IOException {

188

OrcNoHiveShim shim = new OrcNoHiveShim();

189

long totalRows = 0;

190

191

try {

192

// Create reader

193

RecordReader reader = shim.createRecordReader(

194

new Configuration(), schema, null, null,

195

filePath, 0, Long.MAX_VALUE

196

);

197

198

// Create larger batch for better throughput

199

OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 4096);

200

VectorizedRowBatch batch = wrapper.getBatch();

201

202

// Process all batches

203

while (shim.nextBatch(reader, batch)) {

204

totalRows += batch.size;

205

206

// Log progress every 100K rows

207

if (totalRows % 100000 == 0) {

208

System.out.println("Processed " + totalRows + " rows");

209

}

210

211

// Process batch data here

212

processBatch(batch);

213

214

// Reset for next batch

215

batch.reset();

216

}

217

218

reader.close();

219

220

} catch (IOException e) {

221

System.err.println("Error processing ORC file: " + e.getMessage());

222

throw e;

223

}

224

225

return totalRows;

226

}

227

```

228

229

## ORC File Structure and Metadata

230

231

### Schema Handling

232

233

```java

234

import org.apache.orc.TypeDescription;

235

236

// Parse ORC schema from string

237

TypeDescription schema = TypeDescription.fromString(

238

"struct<" +

239

"user_id:bigint," +

240

"profile:struct<name:string,age:int>," +

241

"tags:array<string>," +

242

"metrics:map<string,double>" +

243

">"

244

);

245

246

// Inspect schema structure

247

System.out.println("Root type: " + schema.getCategory());

248

System.out.println("Field count: " + schema.getChildren().size());

249

250

for (int i = 0; i < schema.getChildren().size(); i++) {

251

TypeDescription field = schema.getChildren().get(i);

252

String fieldName = schema.getFieldNames().get(i);

253

System.out.println("Field " + i + ": " + fieldName + " (" + field.getCategory() + ")");

254

}

255

```

256

257

### File Split Processing

258

259

```java

260

// Process specific byte range of large ORC file

261

public void processFileSplit(Path filePath, long splitStart, long splitLength) throws IOException {

262

TypeDescription schema = getSchemaFromFile(filePath);

263

OrcNoHiveShim shim = new OrcNoHiveShim();

264

265

// Create reader for specific split

266

RecordReader reader = shim.createRecordReader(

267

new Configuration(),

268

schema,

269

null, // Read all columns

270

null, // No predicates

271

filePath,

272

splitStart, // Start byte offset

273

splitLength // Bytes to read

274

);

275

276

OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);

277

VectorizedRowBatch batch = wrapper.getBatch();

278

279

while (shim.nextBatch(reader, batch)) {

280

System.out.println("Split batch: " + batch.size + " rows");

281

// Process split data

282

}

283

284

reader.close();

285

}

286

```

287

288

## Configuration Options

289

290

### ORC Reader Configuration

291

292

```java

293

Configuration conf = new Configuration();

294

295

// Performance settings

296

conf.setBoolean("orc.use.zerocopy", true); // Enable zero-copy reads

297

conf.setInt("orc.row.batch.size", 2048); // Rows per batch

298

conf.setBoolean("orc.skip.corrupt.data", false); // Fail on corrupt data

299

conf.setBoolean("orc.tolerate.missing.schema", false); // Strict schema validation

300

301

// Compression settings

302

conf.set("orc.compress", "ZLIB"); // Compression algorithm

303

conf.setInt("orc.compress.size", 262144); // 256KB compression blocks

304

305

// Memory settings

306

conf.setLong("orc.max.file.length", 1024 * 1024 * 1024L); // 1GB max file size

307

conf.setInt("orc.buffer.size", 262144); // 256KB I/O buffer

308

309

// Create shim with configuration

310

OrcNoHiveShim shim = new OrcNoHiveShim();

311

RecordReader reader = shim.createRecordReader(conf, schema, /* other params */);

312

```

313

314

### Predicate Configuration

315

316

```java

317

import org.apache.flink.orc.OrcFilters;

318

319

// Configure complex predicates for pushdown

320

List<OrcFilters.Predicate> complexPredicates = Arrays.asList(

321

// Date range filter

322

OrcFilters.between("created_date",

323

Date.valueOf("2023-01-01"),

324

Date.valueOf("2023-12-31")),

325

326

// Numeric comparisons

327

OrcFilters.and(

328

OrcFilters.greaterThanEquals("age", 18),

329

OrcFilters.lessThan("age", 65)

330

),

331

332

// String operations

333

OrcFilters.or(

334

OrcFilters.startsWith("email", "admin@"),

335

OrcFilters.in("role", Arrays.asList("admin", "moderator"))

336

),

337

338

// Null handling

339

OrcFilters.isNotNull("last_login"),

340

341

// Complex logical combinations

342

OrcFilters.or(

343

OrcFilters.and(

344

OrcFilters.equals("status", "premium"),

345

OrcFilters.greaterThan("subscription_end", new Date())

346

),

347

OrcFilters.equals("status", "free")

348

)

349

);

350

```

351

352

## Stripe and Split Management

353

354

### Stripe-Level Processing

355

356

```java

357

import org.apache.orc.OrcFile;

358

import org.apache.orc.Reader;

359

import org.apache.orc.StripeInformation;

360

361

// Analyze file stripes for optimal split planning

362

public void analyzeOrcStripes(Path filePath) throws IOException {

363

Configuration conf = new Configuration();

364

365

// Open ORC file reader

366

Reader orcReader = OrcFile.createReader(

367

new org.apache.hadoop.fs.Path(filePath.toUri()),

368

OrcFile.readerOptions(conf)

369

);

370

371

// Examine stripe structure

372

List<StripeInformation> stripes = orcReader.getStripes();

373

System.out.println("File has " + stripes.size() + " stripes");

374

375

for (int i = 0; i < stripes.size(); i++) {

376

StripeInformation stripe = stripes.get(i);

377

System.out.println("Stripe " + i + ":");

378

System.out.println(" Offset: " + stripe.getOffset());

379

System.out.println(" Length: " + stripe.getLength());

380

System.out.println(" Rows: " + stripe.getNumberOfRows());

381

System.out.println(" Data Length: " + stripe.getDataLength());

382

}

383

384

orcReader.close();

385

}

386

```

387

388

## Error Handling and Recovery

389

390

### Robust Reading Pattern

391

392

```java

393

import java.util.concurrent.TimeUnit;

394

395

public class RobustOrcReader {

396

private static final int MAX_RETRIES = 3;

397

private static final long RETRY_DELAY_MS = 1000;

398

399

public long readOrcFileWithRetry(Path filePath, TypeDescription schema) {

400

OrcNoHiveShim shim = new OrcNoHiveShim();

401

long totalRows = 0;

402

int retryCount = 0;

403

404

while (retryCount < MAX_RETRIES) {

405

try {

406

RecordReader reader = shim.createRecordReader(

407

new Configuration(), schema, null, null,

408

filePath, 0, Long.MAX_VALUE

409

);

410

411

OrcNoHiveBatchWrapper wrapper = shim.createBatchWrapper(schema, 1024);

412

VectorizedRowBatch batch = wrapper.getBatch();

413

414

while (shim.nextBatch(reader, batch)) {

415

totalRows += batch.size;

416

batch.reset();

417

}

418

419

reader.close();

420

return totalRows; // Success

421

422

} catch (IOException e) {

423

retryCount++;

424

System.err.println("Read attempt " + retryCount + " failed: " + e.getMessage());

425

426

if (retryCount >= MAX_RETRIES) {

427

throw new RuntimeException("Failed to read ORC file after " + MAX_RETRIES + " attempts", e);

428

}

429

430

// Wait before retry

431

try {

432

TimeUnit.MILLISECONDS.sleep(RETRY_DELAY_MS * retryCount);

433

} catch (InterruptedException ie) {

434

Thread.currentThread().interrupt();

435

throw new RuntimeException("Interrupted during retry delay", ie);

436

}

437

}

438

}

439

440

return totalRows;

441

}

442

}

443

```

444

445

### Schema Validation

446

447

```java

448

// Validate schema compatibility before reading

449

public boolean validateSchema(Path filePath, TypeDescription expectedSchema) {

450

try {

451

Configuration conf = new Configuration();

452

Reader orcReader = OrcFile.createReader(

453

new org.apache.hadoop.fs.Path(filePath.toUri()),

454

OrcFile.readerOptions(conf)

455

);

456

457

TypeDescription fileSchema = orcReader.getSchema();

458

459

// Compare schemas

460

if (!isSchemaCompatible(fileSchema, expectedSchema)) {

461

System.err.println("Schema mismatch:");

462

System.err.println("Expected: " + expectedSchema);

463

System.err.println("Found: " + fileSchema);

464

return false;

465

}

466

467

orcReader.close();

468

return true;

469

470

} catch (IOException e) {

471

System.err.println("Failed to read schema from file: " + e.getMessage());

472

return false;

473

}

474

}

475

476

private boolean isSchemaCompatible(TypeDescription fileSchema, TypeDescription expectedSchema) {

477

// Implement schema compatibility logic

478

return fileSchema.toString().equals(expectedSchema.toString());

479

}

480

```

481

482

## Performance Optimization

483

484

### Batch Size Tuning

485

486

```java

487

// Optimize batch size based on data characteristics

488

public int calculateOptimalBatchSize(TypeDescription schema, long availableMemory) {

489

// Estimate bytes per row based on schema

490

long estimatedBytesPerRow = estimateRowSize(schema);

491

492

// Target 10% of available memory for batch

493

long targetBatchMemory = availableMemory / 10;

494

495

// Calculate optimal batch size

496

int optimalBatchSize = (int) (targetBatchMemory / estimatedBytesPerRow);

497

498

// Clamp to reasonable bounds

499

return Math.max(512, Math.min(optimalBatchSize, 8192));

500

}

501

502

private long estimateRowSize(TypeDescription schema) {

503

// Simplified row size estimation

504

long size = 0;

505

for (TypeDescription child : schema.getChildren()) {

506

switch (child.getCategory()) {

507

case BOOLEAN:

508

case BYTE:

509

size += 1;

510

break;

511

case SHORT:

512

size += 2;

513

break;

514

case INT:

515

case FLOAT:

516

size += 4;

517

break;

518

case LONG:

519

case DOUBLE:

520

case DATE:

521

case TIMESTAMP:

522

size += 8;

523

break;

524

case STRING:

525

case VARCHAR:

526

case CHAR:

527

size += 50; // Average string length estimate

528

break;

529

case DECIMAL:

530

size += 16; // Decimal storage estimate

531

break;

532

default:

533

size += 32; // Complex type estimate

534

}

535

}

536

return size;

537

}

538

```