0
# Format Factory Integration
1
2
The Parquet format factory provides seamless integration with Flink's table ecosystem, enabling automatic format detection and configuration for SQL table definitions.
3
4
## Capabilities
5
6
### ParquetFileFormatFactory
7
8
Main factory class that implements both bulk reader and writer format factories, providing complete integration with Flink's dynamic table system.
9
10
```java { .api }
11
/**
12
* Parquet format factory for file system connector integration
13
* Implements both reading and writing capabilities for SQL tables
14
*/
15
public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
16
17
/**
18
* Creates a bulk decoding format for reading Parquet files
19
* @param context Dynamic table factory context with catalog information
20
* @param formatOptions Configuration options for the format
21
* @return BulkDecodingFormat for RowData processing
22
*/
23
public BulkDecodingFormat<RowData> createDecodingFormat(
24
DynamicTableFactory.Context context,
25
ReadableConfig formatOptions
26
);
27
28
/**
29
* Creates an encoding format for writing Parquet files
30
* @param context Dynamic table factory context with catalog information
31
* @param formatOptions Configuration options for the format
32
* @return EncodingFormat for BulkWriter factory creation
33
*/
34
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
35
DynamicTableFactory.Context context,
36
ReadableConfig formatOptions
37
);
38
39
/**
40
* Returns the unique identifier for this format factory
41
* @return "parquet" identifier string
42
*/
43
public String factoryIdentifier();
44
45
/**
46
* Returns the set of required configuration options
47
* @return Empty set (no required options)
48
*/
49
public Set<ConfigOption<?>> requiredOptions();
50
51
/**
52
* Returns the set of optional configuration options
53
* @return Set containing UTC_TIMEZONE and other optional configs
54
*/
55
public Set<ConfigOption<?>> optionalOptions();
56
}
57
```
58
59
### Configuration Options
60
61
```java { .api }
62
/**
63
* Controls timezone handling for timestamp conversion
64
* Default: false (use local timezone)
65
* When true: use UTC timezone for epoch time to LocalDateTime conversion
66
*/
67
public static final ConfigOption<Boolean> UTC_TIMEZONE =
68
key("utc-timezone")
69
.booleanType()
70
.defaultValue(false)
71
.withDescription(
72
"Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. " +
73
"Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone"
74
);
75
```
76
77
## Usage Examples
78
79
### SQL Table Definition
80
81
```sql
82
-- Create a Parquet table with UTC timezone handling
83
CREATE TABLE orders (
84
order_id BIGINT,
85
customer_name STRING,
86
order_date TIMESTAMP(3),
87
amount DECIMAL(10,2)
88
) WITH (
89
'connector' = 'filesystem',
90
'path' = '/data/orders',
91
'format' = 'parquet',
92
'parquet.utc-timezone' = 'true'
93
);
94
95
-- Query the table (format factory handles Parquet reading automatically)
96
SELECT customer_name, SUM(amount)
97
FROM orders
98
WHERE order_date >= TIMESTAMP '2023-01-01 00:00:00'
99
GROUP BY customer_name;
100
```
101
102
### Programmatic Table Creation
103
104
```java
105
import org.apache.flink.table.api.TableEnvironment;
106
import org.apache.flink.table.descriptors.FileSystem;
107
import org.apache.flink.table.descriptors.Parquet;
108
109
TableEnvironment tableEnv = // ... get table environment
110
111
// The format factory is automatically discovered via service loader
112
tableEnv.executeSql("""
113
CREATE TABLE parquet_table (
114
id BIGINT,
115
name STRING,
116
created_at TIMESTAMP(3)
117
) WITH (
118
'connector' = 'filesystem',
119
'path' = '/path/to/data',
120
'format' = 'parquet',
121
'parquet.utc-timezone' = 'false'
122
)
123
""");
124
```
125
126
### Service Registration
127
128
The format factory is automatically registered via Java's ServiceLoader mechanism:
129
130
```
131
# File: META-INF/services/org.apache.flink.table.factories.Factory
132
org.apache.flink.formats.parquet.ParquetFileFormatFactory
133
```
134
135
## Internal Implementation Details
136
137
The factory creates different decoders and encoders based on the table context:
138
139
- **Decoding**: Creates `ParquetColumnarRowInputFormat` with vectorized reading
140
- **Encoding**: Creates `ParquetRowDataBuilder` with RowData writing support
141
- **Partitioning**: Automatically handles partitioned tables with proper field extraction
142
- **Schema Conversion**: Converts Flink logical types to Parquet schema format
143
144
## Error Handling
145
146
The format factory handles common configuration errors:
147
148
- Invalid timezone configuration values
149
- Incompatible data types for Parquet format
150
- Missing required Hadoop configuration
151
- Schema conversion failures between Flink and Parquet types