Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.
—
Complete Apache Avro integration for Parquet format, supporting specific records, generic records, and reflection-based serialization with full schema compatibility.
Utility class providing convenience methods for creating Parquet writer factories for various Avro data types.
/**
* Convenience builder for creating ParquetWriterFactory instances for Avro types
* Supports specific records, generic records, and reflection-based serialization
*/
public class ParquetAvroWriters {
/**
* Creates a ParquetWriterFactory for Avro specific record types
* Uses the record's built-in schema for Parquet column definition
* @param type The specific record class extending SpecificRecordBase
* @return ParquetWriterFactory configured for the specific record type
*/
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
/**
* Creates a ParquetWriterFactory for Avro generic records
* Uses the provided schema for Parquet column definition and data serialization
* @param schema The Avro schema defining the record structure
* @return ParquetWriterFactory configured for generic records with the given schema
*/
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
/**
* Creates a ParquetWriterFactory using reflection to derive schema from POJO
* Automatically generates Avro schema from Java class structure
* @param type The Java class to use for reflection-based schema generation
* @return ParquetWriterFactory configured for the reflected type
*/
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
}import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.avro.specific.SpecificRecordBase;
// Define Avro specific record (typically generated from .avsc schema)
public class User extends SpecificRecordBase {
public String name;
public int age;
public String email;
// ... getters, setters, and Avro-generated methods
}
// Create writer factory for specific record
ParquetWriterFactory<User> userWriterFactory =
ParquetAvroWriters.forSpecificRecord(User.class);
// Use with FileSink
FileSink<User> userSink = FileSink
.forBulkFormat(new Path("/output/users"), userWriterFactory)
.build();import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;
// Define schema programmatically
String schemaJson = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "long"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
""";
Schema orderSchema = new Schema.Parser().parse(schemaJson);
// Create writer factory for generic records
ParquetWriterFactory<GenericRecord> orderWriterFactory =
ParquetAvroWriters.forGenericRecord(orderSchema);
// Create and populate generic records
GenericRecord order = new GenericData.Record(orderSchema);
order.put("orderId", 12345L);
order.put("customerId", "CUST001");
order.put("amount", 99.95);
order.put("timestamp", System.currentTimeMillis());// Plain Java class (POJO)
public class Product {
private String productId;
private String name;
private double price;
private String category;
// Standard constructors, getters, and setters
public Product() {}
public Product(String productId, String name, double price, String category) {
this.productId = productId;
this.name = name;
this.price = price;
this.category = category;
}
// ... getters and setters
}
// Create writer factory using reflection
ParquetWriterFactory<Product> productWriterFactory =
ParquetAvroWriters.forReflectRecord(Product.class);
// The schema is automatically derived from the class structure
DataStream<Product> productStream = // ... your product data stream
productStream.sinkTo(FileSink
.forBulkFormat(new Path("/output/products"), productWriterFactory)
.build());import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// Complex nested schema with arrays and nested records
String complexSchemaJson = """
{
"type": "record",
"name": "Transaction",
"fields": [
{"name": "transactionId", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "customer", "type": {
"type": "record",
"name": "Customer",
"fields": [
{"name": "customerId", "type": "string"},
{"name": "name", "type": "string"},
{"name": "tier", "type": {"type": "enum", "name": "Tier", "symbols": ["BRONZE", "SILVER", "GOLD"]}}
]
}},
{"name": "items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "Item",
"fields": [
{"name": "itemId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unitPrice", "type": "double"}
]
}
}},
{"name": "totalAmount", "type": "double"}
]
}
""";
Schema transactionSchema = new Schema.Parser().parse(complexSchemaJson);
ParquetWriterFactory<GenericRecord> complexWriterFactory =
ParquetAvroWriters.forGenericRecord(transactionSchema);// Original schema v1
String schemaV1 = """
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "string"},
{"name": "eventType", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
""";
// Evolved schema v2 with optional field
String schemaV2 = """
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "userId", "type": "string"},
{"name": "eventType", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "sessionId", "type": ["null", "string"], "default": null}
]
}
""";
// Both schemas can be used for reading/writing with Avro's compatibility rules
Schema schemaV1Parsed = new Schema.Parser().parse(schemaV1);
Schema schemaV2Parsed = new Schema.Parser().parse(schemaV2);
ParquetWriterFactory<GenericRecord> writerV2 =
ParquetAvroWriters.forGenericRecord(schemaV2Parsed);import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.reflect.ReflectData;
// The writers automatically use appropriate data models:
// - forSpecificRecord() uses SpecificData.get()
// - forGenericRecord() uses GenericData.get()
// - forReflectRecord() uses ReflectData.get()
// For custom data model configurations, you may need to modify
// the underlying AvroParquetWriter configuration (advanced usage)// For high-throughput scenarios with specific records
ParquetWriterFactory<MySpecificRecord> optimizedFactory =
ParquetAvroWriters.forSpecificRecord(MySpecificRecord.class);
// Configure the underlying Parquet settings through FileSink
FileSink<MySpecificRecord> optimizedSink = FileSink
.forBulkFormat(outputPath, optimizedFactory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(256))
.build())
.build();Common exceptions and troubleshooting:
try {
ParquetWriterFactory<GenericRecord> factory =
ParquetAvroWriters.forGenericRecord(schema);
} catch (Exception e) {
// Schema parsing errors, invalid schema format
}
try {
ParquetWriterFactory<MyPOJO> factory =
ParquetAvroWriters.forReflectRecord(MyPOJO.class);
} catch (Exception e) {
// Reflection errors, unsupported field types,
// missing default constructors
}
// Runtime writing errors
try {
bulkWriter.addElement(avroRecord);
} catch (IOException e) {
// File system errors, serialization issues
} catch (ClassCastException e) {
// Type mismatch between record and expected schema
}For production environments using schema registries:
// Conceptual integration with Confluent Schema Registry
// (requires additional dependencies and configuration)
/*
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
SchemaRegistryClient schemaRegistry = // ... initialize client
Schema schema = schemaRegistry.getByID(schemaId);
ParquetWriterFactory<GenericRecord> registryBasedFactory =
ParquetAvroWriters.forGenericRecord(schema);
*/Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12