Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications
—
Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.
Generic factory class for creating BulkWriter instances that wrap ParquetWriter functionality.
/**
* Factory for creating Parquet BulkWriter instances
* @param <T> Type of records to write
*/
@PublicEvolving
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
/**
* Creates a new ParquetWriterFactory
* @param writerBuilder ParquetBuilder that creates configured ParquetWriter instances
*/
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
/**
* Creates a BulkWriter instance for the given output stream
* @param out FSDataOutputStream to write to
* @return BulkWriter instance wrapping a ParquetWriter
* @throws IOException if writer creation fails
*/
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}Functional interface for creating configured ParquetWriter instances with custom settings.
/**
* Functional interface for creating ParquetWriter instances
* @param <T> Type of records to write
*/
@FunctionalInterface
public interface ParquetBuilder<T> {
/**
* Creates and configures a ParquetWriter for the given output file
* @param out OutputFile to write to
* @return Configured ParquetWriter instance
* @throws IOException if writer creation fails
*/
ParquetWriter<T> createWriter(OutputFile out) throws IOException;
}BulkWriter implementation that wraps ParquetWriter with Flink's bulk writing interface.
/**
* BulkWriter implementation wrapping ParquetWriter
* @param <T> Type of records to write
*/
@PublicEvolving
public class ParquetBulkWriter<T> implements BulkWriter<T> {
/**
* Creates a new ParquetBulkWriter
* @param writer ParquetWriter instance to wrap
*/
public ParquetBulkWriter(ParquetWriter<T> writer);
/**
* Writes an element to the Parquet file
* @param element Element to write
* @throws IOException if writing fails
*/
public void addElement(T element) throws IOException;
/**
* Flushes any buffered data (no-op for Parquet)
* @throws IOException if flush fails
*/
public void flush() throws IOException;
/**
* Finishes writing and closes the ParquetWriter
* @throws IOException if finishing fails
*/
public void finish() throws IOException;
}Internal OutputFile implementation for Flink's streaming file system abstraction.
/**
* OutputFile implementation for streaming file systems
*/
@Internal
public class StreamOutputFile implements OutputFile {
/**
* Creates a new StreamOutputFile
* @param out FSDataOutputStream to write to
*/
public StreamOutputFile(FSDataOutputStream out);
/**
* Creates a position output stream for writing
* @return PositionOutputStream for writing data
* @throws IOException if stream creation fails
*/
public PositionOutputStream create() throws IOException;
/**
* Creates a position output stream in overwrite mode
* @return PositionOutputStream for writing data
* @throws IOException if stream creation fails
*/
public PositionOutputStream createOrOverwrite() throws IOException;
/**
* Checks if the output file supports block size setting
* @return false (not supported for streaming)
*/
public boolean supportsBlockSize();
/**
* Gets the default block size (not applicable)
* @return 0
*/
public long defaultBlockSize();
}import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.ParquetBuilder;
import org.apache.parquet.hadoop.ParquetWriter;
// Create a custom ParquetBuilder
ParquetBuilder<MyRecord> builder = (OutputFile out) -> {
return MyRecordParquetWriter.builder(out)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(1024 * 1024)
.withRowGroupSize(128 * 1024 * 1024)
.build();
};
// Create factory
ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(builder);import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
// Create FileSink with Parquet writer factory
FileSink<MyRecord> sink = FileSink
.forBulkFormat(new Path("/output/path"), writerFactory)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(128))
.build()
)
.build();
// Use in DataStream
dataStream.sinkTo(sink);import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.ParquetWriter;
// Builder with custom Parquet settings
ParquetBuilder<MyRecord> customBuilder = (OutputFile out) -> {
return MyRecordParquetWriter.builder(out)
.withCompressionCodec(CompressionCodecName.GZIP)
.withDictionaryEncoding(true)
.withValidation(false)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withPageSize(2 * 1024 * 1024) // 2MB pages
.withRowGroupSize(256 * 1024 * 1024) // 256MB row groups
.build();
};
ParquetWriterFactory<MyRecord> factory = new ParquetWriterFactory<>(customBuilder);import org.apache.flink.api.common.functions.MapFunction;
// Handle writing errors gracefully
dataStream
.map(new MapFunction<InputType, MyRecord>() {
@Override
public MyRecord map(InputType input) throws Exception {
try {
return convertToMyRecord(input);
} catch (Exception e) {
// Log error and handle appropriately
LOG.warn("Failed to convert record: " + input, e);
return null; // or default value
}
}
})
.filter(Objects::nonNull) // Remove failed conversions
.sinkTo(sink);import org.apache.flink.streaming.api.CheckpointingMode;
// Configure checkpointing for reliable writing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Checkpoint every 30 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
// FileSink automatically handles checkpointing
dataStream.sinkTo(parquetSink);// Optimize row group size based on your data
ParquetBuilder<T> builder = (out) ->
writerBuilder(out)
.withRowGroupSize(128 * 1024 * 1024) // 128MB - good for analytics
.build();// Choose compression based on use case
CompressionCodecName compression;
// For write-heavy workloads (faster compression)
compression = CompressionCodecName.SNAPPY;
// For read-heavy workloads (better compression ratio)
compression = CompressionCodecName.GZIP;
// For balanced performance
compression = CompressionCodecName.LZ4;// Configure page size for memory efficiency
ParquetBuilder<T> builder = (out) ->
writerBuilder(out)
.withPageSize(1024 * 1024) // 1MB pages
.withDictionaryPageSize(512 * 1024) // 512KB dictionary pages
.build();The writing support provides flexible factory patterns that integrate seamlessly with Flink's bulk writing infrastructure while giving full control over Parquet-specific configurations.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parquet