CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-parquet-2-12

Apache Flink SQL Parquet format support package that provides SQL client integration for reading and writing Parquet files in Flink applications.

Pending
Overview
Eval results
Files

avro-integration.mddocs/

Avro Integration

Complete Apache Avro integration for Parquet format, supporting specific records, generic records, and reflection-based serialization with full schema compatibility.

Capabilities

ParquetAvroWriters

Utility class providing convenience methods for creating Parquet writer factories for various Avro data types.

/**
 * Convenience builder for creating ParquetWriterFactory instances for Avro types
 * Supports specific records, generic records, and reflection-based serialization
 */
public class ParquetAvroWriters {
    
    /**
     * Creates a ParquetWriterFactory for Avro specific record types
     * Uses the record's built-in schema for Parquet column definition
     * @param type The specific record class extending SpecificRecordBase
     * @return ParquetWriterFactory configured for the specific record type
     */
    public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
    
    /**
     * Creates a ParquetWriterFactory for Avro generic records
     * Uses the provided schema for Parquet column definition and data serialization
     * @param schema The Avro schema defining the record structure
     * @return ParquetWriterFactory configured for generic records with the given schema
     */
    public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
    
    /**
     * Creates a ParquetWriterFactory using reflection to derive schema from POJO
     * Automatically generates Avro schema from Java class structure
     * @param type The Java class to use for reflection-based schema generation
     * @return ParquetWriterFactory configured for the reflected type
     */
    public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
}

Usage Examples

Specific Record Integration

import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.avro.specific.SpecificRecordBase;

// Define Avro specific record (typically generated from .avsc schema)
public class User extends SpecificRecordBase {
    public String name;
    public int age;
    public String email;
    // ... getters, setters, and Avro-generated methods
}

// Create writer factory for specific record
ParquetWriterFactory<User> userWriterFactory = 
    ParquetAvroWriters.forSpecificRecord(User.class);

// Use with FileSink
FileSink<User> userSink = FileSink
    .forBulkFormat(new Path("/output/users"), userWriterFactory)
    .build();

Generic Record Usage

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;

// Define schema programmatically
String schemaJson = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "long"},
    {"name": "customerId", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "timestamp", "type": "long"}
  ]
}
""";

Schema orderSchema = new Schema.Parser().parse(schemaJson);

// Create writer factory for generic records
ParquetWriterFactory<GenericRecord> orderWriterFactory = 
    ParquetAvroWriters.forGenericRecord(orderSchema);

// Create and populate generic records
GenericRecord order = new GenericData.Record(orderSchema);
order.put("orderId", 12345L);
order.put("customerId", "CUST001");
order.put("amount", 99.95);
order.put("timestamp", System.currentTimeMillis());

Reflection-Based Usage

// Plain Java class (POJO)
public class Product {
    private String productId;
    private String name;
    private double price;
    private String category;
    
    // Standard constructors, getters, and setters
    public Product() {}
    
    public Product(String productId, String name, double price, String category) {
        this.productId = productId;
        this.name = name;
        this.price = price;
        this.category = category;
    }
    
    // ... getters and setters
}

// Create writer factory using reflection
ParquetWriterFactory<Product> productWriterFactory = 
    ParquetAvroWriters.forReflectRecord(Product.class);

// The schema is automatically derived from the class structure
DataStream<Product> productStream = // ... your product data stream
productStream.sinkTo(FileSink
    .forBulkFormat(new Path("/output/products"), productWriterFactory)
    .build());

Complex Schema Integration

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

// Complex nested schema with arrays and nested records
String complexSchemaJson = """
{
  "type": "record",
  "name": "Transaction",
  "fields": [
    {"name": "transactionId", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "customer", "type": {
      "type": "record",
      "name": "Customer",
      "fields": [
        {"name": "customerId", "type": "string"},
        {"name": "name", "type": "string"},
        {"name": "tier", "type": {"type": "enum", "name": "Tier", "symbols": ["BRONZE", "SILVER", "GOLD"]}}
      ]
    }},
    {"name": "items", "type": {
      "type": "array",
      "items": {
        "type": "record",
        "name": "Item",
        "fields": [
          {"name": "itemId", "type": "string"},
          {"name": "quantity", "type": "int"},
          {"name": "unitPrice", "type": "double"}
        ]
      }
    }},
    {"name": "totalAmount", "type": "double"}
  ]
}
""";

Schema transactionSchema = new Schema.Parser().parse(complexSchemaJson);
ParquetWriterFactory<GenericRecord> complexWriterFactory = 
    ParquetAvroWriters.forGenericRecord(transactionSchema);

Schema Evolution and Compatibility

// Original schema v1
String schemaV1 = """
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "userId", "type": "string"},
    {"name": "eventType", "type": "string"},
    {"name": "timestamp", "type": "long"}
  ]
}
""";

// Evolved schema v2 with optional field
String schemaV2 = """
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "userId", "type": "string"},
    {"name": "eventType", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "sessionId", "type": ["null", "string"], "default": null}
  ]
}
""";

// Both schemas can be used for reading/writing with Avro's compatibility rules
Schema schemaV1Parsed = new Schema.Parser().parse(schemaV1);
Schema schemaV2Parsed = new Schema.Parser().parse(schemaV2);

ParquetWriterFactory<GenericRecord> writerV2 = 
    ParquetAvroWriters.forGenericRecord(schemaV2Parsed);

Advanced Configuration

Custom Avro Data Models

import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.reflect.ReflectData;

// The writers automatically use appropriate data models:
// - forSpecificRecord() uses SpecificData.get()
// - forGenericRecord() uses GenericData.get() 
// - forReflectRecord() uses ReflectData.get()

// For custom data model configurations, you may need to modify
// the underlying AvroParquetWriter configuration (advanced usage)

Performance Considerations

// For high-throughput scenarios with specific records
ParquetWriterFactory<MySpecificRecord> optimizedFactory = 
    ParquetAvroWriters.forSpecificRecord(MySpecificRecord.class);

// Configure the underlying Parquet settings through FileSink
FileSink<MySpecificRecord> optimizedSink = FileSink
    .forBulkFormat(outputPath, optimizedFactory)
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withRolloverInterval(Duration.ofMinutes(15))
        .withInactivityInterval(Duration.ofMinutes(5))
        .withMaxPartSize(MemorySize.ofMebiBytes(256))
        .build())
    .build();

Error Handling

Common exceptions and troubleshooting:

try {
    ParquetWriterFactory<GenericRecord> factory = 
        ParquetAvroWriters.forGenericRecord(schema);
} catch (Exception e) {
    // Schema parsing errors, invalid schema format
}

try {
    ParquetWriterFactory<MyPOJO> factory = 
        ParquetAvroWriters.forReflectRecord(MyPOJO.class);
} catch (Exception e) {
    // Reflection errors, unsupported field types, 
    // missing default constructors
}

// Runtime writing errors
try {
    bulkWriter.addElement(avroRecord);
} catch (IOException e) {
    // File system errors, serialization issues
} catch (ClassCastException e) {
    // Type mismatch between record and expected schema
}

Schema Registry Integration

For production environments using schema registries:

// Conceptual integration with Confluent Schema Registry
// (requires additional dependencies and configuration)

/*
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

SchemaRegistryClient schemaRegistry = // ... initialize client
Schema schema = schemaRegistry.getByID(schemaId);

ParquetWriterFactory<GenericRecord> registryBasedFactory = 
    ParquetAvroWriters.forGenericRecord(schema);
*/

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-parquet-2-12

docs

avro-integration.md

format-factory.md

index.md

protobuf-integration.md

rowdata-writers.md

schema-utilities.md

vectorized-input.md

tile.json