0
# Protocol Buffers Integration
1
2
Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data with schema evolution and cross-language compatibility.
3
4
## Capabilities
5
6
### ParquetProtoWriters
7
8
Factory class for creating ParquetWriterFactory instances that can write Protocol Buffers messages to Parquet files.
9
10
```java { .api }
11
/**
12
* Convenience builder for creating ParquetWriterFactory instances for Protobuf classes
13
*/
14
public class ParquetProtoWriters {
15
16
/**
17
* Creates a ParquetWriterFactory for Protocol Buffers message types
18
* @param <T> Protocol Buffers message type extending Message
19
* @param type Class of the Protocol Buffers message to write
20
* @return ParquetWriterFactory for writing protobuf messages
21
*/
22
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
23
}
24
```
25
26
### ParquetProtoWriterBuilder
27
28
Internal builder class for creating Protocol Buffers-specific ParquetWriter instances with proper WriteSupport configuration.
29
30
```java { .api }
31
/**
32
* Builder for Protocol Buffers ParquetWriter instances
33
* @param <T> Protocol Buffers message type
34
*/
35
public static class ParquetProtoWriterBuilder<T extends Message>
36
extends ParquetWriter.Builder<T, ParquetProtoWriterBuilder<T>> {
37
38
/**
39
* Creates a new ParquetProtoWriterBuilder
40
* @param outputFile OutputFile to write to
41
* @param clazz Class of the Protocol Buffers message
42
*/
43
public ParquetProtoWriterBuilder(OutputFile outputFile, Class<T> clazz);
44
45
/**
46
* Returns self reference for builder pattern
47
* @return This builder instance
48
*/
49
protected ParquetProtoWriterBuilder<T> self();
50
51
/**
52
* Creates WriteSupport for Protocol Buffers messages
53
* @param conf Hadoop configuration
54
* @return ProtoWriteSupport instance for the message type
55
*/
56
protected WriteSupport<T> getWriteSupport(Configuration conf);
57
}
58
```
59
60
## Usage Examples
61
62
### Basic Protocol Buffers Writing
63
64
```java
65
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
66
import org.apache.flink.connector.file.sink.FileSink;
67
import com.google.protobuf.Message;
68
69
// Define your Protocol Buffers message class
70
// Assuming you have a generated class MyMessage from your .proto file
71
public class MyMessage extends Message {
72
// Generated protobuf code
73
}
74
75
// Create writer factory for protobuf messages
76
ParquetWriterFactory<MyMessage> protoWriterFactory =
77
ParquetProtoWriters.forType(MyMessage.class);
78
79
// Create FileSink with Protocol Buffers writer
80
FileSink<MyMessage> protoSink = FileSink
81
.forBulkFormat(
82
new Path("/output/protobuf-data"),
83
protoWriterFactory
84
)
85
.withRollingPolicy(
86
DefaultRollingPolicy.builder()
87
.withRolloverInterval(Duration.ofMinutes(10))
88
.withInactivityInterval(Duration.ofMinutes(2))
89
.build()
90
)
91
.build();
92
93
// Write protobuf messages to Parquet
94
DataStream<MyMessage> messageStream = env.addSource(new MyMessageSource());
95
messageStream.sinkTo(protoSink);
96
```
97
98
### Complex Protocol Buffers Schema
99
100
```java
101
// Example .proto file:
102
// syntax = "proto3";
103
//
104
// message UserEvent {
105
// int64 user_id = 1;
106
// string event_type = 2;
107
// int64 timestamp = 3;
108
// UserProfile profile = 4;
109
// repeated string tags = 5;
110
// map<string, string> properties = 6;
111
// }
112
//
113
// message UserProfile {
114
// string name = 1;
115
// string email = 2;
116
// int32 age = 3;
117
// }
118
119
// Generated classes: UserEvent, UserProfile
120
121
// Create writer for complex nested protobuf messages
122
ParquetWriterFactory<UserEvent> userEventWriterFactory =
123
ParquetProtoWriters.forType(UserEvent.class);
124
125
// Process and write complex events
126
DataStream<UserEvent> userEventStream = rawEventStream
127
.map(rawEvent -> {
128
UserEvent.Builder eventBuilder = UserEvent.newBuilder()
129
.setUserId(rawEvent.getUserId())
130
.setEventType(rawEvent.getType())
131
.setTimestamp(rawEvent.getTimestamp());
132
133
// Build nested profile
134
UserProfile profile = UserProfile.newBuilder()
135
.setName(rawEvent.getProfile().getName())
136
.setEmail(rawEvent.getProfile().getEmail())
137
.setAge(rawEvent.getProfile().getAge())
138
.build();
139
140
eventBuilder.setProfile(profile);
141
142
// Add repeated fields
143
rawEvent.getTags().forEach(eventBuilder::addTags);
144
145
// Add map fields
146
eventBuilder.putAllProperties(rawEvent.getProperties());
147
148
return eventBuilder.build();
149
});
150
151
FileSink<UserEvent> complexSink = FileSink
152
.forBulkFormat(new Path("/events/protobuf"), userEventWriterFactory)
153
.build();
154
155
userEventStream.sinkTo(complexSink);
156
```
157
158
### Protocol Buffers with Custom Configuration
159
160
```java
161
import org.apache.flink.formats.parquet.ParquetBuilder;
162
import org.apache.parquet.hadoop.ParquetWriter;
163
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
164
165
// Create custom ParquetBuilder for protobuf with specific settings
166
ParquetBuilder<MyMessage> customProtoBuilder = (OutputFile out) -> {
167
return ParquetProtoWriters.ParquetProtoWriterBuilder
168
.builder(out, MyMessage.class)
169
.withCompressionCodec(CompressionCodecName.SNAPPY)
170
.withDictionaryEncoding(true)
171
.withPageSize(2 * 1024 * 1024) // 2MB pages
172
.withRowGroupSize(128 * 1024 * 1024) // 128MB row groups
173
.build();
174
};
175
176
ParquetWriterFactory<MyMessage> customFactory =
177
new ParquetWriterFactory<>(customProtoBuilder);
178
```
179
180
### Schema Evolution Handling
181
182
```java
183
// Handle protobuf schema evolution gracefully
184
// Original message v1:
185
// message ProductV1 {
186
// int64 id = 1;
187
// string name = 2;
188
// double price = 3;
189
// }
190
191
// Evolved message v2 (backward compatible):
192
// message ProductV2 {
193
// int64 id = 1;
194
// string name = 2;
195
// double price = 3;
196
// string category = 4; // New optional field
197
// repeated string tags = 5; // New repeated field
198
// }
199
200
// Writer can handle both versions
201
ParquetWriterFactory<ProductV2> evolvedWriterFactory =
202
ParquetProtoWriters.forType(ProductV2.class);
203
204
// Convert from v1 to v2 during processing
205
DataStream<ProductV2> evolvedStream = v1Stream.map(v1Product -> {
206
return ProductV2.newBuilder()
207
.setId(v1Product.getId())
208
.setName(v1Product.getName())
209
.setPrice(v1Product.getPrice())
210
.setCategory("UNKNOWN") // Default for new field
211
.build();
212
});
213
214
evolvedStream.sinkTo(FileSink.forBulkFormat(outputPath, evolvedWriterFactory).build());
215
```
216
217
### Integration with Flink SQL/Table API
218
219
```java
220
// Note: Direct SQL integration with protobuf requires custom deserialization
221
// This example shows the DataStream to Table conversion approach
222
223
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
224
import org.apache.flink.table.api.Schema;
225
226
// Convert protobuf stream to Table for SQL processing
227
Table protoTable = tableEnv.fromDataStream(
228
messageStream,
229
Schema.newBuilder()
230
.column("user_id", DataTypes.BIGINT())
231
.column("event_type", DataTypes.STRING())
232
.column("timestamp", DataTypes.TIMESTAMP(3))
233
.column("profile_name", DataTypes.STRING())
234
.column("profile_email", DataTypes.STRING())
235
.build()
236
);
237
238
// Register table for SQL queries
239
tableEnv.createTemporaryView("user_events", protoTable);
240
241
// Query with SQL
242
Table result = tableEnv.sqlQuery("""
243
SELECT
244
user_id,
245
event_type,
246
COUNT(*) as event_count
247
FROM user_events
248
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
249
GROUP BY user_id, event_type
250
""");
251
252
// Convert back to DataStream and write as protobuf
253
DataStream<Row> resultStream = tableEnv.toDataStream(result);
254
DataStream<SummaryMessage> summaryStream = resultStream.map(row -> {
255
return SummaryMessage.newBuilder()
256
.setUserId(row.getField(0))
257
.setEventType(row.getField(1))
258
.setCount(row.getField(2))
259
.build();
260
});
261
262
summaryStream.sinkTo(FileSink
263
.forBulkFormat(summaryPath, ParquetProtoWriters.forType(SummaryMessage.class))
264
.build());
265
```
266
267
### Performance Optimization
268
269
```java
270
// Optimize protobuf writing performance
271
ParquetBuilder<MyMessage> optimizedBuilder = (out) -> {
272
return MyMessage.class
273
.getDeclaredConstructor() // Use custom configuration
274
.newInstance()
275
.toBuilder()
276
.build();
277
};
278
279
// Configure for high-throughput scenarios
280
Configuration hadoopConf = new Configuration();
281
hadoopConf.set("parquet.proto.writeInt96AsFixedLenByteArray", "false");
282
hadoopConf.set("parquet.proto.add-string-annotations", "true");
283
hadoopConf.setInt("parquet.page.size", 1024 * 1024); // 1MB pages
284
hadoopConf.setInt("parquet.block.size", 256 * 1024 * 1024); // 256MB blocks
285
286
// Use configuration in custom builder
287
ParquetBuilder<MyMessage> configuredBuilder = (out) -> {
288
return ParquetProtoWriters.ParquetProtoWriterBuilder
289
.builder(out, MyMessage.class)
290
.withConf(hadoopConf)
291
.withCompressionCodec(CompressionCodecName.LZ4)
292
.build();
293
};
294
```
295
296
### Error Handling and Validation
297
298
```java
299
// Handle protobuf serialization errors
300
DataStream<MyMessage> validatedStream = rawDataStream
301
.map(new RichMapFunction<RawData, MyMessage>() {
302
private transient Counter invalidMessages;
303
304
@Override
305
public void open(Configuration parameters) {
306
invalidMessages = getRuntimeContext()
307
.getMetricGroup()
308
.counter("invalid_protobuf_messages");
309
}
310
311
@Override
312
public MyMessage map(RawData raw) throws Exception {
313
try {
314
MyMessage.Builder builder = MyMessage.newBuilder();
315
316
// Validate required fields
317
if (raw.getId() <= 0) {
318
throw new IllegalArgumentException("Invalid ID: " + raw.getId());
319
}
320
321
// Build message with validation
322
MyMessage message = builder
323
.setId(raw.getId())
324
.setName(validateAndCleanString(raw.getName()))
325
.setTimestamp(raw.getTimestamp())
326
.build();
327
328
// Validate the built message
329
if (!message.isInitialized()) {
330
throw new IllegalStateException("Message not properly initialized");
331
}
332
333
return message;
334
335
} catch (Exception e) {
336
invalidMessages.inc();
337
LOG.warn("Failed to create protobuf message from raw data: {}", raw, e);
338
// Return default message or rethrow based on requirements
339
throw e;
340
}
341
}
342
343
private String validateAndCleanString(String input) {
344
return input != null ? input.trim() : "";
345
}
346
})
347
.filter(Objects::nonNull);
348
349
validatedStream.sinkTo(protoSink);
350
```
351
352
## Protocol Buffers Advantages
353
354
### Schema Evolution
355
- **Backward Compatibility**: New optional fields don't break existing readers
356
- **Forward Compatibility**: Old readers can handle new data by ignoring unknown fields
357
- **Field Numbering**: Stable field IDs enable safe schema changes
358
359
### Cross-Language Support
360
- **Language Agnostic**: Same Parquet files readable from Python, C++, Go, etc.
361
- **Code Generation**: Strongly-typed classes generated from .proto definitions
362
- **Standardized Serialization**: Consistent binary format across platforms
363
364
### Performance Benefits
365
- **Compact Encoding**: Variable-length encoding reduces storage size
366
- **Fast Serialization**: Optimized binary format for quick read/write operations
367
- **Schema Registry**: Centralized schema management for large organizations
368
369
The Protocol Buffers integration provides efficient, type-safe serialization with excellent schema evolution capabilities, making it ideal for large-scale data processing systems that need to handle changing data structures over time.