CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Pending
Overview
Eval results
Files

avro-integration.mddocs/

Avro Integration

Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.

Capabilities

AvroParquetReaders

Factory methods for creating StreamFormat readers that can read Avro records from Parquet files.

/**
 * Convenience builder to create AvroParquetRecordFormat instances for different Avro record types
 */
@Experimental
public class AvroParquetReaders {
    
    /**
     * Creates a StreamFormat for reading Avro SpecificRecord types from Parquet files
     * @param <T> SpecificRecord type
     * @param typeClass Class of the SpecificRecord to read
     * @return StreamFormat for reading SpecificRecord instances
     */
    public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);
    
    /**
     * Creates a StreamFormat for reading Avro GenericRecord types from Parquet files
     * Requires explicit schema since Flink needs schema information for serialization
     * @param schema Avro schema for the GenericRecord
     * @return StreamFormat for reading GenericRecord instances
     */
    public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);
    
    /**
     * Creates a StreamFormat for reading POJOs from Parquet files using Avro reflection
     * @param <T> POJO type (not SpecificRecord or GenericRecord)
     * @param typeClass Class of the POJO to read via reflection
     * @return StreamFormat for reading POJO instances
     * @throws IllegalArgumentException if typeClass is SpecificRecord or GenericRecord
     */
    public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);
}

AvroParquetWriters

Factory methods for creating ParquetWriterFactory instances that can write Avro records to Parquet files.

/**
 * Convenience builder to create ParquetWriterFactory instances for different Avro types
 */
@Experimental
public class AvroParquetWriters {
    
    /**
     * Creates a ParquetWriterFactory for Avro SpecificRecord types
     * @param <T> SpecificRecord type
     * @param type Class of the SpecificRecord to write
     * @return ParquetWriterFactory for writing SpecificRecord instances
     */
    public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
    
    /**
     * Creates a ParquetWriterFactory for Avro GenericRecord types
     * @param schema Avro schema for the GenericRecord
     * @return ParquetWriterFactory for writing GenericRecord instances
     */
    public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
    
    /**
     * Creates a ParquetWriterFactory for POJOs using Avro reflection
     * @param <T> POJO type
     * @param type Class of the POJO to write via reflection
     * @return ParquetWriterFactory for writing POJO instances
     */
    public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
}

AvroParquetRecordFormat

StreamFormat implementation for reading Avro records from Parquet files with proper type information handling.

/**
 * StreamFormat implementation for reading Avro records from Parquet files
 * @param <E> Avro record type
 */
public class AvroParquetRecordFormat<E> implements StreamFormat<E> {
    
    /**
     * Creates a new AvroParquetRecordFormat
     * @param avroTypeInfo Type information for the Avro records
     * @param dataModelSupplier Supplier for GenericData instance
     */
    public AvroParquetRecordFormat(
        TypeInformation<E> avroTypeInfo, 
        SerializableSupplier<GenericData> dataModelSupplier
    );
    
    /**
     * Creates a reader for the given file split
     * @param config Hadoop configuration
     * @param split File source split to read
     * @return Reader iterator for Avro records
     * @throws IOException if reader creation fails
     */
    public RecordReaderIterator<E> createReader(Configuration config, FileSourceSplit split) throws IOException;
    
    /**
     * Restores a reader from checkpoint state
     * @param config Hadoop configuration  
     * @param split File source split to read
     * @return Reader iterator for Avro records
     * @throws IOException if reader restoration fails
     */
    public RecordReaderIterator<E> restoreReader(Configuration config, FileSourceSplit split) throws IOException;
    
    /**
     * Indicates whether this format supports splitting (it does not)
     * @return false - Parquet files with Avro are not splittable
     */
    public boolean isSplittable();
    
    /**
     * Returns the produced type information
     * @return TypeInformation for the Avro record type
     */
    public TypeInformation<E> getProducedType();
}

Usage Examples

Reading SpecificRecord

import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.core.fs.Path;
import org.apache.avro.specific.SpecificRecordBase;

// Define your Avro SpecificRecord class
public class User extends SpecificRecordBase {
    private String name;
    private int age;
    private String email;
    // ... constructors, getters, setters
}

// Create file source for reading User records
FileSource<User> source = FileSource
    .forRecordStreamFormat(
        AvroParquetReaders.forSpecificRecord(User.class),
        new Path("hdfs://path/to/user/parquet/files")
    )
    .build();

// Create DataStream
DataStream<User> userStream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "user-parquet-source"
);

Reading GenericRecord

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

// Define Avro schema
String schemaString = """
{
  "type": "record",
  "name": "Product",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "price", "type": "double"},
    {"name": "category", "type": ["null", "string"], "default": null}
  ]
}
""";

Schema schema = new Schema.Parser().parse(schemaString);

// Create file source for GenericRecord
FileSource<GenericRecord> source = FileSource
    .forRecordStreamFormat(
        AvroParquetReaders.forGenericRecord(schema),
        new Path("/data/products")
    )
    .build();

DataStream<GenericRecord> productStream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "product-parquet-source"
);

// Process GenericRecord
productStream.map(record -> {
    Long id = (Long) record.get("id");
    String name = record.get("name").toString();
    Double price = (Double) record.get("price");
    return new ProcessedProduct(id, name, price);
});

Reading with Reflection

// POJO class for reflection-based reading
public class Event {
    public long timestamp;
    public String eventType;
    public Map<String, Object> properties;
    
    // Default constructor required
    public Event() {}
    
    // Constructor, getters, setters...
}

// Create file source using reflection
FileSource<Event> source = FileSource
    .forRecordStreamFormat(
        AvroParquetReaders.forReflectRecord(Event.class),
        new Path("/events/parquet")
    )
    .build();

Writing SpecificRecord

import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.flink.connector.file.sink.FileSink;

// Create FileSink for writing User records
FileSink<User> sink = FileSink
    .forBulkFormat(
        new Path("/output/users"),
        AvroParquetWriters.forSpecificRecord(User.class)
    )
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(10))
            .withInactivityInterval(Duration.ofMinutes(2))
            .build()
    )
    .build();

// Write user stream to Parquet
userStream.sinkTo(sink);

Writing GenericRecord

// Create GenericRecord instances
DataStream<GenericRecord> genericStream = originalStream.map(data -> {
    GenericRecord record = new GenericData.Record(schema);
    record.put("id", data.getId());
    record.put("name", data.getName());
    record.put("price", data.getPrice());
    record.put("category", data.getCategory());
    return record;
});

// Create sink for GenericRecord
FileSink<GenericRecord> genericSink = FileSink
    .forBulkFormat(
        new Path("/output/products"),
        AvroParquetWriters.forGenericRecord(schema)
    )
    .build();

genericStream.sinkTo(genericSink);

Schema Evolution Handling

import org.apache.avro.Schema;

// Handle schema evolution by reading with newer schema
Schema oldSchema = getOldSchema();
Schema newSchema = getNewSchemaWithDefaults();

// Reader will handle missing fields using defaults
FileSource<GenericRecord> evolutionSource = FileSource
    .forRecordStreamFormat(
        AvroParquetReaders.forGenericRecord(newSchema),
        new Path("/data/evolved-records")
    )
    .build();

DataStream<GenericRecord> evolvedStream = env.fromSource(
    evolutionSource,
    WatermarkStrategy.noWatermarks(),
    "evolved-source"
);

Complex Nested Types

// Avro schema with nested types
String nestedSchema = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "string"},
    {"name": "customer", "type": {
      "type": "record",
      "name": "Customer", 
      "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"}
      ]
    }},
    {"name": "items", "type": {
      "type": "array",
      "items": {
        "type": "record",
        "name": "OrderItem",
        "fields": [
          {"name": "productId", "type": "string"},
          {"name": "quantity", "type": "int"},
          {"name": "unitPrice", "type": "double"}
        ]
      }
    }}
  ]
}
""";

Schema orderSchema = new Schema.Parser().parse(nestedSchema);

// Read complex nested structures
FileSource<GenericRecord> orderSource = FileSource
    .forRecordStreamFormat(
        AvroParquetReaders.forGenericRecord(orderSchema),
        new Path("/orders/nested")
    )
    .build();

Performance Considerations

Memory Usage

Avro-Parquet integration requires schema information in memory for serialization/deserialization. For GenericRecord usage, schemas should be reused when possible to minimize memory overhead.

Type Safety

SpecificRecord provides compile-time type safety and better performance due to generated code. GenericRecord offers more flexibility but requires runtime type checking.

Schema Registry Integration

// Example with schema registry (conceptual)
Schema schema = schemaRegistry.getLatestSchema("user-events");
FileSource<GenericRecord> source = FileSource
    .forRecordStreamFormat(
        AvroParquetReaders.forGenericRecord(schema),
        inputPath
    )
    .build();

The Avro integration provides seamless compatibility with existing Avro-based data pipelines while leveraging Parquet's columnar storage benefits for improved query performance.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parquet

docs

avro-integration.md

index.md

protobuf-integration.md

rowdata-integration.md

table-integration.md

utilities.md

vectorized-reading.md

writing-support.md

tile.json