CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-parquet-2-12

Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.

Pending
Overview
Eval results
Files

protobuf-integration.mddocs/

Protobuf Integration

Seamless Protocol Buffers integration for writing Protobuf messages to Parquet format with automatic schema generation and type-safe serialization.

Capabilities

ParquetProtoWriters

Utility class providing convenient methods for creating Parquet writer factories that handle Protocol Buffers messages.

/**
 * Convenience builder for creating ParquetWriterFactory instances for Protobuf message types
 * Automatically handles schema generation and message serialization
 */
public class ParquetProtoWriters {
    
    /**
     * Creates a ParquetWriterFactory for Protocol Buffers message types
     * Automatically derives the Parquet schema from the Protobuf message definition
     * @param type The Protobuf message class extending com.google.protobuf.Message
     * @return ParquetWriterFactory configured for the specified message type
     */
    public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
}

Usage Examples

Basic Protobuf Message Writing

import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
import com.google.protobuf.Message;

// Assume you have a generated Protobuf message class
// Generated from user.proto:
// message User {
//   string user_id = 1;
//   string name = 2;
//   int32 age = 3;
//   string email = 4;
//   repeated string interests = 5;
// }

// Create writer factory for the Protobuf message
ParquetWriterFactory<UserProto.User> userWriterFactory = 
    ParquetProtoWriters.forType(UserProto.User.class);

// Use with Flink's FileSink
FileSink<UserProto.User> userSink = FileSink
    .forBulkFormat(new Path("/output/users"), userWriterFactory)
    .build();

// Write Protobuf messages
DataStream<UserProto.User> userStream = env
    .fromCollection(Arrays.asList(
        UserProto.User.newBuilder()
            .setUserId("user123")
            .setName("Alice Johnson")
            .setAge(28)
            .setEmail("alice@example.com")
            .addInterests("technology")
            .addInterests("music")
            .build(),
        UserProto.User.newBuilder()
            .setUserId("user456")
            .setName("Bob Smith")
            .setAge(35)
            .setEmail("bob@example.com")
            .addInterests("sports")
            .build()
    ));

userStream.sinkTo(userSink);

Complex Nested Messages

// Complex nested Protobuf schema
// order.proto:
// message Order {
//   string order_id = 1;
//   int64 timestamp = 2;
//   Customer customer = 3;
//   repeated OrderItem items = 4;
//   double total_amount = 5;
//   OrderStatus status = 6;
// }
//
// message Customer {
//   string customer_id = 1;
//   string name = 2;
//   Address address = 3;
// }
//
// message Address {
//   string street = 1;
//   string city = 2;
//   string state = 3;
//   string zip_code = 4;
// }
//
// message OrderItem {
//   string product_id = 1;
//   string product_name = 2;
//   int32 quantity = 3;
//   double unit_price = 4;
// }
//
// enum OrderStatus {
//   PENDING = 0;
//   CONFIRMED = 1;
//   SHIPPED = 2;
//   DELIVERED = 3;
//   CANCELLED = 4;
// }

ParquetWriterFactory<OrderProto.Order> orderWriterFactory = 
    ParquetProtoWriters.forType(OrderProto.Order.class);

// Create complex nested message
OrderProto.Order order = OrderProto.Order.newBuilder()
    .setOrderId("ORD-001")
    .setTimestamp(System.currentTimeMillis())
    .setCustomer(OrderProto.Customer.newBuilder()
        .setCustomerId("CUST-123")
        .setName("John Doe")
        .setAddress(OrderProto.Address.newBuilder()
            .setStreet("123 Main St")
            .setCity("San Francisco")
            .setState("CA")
            .setZipCode("94105")
            .build())
        .build())
    .addItems(OrderProto.OrderItem.newBuilder()
        .setProductId("PROD-001")
        .setProductName("Laptop")
        .setQuantity(1)
        .setUnitPrice(999.99)
        .build())
    .addItems(OrderProto.OrderItem.newBuilder()
        .setProductId("PROD-002")
        .setProductName("Mouse")
        .setQuantity(2)
        .setUnitPrice(29.99)
        .build())
    .setTotalAmount(1059.97)
    .setStatus(OrderProto.OrderStatus.CONFIRMED)
    .build();

Streaming Integration with Kafka

import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

// Deserialize Protobuf messages from Kafka
KafkaSource<UserProto.User> kafkaSource = KafkaSource.<UserProto.User>builder()
    .setBootstrapServers("localhost:9092")
    .setTopics("user-events")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new ProtobufDeserializationSchema<>(UserProto.User.class))
    .build();

DataStream<UserProto.User> userEvents = env.fromSource(kafkaSource, 
    WatermarkStrategy.noWatermarks(), "kafka-source");

// Write to Parquet files
ParquetWriterFactory<UserProto.User> protoWriterFactory = 
    ParquetProtoWriters.forType(UserProto.User.class);

FileSink<UserProto.User> parquetSink = FileSink
    .forBulkFormat(new Path("/data/users"), protoWriterFactory)
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withRolloverInterval(Duration.ofMinutes(10))
        .withInactivityInterval(Duration.ofMinutes(2))
        .build())
    .build();

userEvents.sinkTo(parquetSink);

Schema Evolution with Protobuf

// Original message definition (v1)
// message UserV1 {
//   string user_id = 1;
//   string name = 2;
//   int32 age = 3;
// }

// Evolved message definition (v2) - backward compatible
// message UserV2 {
//   string user_id = 1;
//   string name = 2;
//   int32 age = 3;
//   string email = 4;        // New optional field
//   repeated string tags = 5; // New repeated field
// }

// Both versions can be written to Parquet with their respective writers
ParquetWriterFactory<UserV1Proto.UserV1> writerV1 = 
    ParquetProtoWriters.forType(UserV1Proto.UserV1.class);

ParquetWriterFactory<UserV2Proto.UserV2> writerV2 = 
    ParquetProtoWriters.forType(UserV2Proto.UserV2.class);

// Parquet files written with different schema versions can coexist
// Schema evolution rules follow Protobuf compatibility guidelines

Performance Optimization

// Configure FileSink for high-throughput Protobuf writing
ParquetWriterFactory<MyProtoMessage> optimizedFactory = 
    ParquetProtoWriters.forType(MyProtoMessage.class);

FileSink<MyProtoMessage> highThroughputSink = FileSink
    .forBulkFormat(outputPath, optimizedFactory)
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withRolloverInterval(Duration.ofMinutes(15))    // Larger files
        .withInactivityInterval(Duration.ofMinutes(5))   // Longer wait
        .withMaxPartSize(MemorySize.ofMebiBytes(512))    // Larger parts
        .build())
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy/MM/dd/HH"))
    .build();

Custom Protobuf Message Processing

// Transform Protobuf messages before writing
DataStream<OrderProto.Order> orders = // ... source stream

DataStream<OrderProto.Order> enrichedOrders = orders
    .map(order -> {
        // Enrich or transform the Protobuf message
        return order.toBuilder()
            .setTimestamp(System.currentTimeMillis())
            .setStatus(calculateStatus(order))
            .build();
    })
    .returns(OrderProto.Order.class);

// Write enriched messages to Parquet
ParquetWriterFactory<OrderProto.Order> enrichedWriterFactory = 
    ParquetProtoWriters.forType(OrderProto.Order.class);

enrichedOrders.sinkTo(FileSink
    .forBulkFormat(new Path("/data/enriched-orders"), enrichedWriterFactory)
    .build());

Type Mapping

Protobuf types are automatically mapped to Parquet types:

Protobuf TypeParquet TypeNotes
boolBOOLEANDirect mapping
int32, sint32, sfixed32INT3232-bit signed integers
int64, sint64, sfixed64INT6464-bit signed integers
uint32, fixed32INT32Stored as signed, interpretation differs
uint64, fixed64INT64Stored as signed, interpretation differs
floatFLOATIEEE 754 single precision
doubleDOUBLEIEEE 754 double precision
stringBINARY (UTF8)UTF-8 encoded strings
bytesBINARYRaw binary data
enumBINARY (ENUM)Stored as string representation
messageGROUPNested structure
repeatedLISTRepeated fields as lists
mapMAPKey-value mappings

Error Handling

Common error scenarios and troubleshooting:

try {
    ParquetWriterFactory<MyProtoMessage> factory = 
        ParquetProtoWriters.forType(MyProtoMessage.class);
} catch (IllegalArgumentException e) {
    // Invalid message class or unsupported Protobuf features
    logger.error("Failed to create Protobuf writer factory", e);
} catch (Exception e) {
    // Other initialization errors (missing dependencies, etc.)
    logger.error("Unexpected error creating writer factory", e);
}

// Runtime writing errors
try {
    bulkWriter.addElement(protoMessage);
} catch (IOException e) {
    // File system errors, serialization failures
    logger.error("Failed to write Protobuf message", e);
} catch (RuntimeException e) {
    // Message validation errors, schema incompatibilities
    logger.error("Runtime error writing message", e);
}

Dependencies

Required dependencies for Protobuf integration:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-parquet_2.12</artifactId>
    <version>1.14.6</version>
</dependency>

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.19.4</version>
</dependency>

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-protobuf</artifactId>
    <version>1.12.2</version>
</dependency>

Protobuf Code Generation

Generate Java classes from .proto files:

# Using protoc compiler
protoc --java_out=src/main/java user.proto order.proto

# Using Maven plugin
# Add to pom.xml:
# <plugin>
#   <groupId>org.xolstice.maven.plugins</groupId>
#   <artifactId>protobuf-maven-plugin</artifactId>
#   <version>0.6.1</version>
#   <configuration>
#     <protocArtifact>com.google.protobuf:protoc:3.19.4:exe:${os.detected.classifier}</protocArtifact>
#   </configuration>
#   <executions>
#     <execution>
#       <goals>
#         <goal>compile</goal>
#       </goals>
#     </execution>
#   </executions>
# </plugin>

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12

docs

avro-integration.md

format-factory.md

index.md

protobuf-integration.md

rowdata-writers.md

schema-utilities.md

vectorized-input.md

tile.json