Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications
—
Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.
Factory methods for creating StreamFormat readers that can read Avro records from Parquet files.
/**
* Convenience builder to create AvroParquetRecordFormat instances for different Avro record types
*/
@Experimental
public class AvroParquetReaders {
/**
* Creates a StreamFormat for reading Avro SpecificRecord types from Parquet files
* @param <T> SpecificRecord type
* @param typeClass Class of the SpecificRecord to read
* @return StreamFormat for reading SpecificRecord instances
*/
public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);
/**
* Creates a StreamFormat for reading Avro GenericRecord types from Parquet files
* Requires explicit schema since Flink needs schema information for serialization
* @param schema Avro schema for the GenericRecord
* @return StreamFormat for reading GenericRecord instances
*/
public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);
/**
* Creates a StreamFormat for reading POJOs from Parquet files using Avro reflection
* @param <T> POJO type (not SpecificRecord or GenericRecord)
* @param typeClass Class of the POJO to read via reflection
* @return StreamFormat for reading POJO instances
* @throws IllegalArgumentException if typeClass is SpecificRecord or GenericRecord
*/
public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);
}Factory methods for creating ParquetWriterFactory instances that can write Avro records to Parquet files.
/**
* Convenience builder to create ParquetWriterFactory instances for different Avro types
*/
@Experimental
public class AvroParquetWriters {
/**
* Creates a ParquetWriterFactory for Avro SpecificRecord types
* @param <T> SpecificRecord type
* @param type Class of the SpecificRecord to write
* @return ParquetWriterFactory for writing SpecificRecord instances
*/
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
/**
* Creates a ParquetWriterFactory for Avro GenericRecord types
* @param schema Avro schema for the GenericRecord
* @return ParquetWriterFactory for writing GenericRecord instances
*/
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
/**
* Creates a ParquetWriterFactory for POJOs using Avro reflection
* @param <T> POJO type
* @param type Class of the POJO to write via reflection
* @return ParquetWriterFactory for writing POJO instances
*/
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
}StreamFormat implementation for reading Avro records from Parquet files with proper type information handling.
/**
* StreamFormat implementation for reading Avro records from Parquet files
* @param <E> Avro record type
*/
public class AvroParquetRecordFormat<E> implements StreamFormat<E> {
/**
* Creates a new AvroParquetRecordFormat
* @param avroTypeInfo Type information for the Avro records
* @param dataModelSupplier Supplier for GenericData instance
*/
public AvroParquetRecordFormat(
TypeInformation<E> avroTypeInfo,
SerializableSupplier<GenericData> dataModelSupplier
);
/**
* Creates a reader for the given file split
* @param config Hadoop configuration
* @param split File source split to read
* @return Reader iterator for Avro records
* @throws IOException if reader creation fails
*/
public RecordReaderIterator<E> createReader(Configuration config, FileSourceSplit split) throws IOException;
/**
* Restores a reader from checkpoint state
* @param config Hadoop configuration
* @param split File source split to read
* @return Reader iterator for Avro records
* @throws IOException if reader restoration fails
*/
public RecordReaderIterator<E> restoreReader(Configuration config, FileSourceSplit split) throws IOException;
/**
* Indicates whether this format supports splitting (it does not)
* @return false - Parquet files with Avro are not splittable
*/
public boolean isSplittable();
/**
* Returns the produced type information
* @return TypeInformation for the Avro record type
*/
public TypeInformation<E> getProducedType();
}import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.core.fs.Path;
import org.apache.avro.specific.SpecificRecordBase;
// Define your Avro SpecificRecord class
public class User extends SpecificRecordBase {
private String name;
private int age;
private String email;
// ... constructors, getters, setters
}
// Create file source for reading User records
FileSource<User> source = FileSource
.forRecordStreamFormat(
AvroParquetReaders.forSpecificRecord(User.class),
new Path("hdfs://path/to/user/parquet/files")
)
.build();
// Create DataStream
DataStream<User> userStream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"user-parquet-source"
);import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// Define Avro schema
String schemaString = """
{
"type": "record",
"name": "Product",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "price", "type": "double"},
{"name": "category", "type": ["null", "string"], "default": null}
]
}
""";
Schema schema = new Schema.Parser().parse(schemaString);
// Create file source for GenericRecord
FileSource<GenericRecord> source = FileSource
.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(schema),
new Path("/data/products")
)
.build();
DataStream<GenericRecord> productStream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"product-parquet-source"
);
// Process GenericRecord
productStream.map(record -> {
Long id = (Long) record.get("id");
String name = record.get("name").toString();
Double price = (Double) record.get("price");
return new ProcessedProduct(id, name, price);
});// POJO class for reflection-based reading
public class Event {
public long timestamp;
public String eventType;
public Map<String, Object> properties;
// Default constructor required
public Event() {}
// Constructor, getters, setters...
}
// Create file source using reflection
FileSource<Event> source = FileSource
.forRecordStreamFormat(
AvroParquetReaders.forReflectRecord(Event.class),
new Path("/events/parquet")
)
.build();import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.flink.connector.file.sink.FileSink;
// Create FileSink for writing User records
FileSink<User> sink = FileSink
.forBulkFormat(
new Path("/output/users"),
AvroParquetWriters.forSpecificRecord(User.class)
)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(10))
.withInactivityInterval(Duration.ofMinutes(2))
.build()
)
.build();
// Write user stream to Parquet
userStream.sinkTo(sink);// Create GenericRecord instances
DataStream<GenericRecord> genericStream = originalStream.map(data -> {
GenericRecord record = new GenericData.Record(schema);
record.put("id", data.getId());
record.put("name", data.getName());
record.put("price", data.getPrice());
record.put("category", data.getCategory());
return record;
});
// Create sink for GenericRecord
FileSink<GenericRecord> genericSink = FileSink
.forBulkFormat(
new Path("/output/products"),
AvroParquetWriters.forGenericRecord(schema)
)
.build();
genericStream.sinkTo(genericSink);import org.apache.avro.Schema;
// Handle schema evolution by reading with newer schema
Schema oldSchema = getOldSchema();
Schema newSchema = getNewSchemaWithDefaults();
// Reader will handle missing fields using defaults
FileSource<GenericRecord> evolutionSource = FileSource
.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(newSchema),
new Path("/data/evolved-records")
)
.build();
DataStream<GenericRecord> evolvedStream = env.fromSource(
evolutionSource,
WatermarkStrategy.noWatermarks(),
"evolved-source"
);// Avro schema with nested types
String nestedSchema = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customer", "type": {
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}},
{"name": "items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unitPrice", "type": "double"}
]
}
}}
]
}
""";
Schema orderSchema = new Schema.Parser().parse(nestedSchema);
// Read complex nested structures
FileSource<GenericRecord> orderSource = FileSource
.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(orderSchema),
new Path("/orders/nested")
)
.build();Avro-Parquet integration requires schema information in memory for serialization/deserialization. For GenericRecord usage, schemas should be reused when possible to minimize memory overhead.
SpecificRecord provides compile-time type safety and better performance due to generated code. GenericRecord offers more flexibility but requires runtime type checking.
// Example with schema registry (conceptual)
Schema schema = schemaRegistry.getLatestSchema("user-events");
FileSource<GenericRecord> source = FileSource
.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(schema),
inputPath
)
.build();The Avro integration provides seamless compatibility with existing Avro-based data pipelines while leveraging Parquet's columnar storage benefits for improved query performance.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parquet