CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-parquet-2-12

Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.

Pending
Overview
Eval results
Files

format-factory.mddocs/

Format Factory Integration

The Parquet format factory provides seamless integration with Flink's table ecosystem, enabling automatic format detection and configuration for SQL table definitions.

Capabilities

ParquetFileFormatFactory

Main factory class that implements both bulk reader and writer format factories, providing complete integration with Flink's dynamic table system.

/**
 * Parquet format factory for file system connector integration
 * Implements both reading and writing capabilities for SQL tables
 */
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
    
    /**
     * Creates a bulk decoding format for reading Parquet files
     * @param context Dynamic table factory context with catalog information
     * @param formatOptions Configuration options for the format
     * @return BulkDecodingFormat for RowData processing
     */
    public BulkDecodingFormat<RowData> createDecodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
    
    /**
     * Creates an encoding format for writing Parquet files
     * @param context Dynamic table factory context with catalog information
     * @param formatOptions Configuration options for the format  
     * @return EncodingFormat for BulkWriter factory creation
     */
    public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
    
    /**
     * Returns the unique identifier for this format factory
     * @return "parquet" identifier string
     */
    public String factoryIdentifier();
    
    /**
     * Returns the set of required configuration options
     * @return Empty set (no required options)
     */
    public Set<ConfigOption<?>> requiredOptions();
    
    /**
     * Returns the set of optional configuration options
     * @return Set containing UTC_TIMEZONE and other optional configs
     */
    public Set<ConfigOption<?>> optionalOptions();
}

Configuration Options

/**
 * Controls timezone handling for timestamp conversion
 * Default: false (use local timezone)
 * When true: use UTC timezone for epoch time to LocalDateTime conversion
 */
public static final ConfigOption<Boolean> UTC_TIMEZONE = 
    key("utc-timezone")
        .booleanType()
        .defaultValue(false)
        .withDescription(
            "Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. " +
            "Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone"
        );

Usage Examples

SQL Table Definition

-- Create a Parquet table with UTC timezone handling
CREATE TABLE orders (
    order_id BIGINT,
    customer_name STRING,
    order_date TIMESTAMP(3),
    amount DECIMAL(10,2)
) WITH (
    'connector' = 'filesystem',
    'path' = '/data/orders',
    'format' = 'parquet',
    'parquet.utc-timezone' = 'true'
);

-- Query the table (format factory handles Parquet reading automatically)
SELECT customer_name, SUM(amount) 
FROM orders 
WHERE order_date >= TIMESTAMP '2023-01-01 00:00:00'
GROUP BY customer_name;

Programmatic Table Creation

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Parquet;

TableEnvironment tableEnv = // ... get table environment

// The format factory is automatically discovered via service loader
tableEnv.executeSql("""
    CREATE TABLE parquet_table (
        id BIGINT,
        name STRING,
        created_at TIMESTAMP(3)
    ) WITH (
        'connector' = 'filesystem',
        'path' = '/path/to/data',
        'format' = 'parquet',
        'parquet.utc-timezone' = 'false'
    )
""");

Service Registration

The format factory is automatically registered via Java's ServiceLoader mechanism:

# File: META-INF/services/org.apache.flink.table.factories.Factory
org.apache.flink.formats.parquet.ParquetFileFormatFactory

Internal Implementation Details

The factory creates different decoders and encoders based on the table context:

  • Decoding: Creates ParquetColumnarRowInputFormat with vectorized reading
  • Encoding: Creates ParquetRowDataBuilder with RowData writing support
  • Partitioning: Automatically handles partitioned tables with proper field extraction
  • Schema Conversion: Converts Flink logical types to Parquet schema format

Error Handling

The format factory handles common configuration errors:

  • Invalid timezone configuration values
  • Incompatible data types for Parquet format
  • Missing required Hadoop configuration
  • Schema conversion failures between Flink and Parquet types

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12

docs

avro-integration.md

format-factory.md

index.md

protobuf-integration.md

rowdata-writers.md

schema-utilities.md

vectorized-input.md

tile.json