CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Pending
Overview
Eval results
Files

rowdata-integration.mddocs/

RowData Integration

Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.

Capabilities

ParquetRowDataBuilder

Builder class for creating RowData-based Parquet writers with proper schema conversion and configuration.

/**
 * Builder for creating RowData ParquetWriters with schema conversion
 */
public class ParquetRowDataBuilder {
    
    /**
     * Creates a new ParquetRowDataBuilder
     * @param outputFile OutputFile to write to
     * @param rowType Flink RowType defining the schema
     * @param utcTimestamp Whether to use UTC timezone for timestamps
     */
    public ParquetRowDataBuilder(OutputFile outputFile, RowType rowType, boolean utcTimestamp);
    
    /**
     * Creates a ParquetWriterFactory for RowData with automatic schema conversion
     * @param rowType Flink RowType schema
     * @param conf Hadoop configuration for Parquet settings
     * @param utcTimestamp Whether to use UTC timezone for timestamps
     * @return ParquetWriterFactory for writing RowData records
     */
    public static ParquetWriterFactory<RowData> createWriterFactory(
        RowType rowType, 
        Configuration conf, 
        boolean utcTimestamp
    );
}

ParquetRowDataWriter

Writer implementation that converts Flink RowData to Parquet columnar format with full type support.

/**
 * Writes RowData records to Parquet columnar format
 */
public class ParquetRowDataWriter {
    
    /**
     * Creates a new ParquetRowDataWriter
     * @param recordConsumer Parquet RecordConsumer for writing
     * @param rowType Flink RowType schema
     * @param schema Parquet MessageType schema
     * @param utcTimestamp Whether to use UTC timezone for timestamps
     * @param conf Hadoop configuration
     */
    public ParquetRowDataWriter(
        RecordConsumer recordConsumer,
        RowType rowType,
        MessageType schema,
        boolean utcTimestamp,
        Configuration conf
    );
    
    /**
     * Writes a RowData record to Parquet
     * @param record RowData record to write
     */
    public void write(RowData record);
}

ParquetColumnarRowInputFormat

Vectorized input format specifically designed for reading Parquet files as RowData with partition support and statistics reporting.

/**
 * Vectorized input format for reading Parquet files as RowData
 * @param <SplitT> Type of file split
 */
public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT> 
    implements FileBasedStatisticsReportableInputFormat {
    
    /**
     * Creates a new ParquetColumnarRowInputFormat
     * @param hadoopConfig Hadoop configuration
     * @param projectedType Flink RowType for the projected output schema
     * @param producedTypeInfo TypeInformation for RowData
     * @param batchSize Batch size for vectorized reading
     * @param isUtcTimestamp Whether to use UTC timezone for timestamps
     * @param isCaseSensitive Whether field names are case sensitive
     */
    public ParquetColumnarRowInputFormat(
        Configuration hadoopConfig,
        RowType projectedType,
        TypeInformation<RowData> producedTypeInfo,
        int batchSize,
        boolean isUtcTimestamp,
        boolean isCaseSensitive
    );
    
    /**
     * Creates a partitioned format with partition field support
     * @param <SplitT> Type of file split extending FileSourceSplit
     * @param hadoopConfig Hadoop configuration
     * @param producedRowType Output RowType schema
     * @param producedTypeInfo TypeInformation for RowData
     * @param partitionKeys List of partition field names
     * @param extractor Partition field extractor for the split type
     * @param batchSize Batch size for vectorized reading
     * @param isUtcTimestamp Whether to use UTC timezone for timestamps
     * @param isCaseSensitive Whether field names are case sensitive
     * @return ParquetColumnarRowInputFormat with partition support
     */
    public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
        Configuration hadoopConfig,
        RowType producedRowType,
        TypeInformation<RowData> producedTypeInfo,
        List<String> partitionKeys,
        PartitionFieldExtractor<SplitT> extractor,
        int batchSize,
        boolean isUtcTimestamp,
        boolean isCaseSensitive
    );
    
    /**
     * Returns the produced type information
     * @return TypeInformation for RowData
     */
    public TypeInformation<RowData> getProducedType();
    
    /**
     * Reports statistics from Parquet file metadata
     * @param files List of files to analyze
     * @param producedDataType Expected output data type
     * @return TableStats with row counts and column statistics
     */
    public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}

Usage Examples

Basic RowData Writing

import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.data.RowData;

// Define schema using Flink types
RowType rowType = RowType.of(
    new LogicalType[] {
        DataTypes.BIGINT().getLogicalType(),        // id
        DataTypes.STRING().getLogicalType(),        // name  
        DataTypes.DOUBLE().getLogicalType(),        // price
        DataTypes.TIMESTAMP(3).getLogicalType()     // created_at
    },
    new String[] {"id", "name", "price", "created_at"}
);

// Create writer factory
Configuration conf = new Configuration();
ParquetWriterFactory<RowData> writerFactory = 
    ParquetRowDataBuilder.createWriterFactory(rowType, conf, true);

// Use with FileSink
FileSink<RowData> sink = FileSink
    .forBulkFormat(new Path("/output/products"), writerFactory)
    .build();

Reading with Vectorized Format

import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
import org.apache.flink.connector.file.src.FileSource;

// Create vectorized input format
ParquetColumnarRowInputFormat<FileSourceSplit> inputFormat = 
    ParquetColumnarRowInputFormat.createPartitionedFormat(
        new Configuration(),        // Hadoop config
        rowType,                   // Output schema
        TypeInformation.of(RowData.class), // Type info
        Arrays.asList("date"),     // Partition keys
        "__DEFAULT_PARTITION__",   // Default partition name
        2048,                      // Batch size
        true,                      // UTC timezone
        true                       // Case sensitive
    );

// Create file source with vectorized reading
FileSource<RowData> source = FileSource
    .forBulkFormat(inputFormat, new Path("/data/partitioned"))
    .build();

DataStream<RowData> rowDataStream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "parquet-rowdata-source"
);

Complex Type Support

import org.apache.flink.table.types.logical.*;

// Define complex schema with nested types
RowType nestedRowType = RowType.of(
    new LogicalType[] {
        DataTypes.BIGINT().getLogicalType(),                    // order_id
        RowType.of(                                            // customer (nested)
            new LogicalType[] {
                DataTypes.BIGINT().getLogicalType(),           // customer.id
                DataTypes.STRING().getLogicalType()            // customer.name
            },
            new String[] {"id", "name"}
        ),
        ArrayType.newBuilder()                                  // items (array)
            .elementType(RowType.of(
                new LogicalType[] {
                    DataTypes.STRING().getLogicalType(),       // item.product_id
                    DataTypes.INT().getLogicalType(),          // item.quantity
                    DataTypes.DECIMAL(10, 2).getLogicalType()  // item.price
                },
                new String[] {"product_id", "quantity", "price"}
            ))
            .build(),
        MapType.newBuilder()                                    // metadata (map)
            .keyType(DataTypes.STRING().getLogicalType())
            .valueType(DataTypes.STRING().getLogicalType())
            .build()
    },
    new String[] {"order_id", "customer", "items", "metadata"}
);

// Create writer for complex types
ParquetWriterFactory<RowData> complexWriterFactory = 
    ParquetRowDataBuilder.createWriterFactory(nestedRowType, conf, true);

Partition Field Handling

// Reading partitioned data with automatic partition field injection
List<String> partitionKeys = Arrays.asList("year", "month", "day");

ParquetColumnarRowInputFormat<FileSourceSplit> partitionedFormat = 
    ParquetColumnarRowInputFormat.createPartitionedFormat(
        conf,
        producedRowType,           // Schema including partition fields
        typeInfo,
        partitionKeys,             // Partition field names
        "UNKNOWN",                 // Default for null partitions
        4096,                      // Larger batch for partitioned data
        true,                      // UTC timestamps
        false                      // Case insensitive partition names
    );

// File structure: /data/year=2023/month=01/day=15/file.parquet
// Partition fields are automatically added to RowData

Column Projection

// Only read specific columns for better performance
RowType projectedType = RowType.of(
    new LogicalType[] {
        DataTypes.BIGINT().getLogicalType(),    // id
        DataTypes.STRING().getLogicalType()     // name
    },
    new String[] {"id", "name"}
);

ParquetColumnarRowInputFormat<FileSourceSplit> projectedFormat = 
    new ParquetColumnarRowInputFormat<>(
        conf,
        projectedType,                          // Only projected fields
        TypeInformation.of(RowData.class),
        Arrays.asList("id", "name"),           // Selected fields
        null,                                   // No field ID mapping
        2048,                                   // Batch size
        true,                                   // UTC timezone
        true                                    // Case sensitive
    );

Integration with Table API

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;

// Create table descriptor for RowData integration
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
    .schema(Schema.newBuilder()
        .column("order_id", DataTypes.BIGINT())
        .column("customer_name", DataTypes.STRING())
        .column("amount", DataTypes.DECIMAL(10, 2))
        .column("order_time", DataTypes.TIMESTAMP(3))
        .watermark("order_time", "order_time - INTERVAL '5' SECOND")
        .build())
    .option("path", "/data/orders")
    .option("format", "parquet")
    .option("parquet.batch-size", "4096")
    .option("parquet.utc-timezone", "true")
    .build();

Table ordersTable = tableEnv.from(descriptor);

Type Conversion Examples

// Supported Flink to Parquet type mappings:

// Primitive types
DataTypes.BOOLEAN()     → BOOLEAN
DataTypes.TINYINT()     → INT32 (INT_8)
DataTypes.SMALLINT()    → INT32 (INT_16) 
DataTypes.INT()         → INT32
DataTypes.BIGINT()      → INT64
DataTypes.FLOAT()       → FLOAT
DataTypes.DOUBLE()      → DOUBLE
DataTypes.STRING()      → BINARY (UTF8)
DataTypes.BYTES()       → BINARY

// Temporal types  
DataTypes.DATE()        → INT32 (DATE)
DataTypes.TIME()        → INT32 (TIME_MILLIS)
DataTypes.TIMESTAMP(3)  → INT64 (TIMESTAMP_MILLIS)
DataTypes.TIMESTAMP(6)  → INT64 (TIMESTAMP_MICROS)

// Decimal types
DataTypes.DECIMAL(p,s)  → FIXED_LEN_BYTE_ARRAY or INT32/INT64 (DECIMAL)

// Complex types
DataTypes.ARRAY(T)      → LIST with element type conversion
DataTypes.MAP(K,V)      → MAP with key/value type conversion  
DataTypes.ROW(...)      → GROUP with nested field conversions

Performance Optimization

Batch Size Tuning

// Adjust batch size based on memory and performance requirements
int batchSize = calculateOptimalBatchSize(
    availableMemory,
    numberOfColumns,
    avgRowSize
);

ParquetColumnarRowInputFormat<FileSourceSplit> optimizedFormat = 
    ParquetColumnarRowInputFormat.createPartitionedFormat(
        conf, rowType, typeInfo, partitions, defaultPart,
        batchSize,  // Tuned batch size
        utcTime, caseSensitive
    );

Memory Management

// Configure Parquet memory settings
Configuration conf = new Configuration();
conf.set("parquet.memory.min.chunk.size", "1048576");      // 1MB
conf.set("parquet.memory.pool.ratio", "0.95");             // 95% of available memory
conf.set("parquet.page.size", "1048576");                  // 1MB pages
conf.set("parquet.block.size", "134217728");               // 128MB blocks

The RowData integration provides the most efficient path for Table API operations by directly using Flink's internal row representation without additional serialization overhead.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parquet

docs

avro-integration.md

index.md

protobuf-integration.md

rowdata-integration.md

table-integration.md

utilities.md

vectorized-reading.md

writing-support.md

tile.json