CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Pending
Overview
Eval results
Files

vectorized-reading.mddocs/

Vectorized Reading

High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.

Capabilities

ParquetVectorizedInputFormat

Abstract base class for vectorized Parquet file reading with pluggable batch creation strategies.

/**
 * Base class for vectorized Parquet file reading
 * @param <T> Type of records produced
 * @param <SplitT> Type of file split
 */
public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {
    
    /**
     * Creates a reader for the given configuration and split
     * @param config Hadoop configuration for Parquet settings
     * @param split File split to read
     * @return RecordReaderIterator for reading records
     * @throws IOException if reader creation fails
     */
    public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;
    
    /**
     * Restores a reader from checkpoint state
     * @param config Hadoop configuration
     * @param split File split to read  
     * @return RecordReaderIterator for reading records
     * @throws IOException if reader restoration fails
     */
    public RecordReaderIterator<T> restoreReader(Configuration config, SplitT split) throws IOException;
    
    /**
     * Indicates whether this format supports file splitting
     * @return true - vectorized reading supports splitting
     */
    public boolean isSplittable();
    
    /**
     * Creates reader batch implementation for vectorized processing (abstract method)
     * @param writableVectors Array of writable column vectors
     * @param columnarBatch Vectorized column batch for processing
     * @param recycler Pool recycler for batch reuse
     * @return ParquetReaderBatch implementation for the specific type
     */
    protected abstract ParquetReaderBatch<T> createReaderBatch(
        WritableColumnVector[] writableVectors,
        VectorizedColumnBatch columnarBatch,
        Pool.Recycler<ParquetReaderBatch<T>> recycler
    );
}

ColumnBatchFactory

Functional interface for creating vectorized column batches from file splits and column vectors.

/**
 * Factory for creating vectorized column batches
 * @param <SplitT> Type of file split
 */
@FunctionalInterface
public interface ColumnBatchFactory<SplitT> {
    
    /**
     * Creates a VectorizedColumnBatch from split and column vectors
     * @param split File split containing metadata
     * @param vectors Array of column vectors containing data
     * @return VectorizedColumnBatch for processing
     */
    VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);
    
    /**
     * Creates a default factory that doesn't add extra fields
     * @return ColumnBatchFactory without partition field injection
     */
    static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();
}

ParquetColumnarRowSplitReader

Specialized split reader for columnar RowData reading with vectorized processing and partition support.

/**
 * Split reader for columnar RowData reading with vectorization
 */
public class ParquetColumnarRowSplitReader implements RecordReader<RowData> {
    
    /**
     * Creates a new ParquetColumnarRowSplitReader
     * @param utcTimestamp Whether to use UTC timezone for timestamps
     * @param caseSensitive Whether field names are case sensitive
     * @param conf Hadoop configuration
     * @param fieldTypes Array of logical types for output fields
     * @param fieldNames Array of field names for output schema
     * @param selectedFields Array of selected field names (null for all)
     * @param batchSize Batch size for vectorized reading
     * @param rowDataWrapper Function to transform raw RowData
     * @param splitStart Start offset within the split
     * @param splitLength Length of data to read from split
     * @param fileLength Total length of the file
     * @param footer Parquet file metadata
     * @param blocks Array of column chunks to read
     */
    public ParquetColumnarRowSplitReader(
        boolean utcTimestamp,
        boolean caseSensitive,
        Configuration conf,
        RowType rowType,
        String[] fieldNames,
        String[] selectedFields,
        int batchSize,
        Function<RowData, RowData> rowDataWrapper,
        long splitStart,
        long splitLength,
        long fileLength,
        ParquetMetadata footer,
        ColumnChunk[] blocks
    );
    
    /**
     * Reads next batch of records
     * @return RecordIterator for the batch, null if no more data
     * @throws IOException if reading fails
     */
    public RecordIterator<RowData> readBatch() throws IOException;
    
    /**
     * Closes the reader and releases resources
     * @throws IOException if close fails
     */
    public void close() throws IOException;
}

ParquetSplitReaderUtil

Utility class providing helper methods for vectorized Parquet reading operations.

/**
 * Utilities for vectorized Parquet file reading
 */
public class ParquetSplitReaderUtil {
    
    /**
     * Creates column readers for the specified schema and configuration
     * @param utcTimestamp Whether to use UTC timezone
     * @param caseSensitive Whether names are case sensitive
     * @param conf Hadoop configuration
     * @param fieldTypes Array of field logical types
     * @param fieldNames Array of field names
     * @param footer Parquet metadata
     * @param blocks Column chunks to read
     * @param batchSize Batch size for reading
     * @return Array of ColumnReader instances
     */
    public static ColumnReader[] createColumnReaders(
        boolean utcTimestamp,
        boolean caseSensitive,
        Configuration conf,
        LogicalType[] fieldTypes,
        String[] fieldNames,
        ParquetMetadata footer,
        ColumnChunk[] blocks,
        int batchSize
    );
    
    /**
     * Additional utility methods for split reading operations
     */
    // ... other static utility methods
}

Column Vector Types

Specialized column vectors for different data types in vectorized processing.

/**
 * Specialized decimal vector for high-precision numeric data
 */
public class ParquetDecimalVector extends ColumnVector {
    
    /**
     * Creates a new ParquetDecimalVector
     * @param capacity Maximum number of values to store
     */
    public ParquetDecimalVector(int capacity);
    
    /**
     * Sets decimal value at specified position
     * @param index Position to set
     * @param value Decimal value to set
     */
    public void setDecimal(int index, DecimalData value);
    
    /**
     * Gets decimal value at specified position
     * @param index Position to get
     * @return DecimalData value at position
     */
    public DecimalData getDecimal(int index);
}

/**
 * Dictionary support for vectorized reading
 */
public class ParquetDictionary {
    
    /**
     * Creates dictionary from Parquet dictionary page
     * @param dictionaryPage Dictionary page from Parquet file
     * @param descriptor Column descriptor for type information
     * @return ParquetDictionary for decoding values
     */
    public static ParquetDictionary create(
        DictionaryPage dictionaryPage,
        ColumnDescriptor descriptor
    );
    
    /**
     * Decodes dictionary ID to actual value
     * @param id Dictionary ID to decode
     * @return Decoded value
     */
    public Object decode(int id);
}

Column Reader Architecture

Base Column Reader Interface

/**
 * Base interface for vectorized column readers
 */
public interface ColumnReader {
    
    /**
     * Reads a batch of values into the provided column vector
     * @param num Number of values to read
     * @param vector Column vector to populate
     */
    void readBatch(int num, WritableColumnVector vector);
    
    /**
     * Returns the current repetition level
     * @return Repetition level for nested data
     */
    int getCurrentRepetitionLevel();
    
    /**
     * Returns the current definition level  
     * @return Definition level for null handling
     */
    int getCurrentDefinitionLevel();
}

Specialized Column Readers

/**
 * Column reader implementations for different primitive types
 */

// Boolean column reader
public class BooleanColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

// Integer type readers  
public class ByteColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

public class ShortColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

public class IntColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

public class LongColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

// Floating point readers
public class FloatColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

public class DoubleColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

// String and binary readers
public class BytesColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

public class FixedLenBytesColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

// Temporal type readers
public class TimestampColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

// Nested type readers
public class NestedColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

public class NestedPrimitiveColumnReader extends AbstractColumnReader {
    public void readBatch(int num, WritableColumnVector vector);
}

Usage Examples

Basic Vectorized Reading

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.connector.file.src.FileSource;

// Create vectorized input format with optimal batch size
ParquetColumnarRowInputFormat<FileSourceSplit> vectorizedFormat = 
    new ParquetColumnarRowInputFormat<>(
        new Configuration(),
        rowType,
        TypeInformation.of(RowData.class),
        null,                      // Read all fields
        null,                      // No field ID mapping
        4096,                      // Large batch size for performance
        true,                      // UTC timestamps
        true                       // Case sensitive
    );

// Use with FileSource for high-throughput reading
FileSource<RowData> source = FileSource
    .forBulkFormat(vectorizedFormat, new Path("/large-dataset"))
    .build();

DataStream<RowData> highThroughputStream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "vectorized-parquet-source"
);

Custom Column Batch Factory

import org.apache.flink.formats.parquet.vector.ColumnBatchFactory;

// Custom factory that adds partition information
ColumnBatchFactory<FileSourceSplit> customFactory = (split, vectors) -> {
    // Extract partition information from split
    String[] partitionValues = extractPartitionValues(split);
    
    // Create batch with partition fields
    VectorizedColumnBatch batch = new VectorizedColumnBatch(vectors.length + partitionValues.length);
    
    // Add data columns
    for (int i = 0; i < vectors.length; i++) {
        batch.cols[i] = vectors[i];
    }
    
    // Add partition columns
    for (int i = 0; i < partitionValues.length; i++) {
        batch.cols[vectors.length + i] = createPartitionVector(partitionValues[i], batch.size);
    }
    
    return batch;
};

Performance Tuning

// Optimize batch size based on available memory and data characteristics
int optimalBatchSize = calculateBatchSize(
    Runtime.getRuntime().maxMemory(),  // Available memory
    numberOfColumns,                    // Schema width
    averageRowSize,                    // Data density
    parquetBlockSize                   // File block size
);

// Configure for high-throughput scenarios
Configuration perfConfig = new Configuration();
perfConfig.setInt("parquet.read.batch.size", optimalBatchSize);
perfConfig.setBoolean("parquet.read.vectorized.enable", true);
perfConfig.setInt("parquet.read.allocation.size", 8 * 1024 * 1024); // 8MB

ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat = 
    new ParquetColumnarRowInputFormat<>(
        perfConfig,
        rowType,
        typeInfo,
        selectedFields,
        null,
        optimalBatchSize,
        utcTimestamp,
        caseSensitive
    );

Nested Data Reading

// Schema with nested structures
RowType nestedSchema = RowType.of(
    new LogicalType[] {
        DataTypes.BIGINT().getLogicalType(),           // id
        RowType.of(                                    // address (nested)
            new LogicalType[] {
                DataTypes.STRING().getLogicalType(),   // street
                DataTypes.STRING().getLogicalType(),   // city
                DataTypes.STRING().getLogicalType()    // country
            },
            new String[] {"street", "city", "country"}
        ),
        ArrayType.newBuilder()                         // phone_numbers (array)
            .elementType(DataTypes.STRING().getLogicalType())
            .build()
    },
    new String[] {"id", "address", "phone_numbers"}
);

// Vectorized reading handles nested structures efficiently
ParquetColumnarRowInputFormat<FileSourceSplit> nestedFormat = 
    new ParquetColumnarRowInputFormat<>(
        conf, nestedSchema, typeInfo, null, null,
        2048,  // Smaller batches for complex data
        utcTimestamp, caseSensitive
    );

Memory-Efficient Reading

// Configure for memory-constrained environments
Configuration memoryConfig = new Configuration();
memoryConfig.setInt("parquet.read.batch.size", 1024);           // Smaller batches
memoryConfig.setLong("parquet.memory.pool.ratio", 0.7);         // Conservative memory usage
memoryConfig.setBoolean("parquet.strings.signed-min-max", false); // Reduce string overhead

// Use column projection to reduce memory footprint
List<String> essentialFields = Arrays.asList("id", "timestamp", "value");

ParquetColumnarRowInputFormat<FileSourceSplit> memoryEfficientFormat = 
    new ParquetColumnarRowInputFormat<>(
        memoryConfig,
        projectedRowType,                // Only essential fields
        typeInfo,
        essentialFields,                 // Column projection
        null,
        1024,                           // Conservative batch size
        utcTimestamp,
        caseSensitive
    );

Parallel Reading with Multiple Splits

// Configure for parallel reading across multiple splits
FileSource<RowData> parallelSource = FileSource
    .forBulkFormat(vectorizedFormat, inputPath)
    .monitorContinuously(Duration.ofMinutes(1))  // Monitor for new files
    .setSplitEnumerator(                         // Custom split strategy
        ContinuousFileSplitEnumerator.builder()
            .setSplitSize(64 * 1024 * 1024)     // 64MB splits
            .build()
    )
    .build();

// Process with appropriate parallelism
DataStream<RowData> parallelStream = env
    .fromSource(parallelSource, WatermarkStrategy.noWatermarks(), "parallel-source")
    .setParallelism(numberOfCores * 2);  // CPU-bound processing

ParquetSplitReaderUtil

Utility class providing helper methods for creating column readers and vectors in vectorized Parquet reading.

/**
 * Utility methods for Parquet vectorized reading components
 */
public class ParquetSplitReaderUtil {
    
    /**
     * Builds a list of ParquetField representations from RowType fields
     * @param fields List of RowType fields to convert
     * @param fieldNames List of field names for mapping
     * @param columnIO MessageColumnIO for schema information
     * @return List of ParquetField objects for vectorized reading
     */
    public static List<ParquetField> buildFieldsList(
        List<RowType.RowField> fields, 
        List<String> fieldNames, 
        MessageColumnIO columnIO
    );
    
    /**
     * Creates a column reader for the specified field and configuration
     * @param utcTimestamp Whether to use UTC timezone for timestamps
     * @param logicalType Flink logical type for the column
     * @param physicalType Parquet physical type representation
     * @param columnDescriptors List of column descriptors for the field
     * @param pageReadStore Page read store for accessing data
     * @param field ParquetField definition
     * @param depth Nesting depth of the field
     * @return ColumnReader instance for reading the field data
     */
    public static ColumnReader createColumnReader(
        boolean utcTimestamp, 
        LogicalType logicalType, 
        Type physicalType, 
        List<ColumnDescriptor> columnDescriptors, 
        PageReadStore pageReadStore, 
        ParquetField field, 
        int depth
    );
    
    /**
     * Creates a writable column vector for the specified type and configuration
     * @param batchSize Batch size for the vector
     * @param logicalType Flink logical type
     * @param physicalType Parquet physical type
     * @param columnDescriptors Column descriptors for metadata
     * @param depth Nesting depth
     * @return WritableColumnVector for storing read data
     */
    public static WritableColumnVector createWritableColumnVector(
        int batchSize, 
        LogicalType logicalType, 
        Type physicalType, 
        List<ColumnDescriptor> columnDescriptors, 
        int depth
    );
    
    /**
     * Creates a constant-value column vector
     * @param type Logical type of the constant
     * @param value Constant value to fill the vector
     * @param batchSize Size of the vector
     * @return ColumnVector filled with the constant value
     */
    public static ColumnVector createVectorFromConstant(
        LogicalType type, 
        Object value, 
        int batchSize
    );
}

Performance Characteristics

Throughput Optimization

  • Batch Processing: Processes multiple rows simultaneously using SIMD operations where possible
  • Column Pruning: Only reads required columns from storage, reducing I/O
  • Dictionary Compression: Efficient handling of dictionary-encoded columns
  • Lazy Evaluation: Defers expensive operations until data is actually needed

Memory Management

  • Vectorized Memory Layout: Contiguous memory access patterns for better CPU cache utilization
  • Controlled Memory Usage: Configurable batch sizes prevent memory overflow
  • Off-heap Storage: Column vectors can use off-heap memory to reduce GC pressure

I/O Efficiency

  • Block-level Reading: Aligns with Parquet row group boundaries for optimal disk access
  • Parallel I/O: Multiple threads can read different column chunks simultaneously
  • Compression Handling: Native support for all Parquet compression codecs

The vectorized reading infrastructure provides significant performance improvements over row-based processing, especially for analytical workloads with wide schemas and large datasets.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parquet

docs

avro-integration.md

index.md

protobuf-integration.md

rowdata-integration.md

table-integration.md

utilities.md

vectorized-reading.md

writing-support.md

tile.json