CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Pending
Overview
Eval results
Files

protobuf-integration.mddocs/

Protocol Buffers Integration

Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data with schema evolution and cross-language compatibility.

Capabilities

ParquetProtoWriters

Factory class for creating ParquetWriterFactory instances that can write Protocol Buffers messages to Parquet files.

/**
 * Convenience builder for creating ParquetWriterFactory instances for Protobuf classes
 */
public class ParquetProtoWriters {
    
    /**
     * Creates a ParquetWriterFactory for Protocol Buffers message types
     * @param <T> Protocol Buffers message type extending Message
     * @param type Class of the Protocol Buffers message to write
     * @return ParquetWriterFactory for writing protobuf messages
     */
    public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
}

ParquetProtoWriterBuilder

Internal builder class for creating Protocol Buffers-specific ParquetWriter instances with proper WriteSupport configuration.

/**
 * Builder for Protocol Buffers ParquetWriter instances
 * @param <T> Protocol Buffers message type
 */
public static class ParquetProtoWriterBuilder<T extends Message>
        extends ParquetWriter.Builder<T, ParquetProtoWriterBuilder<T>> {
    
    /**
     * Creates a new ParquetProtoWriterBuilder
     * @param outputFile OutputFile to write to
     * @param clazz Class of the Protocol Buffers message
     */
    public ParquetProtoWriterBuilder(OutputFile outputFile, Class<T> clazz);
    
    /**
     * Returns self reference for builder pattern
     * @return This builder instance
     */
    protected ParquetProtoWriterBuilder<T> self();
    
    /**
     * Creates WriteSupport for Protocol Buffers messages
     * @param conf Hadoop configuration
     * @return ProtoWriteSupport instance for the message type
     */
    protected WriteSupport<T> getWriteSupport(Configuration conf);
}

Usage Examples

Basic Protocol Buffers Writing

import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
import org.apache.flink.connector.file.sink.FileSink;
import com.google.protobuf.Message;

// Define your Protocol Buffers message class
// Assuming you have a generated class MyMessage from your .proto file
public class MyMessage extends Message {
    // Generated protobuf code
}

// Create writer factory for protobuf messages
ParquetWriterFactory<MyMessage> protoWriterFactory = 
    ParquetProtoWriters.forType(MyMessage.class);

// Create FileSink with Protocol Buffers writer
FileSink<MyMessage> protoSink = FileSink
    .forBulkFormat(
        new Path("/output/protobuf-data"),
        protoWriterFactory
    )
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(10))
            .withInactivityInterval(Duration.ofMinutes(2))
            .build()
    )
    .build();

// Write protobuf messages to Parquet
DataStream<MyMessage> messageStream = env.addSource(new MyMessageSource());
messageStream.sinkTo(protoSink);

Complex Protocol Buffers Schema

// Example .proto file:
// syntax = "proto3";
// 
// message UserEvent {
//   int64 user_id = 1;
//   string event_type = 2;
//   int64 timestamp = 3;
//   UserProfile profile = 4;
//   repeated string tags = 5;
//   map<string, string> properties = 6;
// }
// 
// message UserProfile {
//   string name = 1;
//   string email = 2;
//   int32 age = 3;
// }

// Generated classes: UserEvent, UserProfile

// Create writer for complex nested protobuf messages
ParquetWriterFactory<UserEvent> userEventWriterFactory = 
    ParquetProtoWriters.forType(UserEvent.class);

// Process and write complex events
DataStream<UserEvent> userEventStream = rawEventStream
    .map(rawEvent -> {
        UserEvent.Builder eventBuilder = UserEvent.newBuilder()
            .setUserId(rawEvent.getUserId())
            .setEventType(rawEvent.getType())
            .setTimestamp(rawEvent.getTimestamp());
            
        // Build nested profile
        UserProfile profile = UserProfile.newBuilder()
            .setName(rawEvent.getProfile().getName())
            .setEmail(rawEvent.getProfile().getEmail())
            .setAge(rawEvent.getProfile().getAge())
            .build();
        
        eventBuilder.setProfile(profile);
        
        // Add repeated fields
        rawEvent.getTags().forEach(eventBuilder::addTags);
        
        // Add map fields
        eventBuilder.putAllProperties(rawEvent.getProperties());
        
        return eventBuilder.build();
    });

FileSink<UserEvent> complexSink = FileSink
    .forBulkFormat(new Path("/events/protobuf"), userEventWriterFactory)
    .build();

userEventStream.sinkTo(complexSink);

Protocol Buffers with Custom Configuration

import org.apache.flink.formats.parquet.ParquetBuilder;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

// Create custom ParquetBuilder for protobuf with specific settings
ParquetBuilder<MyMessage> customProtoBuilder = (OutputFile out) -> {
    return ParquetProtoWriters.ParquetProtoWriterBuilder
        .builder(out, MyMessage.class)
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withDictionaryEncoding(true)
        .withPageSize(2 * 1024 * 1024)      // 2MB pages
        .withRowGroupSize(128 * 1024 * 1024) // 128MB row groups
        .build();
};

ParquetWriterFactory<MyMessage> customFactory = 
    new ParquetWriterFactory<>(customProtoBuilder);

Schema Evolution Handling

// Handle protobuf schema evolution gracefully
// Original message v1:
// message ProductV1 {
//   int64 id = 1;
//   string name = 2;
//   double price = 3;
// }

// Evolved message v2 (backward compatible):
// message ProductV2 {
//   int64 id = 1;
//   string name = 2;
//   double price = 3;
//   string category = 4;        // New optional field
//   repeated string tags = 5;   // New repeated field
// }

// Writer can handle both versions
ParquetWriterFactory<ProductV2> evolvedWriterFactory = 
    ParquetProtoWriters.forType(ProductV2.class);

// Convert from v1 to v2 during processing
DataStream<ProductV2> evolvedStream = v1Stream.map(v1Product -> {
    return ProductV2.newBuilder()
        .setId(v1Product.getId())
        .setName(v1Product.getName())
        .setPrice(v1Product.getPrice())
        .setCategory("UNKNOWN")  // Default for new field
        .build();
});

evolvedStream.sinkTo(FileSink.forBulkFormat(outputPath, evolvedWriterFactory).build());

Integration with Flink SQL/Table API

// Note: Direct SQL integration with protobuf requires custom deserialization
// This example shows the DataStream to Table conversion approach

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Schema;

// Convert protobuf stream to Table for SQL processing
Table protoTable = tableEnv.fromDataStream(
    messageStream,
    Schema.newBuilder()
        .column("user_id", DataTypes.BIGINT())
        .column("event_type", DataTypes.STRING())
        .column("timestamp", DataTypes.TIMESTAMP(3))
        .column("profile_name", DataTypes.STRING())
        .column("profile_email", DataTypes.STRING())
        .build()
);

// Register table for SQL queries
tableEnv.createTemporaryView("user_events", protoTable);

// Query with SQL
Table result = tableEnv.sqlQuery("""
    SELECT 
        user_id,
        event_type,
        COUNT(*) as event_count
    FROM user_events 
    WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
    GROUP BY user_id, event_type
""");

// Convert back to DataStream and write as protobuf
DataStream<Row> resultStream = tableEnv.toDataStream(result);
DataStream<SummaryMessage> summaryStream = resultStream.map(row -> {
    return SummaryMessage.newBuilder()
        .setUserId(row.getField(0))
        .setEventType(row.getField(1))
        .setCount(row.getField(2))
        .build();
});

summaryStream.sinkTo(FileSink
    .forBulkFormat(summaryPath, ParquetProtoWriters.forType(SummaryMessage.class))
    .build());

Performance Optimization

// Optimize protobuf writing performance
ParquetBuilder<MyMessage> optimizedBuilder = (out) -> {
    return MyMessage.class
        .getDeclaredConstructor()  // Use custom configuration
        .newInstance()
        .toBuilder()
        .build();
};

// Configure for high-throughput scenarios
Configuration hadoopConf = new Configuration();
hadoopConf.set("parquet.proto.writeInt96AsFixedLenByteArray", "false");
hadoopConf.set("parquet.proto.add-string-annotations", "true");
hadoopConf.setInt("parquet.page.size", 1024 * 1024);        // 1MB pages
hadoopConf.setInt("parquet.block.size", 256 * 1024 * 1024); // 256MB blocks

// Use configuration in custom builder
ParquetBuilder<MyMessage> configuredBuilder = (out) -> {
    return ParquetProtoWriters.ParquetProtoWriterBuilder
        .builder(out, MyMessage.class)
        .withConf(hadoopConf)
        .withCompressionCodec(CompressionCodecName.LZ4)
        .build();
};

Error Handling and Validation

// Handle protobuf serialization errors
DataStream<MyMessage> validatedStream = rawDataStream
    .map(new RichMapFunction<RawData, MyMessage>() {
        private transient Counter invalidMessages;
        
        @Override
        public void open(Configuration parameters) {
            invalidMessages = getRuntimeContext()
                .getMetricGroup()
                .counter("invalid_protobuf_messages");
        }
        
        @Override
        public MyMessage map(RawData raw) throws Exception {
            try {
                MyMessage.Builder builder = MyMessage.newBuilder();
                
                // Validate required fields
                if (raw.getId() <= 0) {
                    throw new IllegalArgumentException("Invalid ID: " + raw.getId());
                }
                
                // Build message with validation
                MyMessage message = builder
                    .setId(raw.getId())
                    .setName(validateAndCleanString(raw.getName()))
                    .setTimestamp(raw.getTimestamp())
                    .build();
                
                // Validate the built message
                if (!message.isInitialized()) {
                    throw new IllegalStateException("Message not properly initialized");
                }
                
                return message;
                
            } catch (Exception e) {
                invalidMessages.inc();
                LOG.warn("Failed to create protobuf message from raw data: {}", raw, e);
                // Return default message or rethrow based on requirements
                throw e;
            }
        }
        
        private String validateAndCleanString(String input) {
            return input != null ? input.trim() : "";
        }
    })
    .filter(Objects::nonNull);

validatedStream.sinkTo(protoSink);

Protocol Buffers Advantages

Schema Evolution

  • Backward Compatibility: New optional fields don't break existing readers
  • Forward Compatibility: Old readers can handle new data by ignoring unknown fields
  • Field Numbering: Stable field IDs enable safe schema changes

Cross-Language Support

  • Language Agnostic: Same Parquet files readable from Python, C++, Go, etc.
  • Code Generation: Strongly-typed classes generated from .proto definitions
  • Standardized Serialization: Consistent binary format across platforms

Performance Benefits

  • Compact Encoding: Variable-length encoding reduces storage size
  • Fast Serialization: Optimized binary format for quick read/write operations
  • Schema Registry: Centralized schema management for large organizations

The Protocol Buffers integration provides efficient, type-safe serialization with excellent schema evolution capabilities, making it ideal for large-scale data processing systems that need to handle changing data structures over time.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parquet

docs

avro-integration.md

index.md

protobuf-integration.md

rowdata-integration.md

table-integration.md

utilities.md

vectorized-reading.md

writing-support.md

tile.json