Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications
—
Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.
Builder class for creating RowData-based Parquet writers with proper schema conversion and configuration.
/**
* Builder for creating RowData ParquetWriters with schema conversion
*/
public class ParquetRowDataBuilder {
/**
* Creates a new ParquetRowDataBuilder
* @param outputFile OutputFile to write to
* @param rowType Flink RowType defining the schema
* @param utcTimestamp Whether to use UTC timezone for timestamps
*/
public ParquetRowDataBuilder(OutputFile outputFile, RowType rowType, boolean utcTimestamp);
/**
* Creates a ParquetWriterFactory for RowData with automatic schema conversion
* @param rowType Flink RowType schema
* @param conf Hadoop configuration for Parquet settings
* @param utcTimestamp Whether to use UTC timezone for timestamps
* @return ParquetWriterFactory for writing RowData records
*/
public static ParquetWriterFactory<RowData> createWriterFactory(
RowType rowType,
Configuration conf,
boolean utcTimestamp
);
}Writer implementation that converts Flink RowData to Parquet columnar format with full type support.
/**
* Writes RowData records to Parquet columnar format
*/
public class ParquetRowDataWriter {
/**
* Creates a new ParquetRowDataWriter
* @param recordConsumer Parquet RecordConsumer for writing
* @param rowType Flink RowType schema
* @param schema Parquet MessageType schema
* @param utcTimestamp Whether to use UTC timezone for timestamps
* @param conf Hadoop configuration
*/
public ParquetRowDataWriter(
RecordConsumer recordConsumer,
RowType rowType,
MessageType schema,
boolean utcTimestamp,
Configuration conf
);
/**
* Writes a RowData record to Parquet
* @param record RowData record to write
*/
public void write(RowData record);
}Vectorized input format specifically designed for reading Parquet files as RowData with partition support and statistics reporting.
/**
* Vectorized input format for reading Parquet files as RowData
* @param <SplitT> Type of file split
*/
public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT>
implements FileBasedStatisticsReportableInputFormat {
/**
* Creates a new ParquetColumnarRowInputFormat
* @param hadoopConfig Hadoop configuration
* @param projectedType Flink RowType for the projected output schema
* @param producedTypeInfo TypeInformation for RowData
* @param batchSize Batch size for vectorized reading
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
* @param isCaseSensitive Whether field names are case sensitive
*/
public ParquetColumnarRowInputFormat(
Configuration hadoopConfig,
RowType projectedType,
TypeInformation<RowData> producedTypeInfo,
int batchSize,
boolean isUtcTimestamp,
boolean isCaseSensitive
);
/**
* Creates a partitioned format with partition field support
* @param <SplitT> Type of file split extending FileSourceSplit
* @param hadoopConfig Hadoop configuration
* @param producedRowType Output RowType schema
* @param producedTypeInfo TypeInformation for RowData
* @param partitionKeys List of partition field names
* @param extractor Partition field extractor for the split type
* @param batchSize Batch size for vectorized reading
* @param isUtcTimestamp Whether to use UTC timezone for timestamps
* @param isCaseSensitive Whether field names are case sensitive
* @return ParquetColumnarRowInputFormat with partition support
*/
public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
Configuration hadoopConfig,
RowType producedRowType,
TypeInformation<RowData> producedTypeInfo,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
int batchSize,
boolean isUtcTimestamp,
boolean isCaseSensitive
);
/**
* Returns the produced type information
* @return TypeInformation for RowData
*/
public TypeInformation<RowData> getProducedType();
/**
* Reports statistics from Parquet file metadata
* @param files List of files to analyze
* @param producedDataType Expected output data type
* @return TableStats with row counts and column statistics
*/
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.data.RowData;
// Define schema using Flink types
RowType rowType = RowType.of(
new LogicalType[] {
DataTypes.BIGINT().getLogicalType(), // id
DataTypes.STRING().getLogicalType(), // name
DataTypes.DOUBLE().getLogicalType(), // price
DataTypes.TIMESTAMP(3).getLogicalType() // created_at
},
new String[] {"id", "name", "price", "created_at"}
);
// Create writer factory
Configuration conf = new Configuration();
ParquetWriterFactory<RowData> writerFactory =
ParquetRowDataBuilder.createWriterFactory(rowType, conf, true);
// Use with FileSink
FileSink<RowData> sink = FileSink
.forBulkFormat(new Path("/output/products"), writerFactory)
.build();import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.connector.file.src.FileSource;
// Create vectorized input format
ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat =
ParquetColumnarRowInputFormat.createPartitionedFormat(
new Configuration(), // Hadoop config
rowType, // Output schema
TypeInformation.of(RowData.class), // Type info
Arrays.asList("date"), // Partition keys
"__DEFAULT_PARTITION__", // Default partition name
2048, // Batch size
true, // UTC timezone
true // Case sensitive
);
// Create file source with vectorized reading
FileSource<RowData> source = FileSource
.forBulkFormat(inputFormat, new Path("/data/partitioned"))
.build();
DataStream<RowData> rowDataStream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"parquet-rowdata-source"
);import org.apache.flink.table.types.logical.*;
// Define complex schema with nested types
RowType nestedRowType = RowType.of(
new LogicalType[] {
DataTypes.BIGINT().getLogicalType(), // order_id
RowType.of( // customer (nested)
new LogicalType[] {
DataTypes.BIGINT().getLogicalType(), // customer.id
DataTypes.STRING().getLogicalType() // customer.name
},
new String[] {"id", "name"}
),
ArrayType.newBuilder() // items (array)
.elementType(RowType.of(
new LogicalType[] {
DataTypes.STRING().getLogicalType(), // item.product_id
DataTypes.INT().getLogicalType(), // item.quantity
DataTypes.DECIMAL(10, 2).getLogicalType() // item.price
},
new String[] {"product_id", "quantity", "price"}
))
.build(),
MapType.newBuilder() // metadata (map)
.keyType(DataTypes.STRING().getLogicalType())
.valueType(DataTypes.STRING().getLogicalType())
.build()
},
new String[] {"order_id", "customer", "items", "metadata"}
);
// Create writer for complex types
ParquetWriterFactory<RowData> complexWriterFactory =
ParquetRowDataBuilder.createWriterFactory(nestedRowType, conf, true);// Reading partitioned data with automatic partition field injection
List<String> partitionKeys = Arrays.asList("year", "month", "day");
ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat =
ParquetColumnarRowInputFormat.createPartitionedFormat(
conf,
producedRowType, // Schema including partition fields
typeInfo,
partitionKeys, // Partition field names
"UNKNOWN", // Default for null partitions
4096, // Larger batch for partitioned data
true, // UTC timestamps
false // Case insensitive partition names
);
// File structure: /data/year=2023/month=01/day=15/file.parquet
// Partition fields are automatically added to RowData// Only read specific columns for better performance
RowType projectedType = RowType.of(
new LogicalType[] {
DataTypes.BIGINT().getLogicalType(), // id
DataTypes.STRING().getLogicalType() // name
},
new String[] {"id", "name"}
);
ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat =
new ParquetColumnarRowInputFormat<>(
conf,
projectedType, // Only projected fields
TypeInformation.of(RowData.class),
Arrays.asList("id", "name"), // Selected fields
null, // No field ID mapping
2048, // Batch size
true, // UTC timezone
true // Case sensitive
);import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
// Create table descriptor for RowData integration
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("order_id", DataTypes.BIGINT())
.column("customer_name", DataTypes.STRING())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("order_time", DataTypes.TIMESTAMP(3))
.watermark("order_time", "order_time - INTERVAL '5' SECOND")
.build())
.option("path", "/data/orders")
.option("format", "parquet")
.option("parquet.batch-size", "4096")
.option("parquet.utc-timezone", "true")
.build();
Table ordersTable = tableEnv.from(descriptor);// Supported Flink to Parquet type mappings:
// Primitive types
DataTypes.BOOLEAN() → BOOLEAN
DataTypes.TINYINT() → INT32 (INT_8)
DataTypes.SMALLINT() → INT32 (INT_16)
DataTypes.INT() → INT32
DataTypes.BIGINT() → INT64
DataTypes.FLOAT() → FLOAT
DataTypes.DOUBLE() → DOUBLE
DataTypes.STRING() → BINARY (UTF8)
DataTypes.BYTES() → BINARY
// Temporal types
DataTypes.DATE() → INT32 (DATE)
DataTypes.TIME() → INT32 (TIME_MILLIS)
DataTypes.TIMESTAMP(3) → INT64 (TIMESTAMP_MILLIS)
DataTypes.TIMESTAMP(6) → INT64 (TIMESTAMP_MICROS)
// Decimal types
DataTypes.DECIMAL(p,s) → FIXED_LEN_BYTE_ARRAY or INT32/INT64 (DECIMAL)
// Complex types
DataTypes.ARRAY(T) → LIST with element type conversion
DataTypes.MAP(K,V) → MAP with key/value type conversion
DataTypes.ROW(...) → GROUP with nested field conversions// Adjust batch size based on memory and performance requirements
int batchSize = calculateOptimalBatchSize(
availableMemory,
numberOfColumns,
avgRowSize
);
ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat =
ParquetColumnarRowInputFormat.createPartitionedFormat(
conf, rowType, typeInfo, partitions, defaultPart,
batchSize, // Tuned batch size
utcTime, caseSensitive
);// Configure Parquet memory settings
Configuration conf = new Configuration();
conf.set("parquet.memory.min.chunk.size", "1048576"); // 1MB
conf.set("parquet.memory.pool.ratio", "0.95"); // 95% of available memory
conf.set("parquet.page.size", "1048576"); // 1MB pages
conf.set("parquet.block.size", "134217728"); // 128MB blocksThe RowData integration provides the most efficient path for Table API operations by directly using Flink's internal row representation without additional serialization overhead.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parquet