Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.
—
Utilities for converting between Flink logical types and Parquet schema definitions, along with serializable configuration wrappers for distributed processing.
Utility class for converting between Flink's logical type system and Parquet's schema representation with full type mapping support.
/**
* Schema converter for translating between Parquet and Flink internal types
* Handles complex type mappings including nested structures, arrays, and maps
*/
public class ParquetSchemaConverter {
/**
* Converts a Flink RowType to a Parquet MessageType schema
* Creates a complete Parquet schema definition from Flink logical types
* @param name Schema name for the Parquet MessageType
* @param rowType Flink RowType defining the data structure
* @return MessageType representing the equivalent Parquet schema
*/
public static MessageType convertToParquetMessageType(String name, RowType rowType);
}Wrapper class that makes Hadoop Configuration serializable for use in distributed Flink environments.
/**
* Serializable wrapper for Hadoop Configuration
* Enables Configuration objects to be distributed across Flink cluster nodes
*/
public class SerializableConfiguration implements Serializable {
/**
* Creates a new SerializableConfiguration wrapping the provided Configuration
* @param conf Hadoop Configuration to wrap
*/
public SerializableConfiguration(Configuration conf);
/**
* Returns the wrapped Configuration instance
* @return Hadoop Configuration object
*/
public Configuration conf();
}import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.parquet.schema.MessageType;
// Define Flink schema
RowType flinkSchema = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType(),
DataTypes.DOUBLE().getLogicalType(),
DataTypes.BOOLEAN().getLogicalType(),
DataTypes.TIMESTAMP(3).getLogicalType()
},
new String[]{"id", "name", "price", "active", "created_at"}
);
// Convert to Parquet schema
MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
"user_events", flinkSchema
);
System.out.println(parquetSchema);
// Output:
// message user_events {
// optional int64 id;
// optional binary name (UTF8);
// optional double price;
// optional boolean active;
// optional int64 created_at (TIMESTAMP(MILLIS,true));
// }import org.apache.flink.table.types.logical.*;
// Define complex nested schema
RowType addressType = RowType.of(
new LogicalType[]{
DataTypes.STRING().getLogicalType(), // street
DataTypes.STRING().getLogicalType(), // city
DataTypes.STRING().getLogicalType(), // state
DataTypes.STRING().getLogicalType() // zip_code
},
new String[]{"street", "city", "state", "zip_code"}
);
RowType userType = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(), // user_id
DataTypes.STRING().getLogicalType(), // name
addressType, // address (nested)
DataTypes.ARRAY(DataTypes.STRING()).getLogicalType(), // interests (array)
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()).getLogicalType() // metadata (map)
},
new String[]{"user_id", "name", "address", "interests", "metadata"}
);
// Convert complex schema
MessageType complexParquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
"complex_user", userType
);
// Parquet schema will include nested groups, lists, and maps
System.out.println(complexParquetSchema);import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
import org.apache.hadoop.conf.Configuration;
// Create Hadoop configuration with custom settings
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("parquet.compression", "SNAPPY");
hadoopConfig.setInt("parquet.block.size", 134217728);
hadoopConfig.setBoolean("parquet.enable.dictionary", true);
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:9000");
// Wrap in serializable wrapper for distribution
SerializableConfiguration serializableConfig = new SerializableConfiguration(hadoopConfig);
// Use in Flink operations that need to serialize configuration
DataStream<RowData> processedData = inputStream
.map(new RichMapFunction<RowData, RowData>() {
private transient Configuration config;
@Override
public void open(Configuration parameters) {
// Extract Hadoop configuration on each task manager
this.config = serializableConfig.conf();
}
@Override
public RowData map(RowData value) throws Exception {
// Use Hadoop configuration in processing
String compressionType = config.get("parquet.compression");
// ... processing logic
return value;
}
});// Schema conversion in writer creation
RowType rowType = RowType.of(
new LogicalType[]{
DataTypes.STRING().getLogicalType(),
DataTypes.DECIMAL(10, 2).getLogicalType(),
DataTypes.TIMESTAMP(3).getLogicalType()
},
new String[]{"product_id", "price", "updated_at"}
);
// The schema converter is used internally by ParquetRowDataBuilder
Configuration config = new Configuration();
ParquetWriterFactory<RowData> writerFactory =
ParquetRowDataBuilder.createWriterFactory(rowType, config, true);
// Schema conversion happens automatically:
// Flink RowType -> Parquet MessageType -> Parquet Writer configuration// Create schemas with meaningful names for debugging and metadata
MessageType orderSchema = ParquetSchemaConverter.convertToParquetMessageType(
"order_events", orderRowType
);
MessageType customerSchema = ParquetSchemaConverter.convertToParquetMessageType(
"customer_profiles", customerRowType
);
MessageType productSchema = ParquetSchemaConverter.convertToParquetMessageType(
"product_catalog", productRowType
);
// Schema names appear in Parquet metadata and can be used for debugging| Flink LogicalType | Parquet Type | Physical Type | Logical Type |
|---|---|---|---|
BOOLEAN | BOOLEAN | BOOLEAN | - |
TINYINT | INT32 | INT32 | INT(8,true) |
SMALLINT | INT32 | INT32 | INT(16,true) |
INTEGER | INT32 | INT32 | INT(32,true) |
BIGINT | INT64 | INT64 | INT(64,true) |
FLOAT | FLOAT | FLOAT | - |
DOUBLE | DOUBLE | DOUBLE | - |
DECIMAL(p,s) | FIXED_LEN_BYTE_ARRAY or BINARY | Depends on precision | DECIMAL(p,s) |
CHAR(n) | BINARY | BINARY | UTF8 |
VARCHAR(n) | BINARY | BINARY | UTF8 |
BINARY(n) | BINARY | BINARY | - |
VARBINARY(n) | BINARY | BINARY | - |
| Flink LogicalType | Parquet Type | Physical Type | Logical Type |
|---|---|---|---|
DATE | INT32 | INT32 | DATE |
TIME | INT32 | INT32 | TIME(MILLIS,true) |
TIMESTAMP(p) | INT64 | INT64 | TIMESTAMP(MILLIS,true) |
TIMESTAMP_WITH_TIME_ZONE(p) | INT64 | INT64 | TIMESTAMP(MILLIS,true) |
| Flink LogicalType | Parquet Type | Structure |
|---|---|---|
ARRAY<T> | LIST | optional group list { repeated group element { <T> element; } } |
MAP<K,V> | MAP | optional group map { repeated group key_value { <K> key; <V> value; } } |
ROW<...> | GROUP | optional group { <fields...> } |
All Flink types are converted to optional Parquet fields by default:
required <type> field_nameoptional <type> field_nameThe schema converter handles repetition automatically:
OPTIONAL repetitionREPEATED repetitionREQUIRED repetition (when explicitly marked)// Validate schema conversion
try {
MessageType parquetSchema = ParquetSchemaConverter.convertToParquetMessageType(
"test_schema", rowType
);
// Check field count
assert parquetSchema.getFieldCount() == rowType.getFieldCount();
// Validate specific fields
for (int i = 0; i < rowType.getFieldCount(); i++) {
String expectedName = rowType.getFieldNames().get(i);
String actualName = parquetSchema.getFieldName(i);
assert expectedName.equals(actualName);
}
} catch (IllegalArgumentException e) {
// Unsupported type conversion
logger.error("Schema conversion failed: " + e.getMessage());
}// Pattern for operations requiring Hadoop configuration
public class ParquetProcessFunction extends ProcessFunction<RowData, RowData> {
private final SerializableConfiguration serializableConfig;
private transient Configuration hadoopConfig;
public ParquetProcessFunction(Configuration config) {
this.serializableConfig = new SerializableConfiguration(config);
}
@Override
public void open(Configuration parameters) {
this.hadoopConfig = serializableConfig.conf();
}
@Override
public void processElement(RowData value, Context ctx, Collector<RowData> out) {
// Use hadoopConfig for processing
String setting = hadoopConfig.get("custom.setting");
// ... processing logic
out.collect(value);
}
}Common error scenarios and solutions:
try {
MessageType schema = ParquetSchemaConverter.convertToParquetMessageType(
"my_schema", rowType
);
} catch (IllegalArgumentException e) {
// Unsupported Flink type for Parquet conversion
logger.error("Unsupported type in schema: " + e.getMessage());
// Check for unsupported types like MULTISET, RAW, etc.
} catch (Exception e) {
// Other conversion errors
logger.error("Schema conversion failed", e);
}
// Configuration serialization issues
try {
SerializableConfiguration serializable = new SerializableConfiguration(config);
Configuration deserialized = serializable.conf();
} catch (Exception e) {
// Serialization/deserialization errors
// Usually due to non-serializable configuration values
logger.error("Configuration serialization failed", e);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12