CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities and Schema Conversion

Utility classes for schema conversion, configuration management, and statistics reporting, enabling seamless integration between Flink and Parquet type systems.

Capabilities

ParquetSchemaConverter

Utility class for converting between Flink's type system and Parquet's schema representation with support for all data types and configurations.

/**
 * Converts between Flink and Parquet schemas
 */
public class ParquetSchemaConverter {
    
    // Schema naming constants
    static final String MAP_REPEATED_NAME = "key_value";
    static final String LIST_ELEMENT_NAME = "element";
    
    /**
     * Converts Flink RowType to Parquet MessageType schema
     * @param name Schema name for the MessageType
     * @param rowType Flink RowType to convert
     * @param conf Hadoop configuration for conversion settings
     * @return MessageType representing the Parquet schema
     */
    public static MessageType convertToParquetMessageType(
        String name, 
        RowType rowType, 
        Configuration conf
    );
    
    /**
     * Converts individual Flink LogicalType to Parquet Type
     * @param name Field name for the Type
     * @param logicalType Flink LogicalType to convert
     * @param conf Hadoop configuration for conversion settings
     * @return Parquet Type representation
     */
    public static Type convertToParquetType(
        String name, 
        LogicalType logicalType, 
        Configuration conf
    );
    
    /**
     * Computes minimum bytes required for decimal precision
     * @param precision Decimal precision (number of digits)
     * @return Minimum bytes needed to store the precision
     */
    public static int computeMinBytesForDecimalPrecision(int precision);
    
    /**
     * Checks if decimal precision fits in 32 bits (4 bytes)
     * @param precision Decimal precision to check
     * @return true if precision fits in 32 bits
     */
    public static boolean is32BitDecimal(int precision);
    
    /**
     * Checks if decimal precision fits in 64 bits (8 bytes)
     * @param precision Decimal precision to check
     * @return true if precision fits in 64 bits
     */
    public static boolean is64BitDecimal(int precision);
}

SerializableConfiguration

Serializable wrapper for Hadoop Configuration objects, enabling configuration to be passed through Flink's serialization system.

/**
 * Serializable wrapper for Hadoop Configuration
 */
public class SerializableConfiguration implements Serializable {
    
    /**
     * Creates a new SerializableConfiguration wrapping the provided Configuration
     * @param configuration Hadoop Configuration to wrap
     */
    public SerializableConfiguration(Configuration configuration);
    
    /**
     * Returns the wrapped Hadoop Configuration
     * @return Hadoop Configuration instance
     */
    public Configuration conf();
}

ParquetFormatStatisticsReportUtil

Utility class for extracting and reporting statistics from Parquet file metadata for query optimization.

/**
 * Utilities for extracting and reporting Parquet file statistics
 */
public class ParquetFormatStatisticsReportUtil {
    
    /**
     * Extracts table statistics from Parquet file metadata
     * @param files List of Parquet files to analyze
     * @param producedDataType Expected output data type
     * @param conf Hadoop configuration
     * @param utcTimestamp Whether timestamps use UTC timezone
     * @return TableStats containing row counts and column statistics
     */
    public static TableStats getTableStatistics(
        List<Path> files, 
        DataType producedDataType, 
        Configuration conf, 
        boolean utcTimestamp
    );
    
    /**
     * Additional utility methods for statistics extraction
     */
    // ... other static methods for detailed statistics processing
}

ParquetInputFile

InputFile implementation that bridges Flink's file system abstraction with Parquet's input requirements.

/**
 * InputFile implementation for Parquet using Flink file system abstraction
 */
public class ParquetInputFile implements InputFile {
    
    /**
     * Creates a new ParquetInputFile
     * @param inputStream FSDataInputStream to read from
     * @param length Total length of the file
     */
    public ParquetInputFile(FSDataInputStream inputStream, long length);
    
    /**
     * Returns the length of the file
     * @return File length in bytes
     */
    public long getLength();
    
    /**
     * Creates a new SeekableInputStream for reading
     * @return SeekableInputStream for reading file data
     * @throws IOException if stream creation fails
     */
    public SeekableInputStream newStream() throws IOException;
}

NestedPositionUtil

Utility class for handling nested data positions in vectorized reading operations.

/**
 * Utilities for handling nested data positions in vectorized reading
 */
public class NestedPositionUtil {
    
    /**
     * Calculates positions for nested array elements
     * @param definitionLevels Definition levels for null handling
     * @param repetitionLevels Repetition levels for nested structures
     * @param maxDefinitionLevel Maximum definition level
     * @param maxRepetitionLevel Maximum repetition level
     * @return Position information for nested elements
     */
    public static PositionInfo calculateNestedPositions(
        int[] definitionLevels,
        int[] repetitionLevels,
        int maxDefinitionLevel,
        int maxRepetitionLevel
    );
    
    /**
     * Additional utility methods for nested position calculations
     */
    // ... other static methods for position handling
}

ParquetFormatStatisticsReportUtil

Utility class for collecting and reporting table statistics from Parquet files for query optimization.

/**
 * Utility for collecting statistics from Parquet files
 */
public class ParquetFormatStatisticsReportUtil {
    
    /**
     * Generates table statistics from list of Parquet files
     * @param files List of file paths to analyze
     * @param producedDataType Data type for the produced table
     * @param hadoopConf Hadoop configuration
     * @param isUtcTimestamp Whether to use UTC timezone for timestamps
     * @return TableStats containing collected statistics
     * @throws IOException if statistics collection fails
     */
    public static TableStats getTableStatistics(
        List<Path> files, 
        DataType producedDataType, 
        Configuration hadoopConf, 
        boolean isUtcTimestamp
    ) throws IOException;
}

Position Tracking Classes

RowPosition

/**
 * Position tracking for row data in vectorized reading
 */
public class RowPosition {
    
    /**
     * Creates a new RowPosition
     * @param currentPosition Current position in the row
     */
    public RowPosition(int currentPosition);
    
    /**
     * Updates the current position
     * @param newPosition New position value
     */
    public void updatePosition(int newPosition);
    
    /**
     * Gets the current position
     * @return Current position value
     */
    public int getCurrentPosition();
}

CollectionPosition

/**
 * Position tracking for collection data (arrays, maps) in vectorized reading
 */
public class CollectionPosition {
    
    /**
     * Creates a new CollectionPosition
     * @param startPosition Start position of the collection
     * @param length Length of the collection
     */
    public CollectionPosition(int startPosition, int length);
    
    /**
     * Gets the start position of the collection
     * @return Start position
     */
    public int getStartPosition();
    
    /**
     * Gets the length of the collection
     * @return Collection length
     */
    public int getLength();
    
    /**
     * Checks if the collection is empty
     * @return true if collection is empty
     */
    public boolean isEmpty();
}

Type Field Definitions

ParquetField

/**
 * Base class for Parquet field representations
 */
public abstract class ParquetField {
    
    /**
     * Gets the field name
     * @return Field name
     */
    public abstract String getName();
    
    /**
     * Gets the field type
     * @return Parquet Type for this field
     */
    public abstract Type getType();
    
    /**
     * Checks if this field is repeated (array)
     * @return true if field is repeated
     */
    public abstract boolean isRepeated();
}

/**
 * Primitive field implementation
 */
public class ParquetPrimitiveField extends ParquetField {
    
    /**
     * Creates a new ParquetPrimitiveField
     * @param name Field name
     * @param primitiveType Parquet primitive type
     * @param repetition Repetition type (required, optional, repeated)
     */
    public ParquetPrimitiveField(String name, PrimitiveType primitiveType, Type.Repetition repetition);
    
    /**
     * Gets the primitive type
     * @return PrimitiveType for this field
     */
    public PrimitiveType getPrimitiveType();
}

/**
 * Group field implementation for nested structures
 */
public class ParquetGroupField extends ParquetField {
    
    /**
     * Creates a new ParquetGroupField
     * @param name Field name
     * @param groupType Parquet group type
     * @param children Child fields in the group
     */
    public ParquetGroupField(String name, GroupType groupType, List<ParquetField> children);
    
    /**
     * Gets the child fields
     * @return List of child ParquetField instances
     */
    public List<ParquetField> getChildren();
    
    /**
     * Gets a child field by name
     * @param name Child field name
     * @return ParquetField instance or null if not found
     */
    public ParquetField getChild(String name);
}

Usage Examples

Schema Conversion

import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.parquet.schema.MessageType;

// Define Flink schema
RowType flinkSchema = RowType.of(
    new LogicalType[] {
        DataTypes.BIGINT().getLogicalType(),
        DataTypes.STRING().getLogicalType(),
        DataTypes.DECIMAL(10, 2).getLogicalType(),
        DataTypes.TIMESTAMP(3).getLogicalType(),
        ArrayType.newBuilder()
            .elementType(DataTypes.STRING().getLogicalType())
            .build()
    },
    new String[] {"id", "name", "price", "created_at", "tags"}
);

// Convert to Parquet schema
Configuration conf = new Configuration();
MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
    "MyRecord", 
    flinkSchema, 
    conf
);

System.out.println(parquetSchema);
// Output: Parquet schema with proper type mappings

Configuration Management

import org.apache.flink.formats.parquet.utils.SerializableConfiguration;

// Create Hadoop configuration with Parquet settings
Configuration hadoopConf = new Configuration();
hadoopConf.set("parquet.compression", "SNAPPY");
hadoopConf.set("parquet.page.size", "1048576");
hadoopConf.set("parquet.block.size", "134217728");
hadoopConf.setBoolean("parquet.enable.dictionary", true);

// Wrap for serialization in Flink jobs
SerializableConfiguration serializableConf = new SerializableConfiguration(hadoopConf);

// Use in distributed operations
DataStream<MyData> processedStream = dataStream
    .map(new RichMapFunction<MyData, ProcessedData>() {
        private transient Configuration conf;
        
        @Override
        public void open(Configuration parameters) {
            // Access configuration in distributed context
            this.conf = serializableConf.conf();
        }
        
        @Override
        public ProcessedData map(MyData value) throws Exception {
            // Use configuration for processing
            String compression = conf.get("parquet.compression");
            return processWithCompression(value, compression);
        }
    });

Statistics Extraction

import org.apache.flink.formats.parquet.utils.ParquetFormatStatisticsReportUtil;
import org.apache.flink.table.plan.stats.TableStats;

// Extract statistics from Parquet files
List<Path> parquetFiles = Arrays.asList(
    new Path("/data/part-00000.parquet"),
    new Path("/data/part-00001.parquet"),
    new Path("/data/part-00002.parquet")
);

DataType outputType = DataTypes.ROW(
    DataTypes.FIELD("id", DataTypes.BIGINT()),
    DataTypes.FIELD("name", DataTypes.STRING()),
    DataTypes.FIELD("amount", DataTypes.DECIMAL(10, 2))
);

TableStats stats = ParquetFormatStatisticsReportUtil.getTableStatistics(
    parquetFiles,
    outputType,
    hadoopConf,
    true  // UTC timestamps
);

System.out.println("Row count: " + stats.getRowCount());
System.out.println("Column stats: " + stats.getColumnStats());

Custom Input File

import org.apache.flink.formats.parquet.ParquetInputFile;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;

// Create custom input file for Parquet library
Path filePath = new Path("hdfs://cluster/data/file.parquet");
FileSystem fileSystem = filePath.getFileSystem();

try (FSDataInputStream inputStream = fileSystem.open(filePath)) {
    long fileLength = fileSystem.getFileStatus(filePath).getLen();
    
    // Create ParquetInputFile for use with Parquet library
    ParquetInputFile inputFile = new ParquetInputFile(inputStream, fileLength);
    
    // Use with Parquet readers
    ParquetFileReader reader = ParquetFileReader.open(inputFile);
    ParquetMetadata metadata = reader.getFooter();
    
    System.out.println("Schema: " + metadata.getFileMetaData().getSchema());
    System.out.println("Row groups: " + metadata.getBlocks().size());
}

Decimal Precision Handling

import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;

// Check decimal precision requirements
int[] precisions = {5, 10, 15, 20, 25, 30, 35};

for (int precision : precisions) {
    int minBytes = ParquetSchemaConverter.computeMinBytesForDecimalPrecision(precision);
    boolean is32Bit = ParquetSchemaConverter.is32BitDecimal(precision);
    boolean is64Bit = ParquetSchemaConverter.is64BitDecimal(precision);
    
    System.out.printf("Precision %d: %d bytes, 32-bit: %b, 64-bit: %b%n", 
                     precision, minBytes, is32Bit, is64Bit);
}

// Output:
// Precision 5: 3 bytes, 32-bit: true, 64-bit: true
// Precision 10: 5 bytes, 32-bit: false, 64-bit: true
// Precision 15: 7 bytes, 32-bit: false, 64-bit: true
// Precision 20: 9 bytes, 32-bit: false, 64-bit: false

Nested Position Calculation

import org.apache.flink.formats.parquet.utils.NestedPositionUtil;

// Handle nested array positions
int[] definitionLevels = {3, 3, 2, 3, 3, 3, 1, 3, 3};
int[] repetitionLevels = {0, 1, 0, 0, 1, 1, 0, 0, 1};

PositionInfo positions = NestedPositionUtil.calculateNestedPositions(
    definitionLevels,
    repetitionLevels,
    3,  // max definition level
    1   // max repetition level
);

// Use positions for vectorized nested data processing
processNestedData(positions);

Integration Examples

Custom Format with Utilities

import org.apache.flink.formats.parquet.utils.*;

public class CustomParquetFormat implements BulkFormat<CustomRecord, FileSourceSplit> {
    
    private final SerializableConfiguration hadoopConf;
    private final RowType schema;
    
    public CustomParquetFormat(Configuration conf, RowType schema) {
        this.hadoopConf = new SerializableConfiguration(conf);
        this.schema = schema;
    }
    
    @Override
    public Reader<CustomRecord> createReader(Configuration config, FileSourceSplit split) {
        // Convert schema
        MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
            "CustomRecord", 
            schema, 
            hadoopConf.conf()
        );
        
        // Create input file
        ParquetInputFile inputFile = new ParquetInputFile(
            openInputStream(split.path()), 
            split.length()
        );
        
        return new CustomParquetReader(inputFile, parquetSchema);
    }
    
    @Override
    public Reader<CustomRecord> restoreReader(Configuration config, FileSourceSplit split) {
        return createReader(config, split);
    }
    
    @Override
    public boolean isSplittable() {
        return true;
    }
}

The utilities package provides essential infrastructure for seamless integration between Flink's type system and Parquet's columnar format, handling the complexity of schema conversion and configuration management.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parquet

docs

avro-integration.md

index.md

protobuf-integration.md

rowdata-integration.md

table-integration.md

utilities.md

vectorized-reading.md

writing-support.md

tile.json