0
# Avro Integration
1
2
Complete Apache Avro integration for Parquet format, supporting specific records, generic records, and reflection-based serialization with full schema compatibility.
3
4
## Capabilities
5
6
### ParquetAvroWriters
7
8
Utility class providing convenience methods for creating Parquet writer factories for various Avro data types.
9
10
```java { .api }
11
/**
12
* Convenience builder for creating ParquetWriterFactory instances for Avro types
13
* Supports specific records, generic records, and reflection-based serialization
14
*/
15
public class ParquetAvroWriters {
16
17
/**
18
* Creates a ParquetWriterFactory for Avro specific record types
19
* Uses the record's built-in schema for Parquet column definition
20
* @param type The specific record class extending SpecificRecordBase
21
* @return ParquetWriterFactory configured for the specific record type
22
*/
23
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
24
25
/**
26
* Creates a ParquetWriterFactory for Avro generic records
27
* Uses the provided schema for Parquet column definition and data serialization
28
* @param schema The Avro schema defining the record structure
29
* @return ParquetWriterFactory configured for generic records with the given schema
30
*/
31
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
32
33
/**
34
* Creates a ParquetWriterFactory using reflection to derive schema from POJO
35
* Automatically generates Avro schema from Java class structure
36
* @param type The Java class to use for reflection-based schema generation
37
* @return ParquetWriterFactory configured for the reflected type
38
*/
39
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
40
}
41
```
42
43
## Usage Examples
44
45
### Specific Record Integration
46
47
```java
48
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
49
import org.apache.avro.specific.SpecificRecordBase;
50
51
// Define Avro specific record (typically generated from .avsc schema)
52
public class User extends SpecificRecordBase {
53
public String name;
54
public int age;
55
public String email;
56
// ... getters, setters, and Avro-generated methods
57
}
58
59
// Create writer factory for specific record
60
ParquetWriterFactory<User> userWriterFactory =
61
ParquetAvroWriters.forSpecificRecord(User.class);
62
63
// Use with FileSink
64
FileSink<User> userSink = FileSink
65
.forBulkFormat(new Path("/output/users"), userWriterFactory)
66
.build();
67
```
68
69
### Generic Record Usage
70
71
```java
72
import org.apache.avro.Schema;
73
import org.apache.avro.generic.GenericRecord;
74
import org.apache.avro.generic.GenericData;
75
76
// Define schema programmatically
77
String schemaJson = """
78
{
79
"type": "record",
80
"name": "Order",
81
"fields": [
82
{"name": "orderId", "type": "long"},
83
{"name": "customerId", "type": "string"},
84
{"name": "amount", "type": "double"},
85
{"name": "timestamp", "type": "long"}
86
]
87
}
88
""";
89
90
Schema orderSchema = new Schema.Parser().parse(schemaJson);
91
92
// Create writer factory for generic records
93
ParquetWriterFactory<GenericRecord> orderWriterFactory =
94
ParquetAvroWriters.forGenericRecord(orderSchema);
95
96
// Create and populate generic records
97
GenericRecord order = new GenericData.Record(orderSchema);
98
order.put("orderId", 12345L);
99
order.put("customerId", "CUST001");
100
order.put("amount", 99.95);
101
order.put("timestamp", System.currentTimeMillis());
102
```
103
104
### Reflection-Based Usage
105
106
```java
107
// Plain Java class (POJO)
108
public class Product {
109
private String productId;
110
private String name;
111
private double price;
112
private String category;
113
114
// Standard constructors, getters, and setters
115
public Product() {}
116
117
public Product(String productId, String name, double price, String category) {
118
this.productId = productId;
119
this.name = name;
120
this.price = price;
121
this.category = category;
122
}
123
124
// ... getters and setters
125
}
126
127
// Create writer factory using reflection
128
ParquetWriterFactory<Product> productWriterFactory =
129
ParquetAvroWriters.forReflectRecord(Product.class);
130
131
// The schema is automatically derived from the class structure
132
DataStream<Product> productStream = // ... your product data stream
133
productStream.sinkTo(FileSink
134
.forBulkFormat(new Path("/output/products"), productWriterFactory)
135
.build());
136
```
137
138
### Complex Schema Integration
139
140
```java
141
import org.apache.avro.Schema;
142
import org.apache.avro.generic.GenericRecord;
143
144
// Complex nested schema with arrays and nested records
145
String complexSchemaJson = """
146
{
147
"type": "record",
148
"name": "Transaction",
149
"fields": [
150
{"name": "transactionId", "type": "string"},
151
{"name": "timestamp", "type": "long"},
152
{"name": "customer", "type": {
153
"type": "record",
154
"name": "Customer",
155
"fields": [
156
{"name": "customerId", "type": "string"},
157
{"name": "name", "type": "string"},
158
{"name": "tier", "type": {"type": "enum", "name": "Tier", "symbols": ["BRONZE", "SILVER", "GOLD"]}}
159
]
160
}},
161
{"name": "items", "type": {
162
"type": "array",
163
"items": {
164
"type": "record",
165
"name": "Item",
166
"fields": [
167
{"name": "itemId", "type": "string"},
168
{"name": "quantity", "type": "int"},
169
{"name": "unitPrice", "type": "double"}
170
]
171
}
172
}},
173
{"name": "totalAmount", "type": "double"}
174
]
175
}
176
""";
177
178
Schema transactionSchema = new Schema.Parser().parse(complexSchemaJson);
179
ParquetWriterFactory<GenericRecord> complexWriterFactory =
180
ParquetAvroWriters.forGenericRecord(transactionSchema);
181
```
182
183
### Schema Evolution and Compatibility
184
185
```java
186
// Original schema v1
187
String schemaV1 = """
188
{
189
"type": "record",
190
"name": "UserEvent",
191
"fields": [
192
{"name": "userId", "type": "string"},
193
{"name": "eventType", "type": "string"},
194
{"name": "timestamp", "type": "long"}
195
]
196
}
197
""";
198
199
// Evolved schema v2 with optional field
200
String schemaV2 = """
201
{
202
"type": "record",
203
"name": "UserEvent",
204
"fields": [
205
{"name": "userId", "type": "string"},
206
{"name": "eventType", "type": "string"},
207
{"name": "timestamp", "type": "long"},
208
{"name": "sessionId", "type": ["null", "string"], "default": null}
209
]
210
}
211
""";
212
213
// Both schemas can be used for reading/writing with Avro's compatibility rules
214
Schema schemaV1Parsed = new Schema.Parser().parse(schemaV1);
215
Schema schemaV2Parsed = new Schema.Parser().parse(schemaV2);
216
217
ParquetWriterFactory<GenericRecord> writerV2 =
218
ParquetAvroWriters.forGenericRecord(schemaV2Parsed);
219
```
220
221
## Advanced Configuration
222
223
### Custom Avro Data Models
224
225
```java
226
import org.apache.avro.generic.GenericData;
227
import org.apache.avro.specific.SpecificData;
228
import org.apache.avro.reflect.ReflectData;
229
230
// The writers automatically use appropriate data models:
231
// - forSpecificRecord() uses SpecificData.get()
232
// - forGenericRecord() uses GenericData.get()
233
// - forReflectRecord() uses ReflectData.get()
234
235
// For custom data model configurations, you may need to modify
236
// the underlying AvroParquetWriter configuration (advanced usage)
237
```
238
239
### Performance Considerations
240
241
```java
242
// For high-throughput scenarios with specific records
243
ParquetWriterFactory<MySpecificRecord> optimizedFactory =
244
ParquetAvroWriters.forSpecificRecord(MySpecificRecord.class);
245
246
// Configure the underlying Parquet settings through FileSink
247
FileSink<MySpecificRecord> optimizedSink = FileSink
248
.forBulkFormat(outputPath, optimizedFactory)
249
.withRollingPolicy(DefaultRollingPolicy.builder()
250
.withRolloverInterval(Duration.ofMinutes(15))
251
.withInactivityInterval(Duration.ofMinutes(5))
252
.withMaxPartSize(MemorySize.ofMebiBytes(256))
253
.build())
254
.build();
255
```
256
257
## Error Handling
258
259
Common exceptions and troubleshooting:
260
261
```java
262
try {
263
ParquetWriterFactory<GenericRecord> factory =
264
ParquetAvroWriters.forGenericRecord(schema);
265
} catch (Exception e) {
266
// Schema parsing errors, invalid schema format
267
}
268
269
try {
270
ParquetWriterFactory<MyPOJO> factory =
271
ParquetAvroWriters.forReflectRecord(MyPOJO.class);
272
} catch (Exception e) {
273
// Reflection errors, unsupported field types,
274
// missing default constructors
275
}
276
277
// Runtime writing errors
278
try {
279
bulkWriter.addElement(avroRecord);
280
} catch (IOException e) {
281
// File system errors, serialization issues
282
} catch (ClassCastException e) {
283
// Type mismatch between record and expected schema
284
}
285
```
286
287
## Schema Registry Integration
288
289
For production environments using schema registries:
290
291
```java
292
// Conceptual integration with Confluent Schema Registry
293
// (requires additional dependencies and configuration)
294
295
/*
296
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
297
298
SchemaRegistryClient schemaRegistry = // ... initialize client
299
Schema schema = schemaRegistry.getByID(schemaId);
300
301
ParquetWriterFactory<GenericRecord> registryBasedFactory =
302
ParquetAvroWriters.forGenericRecord(schema);
303
*/
304
```