CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-parquet-2-12

Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.

Pending
Overview
Eval results
Files

schema-utilities.mddocs/

Schema Utilities

Utilities for converting between Flink logical types and Parquet schema definitions, along with serializable configuration wrappers for distributed processing.

Capabilities

ParquetSchemaConverter

Utility class for converting between Flink's logical type system and Parquet's schema representation with full type mapping support.

/**
 * Schema converter for translating between Parquet and Flink internal types
 * Handles complex type mappings including nested structures, arrays, and maps
 */
public class ParquetSchemaConverter {
    
    /**
     * Converts a Flink RowType to a Parquet MessageType schema
     * Creates a complete Parquet schema definition from Flink logical types
     * @param name Schema name for the Parquet MessageType
     * @param rowType Flink RowType defining the data structure
     * @return MessageType representing the equivalent Parquet schema
     */
    public static MessageType convertToParquetMessageType(String name, RowType rowType);
}

SerializableConfiguration

Wrapper class that makes Hadoop Configuration serializable for use in distributed Flink environments.

/**
 * Serializable wrapper for Hadoop Configuration
 * Enables Configuration objects to be distributed across Flink cluster nodes
 */
public class SerializableConfiguration implements Serializable {
    
    /**
     * Creates a new SerializableConfiguration wrapping the provided Configuration
     * @param conf Hadoop Configuration to wrap
     */
    public SerializableConfiguration(Configuration conf);
    
    /**
     * Returns the wrapped Configuration instance
     * @return Hadoop Configuration object
     */
    public Configuration conf();
}

Usage Examples

Basic Schema Conversion

import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.parquet.schema.MessageType;

// Define Flink schema
RowType flinkSchema = RowType.of(
    new LogicalType[]{
        DataTypes.BIGINT().getLogicalType(),
        DataTypes.STRING().getLogicalType(),
        DataTypes.DOUBLE().getLogicalType(),
        DataTypes.BOOLEAN().getLogicalType(),
        DataTypes.TIMESTAMP(3).getLogicalType()
    },
    new String[]{"id", "name", "price", "active", "created_at"}
);

// Convert to Parquet schema
MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
    "user_events", flinkSchema
);

System.out.println(parquetSchema);
// Output:
// message user_events {
//   optional int64 id;
//   optional binary name (UTF8);
//   optional double price;
//   optional boolean active;
//   optional int64 created_at (TIMESTAMP(MILLIS,true));
// }

Complex Nested Schema Conversion

import org.apache.flink.table.types.logical.*;

// Define complex nested schema
RowType addressType = RowType.of(
    new LogicalType[]{
        DataTypes.STRING().getLogicalType(),    // street
        DataTypes.STRING().getLogicalType(),    // city  
        DataTypes.STRING().getLogicalType(),    // state
        DataTypes.STRING().getLogicalType()     // zip_code
    },
    new String[]{"street", "city", "state", "zip_code"}
);

RowType userType = RowType.of(
    new LogicalType[]{
        DataTypes.BIGINT().getLogicalType(),                    // user_id
        DataTypes.STRING().getLogicalType(),                    // name
        addressType,                                            // address (nested)
        DataTypes.ARRAY(DataTypes.STRING()).getLogicalType(),   // interests (array)
        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()).getLogicalType() // metadata (map)
    },
    new String[]{"user_id", "name", "address", "interests", "metadata"}
);

// Convert complex schema
MessageType complexParquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
    "complex_user", userType
);

// Parquet schema will include nested groups, lists, and maps
System.out.println(complexParquetSchema);

SerializableConfiguration Usage

import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
import org.apache.hadoop.conf.Configuration;

// Create Hadoop configuration with custom settings
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("parquet.compression", "SNAPPY");
hadoopConfig.setInt("parquet.block.size", 134217728);
hadoopConfig.setBoolean("parquet.enable.dictionary", true);
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:9000");

// Wrap in serializable wrapper for distribution
SerializableConfiguration serializableConfig = new SerializableConfiguration(hadoopConfig);

// Use in Flink operations that need to serialize configuration
DataStream<RowData> processedData = inputStream
    .map(new RichMapFunction<RowData, RowData>() {
        private transient Configuration config;
        
        @Override
        public void open(Configuration parameters) {
            // Extract Hadoop configuration on each task manager
            this.config = serializableConfig.conf();
        }
        
        @Override
        public RowData map(RowData value) throws Exception {
            // Use Hadoop configuration in processing
            String compressionType = config.get("parquet.compression");
            // ... processing logic
            return value;
        }
    });

Integration with Writer Builders

// Schema conversion in writer creation
RowType rowType = RowType.of(
    new LogicalType[]{
        DataTypes.STRING().getLogicalType(),
        DataTypes.DECIMAL(10, 2).getLogicalType(),
        DataTypes.TIMESTAMP(3).getLogicalType()
    },
    new String[]{"product_id", "price", "updated_at"}
);

// The schema converter is used internally by ParquetRowDataBuilder
Configuration config = new Configuration();
ParquetWriterFactory<RowData> writerFactory = 
    ParquetRowDataBuilder.createWriterFactory(rowType, config, true);

// Schema conversion happens automatically:
// Flink RowType -> Parquet MessageType -> Parquet Writer configuration

Custom Schema Naming

// Create schemas with meaningful names for debugging and metadata
MessageType orderSchema = ParquetSchemaConverter.convertToParquetMessageType(
    "order_events", orderRowType
);

MessageType customerSchema = ParquetSchemaConverter.convertToParquetMessageType(
    "customer_profiles", customerRowType  
);

MessageType productSchema = ParquetSchemaConverter.convertToParquetMessageType(
    "product_catalog", productRowType
);

// Schema names appear in Parquet metadata and can be used for debugging

Type Mapping Reference

Primitive Types

Flink LogicalTypeParquet TypePhysical TypeLogical Type
BOOLEANBOOLEANBOOLEAN-
TINYINTINT32INT32INT(8,true)
SMALLINTINT32INT32INT(16,true)
INTEGERINT32INT32INT(32,true)
BIGINTINT64INT64INT(64,true)
FLOATFLOATFLOAT-
DOUBLEDOUBLEDOUBLE-
DECIMAL(p,s)FIXED_LEN_BYTE_ARRAY or BINARYDepends on precisionDECIMAL(p,s)
CHAR(n)BINARYBINARYUTF8
VARCHAR(n)BINARYBINARYUTF8
BINARY(n)BINARYBINARY-
VARBINARY(n)BINARYBINARY-

Temporal Types

Flink LogicalTypeParquet TypePhysical TypeLogical Type
DATEINT32INT32DATE
TIMEINT32INT32TIME(MILLIS,true)
TIMESTAMP(p)INT64INT64TIMESTAMP(MILLIS,true)
TIMESTAMP_WITH_TIME_ZONE(p)INT64INT64TIMESTAMP(MILLIS,true)

Complex Types

Flink LogicalTypeParquet TypeStructure
ARRAY<T>LISToptional group list { repeated group element { <T> element; } }
MAP<K,V>MAPoptional group map { repeated group key_value { <K> key; <V> value; } }
ROW<...>GROUPoptional group { <fields...> }

Null Handling

All Flink types are converted to optional Parquet fields by default:

  • Non-null types: required <type> field_name
  • Nullable types: optional <type> field_name

Repetition Levels

The schema converter handles repetition automatically:

  • Regular fields: OPTIONAL repetition
  • Array elements: REPEATED repetition
  • Required fields: REQUIRED repetition (when explicitly marked)

Advanced Usage

Schema Validation

// Validate schema conversion
try {
    MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
        "test_schema", rowType
    );
    
    // Check field count
    assert parquetSchema.getFieldCount() == rowType.getFieldCount();
    
    // Validate specific fields
    for (int i = 0; i < rowType.getFieldCount(); i++) {
        String expectedName = rowType.getFieldNames().get(i);
        String actualName = parquetSchema.getFieldName(i);
        assert expectedName.equals(actualName);
    }
    
} catch (IllegalArgumentException e) {
    // Unsupported type conversion
    logger.error("Schema conversion failed: " + e.getMessage());
}

Configuration Serialization Patterns

// Pattern for operations requiring Hadoop configuration
public class ParquetProcessFunction extends ProcessFunction<RowData, RowData> {
    private final SerializableConfiguration serializableConfig;
    private transient Configuration hadoopConfig;
    
    public ParquetProcessFunction(Configuration config) {
        this.serializableConfig = new SerializableConfiguration(config);
    }
    
    @Override
    public void open(Configuration parameters) {
        this.hadoopConfig = serializableConfig.conf();
    }
    
    @Override
    public void processElement(RowData value, Context ctx, Collector<RowData> out) {
        // Use hadoopConfig for processing
        String setting = hadoopConfig.get("custom.setting");
        // ... processing logic
        out.collect(value);
    }
}

Error Handling

Common error scenarios and solutions:

try {
    MessageType schema = ParquetSchemaConverter.convertToParquetMessageType(
        "my_schema", rowType
    );
} catch (IllegalArgumentException e) {
    // Unsupported Flink type for Parquet conversion
    logger.error("Unsupported type in schema: " + e.getMessage());
    // Check for unsupported types like MULTISET, RAW, etc.
} catch (Exception e) {
    // Other conversion errors
    logger.error("Schema conversion failed", e);
}

// Configuration serialization issues
try {
    SerializableConfiguration serializable = new SerializableConfiguration(config);  
    Configuration deserialized = serializable.conf();
} catch (Exception e) {
    // Serialization/deserialization errors
    // Usually due to non-serializable configuration values
    logger.error("Configuration serialization failed", e);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12

docs

avro-integration.md

format-factory.md

index.md

protobuf-integration.md

rowdata-writers.md

schema-utilities.md

vectorized-input.md

tile.json