or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

utilities.mddocs/

0

# Utilities and Schema Conversion

1

2

Utility classes for schema conversion, configuration management, and statistics reporting, enabling seamless integration between Flink and Parquet type systems.

3

4

## Capabilities

5

6

### ParquetSchemaConverter

7

8

Utility class for converting between Flink's type system and Parquet's schema representation with support for all data types and configurations.

9

10

```java { .api }

11

/**

12

* Converts between Flink and Parquet schemas

13

*/

14

public class ParquetSchemaConverter {

15

16

// Schema naming constants

17

static final String MAP_REPEATED_NAME = "key_value";

18

static final String LIST_ELEMENT_NAME = "element";

19

20

/**

21

* Converts Flink RowType to Parquet MessageType schema

22

* @param name Schema name for the MessageType

23

* @param rowType Flink RowType to convert

24

* @param conf Hadoop configuration for conversion settings

25

* @return MessageType representing the Parquet schema

26

*/

27

public static MessageType convertToParquetMessageType(

28

String name,

29

RowType rowType,

30

Configuration conf

31

);

32

33

/**

34

* Converts individual Flink LogicalType to Parquet Type

35

* @param name Field name for the Type

36

* @param logicalType Flink LogicalType to convert

37

* @param conf Hadoop configuration for conversion settings

38

* @return Parquet Type representation

39

*/

40

public static Type convertToParquetType(

41

String name,

42

LogicalType logicalType,

43

Configuration conf

44

);

45

46

/**

47

* Computes minimum bytes required for decimal precision

48

* @param precision Decimal precision (number of digits)

49

* @return Minimum bytes needed to store the precision

50

*/

51

public static int computeMinBytesForDecimalPrecision(int precision);

52

53

/**

54

* Checks if decimal precision fits in 32 bits (4 bytes)

55

* @param precision Decimal precision to check

56

* @return true if precision fits in 32 bits

57

*/

58

public static boolean is32BitDecimal(int precision);

59

60

/**

61

* Checks if decimal precision fits in 64 bits (8 bytes)

62

* @param precision Decimal precision to check

63

* @return true if precision fits in 64 bits

64

*/

65

public static boolean is64BitDecimal(int precision);

66

}

67

```

68

69

### SerializableConfiguration

70

71

Serializable wrapper for Hadoop Configuration objects, enabling configuration to be passed through Flink's serialization system.

72

73

```java { .api }

74

/**

75

* Serializable wrapper for Hadoop Configuration

76

*/

77

public class SerializableConfiguration implements Serializable {

78

79

/**

80

* Creates a new SerializableConfiguration wrapping the provided Configuration

81

* @param configuration Hadoop Configuration to wrap

82

*/

83

public SerializableConfiguration(Configuration configuration);

84

85

/**

86

* Returns the wrapped Hadoop Configuration

87

* @return Hadoop Configuration instance

88

*/

89

public Configuration conf();

90

}

91

```

92

93

### ParquetFormatStatisticsReportUtil

94

95

Utility class for extracting and reporting statistics from Parquet file metadata for query optimization.

96

97

```java { .api }

98

/**

99

* Utilities for extracting and reporting Parquet file statistics

100

*/

101

public class ParquetFormatStatisticsReportUtil {

102

103

/**

104

* Extracts table statistics from Parquet file metadata

105

* @param files List of Parquet files to analyze

106

* @param producedDataType Expected output data type

107

* @param conf Hadoop configuration

108

* @param utcTimestamp Whether timestamps use UTC timezone

109

* @return TableStats containing row counts and column statistics

110

*/

111

public static TableStats getTableStatistics(

112

List<Path> files,

113

DataType producedDataType,

114

Configuration conf,

115

boolean utcTimestamp

116

);

117

118

/**

119

* Additional utility methods for statistics extraction

120

*/

121

// ... other static methods for detailed statistics processing

122

}

123

```

124

125

### ParquetInputFile

126

127

InputFile implementation that bridges Flink's file system abstraction with Parquet's input requirements.

128

129

```java { .api }

130

/**

131

* InputFile implementation for Parquet using Flink file system abstraction

132

*/

133

public class ParquetInputFile implements InputFile {

134

135

/**

136

* Creates a new ParquetInputFile

137

* @param inputStream FSDataInputStream to read from

138

* @param length Total length of the file

139

*/

140

public ParquetInputFile(FSDataInputStream inputStream, long length);

141

142

/**

143

* Returns the length of the file

144

* @return File length in bytes

145

*/

146

public long getLength();

147

148

/**

149

* Creates a new SeekableInputStream for reading

150

* @return SeekableInputStream for reading file data

151

* @throws IOException if stream creation fails

152

*/

153

public SeekableInputStream newStream() throws IOException;

154

}

155

```

156

157

### NestedPositionUtil

158

159

Utility class for handling nested data positions in vectorized reading operations.

160

161

```java { .api }

162

/**

163

* Utilities for handling nested data positions in vectorized reading

164

*/

165

public class NestedPositionUtil {

166

167

/**

168

* Calculates positions for nested array elements

169

* @param definitionLevels Definition levels for null handling

170

* @param repetitionLevels Repetition levels for nested structures

171

* @param maxDefinitionLevel Maximum definition level

172

* @param maxRepetitionLevel Maximum repetition level

173

* @return Position information for nested elements

174

*/

175

public static PositionInfo calculateNestedPositions(

176

int[] definitionLevels,

177

int[] repetitionLevels,

178

int maxDefinitionLevel,

179

int maxRepetitionLevel

180

);

181

182

/**

183

* Additional utility methods for nested position calculations

184

*/

185

// ... other static methods for position handling

186

}

187

```

188

189

### ParquetFormatStatisticsReportUtil

190

191

Utility class for collecting and reporting table statistics from Parquet files for query optimization.

192

193

```java { .api }

194

/**

195

* Utility for collecting statistics from Parquet files

196

*/

197

public class ParquetFormatStatisticsReportUtil {

198

199

/**

200

* Generates table statistics from list of Parquet files

201

* @param files List of file paths to analyze

202

* @param producedDataType Data type for the produced table

203

* @param hadoopConf Hadoop configuration

204

* @param isUtcTimestamp Whether to use UTC timezone for timestamps

205

* @return TableStats containing collected statistics

206

* @throws IOException if statistics collection fails

207

*/

208

public static TableStats getTableStatistics(

209

List<Path> files,

210

DataType producedDataType,

211

Configuration hadoopConf,

212

boolean isUtcTimestamp

213

) throws IOException;

214

}

215

```

216

217

## Position Tracking Classes

218

219

### RowPosition

220

221

```java { .api }

222

/**

223

* Position tracking for row data in vectorized reading

224

*/

225

public class RowPosition {

226

227

/**

228

* Creates a new RowPosition

229

* @param currentPosition Current position in the row

230

*/

231

public RowPosition(int currentPosition);

232

233

/**

234

* Updates the current position

235

* @param newPosition New position value

236

*/

237

public void updatePosition(int newPosition);

238

239

/**

240

* Gets the current position

241

* @return Current position value

242

*/

243

public int getCurrentPosition();

244

}

245

```

246

247

### CollectionPosition

248

249

```java { .api }

250

/**

251

* Position tracking for collection data (arrays, maps) in vectorized reading

252

*/

253

public class CollectionPosition {

254

255

/**

256

* Creates a new CollectionPosition

257

* @param startPosition Start position of the collection

258

* @param length Length of the collection

259

*/

260

public CollectionPosition(int startPosition, int length);

261

262

/**

263

* Gets the start position of the collection

264

* @return Start position

265

*/

266

public int getStartPosition();

267

268

/**

269

* Gets the length of the collection

270

* @return Collection length

271

*/

272

public int getLength();

273

274

/**

275

* Checks if the collection is empty

276

* @return true if collection is empty

277

*/

278

public boolean isEmpty();

279

}

280

```

281

282

## Type Field Definitions

283

284

### ParquetField

285

286

```java { .api }

287

/**

288

* Base class for Parquet field representations

289

*/

290

public abstract class ParquetField {

291

292

/**

293

* Gets the field name

294

* @return Field name

295

*/

296

public abstract String getName();

297

298

/**

299

* Gets the field type

300

* @return Parquet Type for this field

301

*/

302

public abstract Type getType();

303

304

/**

305

* Checks if this field is repeated (array)

306

* @return true if field is repeated

307

*/

308

public abstract boolean isRepeated();

309

}

310

311

/**

312

* Primitive field implementation

313

*/

314

public class ParquetPrimitiveField extends ParquetField {

315

316

/**

317

* Creates a new ParquetPrimitiveField

318

* @param name Field name

319

* @param primitiveType Parquet primitive type

320

* @param repetition Repetition type (required, optional, repeated)

321

*/

322

public ParquetPrimitiveField(String name, PrimitiveType primitiveType, Type.Repetition repetition);

323

324

/**

325

* Gets the primitive type

326

* @return PrimitiveType for this field

327

*/

328

public PrimitiveType getPrimitiveType();

329

}

330

331

/**

332

* Group field implementation for nested structures

333

*/

334

public class ParquetGroupField extends ParquetField {

335

336

/**

337

* Creates a new ParquetGroupField

338

* @param name Field name

339

* @param groupType Parquet group type

340

* @param children Child fields in the group

341

*/

342

public ParquetGroupField(String name, GroupType groupType, List<ParquetField> children);

343

344

/**

345

* Gets the child fields

346

* @return List of child ParquetField instances

347

*/

348

public List<ParquetField> getChildren();

349

350

/**

351

* Gets a child field by name

352

* @param name Child field name

353

* @return ParquetField instance or null if not found

354

*/

355

public ParquetField getChild(String name);

356

}

357

```

358

359

## Usage Examples

360

361

### Schema Conversion

362

363

```java

364

import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;

365

import org.apache.flink.table.types.logical.RowType;

366

import org.apache.parquet.schema.MessageType;

367

368

// Define Flink schema

369

RowType flinkSchema = RowType.of(

370

new LogicalType[] {

371

DataTypes.BIGINT().getLogicalType(),

372

DataTypes.STRING().getLogicalType(),

373

DataTypes.DECIMAL(10, 2).getLogicalType(),

374

DataTypes.TIMESTAMP(3).getLogicalType(),

375

ArrayType.newBuilder()

376

.elementType(DataTypes.STRING().getLogicalType())

377

.build()

378

},

379

new String[] {"id", "name", "price", "created_at", "tags"}

380

);

381

382

// Convert to Parquet schema

383

Configuration conf = new Configuration();

384

MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(

385

"MyRecord",

386

flinkSchema,

387

conf

388

);

389

390

System.out.println(parquetSchema);

391

// Output: Parquet schema with proper type mappings

392

```

393

394

### Configuration Management

395

396

```java

397

import org.apache.flink.formats.parquet.utils.SerializableConfiguration;

398

399

// Create Hadoop configuration with Parquet settings

400

Configuration hadoopConf = new Configuration();

401

hadoopConf.set("parquet.compression", "SNAPPY");

402

hadoopConf.set("parquet.page.size", "1048576");

403

hadoopConf.set("parquet.block.size", "134217728");

404

hadoopConf.setBoolean("parquet.enable.dictionary", true);

405

406

// Wrap for serialization in Flink jobs

407

SerializableConfiguration serializableConf = new SerializableConfiguration(hadoopConf);

408

409

// Use in distributed operations

410

DataStream<MyData> processedStream = dataStream

411

.map(new RichMapFunction<MyData, ProcessedData>() {

412

private transient Configuration conf;

413

414

@Override

415

public void open(Configuration parameters) {

416

// Access configuration in distributed context

417

this.conf = serializableConf.conf();

418

}

419

420

@Override

421

public ProcessedData map(MyData value) throws Exception {

422

// Use configuration for processing

423

String compression = conf.get("parquet.compression");

424

return processWithCompression(value, compression);

425

}

426

});

427

```

428

429

### Statistics Extraction

430

431

```java

432

import org.apache.flink.formats.parquet.utils.ParquetFormatStatisticsReportUtil;

433

import org.apache.flink.table.plan.stats.TableStats;

434

435

// Extract statistics from Parquet files

436

List<Path> parquetFiles = Arrays.asList(

437

new Path("/data/part-00000.parquet"),

438

new Path("/data/part-00001.parquet"),

439

new Path("/data/part-00002.parquet")

440

);

441

442

DataType outputType = DataTypes.ROW(

443

DataTypes.FIELD("id", DataTypes.BIGINT()),

444

DataTypes.FIELD("name", DataTypes.STRING()),

445

DataTypes.FIELD("amount", DataTypes.DECIMAL(10, 2))

446

);

447

448

TableStats stats = ParquetFormatStatisticsReportUtil.getTableStatistics(

449

parquetFiles,

450

outputType,

451

hadoopConf,

452

true // UTC timestamps

453

);

454

455

System.out.println("Row count: " + stats.getRowCount());

456

System.out.println("Column stats: " + stats.getColumnStats());

457

```

458

459

### Custom Input File

460

461

```java

462

import org.apache.flink.formats.parquet.ParquetInputFile;

463

import org.apache.flink.core.fs.FileSystem;

464

import org.apache.flink.core.fs.Path;

465

466

// Create custom input file for Parquet library

467

Path filePath = new Path("hdfs://cluster/data/file.parquet");

468

FileSystem fileSystem = filePath.getFileSystem();

469

470

try (FSDataInputStream inputStream = fileSystem.open(filePath)) {

471

long fileLength = fileSystem.getFileStatus(filePath).getLen();

472

473

// Create ParquetInputFile for use with Parquet library

474

ParquetInputFile inputFile = new ParquetInputFile(inputStream, fileLength);

475

476

// Use with Parquet readers

477

ParquetFileReader reader = ParquetFileReader.open(inputFile);

478

ParquetMetadata metadata = reader.getFooter();

479

480

System.out.println("Schema: " + metadata.getFileMetaData().getSchema());

481

System.out.println("Row groups: " + metadata.getBlocks().size());

482

}

483

```

484

485

### Decimal Precision Handling

486

487

```java

488

import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;

489

490

// Check decimal precision requirements

491

int[] precisions = {5, 10, 15, 20, 25, 30, 35};

492

493

for (int precision : precisions) {

494

int minBytes = ParquetSchemaConverter.computeMinBytesForDecimalPrecision(precision);

495

boolean is32Bit = ParquetSchemaConverter.is32BitDecimal(precision);

496

boolean is64Bit = ParquetSchemaConverter.is64BitDecimal(precision);

497

498

System.out.printf("Precision %d: %d bytes, 32-bit: %b, 64-bit: %b%n",

499

precision, minBytes, is32Bit, is64Bit);

500

}

501

502

// Output:

503

// Precision 5: 3 bytes, 32-bit: true, 64-bit: true

504

// Precision 10: 5 bytes, 32-bit: false, 64-bit: true

505

// Precision 15: 7 bytes, 32-bit: false, 64-bit: true

506

// Precision 20: 9 bytes, 32-bit: false, 64-bit: false

507

```

508

509

### Nested Position Calculation

510

511

```java

512

import org.apache.flink.formats.parquet.utils.NestedPositionUtil;

513

514

// Handle nested array positions

515

int[] definitionLevels = {3, 3, 2, 3, 3, 3, 1, 3, 3};

516

int[] repetitionLevels = {0, 1, 0, 0, 1, 1, 0, 0, 1};

517

518

PositionInfo positions = NestedPositionUtil.calculateNestedPositions(

519

definitionLevels,

520

repetitionLevels,

521

3, // max definition level

522

1 // max repetition level

523

);

524

525

// Use positions for vectorized nested data processing

526

processNestedData(positions);

527

```

528

529

## Integration Examples

530

531

### Custom Format with Utilities

532

533

```java

534

import org.apache.flink.formats.parquet.utils.*;

535

536

public class CustomParquetFormat implements BulkFormat<CustomRecord, FileSourceSplit> {

537

538

private final SerializableConfiguration hadoopConf;

539

private final RowType schema;

540

541

public CustomParquetFormat(Configuration conf, RowType schema) {

542

this.hadoopConf = new SerializableConfiguration(conf);

543

this.schema = schema;

544

}

545

546

@Override

547

public Reader<CustomRecord> createReader(Configuration config, FileSourceSplit split) {

548

// Convert schema

549

MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(

550

"CustomRecord",

551

schema,

552

hadoopConf.conf()

553

);

554

555

// Create input file

556

ParquetInputFile inputFile = new ParquetInputFile(

557

openInputStream(split.path()),

558

split.length()

559

);

560

561

return new CustomParquetReader(inputFile, parquetSchema);

562

}

563

564

@Override

565

public Reader<CustomRecord> restoreReader(Configuration config, FileSourceSplit split) {

566

return createReader(config, split);

567

}

568

569

@Override

570

public boolean isSplittable() {

571

return true;

572

}

573

}

574

```

575

576

The utilities package provides essential infrastructure for seamless integration between Flink's type system and Parquet's columnar format, handling the complexity of schema conversion and configuration management.