0
# Table Integration
1
2
Primary integration point for Flink's Table API, providing format factories for reading and writing Parquet files with comprehensive configuration options and statistics support.
3
4
## Capabilities
5
6
### ParquetFileFormatFactory
7
8
Main factory class implementing both bulk reader and writer format factories for seamless Table API integration.
9
10
```java { .api }
11
/**
12
* Parquet format factory for file system connectors
13
*/
14
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
15
16
/**
17
* Creates a bulk decoding format for reading Parquet files
18
* @param context Dynamic table factory context
19
* @param formatOptions Configuration options
20
* @return BulkDecodingFormat for reading RowData from Parquet files
21
*/
22
public BulkDecodingFormat<RowData> createDecodingFormat(
23
DynamicTableFactory.Context context,
24
ReadableConfig formatOptions
25
);
26
27
/**
28
* Creates an encoding format for writing Parquet files
29
* @param context Dynamic table factory context
30
* @param formatOptions Configuration options
31
* @return EncodingFormat for writing RowData to Parquet files
32
*/
33
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
34
DynamicTableFactory.Context context,
35
ReadableConfig formatOptions
36
);
37
38
/**
39
* Returns the format identifier for this factory
40
* @return "parquet"
41
*/
42
public String factoryIdentifier();
43
44
/**
45
* Required configuration options (none for Parquet)
46
* @return Empty set
47
*/
48
public Set<ConfigOption<?>> requiredOptions();
49
50
/**
51
* Optional configuration options
52
* @return Set of supported configuration options
53
*/
54
public Set<ConfigOption<?>> optionalOptions();
55
}
56
```
57
58
### Configuration Options
59
60
Essential configuration options for controlling Parquet reading and writing behavior.
61
62
```java { .api }
63
/**
64
* Format identifier constant
65
*/
66
public static final String IDENTIFIER = "parquet";
67
68
/**
69
* Use UTC timezone or local timezone for timestamp conversion
70
* Default: false (use local timezone)
71
*/
72
public static final ConfigOption<Boolean> UTC_TIMEZONE =
73
key("utc-timezone")
74
.booleanType()
75
.defaultValue(false);
76
77
/**
78
* Time unit for storing Parquet timestamps
79
* Values: "nanos", "micros", "millis"
80
* Default: "micros"
81
*/
82
public static final ConfigOption<String> TIMESTAMP_TIME_UNIT =
83
key("timestamp.time.unit")
84
.stringType()
85
.defaultValue("micros");
86
87
/**
88
* Write timestamps as int64/LogicalTypes instead of int96/OriginalTypes
89
* Default: false (use int96 format)
90
*/
91
public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP =
92
key("write.int64.timestamp")
93
.booleanType()
94
.defaultValue(false);
95
96
/**
97
* Batch size for vectorized reading
98
* Default: 2048 rows per batch
99
*/
100
public static final ConfigOption<Integer> BATCH_SIZE =
101
key("batch-size")
102
.intType()
103
.defaultValue(2048);
104
```
105
106
### ParquetBulkDecodingFormat
107
108
Bulk decoding format implementation with statistics reporting and projection support.
109
110
```java { .api }
111
/**
112
* Bulk decoding format for reading Parquet files with statistics support
113
*/
114
public static class ParquetBulkDecodingFormat
115
implements ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
116
BulkDecodingFormat<RowData>,
117
FileBasedStatisticsReportableInputFormat {
118
119
/**
120
* Creates runtime decoder with projection support
121
* @param sourceContext Table source context
122
* @param producedDataType Output data type
123
* @param projections Column projections
124
* @return BulkFormat for reading projected RowData
125
*/
126
public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
127
DynamicTableSource.Context sourceContext,
128
DataType producedDataType,
129
int[][] projections
130
);
131
132
/**
133
* Returns supported changelog mode (insert-only)
134
* @return ChangelogMode.insertOnly()
135
*/
136
public ChangelogMode getChangelogMode();
137
138
/**
139
* Reports table statistics from Parquet file metadata
140
* @param files List of Parquet files to analyze
141
* @param producedDataType Expected output data type
142
* @return TableStats with row counts and column statistics
143
*/
144
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
145
}
146
```
147
148
## Usage Examples
149
150
### Basic Table Creation
151
152
```java
153
import org.apache.flink.table.api.TableEnvironment;
154
155
TableEnvironment tableEnv = TableEnvironment.create(settings);
156
157
// Create table with Parquet format
158
tableEnv.executeSql("""
159
CREATE TABLE orders (
160
order_id BIGINT,
161
customer_id STRING,
162
product_name STRING,
163
quantity INT,
164
price DECIMAL(10,2),
165
order_date TIMESTAMP(3)
166
) WITH (
167
'connector' = 'filesystem',
168
'path' = '/data/orders',
169
'format' = 'parquet'
170
)
171
""");
172
```
173
174
### Advanced Configuration
175
176
```java
177
// Table with custom Parquet settings
178
tableEnv.executeSql("""
179
CREATE TABLE events (
180
event_id BIGINT,
181
timestamp_col TIMESTAMP(3),
182
payload STRING
183
) WITH (
184
'connector' = 'filesystem',
185
'path' = '/data/events',
186
'format' = 'parquet',
187
'parquet.utc-timezone' = 'true',
188
'parquet.timestamp.time.unit' = 'nanos',
189
'parquet.write.int64.timestamp' = 'true',
190
'parquet.batch-size' = '4096'
191
)
192
""");
193
```
194
195
### Reading with Projections
196
197
```java
198
// Only read specific columns for better performance
199
Table result = tableEnv.sqlQuery("""
200
SELECT order_id, customer_id, price
201
FROM orders
202
WHERE order_date >= TIMESTAMP '2023-01-01 00:00:00'
203
""");
204
```
205
206
### Partition Support
207
208
```java
209
// Partitioned Parquet table
210
tableEnv.executeSql("""
211
CREATE TABLE sales_partitioned (
212
transaction_id BIGINT,
213
amount DECIMAL(10,2),
214
product_category STRING,
215
sale_date DATE
216
) PARTITIONED BY (sale_date) WITH (
217
'connector' = 'filesystem',
218
'path' = '/data/sales',
219
'format' = 'parquet',
220
'sink.partition-commit.policy.kind' = 'success-file'
221
)
222
""");
223
```
224
225
## Statistics and Performance
226
227
The Parquet format factory automatically extracts statistics from Parquet file metadata:
228
229
- **Row counts**: Exact counts from file metadata
230
- **Column statistics**: Min/max values, null counts where available
231
- **File-level metrics**: Used for query planning and optimization
232
- **Projection pushdown**: Only reads required columns from storage
233
- **Predicate pushdown**: Filters applied at file level when possible
234
235
This enables Flink's cost-based optimizer to make intelligent decisions about query execution plans.