0
# Avro Integration
1
2
Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.
3
4
## Capabilities
5
6
### AvroParquetReaders
7
8
Factory methods for creating StreamFormat readers that can read Avro records from Parquet files.
9
10
```java { .api }
11
/**
12
* Convenience builder to create AvroParquetRecordFormat instances for different Avro record types
13
*/
14
@Experimental
15
public class AvroParquetReaders {
16
17
/**
18
* Creates a StreamFormat for reading Avro SpecificRecord types from Parquet files
19
* @param <T> SpecificRecord type
20
* @param typeClass Class of the SpecificRecord to read
21
* @return StreamFormat for reading SpecificRecord instances
22
*/
23
public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);
24
25
/**
26
* Creates a StreamFormat for reading Avro GenericRecord types from Parquet files
27
* Requires explicit schema since Flink needs schema information for serialization
28
* @param schema Avro schema for the GenericRecord
29
* @return StreamFormat for reading GenericRecord instances
30
*/
31
public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);
32
33
/**
34
* Creates a StreamFormat for reading POJOs from Parquet files using Avro reflection
35
* @param <T> POJO type (not SpecificRecord or GenericRecord)
36
* @param typeClass Class of the POJO to read via reflection
37
* @return StreamFormat for reading POJO instances
38
* @throws IllegalArgumentException if typeClass is SpecificRecord or GenericRecord
39
*/
40
public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);
41
}
42
```
43
44
### AvroParquetWriters
45
46
Factory methods for creating ParquetWriterFactory instances that can write Avro records to Parquet files.
47
48
```java { .api }
49
/**
50
* Convenience builder to create ParquetWriterFactory instances for different Avro types
51
*/
52
@Experimental
53
public class AvroParquetWriters {
54
55
/**
56
* Creates a ParquetWriterFactory for Avro SpecificRecord types
57
* @param <T> SpecificRecord type
58
* @param type Class of the SpecificRecord to write
59
* @return ParquetWriterFactory for writing SpecificRecord instances
60
*/
61
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
62
63
/**
64
* Creates a ParquetWriterFactory for Avro GenericRecord types
65
* @param schema Avro schema for the GenericRecord
66
* @return ParquetWriterFactory for writing GenericRecord instances
67
*/
68
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
69
70
/**
71
* Creates a ParquetWriterFactory for POJOs using Avro reflection
72
* @param <T> POJO type
73
* @param type Class of the POJO to write via reflection
74
* @return ParquetWriterFactory for writing POJO instances
75
*/
76
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
77
}
78
```
79
80
### AvroParquetRecordFormat
81
82
StreamFormat implementation for reading Avro records from Parquet files with proper type information handling.
83
84
```java { .api }
85
/**
86
* StreamFormat implementation for reading Avro records from Parquet files
87
* @param <E> Avro record type
88
*/
89
public class AvroParquetRecordFormat<E> implements StreamFormat<E> {
90
91
/**
92
* Creates a new AvroParquetRecordFormat
93
* @param avroTypeInfo Type information for the Avro records
94
* @param dataModelSupplier Supplier for GenericData instance
95
*/
96
public AvroParquetRecordFormat(
97
TypeInformation<E> avroTypeInfo,
98
SerializableSupplier<GenericData> dataModelSupplier
99
);
100
101
/**
102
* Creates a reader for the given file split
103
* @param config Hadoop configuration
104
* @param split File source split to read
105
* @return Reader iterator for Avro records
106
* @throws IOException if reader creation fails
107
*/
108
public RecordReaderIterator<E> createReader(Configuration config, FileSourceSplit split) throws IOException;
109
110
/**
111
* Restores a reader from checkpoint state
112
* @param config Hadoop configuration
113
* @param split File source split to read
114
* @return Reader iterator for Avro records
115
* @throws IOException if reader restoration fails
116
*/
117
public RecordReaderIterator<E> restoreReader(Configuration config, FileSourceSplit split) throws IOException;
118
119
/**
120
* Indicates whether this format supports splitting (it does not)
121
* @return false - Parquet files with Avro are not splittable
122
*/
123
public boolean isSplittable();
124
125
/**
126
* Returns the produced type information
127
* @return TypeInformation for the Avro record type
128
*/
129
public TypeInformation<E> getProducedType();
130
}
131
```
132
133
## Usage Examples
134
135
### Reading SpecificRecord
136
137
```java
138
import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
139
import org.apache.flink.connector.file.src.FileSource;
140
import org.apache.flink.core.fs.Path;
141
import org.apache.avro.specific.SpecificRecordBase;
142
143
// Define your Avro SpecificRecord class
144
public class User extends SpecificRecordBase {
145
private String name;
146
private int age;
147
private String email;
148
// ... constructors, getters, setters
149
}
150
151
// Create file source for reading User records
152
FileSource<User> source = FileSource
153
.forRecordStreamFormat(
154
AvroParquetReaders.forSpecificRecord(User.class),
155
new Path("hdfs://path/to/user/parquet/files")
156
)
157
.build();
158
159
// Create DataStream
160
DataStream<User> userStream = env.fromSource(
161
source,
162
WatermarkStrategy.noWatermarks(),
163
"user-parquet-source"
164
);
165
```
166
167
### Reading GenericRecord
168
169
```java
170
import org.apache.avro.Schema;
171
import org.apache.avro.generic.GenericRecord;
172
173
// Define Avro schema
174
String schemaString = """
175
{
176
"type": "record",
177
"name": "Product",
178
"fields": [
179
{"name": "id", "type": "long"},
180
{"name": "name", "type": "string"},
181
{"name": "price", "type": "double"},
182
{"name": "category", "type": ["null", "string"], "default": null}
183
]
184
}
185
""";
186
187
Schema schema = new Schema.Parser().parse(schemaString);
188
189
// Create file source for GenericRecord
190
FileSource<GenericRecord> source = FileSource
191
.forRecordStreamFormat(
192
AvroParquetReaders.forGenericRecord(schema),
193
new Path("/data/products")
194
)
195
.build();
196
197
DataStream<GenericRecord> productStream = env.fromSource(
198
source,
199
WatermarkStrategy.noWatermarks(),
200
"product-parquet-source"
201
);
202
203
// Process GenericRecord
204
productStream.map(record -> {
205
Long id = (Long) record.get("id");
206
String name = record.get("name").toString();
207
Double price = (Double) record.get("price");
208
return new ProcessedProduct(id, name, price);
209
});
210
```
211
212
### Reading with Reflection
213
214
```java
215
// POJO class for reflection-based reading
216
public class Event {
217
public long timestamp;
218
public String eventType;
219
public Map<String, Object> properties;
220
221
// Default constructor required
222
public Event() {}
223
224
// Constructor, getters, setters...
225
}
226
227
// Create file source using reflection
228
FileSource<Event> source = FileSource
229
.forRecordStreamFormat(
230
AvroParquetReaders.forReflectRecord(Event.class),
231
new Path("/events/parquet")
232
)
233
.build();
234
```
235
236
### Writing SpecificRecord
237
238
```java
239
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
240
import org.apache.flink.connector.file.sink.FileSink;
241
242
// Create FileSink for writing User records
243
FileSink<User> sink = FileSink
244
.forBulkFormat(
245
new Path("/output/users"),
246
AvroParquetWriters.forSpecificRecord(User.class)
247
)
248
.withRollingPolicy(
249
DefaultRollingPolicy.builder()
250
.withRolloverInterval(Duration.ofMinutes(10))
251
.withInactivityInterval(Duration.ofMinutes(2))
252
.build()
253
)
254
.build();
255
256
// Write user stream to Parquet
257
userStream.sinkTo(sink);
258
```
259
260
### Writing GenericRecord
261
262
```java
263
// Create GenericRecord instances
264
DataStream<GenericRecord> genericStream = originalStream.map(data -> {
265
GenericRecord record = new GenericData.Record(schema);
266
record.put("id", data.getId());
267
record.put("name", data.getName());
268
record.put("price", data.getPrice());
269
record.put("category", data.getCategory());
270
return record;
271
});
272
273
// Create sink for GenericRecord
274
FileSink<GenericRecord> genericSink = FileSink
275
.forBulkFormat(
276
new Path("/output/products"),
277
AvroParquetWriters.forGenericRecord(schema)
278
)
279
.build();
280
281
genericStream.sinkTo(genericSink);
282
```
283
284
### Schema Evolution Handling
285
286
```java
287
import org.apache.avro.Schema;
288
289
// Handle schema evolution by reading with newer schema
290
Schema oldSchema = getOldSchema();
291
Schema newSchema = getNewSchemaWithDefaults();
292
293
// Reader will handle missing fields using defaults
294
FileSource<GenericRecord> evolutionSource = FileSource
295
.forRecordStreamFormat(
296
AvroParquetReaders.forGenericRecord(newSchema),
297
new Path("/data/evolved-records")
298
)
299
.build();
300
301
DataStream<GenericRecord> evolvedStream = env.fromSource(
302
evolutionSource,
303
WatermarkStrategy.noWatermarks(),
304
"evolved-source"
305
);
306
```
307
308
### Complex Nested Types
309
310
```java
311
// Avro schema with nested types
312
String nestedSchema = """
313
{
314
"type": "record",
315
"name": "Order",
316
"fields": [
317
{"name": "orderId", "type": "string"},
318
{"name": "customer", "type": {
319
"type": "record",
320
"name": "Customer",
321
"fields": [
322
{"name": "id", "type": "long"},
323
{"name": "name", "type": "string"}
324
]
325
}},
326
{"name": "items", "type": {
327
"type": "array",
328
"items": {
329
"type": "record",
330
"name": "OrderItem",
331
"fields": [
332
{"name": "productId", "type": "string"},
333
{"name": "quantity", "type": "int"},
334
{"name": "unitPrice", "type": "double"}
335
]
336
}
337
}}
338
]
339
}
340
""";
341
342
Schema orderSchema = new Schema.Parser().parse(nestedSchema);
343
344
// Read complex nested structures
345
FileSource<GenericRecord> orderSource = FileSource
346
.forRecordStreamFormat(
347
AvroParquetReaders.forGenericRecord(orderSchema),
348
new Path("/orders/nested")
349
)
350
.build();
351
```
352
353
## Performance Considerations
354
355
### Memory Usage
356
357
Avro-Parquet integration requires schema information in memory for serialization/deserialization. For GenericRecord usage, schemas should be reused when possible to minimize memory overhead.
358
359
### Type Safety
360
361
SpecificRecord provides compile-time type safety and better performance due to generated code. GenericRecord offers more flexibility but requires runtime type checking.
362
363
### Schema Registry Integration
364
365
```java
366
// Example with schema registry (conceptual)
367
Schema schema = schemaRegistry.getLatestSchema("user-events");
368
FileSource<GenericRecord> source = FileSource
369
.forRecordStreamFormat(
370
AvroParquetReaders.forGenericRecord(schema),
371
inputPath
372
)
373
.build();
374
```
375
376
The Avro integration provides seamless compatibility with existing Avro-based data pipelines while leveraging Parquet's columnar storage benefits for improved query performance.