Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-parquet@2.1.00
# Apache Flink Parquet Format
1
2
Apache Flink Parquet format support providing high-performance columnar file reading and writing capabilities for both batch and streaming applications. This library enables efficient processing of Apache Parquet files within the Flink ecosystem, offering vectorized reading, multiple serialization format support, and seamless integration with Flink's Table API and DataStream API.
3
4
## Package Information
5
6
- **Package Name**: flink-parquet
7
- **Package Type**: Maven
8
- **Group ID**: org.apache.flink
9
- **Artifact ID**: flink-parquet
10
- **Language**: Java
11
- **Installation**: Add to Maven pom.xml:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-parquet</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.formats.parquet.ParquetFileFormatFactory;
25
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
26
import org.apache.flink.formats.parquet.ParquetWriterFactory;
27
import org.apache.flink.formats.parquet.ParquetBuilder;
28
```
29
30
For Avro integration:
31
32
```java
33
import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
34
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
35
```
36
37
For RowData integration:
38
39
```java
40
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
41
```
42
43
For Protocol Buffers integration:
44
45
```java
46
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
47
```
48
49
## Basic Usage
50
51
### Reading Parquet Files
52
53
```java
54
import org.apache.flink.formats.parquet.avro.AvroParquetReaders;
55
import org.apache.flink.connector.file.src.FileSource;
56
import org.apache.flink.core.fs.Path;
57
58
// Read Avro records from Parquet
59
FileSource<SpecificRecord> source = FileSource
60
.forRecordStreamFormat(
61
AvroParquetReaders.forSpecificRecord(MyAvroRecord.class),
62
new Path("path/to/parquet/files")
63
)
64
.build();
65
66
DataStream<SpecificRecord> stream = env.fromSource(
67
source,
68
WatermarkStrategy.noWatermarks(),
69
"parquet-source"
70
);
71
```
72
73
### Writing Parquet Files
74
75
```java
76
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
77
import org.apache.flink.connector.file.sink.FileSink;
78
79
// Write Avro records to Parquet
80
FileSink<SpecificRecord> sink = FileSink
81
.forBulkFormat(
82
new Path("path/to/output"),
83
AvroParquetWriters.forSpecificRecord(MyAvroRecord.class)
84
)
85
.build();
86
87
stream.sinkTo(sink);
88
```
89
90
### Table API Integration
91
92
```java
93
import org.apache.flink.table.api.TableEnvironment;
94
95
// Create Parquet table
96
tableEnv.executeSql("""
97
CREATE TABLE parquet_table (
98
id BIGINT,
99
name STRING,
100
timestamp_col TIMESTAMP(3)
101
) WITH (
102
'connector' = 'filesystem',
103
'path' = 'path/to/parquet/files',
104
'format' = 'parquet'
105
)
106
""");
107
```
108
109
## Architecture
110
111
The Flink Parquet module is built around several key architectural components:
112
113
- **Format Factory**: `ParquetFileFormatFactory` provides the main entry point for Table API integration, creating bulk reading/writing formats
114
- **Vectorized Reading**: High-performance columnar readers that process multiple rows in batches for improved throughput
115
- **Multi-Format Support**: Seamless integration with Avro, Protocol Buffers, and Flink's native RowData serialization
116
- **Schema Conversion**: Automatic conversion between Flink types and Parquet schema with support for nested data structures
117
- **Statistics Integration**: Built-in support for collecting and reporting file-level statistics for query optimization
118
119
The design enables efficient large-scale data processing by leveraging Parquet's columnar storage format while maintaining full compatibility with Flink's streaming and batch processing capabilities.
120
121
## Capabilities
122
123
### Table API Format Factory
124
125
Primary integration point for Flink's Table API, providing format factories for reading and writing Parquet files with comprehensive configuration options.
126
127
```java { .api }
128
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
129
public BulkDecodingFormat<RowData> createDecodingFormat(
130
DynamicTableFactory.Context context,
131
ReadableConfig formatOptions
132
);
133
134
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
135
DynamicTableFactory.Context context,
136
ReadableConfig formatOptions
137
);
138
}
139
```
140
141
[Table Integration](./table-integration.md)
142
143
### Writing Support
144
145
Factory-based writers for creating Parquet files from various data formats, with support for custom ParquetWriter configurations and bulk writing operations.
146
147
```java { .api }
148
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
149
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
150
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
151
}
152
153
@FunctionalInterface
154
public interface ParquetBuilder<T> {
155
ParquetWriter<T> createWriter(OutputFile out) throws IOException;
156
}
157
```
158
159
[Writing Support](./writing-support.md)
160
161
### Avro Integration
162
163
Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.
164
165
```java { .api }
166
public class AvroParquetReaders {
167
public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);
168
public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);
169
public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);
170
}
171
172
public class AvroParquetWriters {
173
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
174
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
175
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
176
}
177
```
178
179
[Avro Integration](./avro-integration.md)
180
181
### RowData Integration
182
183
Native Flink RowData support for optimal performance in Table API and SQL operations, with automatic schema conversion and type mapping.
184
185
```java { .api }
186
public class ParquetRowDataBuilder {
187
public static ParquetWriterFactory<RowData> createWriterFactory(
188
RowType rowType,
189
Configuration conf,
190
boolean utcTimestamp
191
);
192
}
193
194
public class ParquetColumnarRowInputFormat<SplitT> extends ParquetVectorizedInputFormat<RowData, SplitT>
195
implements FileBasedStatisticsReportableInputFormat {
196
public static ParquetColumnarRowInputFormat<FileSourceSplit> createPartitionedFormat(
197
Configuration conf,
198
RowType producedRowType,
199
TypeInformation<RowData> producedTypeInfo,
200
List<String> partitionKeys,
201
String defaultPartName,
202
int batchSize,
203
boolean utcTimestamp,
204
boolean caseSensitive
205
);
206
}
207
```
208
209
[RowData Integration](./rowdata-integration.md)
210
211
### Protocol Buffers Integration
212
213
Native support for Google Protocol Buffers messages stored in Parquet format, enabling efficient serialization of strongly-typed protobuf data.
214
215
```java { .api }
216
public class ParquetProtoWriters {
217
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
218
}
219
```
220
221
[Protocol Buffers Integration](./protobuf-integration.md)
222
223
### Vectorized Reading
224
225
High-performance vectorized readers that process data in columnar batches, supporting various column types and nested data structures for optimal throughput.
226
227
```java { .api }
228
public abstract class ParquetVectorizedInputFormat<T, SplitT> implements FileInputFormat<T, SplitT> {
229
public RecordReaderIterator<T> createReader(Configuration config, SplitT split) throws IOException;
230
public boolean isSplittable();
231
}
232
233
@FunctionalInterface
234
public interface ColumnBatchFactory<SplitT> {
235
VectorizedColumnBatch create(SplitT split, ColumnVector[] vectors);
236
static ColumnBatchFactory<FileSourceSplit> withoutExtraFields();
237
}
238
```
239
240
[Vectorized Reading](./vectorized-reading.md)
241
242
### Utilities and Schema Conversion
243
244
Utility classes for schema conversion, configuration management, and statistics reporting, enabling seamless integration between Flink and Parquet type systems.
245
246
```java { .api }
247
public class ParquetSchemaConverter {
248
public static MessageType convertToParquetMessageType(
249
String name,
250
RowType rowType,
251
Configuration conf
252
);
253
254
public static Type convertToParquetType(
255
String name,
256
LogicalType logicalType,
257
Configuration conf
258
);
259
}
260
261
public class SerializableConfiguration implements Serializable {
262
public SerializableConfiguration(Configuration configuration);
263
public Configuration conf();
264
}
265
```
266
267
[Utilities](./utilities.md)
268
269
## Configuration Options
270
271
```java { .api }
272
// Format factory configuration constants
273
public static final ConfigOption<Boolean> UTC_TIMEZONE; // default: false
274
public static final ConfigOption<String> TIMESTAMP_TIME_UNIT; // default: "micros"
275
public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP; // default: false
276
public static final ConfigOption<Integer> BATCH_SIZE; // default: 2048
277
```
278
279
## Common Types
280
281
### BulkWriter Factory
282
283
```java { .api }
284
public interface BulkWriter<T> {
285
void addElement(T element) throws IOException;
286
void flush() throws IOException;
287
void finish() throws IOException;
288
289
interface Factory<T> extends Serializable {
290
BulkWriter<T> create(FSDataOutputStream out) throws IOException;
291
}
292
}
293
```
294
295
### Stream Format
296
297
```java { .api }
298
public interface StreamFormat<T> extends Serializable {
299
Reader<T> createReader(Configuration config, FileSourceSplit split) throws IOException;
300
Reader<T> restoreReader(Configuration config, FileSourceSplit split) throws IOException;
301
boolean isSplittable();
302
TypeInformation<T> getProducedType();
303
}
304
```