Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.
—
High-performance columnar input formats optimized for analytical workloads with vectorized processing, partition support, and efficient memory usage.
Columnar input format that provides RowData iterators using vectorized column batches for maximum performance in analytical queries.
/**
* Parquet input format providing RowData iterator using columnar row data
* Extends ParquetVectorizedInputFormat with RowData-specific functionality
*/
public class ParquetColumnarRowInputFormat<SplitT extends FileSourceSplit>
extends ParquetVectorizedInputFormat<RowData, SplitT> {
/**
* Creates a basic columnar row input format without extra fields
* @param hadoopConfig Hadoop configuration for Parquet reading
* @param projectedType Row type defining the projected schema
* @param batchSize Number of rows per vectorized batch
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
* @param isCaseSensitive Whether column name matching is case sensitive
*/
public ParquetColumnarRowInputFormat(
Configuration hadoopConfig,
RowType projectedType,
int batchSize,
boolean isUtcTimestamp,
boolean isCaseSensitive
);
/**
* Creates a columnar row input format with extra fields support
* @param hadoopConfig Hadoop configuration for Parquet reading
* @param projectedType Projected row type (excludes extra fields)
* @param producedType Produced row type (includes extra fields)
* @param batchFactory Factory for creating column batches with extra fields
* @param batchSize Number of rows per vectorized batch
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
* @param isCaseSensitive Whether column name matching is case sensitive
*/
public ParquetColumnarRowInputFormat(
Configuration hadoopConfig,
RowType projectedType,
RowType producedType,
ColumnBatchFactory<SplitT> batchFactory,
int batchSize,
boolean isUtcTimestamp,
boolean isCaseSensitive
);
/**
* Creates a partitioned columnar row input format
* Automatically handles partition columns generated from file paths
* @param hadoopConfig Hadoop configuration for Parquet reading
* @param producedRowType Complete row type including partition columns
* @param partitionKeys List of partition column names
* @param extractor Extractor for deriving partition values from splits
* @param batchSize Number of rows per vectorized batch
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
* @param isCaseSensitive Whether column name matching is case sensitive
* @return Configured ParquetColumnarRowInputFormat for partitioned data
*/
public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
Configuration hadoopConfig,
RowType producedRowType,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
int batchSize,
boolean isUtcTimestamp,
boolean isCaseSensitive
);
/**
* Returns the type information for the produced RowData
* @return TypeInformation for RowData output
*/
public TypeInformation<RowData> getProducedType();
}Abstract base class for vectorized Parquet input formats providing the foundation for high-performance columnar processing.
/**
* Abstract base class for vectorized Parquet input formats
* Provides vectorized column batch processing with configurable batch sizes
*/
public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit>
implements BulkFormat<T, SplitT> {
/**
* Creates a reader for the given file split
* @param config Flink configuration
* @param split File split to read from
* @return BulkFormat.Reader for processing the split
* @throws IOException if reader creation fails
*/
public Reader<T> createReader(Configuration config, SplitT split) throws IOException;
/**
* Restores a reader from a checkpointed position
* @param config Flink configuration
* @param split File split to read from
* @param restoredOffset Checkpointed position to restore from
* @return BulkFormat.Reader restored at the specified position
* @throws IOException if reader restoration fails
*/
public Reader<T> restoreReader(Configuration config, SplitT split, CheckpointedPosition restoredOffset)
throws IOException;
/**
* Checks if the format supports splitting
* @return true if the format can be split across multiple readers
*/
public boolean isSplittable();
}Factory interface for creating vectorized column batches with support for extra fields like partition columns.
/**
* Factory for creating vectorized column batches
* Supports adding extra fields beyond those present in Parquet files
*/
@FunctionalInterface
public interface ColumnBatchFactory<SplitT extends FileSourceSplit> {
/**
* Creates a vectorized column batch from Parquet column vectors
* @param split File split being processed (for extracting partition info)
* @param parquetVectors Column vectors read from Parquet file
* @return VectorizedColumnBatch with original and extra columns
* @throws IOException if batch creation fails
*/
VectorizedColumnBatch create(SplitT split, ColumnVector[] parquetVectors) throws IOException;
/**
* Creates a factory that doesn't add extra fields
* @return ColumnBatchFactory that passes through Parquet columns unchanged
*/
static <SplitT extends FileSourceSplit> ColumnBatchFactory<SplitT> withoutExtraFields() {
return (split, vectors) -> new VectorizedColumnBatch(vectors);
}
}import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.conf.Configuration;
// Define schema for reading
RowType rowType = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.TIMESTAMP(3).getLogicalType()
},
new String[]{"id", "name", "created_at"}
);
// Create input format
Configuration hadoopConfig = new Configuration();
int batchSize = VectorizedColumnBatch.DEFAULT_SIZE; // 2048
boolean utcTimezone = true;
boolean caseSensitive = false;
ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat =
new ParquetColumnarRowInputFormat<>(
hadoopConfig, rowType, batchSize, utcTimezone, caseSensitive
);
// Use with FileSource
FileSource<RowData> parquetSource = FileSource
.forBulkFileFormat(inputFormat, new Path("/input/parquet/files"))
.build();
DataStream<RowData> dataStream = env.fromSource(parquetSource,
WatermarkStrategy.noWatermarks(), "parquet-source");import org.apache.flink.table.filesystem.PartitionFieldExtractor;
// Schema including partition columns
RowType producedRowType = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(), // id
DataTypes.STRING().getLogicalType(), // name
DataTypes.DATE().getLogicalType(), // partition: date
DataTypes.STRING().getLogicalType() // partition: region
},
new String[]{"id", "name", "date", "region"}
);
// Define partition columns
List<String> partitionKeys = Arrays.asList("date", "region");
// Create partition field extractor
PartitionFieldExtractor<FileSourceSplit> extractor =
PartitionFieldExtractor.forFileSystem("__HIVE_DEFAULT_PARTITION__");
// Create partitioned input format
ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat =
ParquetColumnarRowInputFormat.createPartitionedFormat(
hadoopConfig,
producedRowType,
partitionKeys,
extractor,
4096, // Larger batch size for partitioned data
true, // UTC timezone
false // Case insensitive
);
// File structure: /data/date=2023-01-01/region=us-west/part-0000.parquet
FileSource<RowData> partitionedSource = FileSource
.forBulkFileFormat(partitionedFormat, new Path("/data"))
.setFileEnumerator(FileEnumerator.create())
.build();// Custom configuration for high-throughput reading
Configuration optimizedConfig = new Configuration();
// Parquet-specific optimizations
optimizedConfig.setBoolean("parquet.enable.dictionary", true);
optimizedConfig.setInt("parquet.page.size", 1048576); // 1MB page size
optimizedConfig.setInt("parquet.block.size", 134217728); // 128MB block size
// Larger batch sizes for analytical workloads
int largeBatchSize = 8192; // 4x default size
ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =
new ParquetColumnarRowInputFormat<>(
optimizedConfig,
rowType,
largeBatchSize,
true, // UTC timestamps
false // Case insensitive
);
// Configure FileSource for optimal throughput
FileSource<RowData> optimizedSource = FileSource
.forBulkFileFormat(optimizedFormat, inputPath)
.monitorContinuously(Duration.ofMinutes(1))
.build();
DataStream<RowData> stream = env
.fromSource(optimizedSource, WatermarkStrategy.noWatermarks(), "optimized-parquet")
.setParallelism(Runtime.getRuntime().availableProcessors()); // Scale with CPU cores// Original file schema: id, name, email, age, created_at, updated_at
// Project only needed columns for better performance
RowType projectedSchema = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(), // id
DataTypes.STRING().getLogicalType(), // name
DataTypes.INT().getLogicalType() // age
},
new String[]{"id", "name", "age"}
);
// Only projected columns are read from Parquet files
ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat =
new ParquetColumnarRowInputFormat<>(
hadoopConfig, projectedSchema, batchSize, true, false
);
// This will only read 3 columns instead of 6, significantly improving I/O performance// Custom factory that adds computed columns
ColumnBatchFactory<FileSourceSplit> customFactory = (split, parquetVectors) -> {
// Original Parquet columns: id, amount
// Add computed column: tax (amount * 0.1)
ColumnVector[] allVectors = new ColumnVector[parquetVectors.length + 1];
System.arraycopy(parquetVectors, 0, allVectors, 0, parquetVectors.length);
// Create computed tax column
WritableColumnVector taxVector = new WritableDoubleVector(parquetVectors[0].getSize());
ColumnVector amountVector = parquetVectors[1]; // Assuming amount is second column
for (int i = 0; i < amountVector.getSize(); i++) {
if (!amountVector.isNullAt(i)) {
double amount = amountVector.getDouble(i);
taxVector.putDouble(i, amount * 0.1);
} else {
taxVector.putNull(i);
}
}
allVectors[parquetVectors.length] = taxVector;
return new VectorizedColumnBatch(allVectors);
};
// Schema including computed column
RowType schemaWithTax = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(), // id
DataTypes.DOUBLE().getLogicalType(), // amount
DataTypes.DOUBLE().getLogicalType() // tax (computed)
},
new String[]{"id", "amount", "tax"}
);
RowType parquetSchema = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(), // id
DataTypes.DOUBLE().getLogicalType() // amount
},
new String[]{"id", "amount"}
);
ParquetColumnarRowInputFormat<FileSourceSplit> customFormat =
new ParquetColumnarRowInputFormat<>(
hadoopConfig,
parquetSchema, // Schema in Parquet file
schemaWithTax, // Final produced schema
customFactory, // Custom batch factory
batchSize,
true,
false
);Common error scenarios and solutions:
try {
ParquetColumnarRowInputFormat<FileSourceSplit> format =
new ParquetColumnarRowInputFormat<>(hadoopConfig, rowType, batchSize, true, false);
} catch (IllegalArgumentException e) {
// Invalid row type, negative batch size, etc.
logger.error("Invalid configuration for Parquet input format", e);
} catch (Exception e) {
// Other initialization errors
logger.error("Failed to create Parquet input format", e);
}
// Runtime reading errors
try {
BulkFormat.Reader<RowData> reader = format.createReader(config, split);
RecordIterator<RowData> iterator = reader.readBatch();
} catch (IOException e) {
// File not found, permission errors, corrupted files
logger.error("Failed to read Parquet file", e);
} catch (RuntimeException e) {
// Schema mismatches, unsupported data types
logger.error("Runtime error during Parquet reading", e);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12