Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-parquet-2-12@1.14.0Apache Flink SQL Parquet provides comprehensive support for reading and writing Parquet files in Flink SQL applications. This package bundles Parquet format libraries with proper shading configuration to avoid dependency conflicts in SQL client environments, supporting both batch and streaming data processing workflows.
org.apache.flink:flink-sql-parquet_2.12:1.14.6import org.apache.flink.formats.parquet.ParquetFileFormatFactory;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.data.RowData;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.flink.table.types.logical.LogicalType;
// Create a Parquet writer factory for RowData
RowType rowType = RowType.of(/* field types */);
Configuration hadoopConfig = new Configuration();
boolean utcTimezone = false;
ParquetWriterFactory<RowData> writerFactory =
ParquetRowDataBuilder.createWriterFactory(rowType, hadoopConfig, utcTimezone);
// Use with Flink's file sink
Path outputPath = new Path("/output/path");
FileSink<RowData> sink = FileSink
.forBulkFormat(outputPath, writerFactory)
.build();Flink SQL Parquet is organized around several key components:
SQL table factory that automatically integrates Parquet format with Flink's table ecosystem. Provides transparent support for CREATE TABLE statements with Parquet format.
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
public BulkDecodingFormat<RowData> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
public String factoryIdentifier();
}High-performance writers for Flink's internal RowData format with comprehensive schema mapping and configuration support.
public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> {
public static ParquetWriterFactory<RowData> createWriterFactory(
RowType rowType,
Configuration conf,
boolean utcTimestamp
);
}
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
public BulkWriter<T> create(FSDataOutputStream stream) throws IOException;
}Complete Avro integration supporting specific records, generic records, and reflection-based serialization with full schema compatibility.
public class ParquetAvroWriters {
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
}Seamless Protocol Buffers integration for writing Protobuf messages to Parquet format with automatic schema generation.
public class ParquetProtoWriters {
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
}High-performance columnar input formats optimized for analytical workloads with vectorized processing and partition support.
public class ParquetColumnarRowInputFormat<SplitT extends FileSourceSplit>
extends ParquetVectorizedInputFormat<RowData, SplitT> {
public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
Configuration hadoopConfig,
RowType producedRowType,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
int batchSize,
boolean isUtcTimestamp,
boolean isCaseSensitive
);
}Utilities for converting between Flink logical types and Parquet schema definitions with full type mapping support.
public class ParquetSchemaConverter {
public static MessageType convertToParquetMessageType(String name, RowType rowType);
}
public class SerializableConfiguration {
public SerializableConfiguration(Configuration conf);
public Configuration conf();
}public static final ConfigOption<Boolean> UTC_TIMEZONE =
key("utc-timezone")
.booleanType()
.defaultValue(false)
.withDescription("Use UTC timezone or local timezone to the conversion between epoch" +
" time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" +
" use UTC timezone");CREATE TABLE my_parquet_table (
id BIGINT,
name STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/parquet/files',
'format' = 'parquet',
'parquet.utc-timezone' = 'true'
);// For RowData
ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(
rowType, hadoopConfig, true
);
// For Avro specific records
ParquetWriterFactory<MyAvroRecord> avroFactory =
ParquetAvroWriters.forSpecificRecord(MyAvroRecord.class);
// For Protobuf messages
ParquetWriterFactory<MyProtoMessage> protoFactory =
ParquetProtoWriters.forType(MyProtoMessage.class);@FunctionalInterface
public interface ParquetBuilder<T> extends Serializable {
ParquetWriter<T> createWriter(OutputFile out) throws IOException;
}
public class ParquetBulkWriter<T> implements BulkWriter<T> {
public ParquetBulkWriter(ParquetWriter<T> parquetWriter);
public void addElement(T datum) throws IOException;
public void flush();
public void finish() throws IOException;
}
public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit>
implements BulkFormat<T, SplitT> {
// Abstract base class for vectorized input formats
}