or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

protobuf-integration.mddocs/

0

# Protocol Buffers Integration

1

2

Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data with schema evolution and cross-language compatibility.

3

4

## Capabilities

5

6

### ParquetProtoWriters

7

8

Factory class for creating ParquetWriterFactory instances that can write Protocol Buffers messages to Parquet files.

9

10

```java { .api }

11

/**

12

* Convenience builder for creating ParquetWriterFactory instances for Protobuf classes

13

*/

14

public class ParquetProtoWriters {

15

16

/**

17

* Creates a ParquetWriterFactory for Protocol Buffers message types

18

* @param <T> Protocol Buffers message type extending Message

19

* @param type Class of the Protocol Buffers message to write

20

* @return ParquetWriterFactory for writing protobuf messages

21

*/

22

public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);

23

}

24

```

25

26

### ParquetProtoWriterBuilder

27

28

Internal builder class for creating Protocol Buffers-specific ParquetWriter instances with proper WriteSupport configuration.

29

30

```java { .api }

31

/**

32

* Builder for Protocol Buffers ParquetWriter instances

33

* @param <T> Protocol Buffers message type

34

*/

35

public static class ParquetProtoWriterBuilder<T extends Message>

36

extends ParquetWriter.Builder<T, ParquetProtoWriterBuilder<T>> {

37

38

/**

39

* Creates a new ParquetProtoWriterBuilder

40

* @param outputFile OutputFile to write to

41

* @param clazz Class of the Protocol Buffers message

42

*/

43

public ParquetProtoWriterBuilder(OutputFile outputFile, Class<T> clazz);

44

45

/**

46

* Returns self reference for builder pattern

47

* @return This builder instance

48

*/

49

protected ParquetProtoWriterBuilder<T> self();

50

51

/**

52

* Creates WriteSupport for Protocol Buffers messages

53

* @param conf Hadoop configuration

54

* @return ProtoWriteSupport instance for the message type

55

*/

56

protected WriteSupport<T> getWriteSupport(Configuration conf);

57

}

58

```

59

60

## Usage Examples

61

62

### Basic Protocol Buffers Writing

63

64

```java

65

import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

66

import org.apache.flink.connector.file.sink.FileSink;

67

import com.google.protobuf.Message;

68

69

// Define your Protocol Buffers message class

70

// Assuming you have a generated class MyMessage from your .proto file

71

public class MyMessage extends Message {

72

// Generated protobuf code

73

}

74

75

// Create writer factory for protobuf messages

76

ParquetWriterFactory<MyMessage> protoWriterFactory =

77

ParquetProtoWriters.forType(MyMessage.class);

78

79

// Create FileSink with Protocol Buffers writer

80

FileSink<MyMessage> protoSink = FileSink

81

.forBulkFormat(

82

new Path("/output/protobuf-data"),

83

protoWriterFactory

84

)

85

.withRollingPolicy(

86

DefaultRollingPolicy.builder()

87

.withRolloverInterval(Duration.ofMinutes(10))

88

.withInactivityInterval(Duration.ofMinutes(2))

89

.build()

90

)

91

.build();

92

93

// Write protobuf messages to Parquet

94

DataStream<MyMessage> messageStream = env.addSource(new MyMessageSource());

95

messageStream.sinkTo(protoSink);

96

```

97

98

### Complex Protocol Buffers Schema

99

100

```java

101

// Example .proto file:

102

// syntax = "proto3";

103

//

104

// message UserEvent {

105

// int64 user_id = 1;

106

// string event_type = 2;

107

// int64 timestamp = 3;

108

// UserProfile profile = 4;

109

// repeated string tags = 5;

110

// map<string, string> properties = 6;

111

// }

112

//

113

// message UserProfile {

114

// string name = 1;

115

// string email = 2;

116

// int32 age = 3;

117

// }

118

119

// Generated classes: UserEvent, UserProfile

120

121

// Create writer for complex nested protobuf messages

122

ParquetWriterFactory<UserEvent> userEventWriterFactory =

123

ParquetProtoWriters.forType(UserEvent.class);

124

125

// Process and write complex events

126

DataStream<UserEvent> userEventStream = rawEventStream

127

.map(rawEvent -> {

128

UserEvent.Builder eventBuilder = UserEvent.newBuilder()

129

.setUserId(rawEvent.getUserId())

130

.setEventType(rawEvent.getType())

131

.setTimestamp(rawEvent.getTimestamp());

132

133

// Build nested profile

134

UserProfile profile = UserProfile.newBuilder()

135

.setName(rawEvent.getProfile().getName())

136

.setEmail(rawEvent.getProfile().getEmail())

137

.setAge(rawEvent.getProfile().getAge())

138

.build();

139

140

eventBuilder.setProfile(profile);

141

142

// Add repeated fields

143

rawEvent.getTags().forEach(eventBuilder::addTags);

144

145

// Add map fields

146

eventBuilder.putAllProperties(rawEvent.getProperties());

147

148

return eventBuilder.build();

149

});

150

151

FileSink<UserEvent> complexSink = FileSink

152

.forBulkFormat(new Path("/events/protobuf"), userEventWriterFactory)

153

.build();

154

155

userEventStream.sinkTo(complexSink);

156

```

157

158

### Protocol Buffers with Custom Configuration

159

160

```java

161

import org.apache.flink.formats.parquet.ParquetBuilder;

162

import org.apache.parquet.hadoop.ParquetWriter;

163

import org.apache.parquet.hadoop.metadata.CompressionCodecName;

164

165

// Create custom ParquetBuilder for protobuf with specific settings

166

ParquetBuilder<MyMessage> customProtoBuilder = (OutputFile out) -> {

167

return ParquetProtoWriters.ParquetProtoWriterBuilder

168

.builder(out, MyMessage.class)

169

.withCompressionCodec(CompressionCodecName.SNAPPY)

170

.withDictionaryEncoding(true)

171

.withPageSize(2 * 1024 * 1024) // 2MB pages

172

.withRowGroupSize(128 * 1024 * 1024) // 128MB row groups

173

.build();

174

};

175

176

ParquetWriterFactory<MyMessage> customFactory =

177

new ParquetWriterFactory<>(customProtoBuilder);

178

```

179

180

### Schema Evolution Handling

181

182

```java

183

// Handle protobuf schema evolution gracefully

184

// Original message v1:

185

// message ProductV1 {

186

// int64 id = 1;

187

// string name = 2;

188

// double price = 3;

189

// }

190

191

// Evolved message v2 (backward compatible):

192

// message ProductV2 {

193

// int64 id = 1;

194

// string name = 2;

195

// double price = 3;

196

// string category = 4; // New optional field

197

// repeated string tags = 5; // New repeated field

198

// }

199

200

// Writer can handle both versions

201

ParquetWriterFactory<ProductV2> evolvedWriterFactory =

202

ParquetProtoWriters.forType(ProductV2.class);

203

204

// Convert from v1 to v2 during processing

205

DataStream<ProductV2> evolvedStream = v1Stream.map(v1Product -> {

206

return ProductV2.newBuilder()

207

.setId(v1Product.getId())

208

.setName(v1Product.getName())

209

.setPrice(v1Product.getPrice())

210

.setCategory("UNKNOWN") // Default for new field

211

.build();

212

});

213

214

evolvedStream.sinkTo(FileSink.forBulkFormat(outputPath, evolvedWriterFactory).build());

215

```

216

217

### Integration with Flink SQL/Table API

218

219

```java

220

// Note: Direct SQL integration with protobuf requires custom deserialization

221

// This example shows the DataStream to Table conversion approach

222

223

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

224

import org.apache.flink.table.api.Schema;

225

226

// Convert protobuf stream to Table for SQL processing

227

Table protoTable = tableEnv.fromDataStream(

228

messageStream,

229

Schema.newBuilder()

230

.column("user_id", DataTypes.BIGINT())

231

.column("event_type", DataTypes.STRING())

232

.column("timestamp", DataTypes.TIMESTAMP(3))

233

.column("profile_name", DataTypes.STRING())

234

.column("profile_email", DataTypes.STRING())

235

.build()

236

);

237

238

// Register table for SQL queries

239

tableEnv.createTemporaryView("user_events", protoTable);

240

241

// Query with SQL

242

Table result = tableEnv.sqlQuery("""

243

SELECT

244

user_id,

245

event_type,

246

COUNT(*) as event_count

247

FROM user_events

248

WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR

249

GROUP BY user_id, event_type

250

""");

251

252

// Convert back to DataStream and write as protobuf

253

DataStream<Row> resultStream = tableEnv.toDataStream(result);

254

DataStream<SummaryMessage> summaryStream = resultStream.map(row -> {

255

return SummaryMessage.newBuilder()

256

.setUserId(row.getField(0))

257

.setEventType(row.getField(1))

258

.setCount(row.getField(2))

259

.build();

260

});

261

262

summaryStream.sinkTo(FileSink

263

.forBulkFormat(summaryPath, ParquetProtoWriters.forType(SummaryMessage.class))

264

.build());

265

```

266

267

### Performance Optimization

268

269

```java

270

// Optimize protobuf writing performance

271

ParquetBuilder<MyMessage> optimizedBuilder = (out) -> {

272

return MyMessage.class

273

.getDeclaredConstructor() // Use custom configuration

274

.newInstance()

275

.toBuilder()

276

.build();

277

};

278

279

// Configure for high-throughput scenarios

280

Configuration hadoopConf = new Configuration();

281

hadoopConf.set("parquet.proto.writeInt96AsFixedLenByteArray", "false");

282

hadoopConf.set("parquet.proto.add-string-annotations", "true");

283

hadoopConf.setInt("parquet.page.size", 1024 * 1024); // 1MB pages

284

hadoopConf.setInt("parquet.block.size", 256 * 1024 * 1024); // 256MB blocks

285

286

// Use configuration in custom builder

287

ParquetBuilder<MyMessage> configuredBuilder = (out) -> {

288

return ParquetProtoWriters.ParquetProtoWriterBuilder

289

.builder(out, MyMessage.class)

290

.withConf(hadoopConf)

291

.withCompressionCodec(CompressionCodecName.LZ4)

292

.build();

293

};

294

```

295

296

### Error Handling and Validation

297

298

```java

299

// Handle protobuf serialization errors

300

DataStream<MyMessage> validatedStream = rawDataStream

301

.map(new RichMapFunction<RawData, MyMessage>() {

302

private transient Counter invalidMessages;

303

304

@Override

305

public void open(Configuration parameters) {

306

invalidMessages = getRuntimeContext()

307

.getMetricGroup()

308

.counter("invalid_protobuf_messages");

309

}

310

311

@Override

312

public MyMessage map(RawData raw) throws Exception {

313

try {

314

MyMessage.Builder builder = MyMessage.newBuilder();

315

316

// Validate required fields

317

if (raw.getId() <= 0) {

318

throw new IllegalArgumentException("Invalid ID: " + raw.getId());

319

}

320

321

// Build message with validation

322

MyMessage message = builder

323

.setId(raw.getId())

324

.setName(validateAndCleanString(raw.getName()))

325

.setTimestamp(raw.getTimestamp())

326

.build();

327

328

// Validate the built message

329

if (!message.isInitialized()) {

330

throw new IllegalStateException("Message not properly initialized");

331

}

332

333

return message;

334

335

} catch (Exception e) {

336

invalidMessages.inc();

337

LOG.warn("Failed to create protobuf message from raw data: {}", raw, e);

338

// Return default message or rethrow based on requirements

339

throw e;

340

}

341

}

342

343

private String validateAndCleanString(String input) {

344

return input != null ? input.trim() : "";

345

}

346

})

347

.filter(Objects::nonNull);

348

349

validatedStream.sinkTo(protoSink);

350

```

351

352

## Protocol Buffers Advantages

353

354

### Schema Evolution

355

- **Backward Compatibility**: New optional fields don't break existing readers

356

- **Forward Compatibility**: Old readers can handle new data by ignoring unknown fields

357

- **Field Numbering**: Stable field IDs enable safe schema changes

358

359

### Cross-Language Support

360

- **Language Agnostic**: Same Parquet files readable from Python, C++, Go, etc.

361

- **Code Generation**: Strongly-typed classes generated from .proto definitions

362

- **Standardized Serialization**: Consistent binary format across platforms

363

364

### Performance Benefits

365

- **Compact Encoding**: Variable-length encoding reduces storage size

366

- **Fast Serialization**: Optimized binary format for quick read/write operations

367

- **Schema Registry**: Centralized schema management for large organizations

368

369

The Protocol Buffers integration provides efficient, type-safe serialization with excellent schema evolution capabilities, making it ideal for large-scale data processing systems that need to handle changing data structures over time.