Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.
—
High-performance writers for Flink's internal RowData format, providing comprehensive schema mapping and configuration support for writing Parquet files.
Builder class for creating Parquet writers specifically designed for Flink's RowData format with full schema conversion and configuration support.
/**
* Builder for creating Parquet writers that handle Flink RowData
* Extends ParquetWriter.Builder with RowData-specific functionality
*/
public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> {
/**
* Creates a new ParquetRowDataBuilder instance
* @param path Output file path for the Parquet writer
* @param rowType Flink logical row type defining the schema
* @param utcTimestamp Whether to use UTC timezone for timestamp conversion
*/
public ParquetRowDataBuilder(OutputFile path, RowType rowType, boolean utcTimestamp);
/**
* Creates a complete ParquetWriterFactory for RowData
* @param rowType Flink logical row type defining the data schema
* @param conf Hadoop configuration for Parquet settings
* @param utcTimestamp Whether to use UTC timezone for timestamp conversion
* @return ParquetWriterFactory configured for RowData writing
*/
public static ParquetWriterFactory<RowData> createWriterFactory(
RowType rowType,
Configuration conf,
boolean utcTimestamp
);
}Generic factory for creating Parquet bulk writers using user-provided builder configurations.
/**
* Factory that creates Parquet BulkWriter instances
* Uses user-supplied ParquetBuilder to configure the underlying ParquetWriter
*/
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
/**
* Creates a new ParquetWriterFactory
* @param writerBuilder Builder to construct the ParquetWriter
*/
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
/**
* Creates a BulkWriter for the given output stream
* @param stream FSDataOutputStream to write Parquet data to
* @return BulkWriter instance wrapping a ParquetWriter
* @throws IOException if writer creation fails
*/
public BulkWriter<T> create(FSDataOutputStream stream) throws IOException;
}BulkWriter implementation that wraps Parquet writers for integration with Flink's streaming sinks.
/**
* BulkWriter implementation wrapping a ParquetWriter
* Provides the interface between Flink's streaming framework and Parquet writing
*/
public class ParquetBulkWriter<T> implements BulkWriter<T> {
/**
* Creates a new ParquetBulkWriter
* @param parquetWriter The underlying ParquetWriter to wrap
*/
public ParquetBulkWriter(ParquetWriter<T> parquetWriter);
/**
* Adds an element to the Parquet file
* @param datum The data element to write
* @throws IOException if writing fails
*/
public void addElement(T datum) throws IOException;
/**
* Flushes any buffered data (no-op for Parquet)
*/
public void flush();
/**
* Finishes writing and closes the Parquet file
* @throws IOException if closing fails
*/
public void finish() throws IOException;
}Internal builder implementation for creating RowData-specific Parquet writers with proper configuration.
/**
* Internal ParquetBuilder implementation for RowData
* Handles Hadoop configuration and Parquet writer setup
*/
public static class FlinkParquetBuilder implements ParquetBuilder<RowData> {
/**
* Creates a new FlinkParquetBuilder
* @param rowType Flink logical row type
* @param conf Hadoop configuration
* @param utcTimestamp UTC timezone flag
*/
public FlinkParquetBuilder(RowType rowType, Configuration conf, boolean utcTimestamp);
/**
* Creates a configured ParquetWriter for the given output file
* @param out OutputFile to write to
* @return Configured ParquetWriter instance
* @throws IOException if writer creation fails
*/
public ParquetWriter<RowData> createWriter(OutputFile out) throws IOException;
}import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.hadoop.conf.Configuration;
// Define schema
RowType rowType = RowType.of(
new RowType.RowField("id", LogicalType.of(LogicalTypeRoot.BIGINT)),
new RowType.RowField("name", LogicalType.of(LogicalTypeRoot.VARCHAR, 255)),
new RowType.RowField("timestamp", LogicalType.of(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE))
);
// Create writer factory
Configuration hadoopConfig = new Configuration();
boolean useUtcTimezone = true;
ParquetWriterFactory<RowData> writerFactory =
ParquetRowDataBuilder.createWriterFactory(rowType, hadoopConfig, useUtcTimezone);import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
DataStream<RowData> dataStream = // ... your data stream
// Create file sink with Parquet writer
FileSink<RowData> parquetSink = FileSink
.forBulkFormat(new Path("/output/path"), writerFactory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.build())
.build();
dataStream.sinkTo(parquetSink);import org.apache.hadoop.conf.Configuration;
import static org.apache.parquet.hadoop.ParquetOutputFormat.*;
// Configure Hadoop settings for Parquet
Configuration conf = new Configuration();
conf.set("parquet.compression", "SNAPPY");
conf.setInt("parquet.block.size", 134217728); // 128MB
conf.setInt("parquet.page.size", 1048576); // 1MB
conf.setBoolean("parquet.enable.dictionary", true);
// Create writer with custom configuration
ParquetWriterFactory<RowData> customWriterFactory =
ParquetRowDataBuilder.createWriterFactory(rowType, conf, false);import org.apache.flink.formats.parquet.StreamOutputFile;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.parquet.hadoop.ParquetWriter;
// Create output stream
FSDataOutputStream outputStream = // ... create output stream
StreamOutputFile outputFile = new StreamOutputFile(outputStream);
// Build writer manually
ParquetWriter<RowData> writer = new ParquetRowDataBuilder(outputFile, rowType, true)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withRowGroupSize(134217728)
.withPageSize(1048576)
.withDictionaryEncoding(true)
.build();
// Write data
RowData rowData = // ... create row data
writer.write(rowData);
writer.close();// Available compression codecs
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.withCompressionCodec(CompressionCodecName.SNAPPY) // Default, good balance
.withCompressionCodec(CompressionCodecName.GZIP) // High compression
.withCompressionCodec(CompressionCodecName.LZO)
.withCompressionCodec(CompressionCodecName.BROTLI) // Best compression
.withCompressionCodec(CompressionCodecName.LZ4) // Fast compression
.withCompressionCodec(CompressionCodecName.ZSTD) // Good compression + speed// Row group size (default: 134MB)
.withRowGroupSize(268435456) // 256MB for better compression
// Page size (default: 1MB)
.withPageSize(2097152) // 2MB for reduced metadata overhead
// Dictionary encoding (default: true)
.withDictionaryEncoding(false) // Disable for high-cardinality data
// Dictionary page size (default: 1MB)
.withDictionaryPageSize(2097152)Common exceptions and error scenarios:
try {
ParquetWriterFactory<RowData> factory =
ParquetRowDataBuilder.createWriterFactory(rowType, conf, true);
} catch (IllegalArgumentException e) {
// Invalid row type or unsupported data type
} catch (RuntimeException e) {
// Configuration errors or schema conversion failures
}
try {
writer.write(rowData);
} catch (IOException e) {
// File system errors, disk full, permission issues
} catch (RuntimeException e) {
// Data conversion errors, schema mismatches
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12