CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-parquet-2-12

Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.

Pending
Overview
Eval results
Files

vectorized-input.mddocs/

Vectorized Input

High-performance columnar input formats optimized for analytical workloads with vectorized processing, partition support, and efficient memory usage.

Capabilities

ParquetColumnarRowInputFormat

Columnar input format that provides RowData iterators using vectorized column batches for maximum performance in analytical queries.

/**
 * Parquet input format providing RowData iterator using columnar row data
 * Extends ParquetVectorizedInputFormat with RowData-specific functionality
 */
public class ParquetColumnarRowInputFormat<SplitT extends FileSourceSplit> 
    extends ParquetVectorizedInputFormat<RowData, SplitT> {
    
    /**
     * Creates a basic columnar row input format without extra fields
     * @param hadoopConfig Hadoop configuration for Parquet reading
     * @param projectedType Row type defining the projected schema
     * @param batchSize Number of rows per vectorized batch
     * @param isUtcTimestamp Whether to use UTC timezone for timestamps
     * @param isCaseSensitive Whether column name matching is case sensitive
     */
    public ParquetColumnarRowInputFormat(
        Configuration hadoopConfig,
        RowType projectedType,
        int batchSize,
        boolean isUtcTimestamp,
        boolean isCaseSensitive
    );
    
    /**
     * Creates a columnar row input format with extra fields support
     * @param hadoopConfig Hadoop configuration for Parquet reading
     * @param projectedType Projected row type (excludes extra fields)
     * @param producedType Produced row type (includes extra fields)
     * @param batchFactory Factory for creating column batches with extra fields
     * @param batchSize Number of rows per vectorized batch
     * @param isUtcTimestamp Whether to use UTC timezone for timestamps
     * @param isCaseSensitive Whether column name matching is case sensitive
     */
    public ParquetColumnarRowInputFormat(
        Configuration hadoopConfig,
        RowType projectedType,
        RowType producedType,
        ColumnBatchFactory<SplitT> batchFactory,
        int batchSize,
        boolean isUtcTimestamp,
        boolean isCaseSensitive
    );
    
    /**
     * Creates a partitioned columnar row input format
     * Automatically handles partition columns generated from file paths
     * @param hadoopConfig Hadoop configuration for Parquet reading
     * @param producedRowType Complete row type including partition columns
     * @param partitionKeys List of partition column names
     * @param extractor Extractor for deriving partition values from splits
     * @param batchSize Number of rows per vectorized batch
     * @param isUtcTimestamp Whether to use UTC timezone for timestamps
     * @param isCaseSensitive Whether column name matching is case sensitive
     * @return Configured ParquetColumnarRowInputFormat for partitioned data
     */
    public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
        Configuration hadoopConfig,
        RowType producedRowType,
        List<String> partitionKeys,
        PartitionFieldExtractor<SplitT> extractor,
        int batchSize,
        boolean isUtcTimestamp,
        boolean isCaseSensitive
    );
    
    /**
     * Returns the type information for the produced RowData
     * @return TypeInformation for RowData output
     */
    public TypeInformation<RowData> getProducedType();
}

ParquetVectorizedInputFormat

Abstract base class for vectorized Parquet input formats providing the foundation for high-performance columnar processing.

/**
 * Abstract base class for vectorized Parquet input formats
 * Provides vectorized column batch processing with configurable batch sizes
 */
public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit> 
    implements BulkFormat<T, SplitT> {
    
    /**
     * Creates a reader for the given file split
     * @param config Flink configuration
     * @param split File split to read from
     * @return BulkFormat.Reader for processing the split
     * @throws IOException if reader creation fails
     */
    public Reader<T> createReader(Configuration config, SplitT split) throws IOException;
    
    /**
     * Restores a reader from a checkpointed position
     * @param config Flink configuration
     * @param split File split to read from
     * @param restoredOffset Checkpointed position to restore from
     * @return BulkFormat.Reader restored at the specified position
     * @throws IOException if reader restoration fails
     */
    public Reader<T> restoreReader(Configuration config, SplitT split, CheckpointedPosition restoredOffset) 
        throws IOException;
    
    /**
     * Checks if the format supports splitting
     * @return true if the format can be split across multiple readers
     */
    public boolean isSplittable();
}

ColumnBatchFactory

Factory interface for creating vectorized column batches with support for extra fields like partition columns.

/**
 * Factory for creating vectorized column batches
 * Supports adding extra fields beyond those present in Parquet files
 */
@FunctionalInterface
public interface ColumnBatchFactory<SplitT extends FileSourceSplit> {
    
    /**
     * Creates a vectorized column batch from Parquet column vectors
     * @param split File split being processed (for extracting partition info)
     * @param parquetVectors Column vectors read from Parquet file
     * @return VectorizedColumnBatch with original and extra columns
     * @throws IOException if batch creation fails
     */
    VectorizedColumnBatch create(SplitT split, ColumnVector[] parquetVectors) throws IOException;
    
    /**
     * Creates a factory that doesn't add extra fields
     * @return ColumnBatchFactory that passes through Parquet columns unchanged
     */
    static <SplitT extends FileSourceSplit> ColumnBatchFactory<SplitT> withoutExtraFields() {
        return (split, vectors) -> new VectorizedColumnBatch(vectors);
    }
}

Usage Examples

Basic Vectorized Reading

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.conf.Configuration;

// Define schema for reading
RowType rowType = RowType.of(
    new LogicalType[]{
        DataTypes.BIGINT().getLogicalType(),
        DataTypes.STRING().getLogicalType(),
        DataTypes.TIMESTAMP(3).getLogicalType()
    },
    new String[]{"id", "name", "created_at"}
);

// Create input format
Configuration hadoopConfig = new Configuration();
int batchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 2048
boolean utcTimezone = true;
boolean caseSensitive = false;

ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat = 
    new ParquetColumnarRowInputFormat<>(
        hadoopConfig, rowType, batchSize, utcTimezone, caseSensitive
    );

// Use with FileSource
FileSource<RowData> parquetSource = FileSource
    .forBulkFileFormat(inputFormat, new Path("/input/parquet/files"))
    .build();

DataStream<RowData> dataStream = env.fromSource(parquetSource, 
    WatermarkStrategy.noWatermarks(), "parquet-source");

Partitioned Data Reading

import org.apache.flink.table.filesystem.PartitionFieldExtractor;

// Schema including partition columns
RowType producedRowType = RowType.of(
    new LogicalType[]{
        DataTypes.BIGINT().getLogicalType(),     // id
        DataTypes.STRING().getLogicalType(),     // name  
        DataTypes.DATE().getLogicalType(),       // partition: date
        DataTypes.STRING().getLogicalType()      // partition: region
    },
    new String[]{"id", "name", "date", "region"}
);

// Define partition columns
List<String> partitionKeys = Arrays.asList("date", "region");

// Create partition field extractor
PartitionFieldExtractor<FileSourceSplit> extractor = 
    PartitionFieldExtractor.forFileSystem("__HIVE_DEFAULT_PARTITION__");

// Create partitioned input format
ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat = 
    ParquetColumnarRowInputFormat.createPartitionedFormat(
        hadoopConfig,
        producedRowType,
        partitionKeys,
        extractor,
        4096,  // Larger batch size for partitioned data
        true,  // UTC timezone
        false  // Case insensitive
    );

// File structure: /data/date=2023-01-01/region=us-west/part-0000.parquet
FileSource<RowData> partitionedSource = FileSource
    .forBulkFileFormat(partitionedFormat, new Path("/data"))
    .setFileEnumerator(FileEnumerator.create())
    .build();

Performance Tuning

// Custom configuration for high-throughput reading
Configuration optimizedConfig = new Configuration();

// Parquet-specific optimizations
optimizedConfig.setBoolean("parquet.enable.dictionary", true);
optimizedConfig.setInt("parquet.page.size", 1048576);      // 1MB page size
optimizedConfig.setInt("parquet.block.size", 134217728);   // 128MB block size

// Larger batch sizes for analytical workloads
int largeBatchSize = 8192;  // 4x default size

ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat = 
    new ParquetColumnarRowInputFormat<>(
        optimizedConfig, 
        rowType, 
        largeBatchSize, 
        true,   // UTC timestamps
        false   // Case insensitive
    );

// Configure FileSource for optimal throughput
FileSource<RowData> optimizedSource = FileSource
    .forBulkFileFormat(optimizedFormat, inputPath)
    .monitorContinuously(Duration.ofMinutes(1))
    .build();

DataStream<RowData> stream = env
    .fromSource(optimizedSource, WatermarkStrategy.noWatermarks(), "optimized-parquet")
    .setParallelism(Runtime.getRuntime().availableProcessors()); // Scale with CPU cores

Column Projection

// Original file schema: id, name, email, age, created_at, updated_at
// Project only needed columns for better performance
RowType projectedSchema = RowType.of(
    new LogicalType[]{
        DataTypes.BIGINT().getLogicalType(),     // id
        DataTypes.STRING().getLogicalType(),     // name
        DataTypes.INT().getLogicalType()         // age
    },
    new String[]{"id", "name", "age"}
);

// Only projected columns are read from Parquet files
ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat = 
    new ParquetColumnarRowInputFormat<>(
        hadoopConfig, projectedSchema, batchSize, true, false
    );

// This will only read 3 columns instead of 6, significantly improving I/O performance

Custom Column Batch Factory

// Custom factory that adds computed columns
ColumnBatchFactory<FileSourceSplit> customFactory = (split, parquetVectors) -> {
    // Original Parquet columns: id, amount
    // Add computed column: tax (amount * 0.1)
    
    ColumnVector[] allVectors = new ColumnVector[parquetVectors.length + 1];
    System.arraycopy(parquetVectors, 0, allVectors, 0, parquetVectors.length);
    
    // Create computed tax column
    WritableColumnVector taxVector = new WritableDoubleVector(parquetVectors[0].getSize());
    ColumnVector amountVector = parquetVectors[1]; // Assuming amount is second column
    
    for (int i = 0; i < amountVector.getSize(); i++) {
        if (!amountVector.isNullAt(i)) {
            double amount = amountVector.getDouble(i);
            taxVector.putDouble(i, amount * 0.1);
        } else {
            taxVector.putNull(i);
        }
    }
    
    allVectors[parquetVectors.length] = taxVector;
    return new VectorizedColumnBatch(allVectors);
};

// Schema including computed column
RowType schemaWithTax = RowType.of(
    new LogicalType[]{
        DataTypes.BIGINT().getLogicalType(),    // id
        DataTypes.DOUBLE().getLogicalType(),    // amount
        DataTypes.DOUBLE().getLogicalType()     // tax (computed)
    },
    new String[]{"id", "amount", "tax"}
);

RowType parquetSchema = RowType.of(
    new LogicalType[]{
        DataTypes.BIGINT().getLogicalType(),    // id
        DataTypes.DOUBLE().getLogicalType()     // amount
    },
    new String[]{"id", "amount"}
);

ParquetColumnarRowInputFormat<FileSourceSplit> customFormat = 
    new ParquetColumnarRowInputFormat<>(
        hadoopConfig,
        parquetSchema,      // Schema in Parquet file
        schemaWithTax,      // Final produced schema
        customFactory,      // Custom batch factory
        batchSize,
        true,
        false
    );

Performance Characteristics

Memory Usage

  • Batch Size: Larger batches (4096-8192) improve throughput but use more memory
  • Column Vectors: Memory usage scales with batch size × number of columns × data type size
  • Dictionary Compression: Reduces memory usage for repeated values

I/O Optimization

  • Column Projection: Only read required columns to minimize I/O
  • Predicate Pushdown: Filter row groups at the Parquet level
  • Vectorized Processing: Process multiple rows per operation

Parallelization

  • Split-level Parallelism: Each Parquet file can be processed by separate parallel instances
  • Row Group Parallelism: Large files can be split at row group boundaries
  • CPU Vectorization: Modern CPUs can process vectorized operations efficiently

Error Handling

Common error scenarios and solutions:

try {
    ParquetColumnarRowInputFormat<FileSourceSplit> format = 
        new ParquetColumnarRowInputFormat<>(hadoopConfig, rowType, batchSize, true, false);
} catch (IllegalArgumentException e) {
    // Invalid row type, negative batch size, etc.
    logger.error("Invalid configuration for Parquet input format", e);
} catch (Exception e) {
    // Other initialization errors
    logger.error("Failed to create Parquet input format", e);
}

// Runtime reading errors
try {
    BulkFormat.Reader<RowData> reader = format.createReader(config, split);
    RecordIterator<RowData> iterator = reader.readBatch();
} catch (IOException e) {
    // File not found, permission errors, corrupted files
    logger.error("Failed to read Parquet file", e);
} catch (RuntimeException e) {
    // Schema mismatches, unsupported data types
    logger.error("Runtime error during Parquet reading", e);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12

docs

avro-integration.md

format-factory.md

index.md

protobuf-integration.md

rowdata-writers.md

schema-utilities.md

vectorized-input.md

tile.json