0
# Utilities and Schema Conversion
1
2
Utility classes for schema conversion, configuration management, and statistics reporting, enabling seamless integration between Flink and Parquet type systems.
3
4
## Capabilities
5
6
### ParquetSchemaConverter
7
8
Utility class for converting between Flink's type system and Parquet's schema representation with support for all data types and configurations.
9
10
```java { .api }
11
/**
12
* Converts between Flink and Parquet schemas
13
*/
14
public class ParquetSchemaConverter {
15
16
// Schema naming constants
17
static final String MAP_REPEATED_NAME = "key_value";
18
static final String LIST_ELEMENT_NAME = "element";
19
20
/**
21
* Converts Flink RowType to Parquet MessageType schema
22
* @param name Schema name for the MessageType
23
* @param rowType Flink RowType to convert
24
* @param conf Hadoop configuration for conversion settings
25
* @return MessageType representing the Parquet schema
26
*/
27
public static MessageType convertToParquetMessageType(
28
String name,
29
RowType rowType,
30
Configuration conf
31
);
32
33
/**
34
* Converts individual Flink LogicalType to Parquet Type
35
* @param name Field name for the Type
36
* @param logicalType Flink LogicalType to convert
37
* @param conf Hadoop configuration for conversion settings
38
* @return Parquet Type representation
39
*/
40
public static Type convertToParquetType(
41
String name,
42
LogicalType logicalType,
43
Configuration conf
44
);
45
46
/**
47
* Computes minimum bytes required for decimal precision
48
* @param precision Decimal precision (number of digits)
49
* @return Minimum bytes needed to store the precision
50
*/
51
public static int computeMinBytesForDecimalPrecision(int precision);
52
53
/**
54
* Checks if decimal precision fits in 32 bits (4 bytes)
55
* @param precision Decimal precision to check
56
* @return true if precision fits in 32 bits
57
*/
58
public static boolean is32BitDecimal(int precision);
59
60
/**
61
* Checks if decimal precision fits in 64 bits (8 bytes)
62
* @param precision Decimal precision to check
63
* @return true if precision fits in 64 bits
64
*/
65
public static boolean is64BitDecimal(int precision);
66
}
67
```
68
69
### SerializableConfiguration
70
71
Serializable wrapper for Hadoop Configuration objects, enabling configuration to be passed through Flink's serialization system.
72
73
```java { .api }
74
/**
75
* Serializable wrapper for Hadoop Configuration
76
*/
77
public class SerializableConfiguration implements Serializable {
78
79
/**
80
* Creates a new SerializableConfiguration wrapping the provided Configuration
81
* @param configuration Hadoop Configuration to wrap
82
*/
83
public SerializableConfiguration(Configuration configuration);
84
85
/**
86
* Returns the wrapped Hadoop Configuration
87
* @return Hadoop Configuration instance
88
*/
89
public Configuration conf();
90
}
91
```
92
93
### ParquetFormatStatisticsReportUtil
94
95
Utility class for extracting and reporting statistics from Parquet file metadata for query optimization.
96
97
```java { .api }
98
/**
99
* Utilities for extracting and reporting Parquet file statistics
100
*/
101
public class ParquetFormatStatisticsReportUtil {
102
103
/**
104
* Extracts table statistics from Parquet file metadata
105
* @param files List of Parquet files to analyze
106
* @param producedDataType Expected output data type
107
* @param conf Hadoop configuration
108
* @param utcTimestamp Whether timestamps use UTC timezone
109
* @return TableStats containing row counts and column statistics
110
*/
111
public static TableStats getTableStatistics(
112
List<Path> files,
113
DataType producedDataType,
114
Configuration conf,
115
boolean utcTimestamp
116
);
117
118
/**
119
* Additional utility methods for statistics extraction
120
*/
121
// ... other static methods for detailed statistics processing
122
}
123
```
124
125
### ParquetInputFile
126
127
InputFile implementation that bridges Flink's file system abstraction with Parquet's input requirements.
128
129
```java { .api }
130
/**
131
* InputFile implementation for Parquet using Flink file system abstraction
132
*/
133
public class ParquetInputFile implements InputFile {
134
135
/**
136
* Creates a new ParquetInputFile
137
* @param inputStream FSDataInputStream to read from
138
* @param length Total length of the file
139
*/
140
public ParquetInputFile(FSDataInputStream inputStream, long length);
141
142
/**
143
* Returns the length of the file
144
* @return File length in bytes
145
*/
146
public long getLength();
147
148
/**
149
* Creates a new SeekableInputStream for reading
150
* @return SeekableInputStream for reading file data
151
* @throws IOException if stream creation fails
152
*/
153
public SeekableInputStream newStream() throws IOException;
154
}
155
```
156
157
### NestedPositionUtil
158
159
Utility class for handling nested data positions in vectorized reading operations.
160
161
```java { .api }
162
/**
163
* Utilities for handling nested data positions in vectorized reading
164
*/
165
public class NestedPositionUtil {
166
167
/**
168
* Calculates positions for nested array elements
169
* @param definitionLevels Definition levels for null handling
170
* @param repetitionLevels Repetition levels for nested structures
171
* @param maxDefinitionLevel Maximum definition level
172
* @param maxRepetitionLevel Maximum repetition level
173
* @return Position information for nested elements
174
*/
175
public static PositionInfo calculateNestedPositions(
176
int[] definitionLevels,
177
int[] repetitionLevels,
178
int maxDefinitionLevel,
179
int maxRepetitionLevel
180
);
181
182
/**
183
* Additional utility methods for nested position calculations
184
*/
185
// ... other static methods for position handling
186
}
187
```
188
189
### ParquetFormatStatisticsReportUtil
190
191
Utility class for collecting and reporting table statistics from Parquet files for query optimization.
192
193
```java { .api }
194
/**
195
* Utility for collecting statistics from Parquet files
196
*/
197
public class ParquetFormatStatisticsReportUtil {
198
199
/**
200
* Generates table statistics from list of Parquet files
201
* @param files List of file paths to analyze
202
* @param producedDataType Data type for the produced table
203
* @param hadoopConf Hadoop configuration
204
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
205
* @return TableStats containing collected statistics
206
* @throws IOException if statistics collection fails
207
*/
208
public static TableStats getTableStatistics(
209
List<Path> files,
210
DataType producedDataType,
211
Configuration hadoopConf,
212
boolean isUtcTimestamp
213
) throws IOException;
214
}
215
```
216
217
## Position Tracking Classes
218
219
### RowPosition
220
221
```java { .api }
222
/**
223
* Position tracking for row data in vectorized reading
224
*/
225
public class RowPosition {
226
227
/**
228
* Creates a new RowPosition
229
* @param currentPosition Current position in the row
230
*/
231
public RowPosition(int currentPosition);
232
233
/**
234
* Updates the current position
235
* @param newPosition New position value
236
*/
237
public void updatePosition(int newPosition);
238
239
/**
240
* Gets the current position
241
* @return Current position value
242
*/
243
public int getCurrentPosition();
244
}
245
```
246
247
### CollectionPosition
248
249
```java { .api }
250
/**
251
* Position tracking for collection data (arrays, maps) in vectorized reading
252
*/
253
public class CollectionPosition {
254
255
/**
256
* Creates a new CollectionPosition
257
* @param startPosition Start position of the collection
258
* @param length Length of the collection
259
*/
260
public CollectionPosition(int startPosition, int length);
261
262
/**
263
* Gets the start position of the collection
264
* @return Start position
265
*/
266
public int getStartPosition();
267
268
/**
269
* Gets the length of the collection
270
* @return Collection length
271
*/
272
public int getLength();
273
274
/**
275
* Checks if the collection is empty
276
* @return true if collection is empty
277
*/
278
public boolean isEmpty();
279
}
280
```
281
282
## Type Field Definitions
283
284
### ParquetField
285
286
```java { .api }
287
/**
288
* Base class for Parquet field representations
289
*/
290
public abstract class ParquetField {
291
292
/**
293
* Gets the field name
294
* @return Field name
295
*/
296
public abstract String getName();
297
298
/**
299
* Gets the field type
300
* @return Parquet Type for this field
301
*/
302
public abstract Type getType();
303
304
/**
305
* Checks if this field is repeated (array)
306
* @return true if field is repeated
307
*/
308
public abstract boolean isRepeated();
309
}
310
311
/**
312
* Primitive field implementation
313
*/
314
public class ParquetPrimitiveField extends ParquetField {
315
316
/**
317
* Creates a new ParquetPrimitiveField
318
* @param name Field name
319
* @param primitiveType Parquet primitive type
320
* @param repetition Repetition type (required, optional, repeated)
321
*/
322
public ParquetPrimitiveField(String name, PrimitiveType primitiveType, Type.Repetition repetition);
323
324
/**
325
* Gets the primitive type
326
* @return PrimitiveType for this field
327
*/
328
public PrimitiveType getPrimitiveType();
329
}
330
331
/**
332
* Group field implementation for nested structures
333
*/
334
public class ParquetGroupField extends ParquetField {
335
336
/**
337
* Creates a new ParquetGroupField
338
* @param name Field name
339
* @param groupType Parquet group type
340
* @param children Child fields in the group
341
*/
342
public ParquetGroupField(String name, GroupType groupType, List<ParquetField> children);
343
344
/**
345
* Gets the child fields
346
* @return List of child ParquetField instances
347
*/
348
public List<ParquetField> getChildren();
349
350
/**
351
* Gets a child field by name
352
* @param name Child field name
353
* @return ParquetField instance or null if not found
354
*/
355
public ParquetField getChild(String name);
356
}
357
```
358
359
## Usage Examples
360
361
### Schema Conversion
362
363
```java
364
import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
365
import org.apache.flink.table.types.logical.RowType;
366
import org.apache.parquet.schema.MessageType;
367
368
// Define Flink schema
369
RowType flinkSchema = RowType.of(
370
new LogicalType[] {
371
DataTypes.BIGINT().getLogicalType(),
372
DataTypes.STRING().getLogicalType(),
373
DataTypes.DECIMAL(10, 2).getLogicalType(),
374
DataTypes.TIMESTAMP(3).getLogicalType(),
375
ArrayType.newBuilder()
376
.elementType(DataTypes.STRING().getLogicalType())
377
.build()
378
},
379
new String[] {"id", "name", "price", "created_at", "tags"}
380
);
381
382
// Convert to Parquet schema
383
Configuration conf = new Configuration();
384
MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
385
"MyRecord",
386
flinkSchema,
387
conf
388
);
389
390
System.out.println(parquetSchema);
391
// Output: Parquet schema with proper type mappings
392
```
393
394
### Configuration Management
395
396
```java
397
import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
398
399
// Create Hadoop configuration with Parquet settings
400
Configuration hadoopConf = new Configuration();
401
hadoopConf.set("parquet.compression", "SNAPPY");
402
hadoopConf.set("parquet.page.size", "1048576");
403
hadoopConf.set("parquet.block.size", "134217728");
404
hadoopConf.setBoolean("parquet.enable.dictionary", true);
405
406
// Wrap for serialization in Flink jobs
407
SerializableConfiguration serializableConf = new SerializableConfiguration(hadoopConf);
408
409
// Use in distributed operations
410
DataStream<MyData> processedStream = dataStream
411
.map(new RichMapFunction<MyData, ProcessedData>() {
412
private transient Configuration conf;
413
414
@Override
415
public void open(Configuration parameters) {
416
// Access configuration in distributed context
417
this.conf = serializableConf.conf();
418
}
419
420
@Override
421
public ProcessedData map(MyData value) throws Exception {
422
// Use configuration for processing
423
String compression = conf.get("parquet.compression");
424
return processWithCompression(value, compression);
425
}
426
});
427
```
428
429
### Statistics Extraction
430
431
```java
432
import org.apache.flink.formats.parquet.utils.ParquetFormatStatisticsReportUtil;
433
import org.apache.flink.table.plan.stats.TableStats;
434
435
// Extract statistics from Parquet files
436
List<Path> parquetFiles = Arrays.asList(
437
new Path("/data/part-00000.parquet"),
438
new Path("/data/part-00001.parquet"),
439
new Path("/data/part-00002.parquet")
440
);
441
442
DataType outputType = DataTypes.ROW(
443
DataTypes.FIELD("id", DataTypes.BIGINT()),
444
DataTypes.FIELD("name", DataTypes.STRING()),
445
DataTypes.FIELD("amount", DataTypes.DECIMAL(10, 2))
446
);
447
448
TableStats stats = ParquetFormatStatisticsReportUtil.getTableStatistics(
449
parquetFiles,
450
outputType,
451
hadoopConf,
452
true // UTC timestamps
453
);
454
455
System.out.println("Row count: " + stats.getRowCount());
456
System.out.println("Column stats: " + stats.getColumnStats());
457
```
458
459
### Custom Input File
460
461
```java
462
import org.apache.flink.formats.parquet.ParquetInputFile;
463
import org.apache.flink.core.fs.FileSystem;
464
import org.apache.flink.core.fs.Path;
465
466
// Create custom input file for Parquet library
467
Path filePath = new Path("hdfs://cluster/data/file.parquet");
468
FileSystem fileSystem = filePath.getFileSystem();
469
470
try (FSDataInputStream inputStream = fileSystem.open(filePath)) {
471
long fileLength = fileSystem.getFileStatus(filePath).getLen();
472
473
// Create ParquetInputFile for use with Parquet library
474
ParquetInputFile inputFile = new ParquetInputFile(inputStream, fileLength);
475
476
// Use with Parquet readers
477
ParquetFileReader reader = ParquetFileReader.open(inputFile);
478
ParquetMetadata metadata = reader.getFooter();
479
480
System.out.println("Schema: " + metadata.getFileMetaData().getSchema());
481
System.out.println("Row groups: " + metadata.getBlocks().size());
482
}
483
```
484
485
### Decimal Precision Handling
486
487
```java
488
import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
489
490
// Check decimal precision requirements
491
int[] precisions = {5, 10, 15, 20, 25, 30, 35};
492
493
for (int precision : precisions) {
494
int minBytes = ParquetSchemaConverter.computeMinBytesForDecimalPrecision(precision);
495
boolean is32Bit = ParquetSchemaConverter.is32BitDecimal(precision);
496
boolean is64Bit = ParquetSchemaConverter.is64BitDecimal(precision);
497
498
System.out.printf("Precision %d: %d bytes, 32-bit: %b, 64-bit: %b%n",
499
precision, minBytes, is32Bit, is64Bit);
500
}
501
502
// Output:
503
// Precision 5: 3 bytes, 32-bit: true, 64-bit: true
504
// Precision 10: 5 bytes, 32-bit: false, 64-bit: true
505
// Precision 15: 7 bytes, 32-bit: false, 64-bit: true
506
// Precision 20: 9 bytes, 32-bit: false, 64-bit: false
507
```
508
509
### Nested Position Calculation
510
511
```java
512
import org.apache.flink.formats.parquet.utils.NestedPositionUtil;
513
514
// Handle nested array positions
515
int[] definitionLevels = {3, 3, 2, 3, 3, 3, 1, 3, 3};
516
int[] repetitionLevels = {0, 1, 0, 0, 1, 1, 0, 0, 1};
517
518
PositionInfo positions = NestedPositionUtil.calculateNestedPositions(
519
definitionLevels,
520
repetitionLevels,
521
3, // max definition level
522
1 // max repetition level
523
);
524
525
// Use positions for vectorized nested data processing
526
processNestedData(positions);
527
```
528
529
## Integration Examples
530
531
### Custom Format with Utilities
532
533
```java
534
import org.apache.flink.formats.parquet.utils.*;
535
536
public class CustomParquetFormat implements BulkFormat<CustomRecord, FileSourceSplit> {
537
538
private final SerializableConfiguration hadoopConf;
539
private final RowType schema;
540
541
public CustomParquetFormat(Configuration conf, RowType schema) {
542
this.hadoopConf = new SerializableConfiguration(conf);
543
this.schema = schema;
544
}
545
546
@Override
547
public Reader<CustomRecord> createReader(Configuration config, FileSourceSplit split) {
548
// Convert schema
549
MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
550
"CustomRecord",
551
schema,
552
hadoopConf.conf()
553
);
554
555
// Create input file
556
ParquetInputFile inputFile = new ParquetInputFile(
557
openInputStream(split.path()),
558
split.length()
559
);
560
561
return new CustomParquetReader(inputFile, parquetSchema);
562
}
563
564
@Override
565
public Reader<CustomRecord> restoreReader(Configuration config, FileSourceSplit split) {
566
return createReader(config, split);
567
}
568
569
@Override
570
public boolean isSplittable() {
571
return true;
572
}
573
}
574
```
575
576
The utilities package provides essential infrastructure for seamless integration between Flink's type system and Parquet's columnar format, handling the complexity of schema conversion and configuration management.