or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md
tile.json

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-parquet@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-parquet@2.1.0

index.mddocs/

Apache Flink Parquet Format

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications. This library enables efficient processing of Apache Parquet files within the Flink ecosystem, offering vectorized reading, multiple serialization format support, and seamless integration with Flink's Table API and DataStream API.

Package Information

  • Package Name: flink-parquet
  • Package Type: Maven
  • Group ID: org.apache.flink
  • Artifact ID: flink-parquet
  • Language: Java
  • Installation: Add to Maven pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet</artifactId>
    <version>2.1.0</version>
</dependency>

Core Imports

import org.apache.flink.formats.parquet.ParquetFileFormatFactory;
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.ParquetBuilder;

For Avro integration:

import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;

For RowData integration:

import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;

For Protocol Buffers integration:

import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

Basic Usage

Reading Parquet Files

import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.core.fs.Path;

// Read Avro records from Parquet
FileSource<SpecificRecord> source = FileSource
    .forRecordStreamFormat(
        AvroParquetReaders.forSpecificRecord(MyAvroRecord.class),
        new Path("path/to/parquet/files")
    )
    .build();

DataStream<SpecificRecord> stream = env.fromSource(
    source, 
    WatermarkStrategy.noWatermarks(), 
    "parquet-source"
);

Writing Parquet Files

import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.flink.connector.file.sink.FileSink;

// Write Avro records to Parquet
FileSink<SpecificRecord> sink = FileSink
    .forBulkFormat(
        new Path("path/to/output"),
        AvroParquetWriters.forSpecificRecord(MyAvroRecord.class)
    )
    .build();

stream.sinkTo(sink);

Table API Integration

import org.apache.flink.table.api.TableEnvironment;

// Create Parquet table
tableEnv.executeSql("""
    CREATE TABLE parquet_table (
        id BIGINT,
        name STRING,
        timestamp_col TIMESTAMP(3)
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'path/to/parquet/files',
        'format' = 'parquet'
    )
""");

Architecture

The Flink Parquet module is built around several key architectural components:

  • Format Factory: ParquetFileFormatFactory provides the main entry point for Table API integration, creating bulk reading/writing formats
  • Vectorized Reading: High-performance columnar readers that process multiple rows in batches for improved throughput
  • Multi-Format Support: Seamless integration with Avro, Protocol Buffers, and Flink's native RowData serialization
  • Schema Conversion: Automatic conversion between Flink types and Parquet schema with support for nested data structures
  • Statistics Integration: Built-in support for collecting and reporting file-level statistics for query optimization

The design enables efficient large-scale data processing by leveraging Parquet's columnar storage format while maintaining full compatibility with Flink's streaming and batch processing capabilities.

Capabilities

Table API Format Factory

Primary integration point for Flink's Table API, providing format factories for reading and writing Parquet files with comprehensive configuration options.

public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
    public BulkDecodingFormat<RowData> createDecodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
    
    public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
}

Table Integration

Writing Support

Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.

public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
    public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}

@FunctionalInterface
public interface ParquetBuilder<T> {
    ParquetWriter<T> createWriter(OutputFile out) throws IOException;
}

Writing Support

Avro Integration

Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.

public class AvroParquetReaders {
    public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);
    public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);
    public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);
}

public class AvroParquetWriters {
    public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
    public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
    public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
}

Avro Integration

RowData Integration

Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.

public class ParquetRowDataBuilder {
    public static ParquetWriterFactory<RowData> createWriterFactory(
        RowType rowType, 
        Configuration conf, 
        boolean utcTimestamp
    );
}

public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT> 
    implements FileBasedStatisticsReportableInputFormat {
    public static ParquetColumnarRowInputFormat<FileSourceSplit> createPartitionedFormat(
        Configuration conf,
        RowType producedRowType,
        TypeInformation<RowData> producedTypeInfo,
        List<String> partitionKeys,
        String defaultPartName,
        int batchSize,
        boolean utcTimestamp,
        boolean caseSensitive
    );
}

RowData Integration

Protocol Buffers Integration

Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data.

public class ParquetProtoWriters {
    public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
}

Protocol Buffers Integration

Vectorized Reading

High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.

public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {
    public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;
    public boolean isSplittable();
}

@FunctionalInterface
public interface ColumnBatchFactory<SplitT> {
    VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);
    static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();
}

Vectorized Reading

Utilities and Schema Conversion

Utility classes for schema conversion, configuration management, and statistics reporting, enabling seamless integration between Flink and Parquet type systems.

public class ParquetSchemaConverter {
    public static MessageType convertToParquetMessageType(
        String name, 
        RowType rowType, 
        Configuration conf
    );
    
    public static Type convertToParquetType(
        String name, 
        LogicalType logicalType, 
        Configuration conf
    );
}

public class SerializableConfiguration implements Serializable {
    public SerializableConfiguration(Configuration configuration);
    public Configuration conf();
}

Utilities

Configuration Options

// Format factory configuration constants
public static final ConfigOption<Boolean> UTC_TIMEZONE;              // default: false
public static final ConfigOption<String> TIMESTAMP_TIME_UNIT;        // default: "micros"
public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP;     // default: false
public static final ConfigOption<Integer> BATCH_SIZE;                // default: 2048

Common Types

BulkWriter Factory

public interface BulkWriter<T> {
    void addElement(T element) throws IOException;
    void flush() throws IOException;
    void finish() throws IOException;
    
    interface Factory<T> extends Serializable {
        BulkWriter<T> create(FSDataOutputStream out) throws IOException;
    }
}

Stream Format

public interface StreamFormat<T> extends Serializable {
    Reader<T> createReader(Configuration config, FileSourceSplit split) throws IOException;
    Reader<T> restoreReader(Configuration config, FileSourceSplit split) throws IOException;
    boolean isSplittable();
    TypeInformation<T> getProducedType();
}