Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications
—
Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data with schema evolution and cross-language compatibility.
Factory class for creating ParquetWriterFactory instances that can write Protocol Buffers messages to Parquet files.
/**
* Convenience builder for creating ParquetWriterFactory instances for Protobuf classes
*/
public class ParquetProtoWriters {
/**
* Creates a ParquetWriterFactory for Protocol Buffers message types
* @param <T> Protocol Buffers message type extending Message
* @param type Class of the Protocol Buffers message to write
* @return ParquetWriterFactory for writing protobuf messages
*/
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
}Internal builder class for creating Protocol Buffers-specific ParquetWriter instances with proper WriteSupport configuration.
/**
* Builder for Protocol Buffers ParquetWriter instances
* @param <T> Protocol Buffers message type
*/
public static class ParquetProtoWriterBuilder<T extends Message>
extends ParquetWriter.Builder<T, ParquetProtoWriterBuilder<T>> {
/**
* Creates a new ParquetProtoWriterBuilder
* @param outputFile OutputFile to write to
* @param clazz Class of the Protocol Buffers message
*/
public ParquetProtoWriterBuilder(OutputFile outputFile, Class<T> clazz);
/**
* Returns self reference for builder pattern
* @return This builder instance
*/
protected ParquetProtoWriterBuilder<T> self();
/**
* Creates WriteSupport for Protocol Buffers messages
* @param conf Hadoop configuration
* @return ProtoWriteSupport instance for the message type
*/
protected WriteSupport<T> getWriteSupport(Configuration conf);
}import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
import org.apache.flink.connector.file.sink.FileSink;
import com.google.protobuf.Message;
// Define your Protocol Buffers message class
// Assuming you have a generated class MyMessage from your .proto file
public class MyMessage extends Message {
// Generated protobuf code
}
// Create writer factory for protobuf messages
ParquetWriterFactory<MyMessage> protoWriterFactory =
ParquetProtoWriters.forType(MyMessage.class);
// Create FileSink with Protocol Buffers writer
FileSink<MyMessage> protoSink = FileSink
.forBulkFormat(
new Path("/output/protobuf-data"),
protoWriterFactory
)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(10))
.withInactivityInterval(Duration.ofMinutes(2))
.build()
)
.build();
// Write protobuf messages to Parquet
DataStream<MyMessage> messageStream = env.addSource(new MyMessageSource());
messageStream.sinkTo(protoSink);// Example .proto file:
// syntax = "proto3";
//
// message UserEvent {
// int64 user_id = 1;
// string event_type = 2;
// int64 timestamp = 3;
// UserProfile profile = 4;
// repeated string tags = 5;
// map<string, string> properties = 6;
// }
//
// message UserProfile {
// string name = 1;
// string email = 2;
// int32 age = 3;
// }
// Generated classes: UserEvent, UserProfile
// Create writer for complex nested protobuf messages
ParquetWriterFactory<UserEvent> userEventWriterFactory =
ParquetProtoWriters.forType(UserEvent.class);
// Process and write complex events
DataStream<UserEvent> userEventStream = rawEventStream
.map(rawEvent -> {
UserEvent.Builder eventBuilder = UserEvent.newBuilder()
.setUserId(rawEvent.getUserId())
.setEventType(rawEvent.getType())
.setTimestamp(rawEvent.getTimestamp());
// Build nested profile
UserProfile profile = UserProfile.newBuilder()
.setName(rawEvent.getProfile().getName())
.setEmail(rawEvent.getProfile().getEmail())
.setAge(rawEvent.getProfile().getAge())
.build();
eventBuilder.setProfile(profile);
// Add repeated fields
rawEvent.getTags().forEach(eventBuilder::addTags);
// Add map fields
eventBuilder.putAllProperties(rawEvent.getProperties());
return eventBuilder.build();
});
FileSink<UserEvent> complexSink = FileSink
.forBulkFormat(new Path("/events/protobuf"), userEventWriterFactory)
.build();
userEventStream.sinkTo(complexSink);import org.apache.flink.formats.parquet.ParquetBuilder;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
// Create custom ParquetBuilder for protobuf with specific settings
ParquetBuilder<MyMessage> customProtoBuilder = (OutputFile out) -> {
return ParquetProtoWriters.ParquetProtoWriterBuilder
.builder(out, MyMessage.class)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withDictionaryEncoding(true)
.withPageSize(2 * 1024 * 1024) // 2MB pages
.withRowGroupSize(128 * 1024 * 1024) // 128MB row groups
.build();
};
ParquetWriterFactory<MyMessage> customFactory =
new ParquetWriterFactory<>(customProtoBuilder);// Handle protobuf schema evolution gracefully
// Original message v1:
// message ProductV1 {
// int64 id = 1;
// string name = 2;
// double price = 3;
// }
// Evolved message v2 (backward compatible):
// message ProductV2 {
// int64 id = 1;
// string name = 2;
// double price = 3;
// string category = 4; // New optional field
// repeated string tags = 5; // New repeated field
// }
// Writer can handle both versions
ParquetWriterFactory<ProductV2> evolvedWriterFactory =
ParquetProtoWriters.forType(ProductV2.class);
// Convert from v1 to v2 during processing
DataStream<ProductV2> evolvedStream = v1Stream.map(v1Product -> {
return ProductV2.newBuilder()
.setId(v1Product.getId())
.setName(v1Product.getName())
.setPrice(v1Product.getPrice())
.setCategory("UNKNOWN") // Default for new field
.build();
});
evolvedStream.sinkTo(FileSink.forBulkFormat(outputPath, evolvedWriterFactory).build());// Note: Direct SQL integration with protobuf requires custom deserialization
// This example shows the DataStream to Table conversion approach
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Schema;
// Convert protobuf stream to Table for SQL processing
Table protoTable = tableEnv.fromDataStream(
messageStream,
Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("event_type", DataTypes.STRING())
.column("timestamp", DataTypes.TIMESTAMP(3))
.column("profile_name", DataTypes.STRING())
.column("profile_email", DataTypes.STRING())
.build()
);
// Register table for SQL queries
tableEnv.createTemporaryView("user_events", protoTable);
// Query with SQL
Table result = tableEnv.sqlQuery("""
SELECT
user_id,
event_type,
COUNT(*) as event_count
FROM user_events
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY user_id, event_type
""");
// Convert back to DataStream and write as protobuf
DataStream<Row> resultStream = tableEnv.toDataStream(result);
DataStream<SummaryMessage> summaryStream = resultStream.map(row -> {
return SummaryMessage.newBuilder()
.setUserId(row.getField(0))
.setEventType(row.getField(1))
.setCount(row.getField(2))
.build();
});
summaryStream.sinkTo(FileSink
.forBulkFormat(summaryPath, ParquetProtoWriters.forType(SummaryMessage.class))
.build());// Optimize protobuf writing performance
ParquetBuilder<MyMessage> optimizedBuilder = (out) -> {
return MyMessage.class
.getDeclaredConstructor() // Use custom configuration
.newInstance()
.toBuilder()
.build();
};
// Configure for high-throughput scenarios
Configuration hadoopConf = new Configuration();
hadoopConf.set("parquet.proto.writeInt96AsFixedLenByteArray", "false");
hadoopConf.set("parquet.proto.add-string-annotations", "true");
hadoopConf.setInt("parquet.page.size", 1024 * 1024); // 1MB pages
hadoopConf.setInt("parquet.block.size", 256 * 1024 * 1024); // 256MB blocks
// Use configuration in custom builder
ParquetBuilder<MyMessage> configuredBuilder = (out) -> {
return ParquetProtoWriters.ParquetProtoWriterBuilder
.builder(out, MyMessage.class)
.withConf(hadoopConf)
.withCompressionCodec(CompressionCodecName.LZ4)
.build();
};// Handle protobuf serialization errors
DataStream<MyMessage> validatedStream = rawDataStream
.map(new RichMapFunction<RawData, MyMessage>() {
private transient Counter invalidMessages;
@Override
public void open(Configuration parameters) {
invalidMessages = getRuntimeContext()
.getMetricGroup()
.counter("invalid_protobuf_messages");
}
@Override
public MyMessage map(RawData raw) throws Exception {
try {
MyMessage.Builder builder = MyMessage.newBuilder();
// Validate required fields
if (raw.getId() <= 0) {
throw new IllegalArgumentException("Invalid ID: " + raw.getId());
}
// Build message with validation
MyMessage message = builder
.setId(raw.getId())
.setName(validateAndCleanString(raw.getName()))
.setTimestamp(raw.getTimestamp())
.build();
// Validate the built message
if (!message.isInitialized()) {
throw new IllegalStateException("Message not properly initialized");
}
return message;
} catch (Exception e) {
invalidMessages.inc();
LOG.warn("Failed to create protobuf message from raw data: {}", raw, e);
// Return default message or rethrow based on requirements
throw e;
}
}
private String validateAndCleanString(String input) {
return input != null ? input.trim() : "";
}
})
.filter(Objects::nonNull);
validatedStream.sinkTo(protoSink);The Protocol Buffers integration provides efficient, type-safe serialization with excellent schema evolution capabilities, making it ideal for large-scale data processing systems that need to handle changing data structures over time.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parquet