Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-parquet@2.1.0Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications. This library enables efficient processing of Apache Parquet files within the Flink ecosystem, offering vectorized reading, multiple serialization format support, and seamless integration with Flink's Table API and DataStream API.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.formats.parquet.ParquetFileFormatFactory;
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.ParquetBuilder;For Avro integration:
import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;For RowData integration:
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;For Protocol Buffers integration:
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.core.fs.Path;
// Read Avro records from Parquet
FileSource<SpecificRecord> source = FileSource
.forRecordStreamFormat(
AvroParquetReaders.forSpecificRecord(MyAvroRecord.class),
new Path("path/to/parquet/files")
)
.build();
DataStream<SpecificRecord> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"parquet-source"
);import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.flink.connector.file.sink.FileSink;
// Write Avro records to Parquet
FileSink<SpecificRecord> sink = FileSink
.forBulkFormat(
new Path("path/to/output"),
AvroParquetWriters.forSpecificRecord(MyAvroRecord.class)
)
.build();
stream.sinkTo(sink);import org.apache.flink.table.api.TableEnvironment;
// Create Parquet table
tableEnv.executeSql("""
CREATE TABLE parquet_table (
id BIGINT,
name STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = 'path/to/parquet/files',
'format' = 'parquet'
)
""");The Flink Parquet module is built around several key architectural components:
ParquetFileFormatFactory provides the main entry point for Table API integration, creating bulk reading/writing formatsThe design enables efficient large-scale data processing by leveraging Parquet's columnar storage format while maintaining full compatibility with Flink's streaming and batch processing capabilities.
Primary integration point for Flink's Table API, providing format factories for reading and writing Parquet files with comprehensive configuration options.
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
public BulkDecodingFormat<RowData> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
}Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
@FunctionalInterface
public interface ParquetBuilder<T> {
ParquetWriter<T> createWriter(OutputFile out) throws IOException;
}Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.
public class AvroParquetReaders {
public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);
public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);
public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);
}
public class AvroParquetWriters {
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
}Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.
public class ParquetRowDataBuilder {
public static ParquetWriterFactory<RowData> createWriterFactory(
RowType rowType,
Configuration conf,
boolean utcTimestamp
);
}
public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT>
implements FileBasedStatisticsReportableInputFormat {
public static ParquetColumnarRowInputFormat<FileSourceSplit> createPartitionedFormat(
Configuration conf,
RowType producedRowType,
TypeInformation<RowData> producedTypeInfo,
List<String> partitionKeys,
String defaultPartName,
int batchSize,
boolean utcTimestamp,
boolean caseSensitive
);
}Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data.
public class ParquetProtoWriters {
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
}High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.
public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {
public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;
public boolean isSplittable();
}
@FunctionalInterface
public interface ColumnBatchFactory<SplitT> {
VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);
static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();
}Utility classes for schema conversion, configuration management, and statistics reporting, enabling seamless integration between Flink and Parquet type systems.
public class ParquetSchemaConverter {
public static MessageType convertToParquetMessageType(
String name,
RowType rowType,
Configuration conf
);
public static Type convertToParquetType(
String name,
LogicalType logicalType,
Configuration conf
);
}
public class SerializableConfiguration implements Serializable {
public SerializableConfiguration(Configuration configuration);
public Configuration conf();
}// Format factory configuration constants
public static final ConfigOption<Boolean> UTC_TIMEZONE; // default: false
public static final ConfigOption<String> TIMESTAMP_TIME_UNIT; // default: "micros"
public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP; // default: false
public static final ConfigOption<Integer> BATCH_SIZE; // default: 2048public interface BulkWriter<T> {
void addElement(T element) throws IOException;
void flush() throws IOException;
void finish() throws IOException;
interface Factory<T> extends Serializable {
BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
}public interface StreamFormat<T> extends Serializable {
Reader<T> createReader(Configuration config, FileSourceSplit split) throws IOException;
Reader<T> restoreReader(Configuration config, FileSourceSplit split) throws IOException;
boolean isSplittable();
TypeInformation<T> getProducedType();
}