0
# Flink SQL Parquet
1
2
Apache Flink SQL Parquet provides comprehensive support for reading and writing Parquet files in Flink SQL applications. This package bundles Parquet format libraries with proper shading configuration to avoid dependency conflicts in SQL client environments, supporting both batch and streaming data processing workflows.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-parquet_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Maven dependency `org.apache.flink:flink-sql-parquet_2.12:1.14.6`
10
11
## Core Imports
12
13
```java
14
import org.apache.flink.formats.parquet.ParquetFileFormatFactory;
15
import org.apache.flink.formats.parquet.ParquetWriterFactory;
16
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
17
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
18
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
19
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
20
```
21
22
## Basic Usage
23
24
```java
25
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
26
import org.apache.flink.formats.parquet.ParquetWriterFactory;
27
import org.apache.flink.table.types.logical.RowType;
28
import org.apache.flink.table.data.RowData;
29
import org.apache.flink.connector.file.sink.FileSink;
30
import org.apache.flink.core.fs.Path;
31
import org.apache.hadoop.conf.Configuration;
32
import org.apache.flink.table.types.logical.LogicalType;
33
34
// Create a Parquet writer factory for RowData
35
RowType rowType = RowType.of(/* field types */);
36
Configuration hadoopConfig = new Configuration();
37
boolean utcTimezone = false;
38
39
ParquetWriterFactory<RowData> writerFactory =
40
ParquetRowDataBuilder.createWriterFactory(rowType, hadoopConfig, utcTimezone);
41
42
// Use with Flink's file sink
43
Path outputPath = new Path("/output/path");
44
FileSink<RowData> sink = FileSink
45
.forBulkFormat(outputPath, writerFactory)
46
.build();
47
```
48
49
## Architecture
50
51
Flink SQL Parquet is organized around several key components:
52
53
- **Format Factory**: SQL table factory integration for automatic format detection
54
- **Writer APIs**: Multiple writer implementations for different data types (RowData, Avro, Protobuf)
55
- **Input Formats**: Vectorized and columnar input formats for high-performance reading
56
- **Schema Conversion**: Utilities for converting between Flink and Parquet schemas
57
- **Vectorized Processing**: Column-oriented data processing for improved performance
58
59
## Capabilities
60
61
### Format Factory Integration
62
63
SQL table factory that automatically integrates Parquet format with Flink's table ecosystem. Provides transparent support for CREATE TABLE statements with Parquet format.
64
65
```java { .api }
66
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
67
public BulkDecodingFormat<RowData> createDecodingFormat(
68
DynamicTableFactory.Context context,
69
ReadableConfig formatOptions
70
);
71
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
72
DynamicTableFactory.Context context,
73
ReadableConfig formatOptions
74
);
75
public String factoryIdentifier();
76
}
77
```
78
79
[Format Factory Integration](./format-factory.md)
80
81
### RowData Writer APIs
82
83
High-performance writers for Flink's internal RowData format with comprehensive schema mapping and configuration support.
84
85
```java { .api }
86
public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> {
87
public static ParquetWriterFactory<RowData> createWriterFactory(
88
RowType rowType,
89
Configuration conf,
90
boolean utcTimestamp
91
);
92
}
93
94
public class ParquetWriterFactory<T> implements BulkWriter.Factory<T> {
95
public ParquetWriterFactory(ParquetBuilder<T> writerBuilder);
96
public BulkWriter<T> create(FSDataOutputStream stream) throws IOException;
97
}
98
```
99
100
[RowData Writers](./rowdata-writers.md)
101
102
### Avro Integration
103
104
Complete Avro integration supporting specific records, generic records, and reflection-based serialization with full schema compatibility.
105
106
```java { .api }
107
public class ParquetAvroWriters {
108
public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);
109
public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);
110
public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);
111
}
112
```
113
114
[Avro Integration](./avro-integration.md)
115
116
### Protobuf Integration
117
118
Seamless Protocol Buffers integration for writing Protobuf messages to Parquet format with automatic schema generation.
119
120
```java { .api }
121
public class ParquetProtoWriters {
122
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type);
123
}
124
```
125
126
[Protobuf Integration](./protobuf-integration.md)
127
128
### Vectorized Input Formats
129
130
High-performance columnar input formats optimized for analytical workloads with vectorized processing and partition support.
131
132
```java { .api }
133
public class ParquetColumnarRowInputFormat<SplitT extends FileSourceSplit>
134
extends ParquetVectorizedInputFormat<RowData, SplitT> {
135
136
public static <SplitT extends FileSourceSplit> ParquetColumnarRowInputFormat<SplitT> createPartitionedFormat(
137
Configuration hadoopConfig,
138
RowType producedRowType,
139
List<String> partitionKeys,
140
PartitionFieldExtractor<SplitT> extractor,
141
int batchSize,
142
boolean isUtcTimestamp,
143
boolean isCaseSensitive
144
);
145
}
146
```
147
148
[Vectorized Input](./vectorized-input.md)
149
150
### Schema Utilities
151
152
Utilities for converting between Flink logical types and Parquet schema definitions with full type mapping support.
153
154
```java { .api }
155
public class ParquetSchemaConverter {
156
public static MessageType convertToParquetMessageType(String name, RowType rowType);
157
}
158
159
public class SerializableConfiguration {
160
public SerializableConfiguration(Configuration conf);
161
public Configuration conf();
162
}
163
```
164
165
[Schema Utilities](./schema-utilities.md)
166
167
## Configuration Options
168
169
```java { .api }
170
public static final ConfigOption<Boolean> UTC_TIMEZONE =
171
key("utc-timezone")
172
.booleanType()
173
.defaultValue(false)
174
.withDescription("Use UTC timezone or local timezone to the conversion between epoch" +
175
" time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" +
176
" use UTC timezone");
177
```
178
179
## Common Patterns
180
181
### Creating Table with Parquet Format
182
183
```sql
184
CREATE TABLE my_parquet_table (
185
id BIGINT,
186
name STRING,
187
timestamp_col TIMESTAMP(3)
188
) WITH (
189
'connector' = 'filesystem',
190
'path' = '/path/to/parquet/files',
191
'format' = 'parquet',
192
'parquet.utc-timezone' = 'true'
193
);
194
```
195
196
### Programmatic Writer Creation
197
198
```java
199
// For RowData
200
ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory(
201
rowType, hadoopConfig, true
202
);
203
204
// For Avro specific records
205
ParquetWriterFactory<MyAvroRecord> avroFactory =
206
ParquetAvroWriters.forSpecificRecord(MyAvroRecord.class);
207
208
// For Protobuf messages
209
ParquetWriterFactory<MyProtoMessage> protoFactory =
210
ParquetProtoWriters.forType(MyProtoMessage.class);
211
```
212
213
## Types
214
215
```java { .api }
216
@FunctionalInterface
217
public interface ParquetBuilder<T> extends Serializable {
218
ParquetWriter<T> createWriter(OutputFile out) throws IOException;
219
}
220
221
public class ParquetBulkWriter<T> implements BulkWriter<T> {
222
public ParquetBulkWriter(ParquetWriter<T> parquetWriter);
223
public void addElement(T datum) throws IOException;
224
public void flush();
225
public void finish() throws IOException;
226
}
227
228
public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceSplit>
229
implements BulkFormat<T, SplitT> {
230
// Abstract base class for vectorized input formats
231
}
232
```