CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parquet

Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications

Pending
Overview
Eval results
Files

writing-support.mddocs/

Writing Support

Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.

Capabilities

ParquetWriterFactory

Generic factory class for creating BulkWriter instances that wrap ParquetWriter functionality.

/**
 * Factory for creating Parquet BulkWriter instances
 * @param <T> Type of records to write
 */
@PublicEvolving
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
    
    /**
     * Creates a new ParquetWriterFactory
     * @param writerBuilder ParquetBuilder that creates configured ParquetWriter instances
     */
    public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
    
    /**
     * Creates a BulkWriter instance for the given output stream
     * @param out FSDataOutputStream to write to
     * @return BulkWriter instance wrapping a ParquetWriter
     * @throws IOException if writer creation fails
     */
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}

ParquetBuilder Interface

Functional interface for creating configured ParquetWriter instances with custom settings.

/**
 * Functional interface for creating ParquetWriter instances
 * @param <T> Type of records to write
 */
@FunctionalInterface
public interface ParquetBuilder<T> {
    
    /**
     * Creates and configures a ParquetWriter for the given output file
     * @param out OutputFile to write to
     * @return Configured ParquetWriter instance
     * @throws IOException if writer creation fails
     */
    ParquetWriter<T> createWriter(OutputFile out) throws IOException;
}

ParquetBulkWriter

BulkWriter implementation that wraps ParquetWriter with Flink's bulk writing interface.

/**
 * BulkWriter implementation wrapping ParquetWriter
 * @param <T> Type of records to write
 */
@PublicEvolving
public class ParquetBulkWriter<T> implements BulkWriter<T> {
    
    /**
     * Creates a new ParquetBulkWriter
     * @param writer ParquetWriter instance to wrap
     */
    public ParquetBulkWriter(ParquetWriter<T> writer);
    
    /**
     * Writes an element to the Parquet file
     * @param element Element to write
     * @throws IOException if writing fails
     */
    public void addElement(T element) throws IOException;
    
    /**
     * Flushes any buffered data (no-op for Parquet)
     * @throws IOException if flush fails
     */
    public void flush() throws IOException;
    
    /**
     * Finishes writing and closes the ParquetWriter
     * @throws IOException if finishing fails
     */
    public void finish() throws IOException;
}

StreamOutputFile

Internal OutputFile implementation for Flink's streaming file system abstraction.

/**
 * OutputFile implementation for streaming file systems
 */
@Internal
public class StreamOutputFile implements OutputFile {
    
    /**
     * Creates a new StreamOutputFile
     * @param out FSDataOutputStream to write to
     */
    public StreamOutputFile(FSDataOutputStream out);
    
    /**
     * Creates a position output stream for writing
     * @return PositionOutputStream for writing data
     * @throws IOException if stream creation fails
     */
    public PositionOutputStream create() throws IOException;
    
    /**
     * Creates a position output stream in overwrite mode
     * @return PositionOutputStream for writing data
     * @throws IOException if stream creation fails
     */
    public PositionOutputStream createOrOverwrite() throws IOException;
    
    /**
     * Checks if the output file supports block size setting
     * @return false (not supported for streaming)
     */
    public boolean supportsBlockSize();
    
    /**
     * Gets the default block size (not applicable)
     * @return 0
     */
    public long defaultBlockSize();
}

Usage Examples

Basic Writer Factory

import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.ParquetBuilder;
import org.apache.parquet.hadoop.ParquetWriter;

// Create a custom ParquetBuilder
ParquetBuilder<MyRecord> builder = (OutputFile out) -> {
    return MyRecordParquetWriter.builder(out)
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withPageSize(1024 * 1024)
        .withRowGroupSize(128 * 1024 * 1024)
        .build();
};

// Create factory
ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(builder);

File Sink Integration

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;

// Create FileSink with Parquet writer factory
FileSink<MyRecord> sink = FileSink
    .forBulkFormat(new Path("/output/path"), writerFactory)
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(128))
            .build()
    )
    .build();

// Use in DataStream
dataStream.sinkTo(sink);

Custom Configuration Example

import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.ParquetWriter;

// Builder with custom Parquet settings
ParquetBuilder<MyRecord> customBuilder = (OutputFile out) -> {
    return MyRecordParquetWriter.builder(out)
        .withCompressionCodec(CompressionCodecName.GZIP)
        .withDictionaryEncoding(true)
        .withValidation(false)
        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
        .withPageSize(2 * 1024 * 1024)    // 2MB pages
        .withRowGroupSize(256 * 1024 * 1024) // 256MB row groups
        .build();
};

ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(customBuilder);

Error Handling

import org.apache.flink.api.common.functions.MapFunction;

// Handle writing errors gracefully
dataStream
    .map(new MapFunction<InputType, MyRecord>() {
        @Override
        public MyRecord map(InputType input) throws Exception {
            try {
                return convertToMyRecord(input);
            } catch (Exception e) {
                // Log error and handle appropriately
                LOG.warn("Failed to convert record: " + input, e);
                return null; // or default value
            }
        }
    })
    .filter(Objects::nonNull) // Remove failed conversions
    .sinkTo(sink);

Batch Writing with Checkpointing

import org.apache.flink.streaming.api.CheckpointingMode;

// Configure checkpointing for reliable writing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Checkpoint every 30 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);

// FileSink automatically handles checkpointing
dataStream.sinkTo(parquetSink);

Performance Considerations

Row Group Sizing

// Optimize row group size based on your data
ParquetBuilder<T> builder = (out) -> 
    writerBuilder(out)
        .withRowGroupSize(128 * 1024 * 1024) // 128MB - good for analytics
        .build();

Compression Selection

// Choose compression based on use case
CompressionCodecName compression;

// For write-heavy workloads (faster compression)
compression = CompressionCodecName.SNAPPY;

// For read-heavy workloads (better compression ratio)  
compression = CompressionCodecName.GZIP;

// For balanced performance
compression = CompressionCodecName.LZ4;

Memory Management

// Configure page size for memory efficiency
ParquetBuilder<T> builder = (out) ->
    writerBuilder(out)
        .withPageSize(1024 * 1024)      // 1MB pages
        .withDictionaryPageSize(512 * 1024) // 512KB dictionary pages
        .build();

The writing support provides flexible factory patterns that integrate seamlessly with Flink's bulk writing infrastructure while giving full control over Parquet-specific configurations.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parquet

docs

avro-integration.md

index.md

protobuf-integration.md

rowdata-integration.md

table-integration.md

utilities.md

vectorized-reading.md

writing-support.md

tile.json