CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Pending
Overview
Eval results
Files

table-integration.mddocs/

Table Integration

Primary integration point for Flink's Table API, providing format factories for reading and writing Parquet files with comprehensive configuration options and statistics support.

Capabilities

ParquetFileFormatFactory

Main factory class implementing both bulk reader and writer format factories for seamless Table API integration.

/**
 * Parquet format factory for file system connectors
 */
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
    
    /**
     * Creates a bulk decoding format for reading Parquet files
     * @param context Dynamic table factory context
     * @param formatOptions Configuration options
     * @return BulkDecodingFormat for reading RowData from Parquet files
     */
    public BulkDecodingFormat<RowData> createDecodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
    
    /**
     * Creates an encoding format for writing Parquet files  
     * @param context Dynamic table factory context
     * @param formatOptions Configuration options
     * @return EncodingFormat for writing RowData to Parquet files
     */
    public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
    
    /**
     * Returns the format identifier for this factory
     * @return "parquet"
     */
    public String factoryIdentifier();
    
    /**
     * Required configuration options (none for Parquet)
     * @return Empty set
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Optional configuration options 
     * @return Set of supported configuration options
     */
    public Set<ConfigOption<?>> optionalOptions();
}

Configuration Options

Essential configuration options for controlling Parquet reading and writing behavior.

/**
 * Format identifier constant
 */
public static final String IDENTIFIER = "parquet";

/**
 * Use UTC timezone or local timezone for timestamp conversion
 * Default: false (use local timezone)
 */
public static final ConfigOption<Boolean> UTC_TIMEZONE = 
    key("utc-timezone")
        .booleanType()
        .defaultValue(false);

/**
 * Time unit for storing Parquet timestamps
 * Values: "nanos", "micros", "millis"
 * Default: "micros"
 */
public static final ConfigOption<String> TIMESTAMP_TIME_UNIT = 
    key("timestamp.time.unit")
        .stringType()
        .defaultValue("micros");

/**
 * Write timestamps as int64/LogicalTypes instead of int96/OriginalTypes
 * Default: false (use int96 format)
 */
public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP = 
    key("write.int64.timestamp")
        .booleanType()
        .defaultValue(false);

/**
 * Batch size for vectorized reading
 * Default: 2048 rows per batch
 */
public static final ConfigOption<Integer> BATCH_SIZE = 
    key("batch-size")
        .intType()
        .defaultValue(2048);

ParquetBulkDecodingFormat

Bulk decoding format implementation with statistics reporting and projection support.

/**
 * Bulk decoding format for reading Parquet files with statistics support
 */
public static class ParquetBulkDecodingFormat 
    implements ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
               BulkDecodingFormat<RowData>,
               FileBasedStatisticsReportableInputFormat {
    
    /**
     * Creates runtime decoder with projection support
     * @param sourceContext Table source context
     * @param producedDataType Output data type
     * @param projections Column projections 
     * @return BulkFormat for reading projected RowData
     */
    public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
        DynamicTableSource.Context sourceContext,
        DataType producedDataType,
        int[][] projections
    );
    
    /**
     * Returns supported changelog mode (insert-only)
     * @return ChangelogMode.insertOnly()
     */
    public ChangelogMode getChangelogMode();
    
    /**
     * Reports table statistics from Parquet file metadata
     * @param files List of Parquet files to analyze
     * @param producedDataType Expected output data type
     * @return TableStats with row counts and column statistics
     */
    public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}

Usage Examples

Basic Table Creation

import org.apache.flink.table.api.TableEnvironment;

TableEnvironment tableEnv = TableEnvironment.create(settings);

// Create table with Parquet format
tableEnv.executeSql("""
    CREATE TABLE orders (
        order_id BIGINT,
        customer_id STRING,
        product_name STRING,
        quantity INT,
        price DECIMAL(10,2),
        order_date TIMESTAMP(3)
    ) WITH (
        'connector' = 'filesystem',
        'path' = '/data/orders',
        'format' = 'parquet'
    )
""");

Advanced Configuration

// Table with custom Parquet settings
tableEnv.executeSql("""
    CREATE TABLE events (
        event_id BIGINT,
        timestamp_col TIMESTAMP(3),
        payload STRING
    ) WITH (
        'connector' = 'filesystem',
        'path' = '/data/events',
        'format' = 'parquet',
        'parquet.utc-timezone' = 'true',
        'parquet.timestamp.time.unit' = 'nanos',
        'parquet.write.int64.timestamp' = 'true',
        'parquet.batch-size' = '4096'
    )
""");

Reading with Projections

// Only read specific columns for better performance
Table result = tableEnv.sqlQuery("""
    SELECT order_id, customer_id, price 
    FROM orders 
    WHERE order_date >= TIMESTAMP '2023-01-01 00:00:00'
""");

Partition Support

// Partitioned Parquet table
tableEnv.executeSql("""
    CREATE TABLE sales_partitioned (
        transaction_id BIGINT,
        amount DECIMAL(10,2),
        product_category STRING,
        sale_date DATE
    ) PARTITIONED BY (sale_date) WITH (
        'connector' = 'filesystem',
        'path' = '/data/sales',
        'format' = 'parquet',
        'sink.partition-commit.policy.kind' = 'success-file'
    )
""");

Statistics and Performance

The Parquet format factory automatically extracts statistics from Parquet file metadata:

  • Row counts: Exact counts from file metadata
  • Column statistics: Min/max values, null counts where available
  • File-level metrics: Used for query planning and optimization
  • Projection pushdown: Only reads required columns from storage
  • Predicate pushdown: Filters applied at file level when possible

This enables Flink's cost-based optimizer to make intelligent decisions about query execution plans.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parquet

docs

avro-integration.md

index.md

protobuf-integration.md

rowdata-integration.md

table-integration.md

utilities.md

vectorized-reading.md

writing-support.md

tile.json