Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications
—
High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.
Abstract base class for vectorized Parquet file reading with pluggable batch creation strategies.
/**
* Base class for vectorized Parquet file reading
* @param <T> Type of records produced
* @param <SplitT> Type of file split
*/
public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {
/**
* Creates a reader for the given configuration and split
* @param config Hadoop configuration for Parquet settings
* @param split File split to read
* @return RecordReaderIterator for reading records
* @throws IOException if reader creation fails
*/
public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;
/**
* Restores a reader from checkpoint state
* @param config Hadoop configuration
* @param split File split to read
* @return RecordReaderIterator for reading records
* @throws IOException if reader restoration fails
*/
public RecordReaderIterator<T> restoreReader(Configuration config, SplitT split) throws IOException;
/**
* Indicates whether this format supports file splitting
* @return true - vectorized reading supports splitting
*/
public boolean isSplittable();
/**
* Creates reader batch implementation for vectorized processing (abstract method)
* @param writableVectors Array of writable column vectors
* @param columnarBatch Vectorized column batch for processing
* @param recycler Pool recycler for batch reuse
* @return ParquetReaderBatch implementation for the specific type
*/
protected abstract ParquetReaderBatch<T> createReaderBatch(
WritableColumnVector[] writableVectors,
VectorizedColumnBatch columnarBatch,
Pool.Recycler<ParquetReaderBatch<T>> recycler
);
}Functional interface for creating vectorized column batches from file splits and column vectors.
/**
* Factory for creating vectorized column batches
* @param <SplitT> Type of file split
*/
@FunctionalInterface
public interface ColumnBatchFactory<SplitT> {
/**
* Creates a VectorizedColumnBatch from split and column vectors
* @param split File split containing metadata
* @param vectors Array of column vectors containing data
* @return VectorizedColumnBatch for processing
*/
VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);
/**
* Creates a default factory that doesn't add extra fields
* @return ColumnBatchFactory without partition field injection
*/
static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();
}Specialized split reader for columnar RowData reading with vectorized processing and partition support.
/**
* Split reader for columnar RowData reading with vectorization
*/
public class ParquetColumnarRowSplitReader implements RecordReader<RowData> {
/**
* Creates a new ParquetColumnarRowSplitReader
* @param utcTimestamp Whether to use UTC timezone for timestamps
* @param caseSensitive Whether field names are case sensitive
* @param conf Hadoop configuration
* @param fieldTypes Array of logical types for output fields
* @param fieldNames Array of field names for output schema
* @param selectedFields Array of selected field names (null for all)
* @param batchSize Batch size for vectorized reading
* @param rowDataWrapper Function to transform raw RowData
* @param splitStart Start offset within the split
* @param splitLength Length of data to read from split
* @param fileLength Total length of the file
* @param footer Parquet file metadata
* @param blocks Array of column chunks to read
*/
public ParquetColumnarRowSplitReader(
boolean utcTimestamp,
boolean caseSensitive,
Configuration conf,
RowType rowType,
String[] fieldNames,
String[] selectedFields,
int batchSize,
Function<RowData, RowData> rowDataWrapper,
long splitStart,
long splitLength,
long fileLength,
ParquetMetadata footer,
ColumnChunk[] blocks
);
/**
* Reads next batch of records
* @return RecordIterator for the batch, null if no more data
* @throws IOException if reading fails
*/
public RecordIterator<RowData> readBatch() throws IOException;
/**
* Closes the reader and releases resources
* @throws IOException if close fails
*/
public void close() throws IOException;
}Utility class providing helper methods for vectorized Parquet reading operations.
/**
* Utilities for vectorized Parquet file reading
*/
public class ParquetSplitReaderUtil {
/**
* Creates column readers for the specified schema and configuration
* @param utcTimestamp Whether to use UTC timezone
* @param caseSensitive Whether names are case sensitive
* @param conf Hadoop configuration
* @param fieldTypes Array of field logical types
* @param fieldNames Array of field names
* @param footer Parquet metadata
* @param blocks Column chunks to read
* @param batchSize Batch size for reading
* @return Array of ColumnReader instances
*/
public static ColumnReader[] createColumnReaders(
boolean utcTimestamp,
boolean caseSensitive,
Configuration conf,
LogicalType[] fieldTypes,
String[] fieldNames,
ParquetMetadata footer,
ColumnChunk[] blocks,
int batchSize
);
/**
* Additional utility methods for split reading operations
*/
// ... other static utility methods
}Specialized column vectors for different data types in vectorized processing.
/**
* Specialized decimal vector for high-precision numeric data
*/
public class ParquetDecimalVector extends ColumnVector {
/**
* Creates a new ParquetDecimalVector
* @param capacity Maximum number of values to store
*/
public ParquetDecimalVector(int capacity);
/**
* Sets decimal value at specified position
* @param index Position to set
* @param value Decimal value to set
*/
public void setDecimal(int index, DecimalData value);
/**
* Gets decimal value at specified position
* @param index Position to get
* @return DecimalData value at position
*/
public DecimalData getDecimal(int index);
}
/**
* Dictionary support for vectorized reading
*/
public class ParquetDictionary {
/**
* Creates dictionary from Parquet dictionary page
* @param dictionaryPage Dictionary page from Parquet file
* @param descriptor Column descriptor for type information
* @return ParquetDictionary for decoding values
*/
public static ParquetDictionary create(
DictionaryPage dictionaryPage,
ColumnDescriptor descriptor
);
/**
* Decodes dictionary ID to actual value
* @param id Dictionary ID to decode
* @return Decoded value
*/
public Object decode(int id);
}/**
* Base interface for vectorized column readers
*/
public interface ColumnReader {
/**
* Reads a batch of values into the provided column vector
* @param num Number of values to read
* @param vector Column vector to populate
*/
void readBatch(int num, WritableColumnVector vector);
/**
* Returns the current repetition level
* @return Repetition level for nested data
*/
int getCurrentRepetitionLevel();
/**
* Returns the current definition level
* @return Definition level for null handling
*/
int getCurrentDefinitionLevel();
}/**
* Column reader implementations for different primitive types
*/
// Boolean column reader
public class BooleanColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
// Integer type readers
public class ByteColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
public class ShortColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
public class IntColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
public class LongColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
// Floating point readers
public class FloatColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
public class DoubleColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
// String and binary readers
public class BytesColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
public class FixedLenBytesColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
// Temporal type readers
public class TimestampColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
// Nested type readers
public class NestedColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}
public class NestedPrimitiveColumnReader extends AbstractColumnReader {
public void readBatch(int num, WritableColumnVector vector);
}import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.connector.file.src.FileSource;
// Create vectorized input format with optimal batch size
ParquetColumnarRowInputFormat<FileSourceSplit> vectorizedFormat =
new ParquetColumnarRowInputFormat<>(
new Configuration(),
rowType,
TypeInformation.of(RowData.class),
null, // Read all fields
null, // No field ID mapping
4096, // Large batch size for performance
true, // UTC timestamps
true // Case sensitive
);
// Use with FileSource for high-throughput reading
FileSource<RowData> source = FileSource
.forBulkFormat(vectorizedFormat, new Path("/large-dataset"))
.build();
DataStream<RowData> highThroughputStream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"vectorized-parquet-source"
);import org.apache.flink.formats.parquet.vector.ColumnBatchFactory;
// Custom factory that adds partition information
ColumnBatchFactory<FileSourceSplit> customFactory = (split, vectors) -> {
// Extract partition information from split
String[] partitionValues = extractPartitionValues(split);
// Create batch with partition fields
VectorizedColumnBatch batch = new VectorizedColumnBatch(vectors.length + partitionValues.length);
// Add data columns
for (int i = 0; i < vectors.length; i++) {
batch.cols[i] = vectors[i];
}
// Add partition columns
for (int i = 0; i < partitionValues.length; i++) {
batch.cols[vectors.length + i] = createPartitionVector(partitionValues[i], batch.size);
}
return batch;
};// Optimize batch size based on available memory and data characteristics
int optimalBatchSize = calculateBatchSize(
Runtime.getRuntime().maxMemory(), // Available memory
numberOfColumns, // Schema width
averageRowSize, // Data density
parquetBlockSize // File block size
);
// Configure for high-throughput scenarios
Configuration perfConfig = new Configuration();
perfConfig.setInt("parquet.read.batch.size", optimalBatchSize);
perfConfig.setBoolean("parquet.read.vectorized.enable", true);
perfConfig.setInt("parquet.read.allocation.size", 8 * 1024 * 1024); // 8MB
ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =
new ParquetColumnarRowInputFormat<>(
perfConfig,
rowType,
typeInfo,
selectedFields,
null,
optimalBatchSize,
utcTimestamp,
caseSensitive
);// Schema with nested structures
RowType nestedSchema = RowType.of(
new LogicalType[] {
DataTypes.BIGINT().getLogicalType(), // id
RowType.of( // address (nested)
new LogicalType[] {
DataTypes.STRING().getLogicalType(), // street
DataTypes.STRING().getLogicalType(), // city
DataTypes.STRING().getLogicalType() // country
},
new String[] {"street", "city", "country"}
),
ArrayType.newBuilder() // phone_numbers (array)
.elementType(DataTypes.STRING().getLogicalType())
.build()
},
new String[] {"id", "address", "phone_numbers"}
);
// Vectorized reading handles nested structures efficiently
ParquetColumnarRowInputFormat<FileSourceSplit> nestedFormat =
new ParquetColumnarRowInputFormat<>(
conf, nestedSchema, typeInfo, null, null,
2048, // Smaller batches for complex data
utcTimestamp, caseSensitive
);// Configure for memory-constrained environments
Configuration memoryConfig = new Configuration();
memoryConfig.setInt("parquet.read.batch.size", 1024); // Smaller batches
memoryConfig.setLong("parquet.memory.pool.ratio", 0.7); // Conservative memory usage
memoryConfig.setBoolean("parquet.strings.signed-min-max", false); // Reduce string overhead
// Use column projection to reduce memory footprint
List<String> essentialFields = Arrays.asList("id", "timestamp", "value");
ParquetColumnarRowInputFormat<FileSourceSplit> memoryEfficientFormat =
new ParquetColumnarRowInputFormat<>(
memoryConfig,
projectedRowType, // Only essential fields
typeInfo,
essentialFields, // Column projection
null,
1024, // Conservative batch size
utcTimestamp,
caseSensitive
);// Configure for parallel reading across multiple splits
FileSource<RowData> parallelSource = FileSource
.forBulkFormat(vectorizedFormat, inputPath)
.monitorContinuously(Duration.ofMinutes(1)) // Monitor for new files
.setSplitEnumerator( // Custom split strategy
ContinuousFileSplitEnumerator.builder()
.setSplitSize(64 * 1024 * 1024) // 64MB splits
.build()
)
.build();
// Process with appropriate parallelism
DataStream<RowData> parallelStream = env
.fromSource(parallelSource, WatermarkStrategy.noWatermarks(), "parallel-source")
.setParallelism(numberOfCores * 2); // CPU-bound processingUtility class providing helper methods for creating column readers and vectors in vectorized Parquet reading.
/**
* Utility methods for Parquet vectorized reading components
*/
public class ParquetSplitReaderUtil {
/**
* Builds a list of ParquetField representations from RowType fields
* @param fields List of RowType fields to convert
* @param fieldNames List of field names for mapping
* @param columnIO MessageColumnIO for schema information
* @return List of ParquetField objects for vectorized reading
*/
public static List<ParquetField> buildFieldsList(
List<RowType.RowField> fields,
List<String> fieldNames,
MessageColumnIO columnIO
);
/**
* Creates a column reader for the specified field and configuration
* @param utcTimestamp Whether to use UTC timezone for timestamps
* @param logicalType Flink logical type for the column
* @param physicalType Parquet physical type representation
* @param columnDescriptors List of column descriptors for the field
* @param pageReadStore Page read store for accessing data
* @param field ParquetField definition
* @param depth Nesting depth of the field
* @return ColumnReader instance for reading the field data
*/
public static ColumnReader createColumnReader(
boolean utcTimestamp,
LogicalType logicalType,
Type physicalType,
List<ColumnDescriptor> columnDescriptors,
PageReadStore pageReadStore,
ParquetField field,
int depth
);
/**
* Creates a writable column vector for the specified type and configuration
* @param batchSize Batch size for the vector
* @param logicalType Flink logical type
* @param physicalType Parquet physical type
* @param columnDescriptors Column descriptors for metadata
* @param depth Nesting depth
* @return WritableColumnVector for storing read data
*/
public static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType logicalType,
Type physicalType,
List<ColumnDescriptor> columnDescriptors,
int depth
);
/**
* Creates a constant-value column vector
* @param type Logical type of the constant
* @param value Constant value to fill the vector
* @param batchSize Size of the vector
* @return ColumnVector filled with the constant value
*/
public static ColumnVector createVectorFromConstant(
LogicalType type,
Object value,
int batchSize
);
}The vectorized reading infrastructure provides significant performance improvements over row-based processing, especially for analytical workloads with wide schemas and large datasets.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parquet