Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.
—
Seamless Protocol Buffers integration for writing Protobuf messages to Parquet format with automatic schema generation and type-safe serialization.
Utility class providing convenient methods for creating Parquet writer factories that handle Protocol Buffers messages.
/**
* Convenience builder for creating ParquetWriterFactory instances for Protobuf message types
* Automatically handles schema generation and message serialization
*/
public class ParquetProtoWriters {
/**
* Creates a ParquetWriterFactory for Protocol Buffers message types
* Automatically derives the Parquet schema from the Protobuf message definition
* @param type The Protobuf message class extending com.google.protobuf.Message
* @return ParquetWriterFactory configured for the specified message type
*/
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
}import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
import com.google.protobuf.Message;
// Assume you have a generated Protobuf message class
// Generated from user.proto:
// message User {
// string user_id = 1;
// string name = 2;
// int32 age = 3;
// string email = 4;
// repeated string interests = 5;
// }
// Create writer factory for the Protobuf message
ParquetWriterFactory<UserProto.User> userWriterFactory =
ParquetProtoWriters.forType(UserProto.User.class);
// Use with Flink's FileSink
FileSink<UserProto.User> userSink = FileSink
.forBulkFormat(new Path("/output/users"), userWriterFactory)
.build();
// Write Protobuf messages
DataStream<UserProto.User> userStream = env
.fromCollection(Arrays.asList(
UserProto.User.newBuilder()
.setUserId("user123")
.setName("Alice Johnson")
.setAge(28)
.setEmail("alice@example.com")
.addInterests("technology")
.addInterests("music")
.build(),
UserProto.User.newBuilder()
.setUserId("user456")
.setName("Bob Smith")
.setAge(35)
.setEmail("bob@example.com")
.addInterests("sports")
.build()
));
userStream.sinkTo(userSink);// Complex nested Protobuf schema
// order.proto:
// message Order {
// string order_id = 1;
// int64 timestamp = 2;
// Customer customer = 3;
// repeated OrderItem items = 4;
// double total_amount = 5;
// OrderStatus status = 6;
// }
//
// message Customer {
// string customer_id = 1;
// string name = 2;
// Address address = 3;
// }
//
// message Address {
// string street = 1;
// string city = 2;
// string state = 3;
// string zip_code = 4;
// }
//
// message OrderItem {
// string product_id = 1;
// string product_name = 2;
// int32 quantity = 3;
// double unit_price = 4;
// }
//
// enum OrderStatus {
// PENDING = 0;
// CONFIRMED = 1;
// SHIPPED = 2;
// DELIVERED = 3;
// CANCELLED = 4;
// }
ParquetWriterFactory<OrderProto.Order> orderWriterFactory =
ParquetProtoWriters.forType(OrderProto.Order.class);
// Create complex nested message
OrderProto.Order order = OrderProto.Order.newBuilder()
.setOrderId("ORD-001")
.setTimestamp(System.currentTimeMillis())
.setCustomer(OrderProto.Customer.newBuilder()
.setCustomerId("CUST-123")
.setName("John Doe")
.setAddress(OrderProto.Address.newBuilder()
.setStreet("123 Main St")
.setCity("San Francisco")
.setState("CA")
.setZipCode("94105")
.build())
.build())
.addItems(OrderProto.OrderItem.newBuilder()
.setProductId("PROD-001")
.setProductName("Laptop")
.setQuantity(1)
.setUnitPrice(999.99)
.build())
.addItems(OrderProto.OrderItem.newBuilder()
.setProductId("PROD-002")
.setProductName("Mouse")
.setQuantity(2)
.setUnitPrice(29.99)
.build())
.setTotalAmount(1059.97)
.setStatus(OrderProto.OrderStatus.CONFIRMED)
.build();import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
// Deserialize Protobuf messages from Kafka
KafkaSource<UserProto.User> kafkaSource = KafkaSource.<UserProto.User>builder()
.setBootstrapServers("localhost:9092")
.setTopics("user-events")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new ProtobufDeserializationSchema<>(UserProto.User.class))
.build();
DataStream<UserProto.User> userEvents = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "kafka-source");
// Write to Parquet files
ParquetWriterFactory<UserProto.User> protoWriterFactory =
ParquetProtoWriters.forType(UserProto.User.class);
FileSink<UserProto.User> parquetSink = FileSink
.forBulkFormat(new Path("/data/users"), protoWriterFactory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(10))
.withInactivityInterval(Duration.ofMinutes(2))
.build())
.build();
userEvents.sinkTo(parquetSink);// Original message definition (v1)
// message UserV1 {
// string user_id = 1;
// string name = 2;
// int32 age = 3;
// }
// Evolved message definition (v2) - backward compatible
// message UserV2 {
// string user_id = 1;
// string name = 2;
// int32 age = 3;
// string email = 4; // New optional field
// repeated string tags = 5; // New repeated field
// }
// Both versions can be written to Parquet with their respective writers
ParquetWriterFactory<UserV1Proto.UserV1> writerV1 =
ParquetProtoWriters.forType(UserV1Proto.UserV1.class);
ParquetWriterFactory<UserV2Proto.UserV2> writerV2 =
ParquetProtoWriters.forType(UserV2Proto.UserV2.class);
// Parquet files written with different schema versions can coexist
// Schema evolution rules follow Protobuf compatibility guidelines// Configure FileSink for high-throughput Protobuf writing
ParquetWriterFactory<MyProtoMessage> optimizedFactory =
ParquetProtoWriters.forType(MyProtoMessage.class);
FileSink<MyProtoMessage> highThroughputSink = FileSink
.forBulkFormat(outputPath, optimizedFactory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15)) // Larger files
.withInactivityInterval(Duration.ofMinutes(5)) // Longer wait
.withMaxPartSize(MemorySize.ofMebiBytes(512)) // Larger parts
.build())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy/MM/dd/HH"))
.build();// Transform Protobuf messages before writing
DataStream<OrderProto.Order> orders = // ... source stream
DataStream<OrderProto.Order> enrichedOrders = orders
.map(order -> {
// Enrich or transform the Protobuf message
return order.toBuilder()
.setTimestamp(System.currentTimeMillis())
.setStatus(calculateStatus(order))
.build();
})
.returns(OrderProto.Order.class);
// Write enriched messages to Parquet
ParquetWriterFactory<OrderProto.Order> enrichedWriterFactory =
ParquetProtoWriters.forType(OrderProto.Order.class);
enrichedOrders.sinkTo(FileSink
.forBulkFormat(new Path("/data/enriched-orders"), enrichedWriterFactory)
.build());Protobuf types are automatically mapped to Parquet types:
| Protobuf Type | Parquet Type | Notes |
|---|---|---|
bool | BOOLEAN | Direct mapping |
int32, sint32, sfixed32 | INT32 | 32-bit signed integers |
int64, sint64, sfixed64 | INT64 | 64-bit signed integers |
uint32, fixed32 | INT32 | Stored as signed, interpretation differs |
uint64, fixed64 | INT64 | Stored as signed, interpretation differs |
float | FLOAT | IEEE 754 single precision |
double | DOUBLE | IEEE 754 double precision |
string | BINARY (UTF8) | UTF-8 encoded strings |
bytes | BINARY | Raw binary data |
enum | BINARY (ENUM) | Stored as string representation |
message | GROUP | Nested structure |
repeated | LIST | Repeated fields as lists |
map | MAP | Key-value mappings |
Common error scenarios and troubleshooting:
try {
ParquetWriterFactory<MyProtoMessage> factory =
ParquetProtoWriters.forType(MyProtoMessage.class);
} catch (IllegalArgumentException e) {
// Invalid message class or unsupported Protobuf features
logger.error("Failed to create Protobuf writer factory", e);
} catch (Exception e) {
// Other initialization errors (missing dependencies, etc.)
logger.error("Unexpected error creating writer factory", e);
}
// Runtime writing errors
try {
bulkWriter.addElement(protoMessage);
} catch (IOException e) {
// File system errors, serialization failures
logger.error("Failed to write Protobuf message", e);
} catch (RuntimeException e) {
// Message validation errors, schema incompatibilities
logger.error("Runtime error writing message", e);
}Required dependencies for Protobuf integration:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parquet_2.12</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.4</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>1.12.2</version>
</dependency>Generate Java classes from .proto files:
# Using protoc compiler
protoc --java_out=src/main/java user.proto order.proto
# Using Maven plugin
# Add to pom.xml:
# <plugin>
# <groupId>org.xolstice.maven.plugins</groupId>
# <artifactId>protobuf-maven-plugin</artifactId>
# <version>0.6.1</version>
# <configuration>
# <protocArtifact>com.google.protobuf:protoc:3.19.4:exe:${os.detected.classifier}</protocArtifact>
# </configuration>
# <executions>
# <execution>
# <goals>
# <goal>compile</goal>
# </goals>
# </execution>
# </executions>
# </plugin>Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12